Skip to content

refactor: shared-helpers dedup batch (commit validation, bus helpers, test helpers, macro codegen)#88

Open
patrickleet wants to merge 6 commits into
mainfrom
hardening/shared-helpers-dedup
Open

refactor: shared-helpers dedup batch (commit validation, bus helpers, test helpers, macro codegen)#88
patrickleet wants to merge 6 commits into
mainfrom
hardening/shared-helpers-dedup

Conversation

@patrickleet

@patrickleet patrickleet commented Jun 11, 2026

Copy link
Copy Markdown
Collaborator

Batch of four small, independent de-duplications from the 2026-06-10 review (tasks/shared-helpers-dedup-batch). Each gives a duplicated helper one definition in the module that owns the concept. No public API change; no behavior change.

1. Commit-batch validation (drift risk) — ~90 lines removed

Six validation fns (reject_duplicate_streams, reject_duplicate_outbox_messages, validate_entity_id_matches_identity, validate_prepared_appends, validate_supported_event_codec, validate_snapshot_identity) were duplicated byte-for-byte between src/sqlx_repo/mod.rs and src/hashmap_repo/repository.rs. They have no sqlx dependency — only the cfg gate forced the copy.

Moved to a single ungated home, src/repository/validation.rs (they validate the repository types StreamWrite/PreparedEventAppend/OutboxMessage/CommitBatch). All three backends (hashmap, sqlite, postgres) now import the one copy via crate::repository, so the definition of a valid CommitBatch cannot drift between backends. The error-taxonomy work from #83 in sqlx_repo is untouched.

2. Bus helpers — ~80 lines removed

  • kind_str/kind_from_str (5x: nats, kafka, rabbitmq, postgres_bus, knative_bus) → MessageKind::as_str / MessageKind::from_str_lossy on the type's owning module src/bus/message.rs.
  • retryable(context, err) (5x: nats, kafka, rabbitmq, nats_bus, rabbit_bus) → one pub(crate) fn in src/bus/error.rs, where TransportError lives.
  • subject/topic/routing-key prefix stripping (3x: nats, kafka, rabbit_bus) → strip_address_prefix free fn next to Message.

The crate-internal re-exports and the transport-only fns are feature-gated to the consuming transports so non-transport builds stay warning-free. #80's postgres corrupt-row handling and the in-flight NATS publish path are untouched beyond the helper swap.

3. Transport test helpers — ~150 lines removed (one shared copy added)

The four broker test mains each redefined the same scaffolding. Moved the genuinely-identical helpers (recording_for, named_recording_for, run_token, the run-token-based unique) into tests/transport_conformance/mod.rs, included via the established #[path] mechanism (the in-memory harness already does this).

Load-bearing differences preserved: kafka keeps its own nanos-based unique (it persists topics across runs); each transport keeps its own env-skip helper (env var name + message differ); per-transport scenarios stay in their own files. #80's postgres corrupt-row test is untouched.

4. Macro codegen — ~35 lines deduped

aggregate! (expand_aggregate) and #[sourced] (expand_sourced) emitted a byte-for-byte identical impl distributed::Aggregate block. Extracted one aggregate_impl_tokens(...) helper used by both, preventing replay-semantics drift. Fits the post-#82 expand_* -> syn::Result shape; each caller still places #upcaster_wrappers where it did.

Testing

All local (sqlite no Docker; brokers via docker compose).

  • cargo fmt --all --check — clean.
  • cargo build default + --features sqlite|postgres|nats|rabbitmq|kafka — all clean (kafka built; librdkafka via cmake).
  • cargo test --features sqlite — full suite green, including commit-validation conformance across hashmap + sqlite (271 lib tests + every sqlite test target, 0 failures).
  • cargo test -p distributed_macros — 50 pass. Macro output verified byte-identical: a golden dump of both macros' to_string() output (covering no-payload / single-arg / multi-arg replay arms and the aggregate_type method) matched the pre-refactor output exactly (diff empty).
  • Real-broker validation via docker compose up -d with env URLs: postgres 9, rabbitmq 5, nats 5, kafka 5 transport tests pass; transport_in_memory still 12. docker compose down --remove-orphans after.

🤖 Generated with Claude Code

Summary by CodeRabbit

  • Refactor

    • Consolidated macro codegen and unified message-kind serialization and retryable error handling across transports for consistency.
    • Moved repository validation into a shared module and reorganized imports to improve maintainability.
  • Tests

    • Centralized transport test helpers to eliminate duplication and standardize integration tests.

The six commit-batch validation fns (reject_duplicate_streams,
reject_duplicate_outbox_messages, validate_entity_id_matches_identity,
validate_prepared_appends, validate_supported_event_codec,
validate_snapshot_identity) were duplicated byte-for-byte between
src/sqlx_repo/mod.rs and src/hashmap_repo/repository.rs. They have no
sqlx dependency; only the cfg gate forced the copy.

Move them to a single ungated home, src/repository/validation.rs, since
they validate repository-level types (StreamWrite, PreparedEventAppend,
OutboxMessage, CommitBatch). All three backends (hashmap, sqlite,
postgres) now import the one copy via crate::repository, so the
definition of a valid CommitBatch cannot drift between backends.

No behavior change: the moved fns are byte-identical to the originals;
the error taxonomy work from #83 in sqlx_repo is untouched.

Implements [[tasks/shared-helpers-dedup-batch]]
Three small transport helpers were copy-pasted across the bus adapters:

- kind_str/kind_from_str (5x: nats, kafka, rabbitmq, postgres_bus,
  knative_bus) -> MessageKind::as_str / MessageKind::from_str_lossy on
  the type's owning module, src/bus/message.rs.
- retryable(context, err) (5x: nats, kafka, rabbitmq, nats_bus,
  rabbit_bus) -> one pub(crate) fn in src/bus/error.rs, where
  TransportError lives.
- subject/topic/routing-key prefix stripping (3x: nats, kafka,
  rabbit_bus) -> strip_address_prefix free fn next to Message.

Each helper now has one definition in the module that owns the concept.
The crate-internal re-exports and the retryable/strip fns are gated to
the transport features that consume them so non-transport builds stay
warning-free. No behavior change: the string tokens, the
event-default-on-unknown parse, and the "{context}: {err}" format are
identical to the originals; #80's postgres corrupt-row handling and the
in-flight NATS publish path are untouched beyond the helper swap.

Implements [[tasks/shared-helpers-dedup-batch]]
aggregate! (expand_aggregate) and #[sourced] (expand_sourced) emitted a
byte-for-byte identical 'impl distributed::Aggregate' block: same
ReplayError = String, same entity/entity_mut/replay_event bodies, and
the same optional aggregate_type and upcasters methods. Only the replay
match arms are built differently upstream.

Extract one aggregate_impl_tokens(type_name, entity_field,
aggregate_type_method, replay_arms, upcasters_method) helper that emits
the impl block; both macros now call it. This prevents the replay
semantics of the two entry points from drifting. Works with the post-#82
expand_* -> syn::Result shape; each caller still places #upcaster_wrappers
where it already did.

Verified byte-identical: a golden dump of both macros' to_string() output
(covering no-payload, single-arg, multi-arg replay arms and aggregate_type)
matched the pre-refactor output exactly. The large line delta is rustfmt
re-indenting the two replay-arm closures after collecting them to a Vec.

Implements [[tasks/shared-helpers-dedup-batch]]
The four broker test mains each redefined the same scaffolding:
recording_for / named_recording_for (kafka, rabbitmq, postgres),
run_token, and the run-token-based unique (nats, rabbitmq). Move the
genuinely-identical helpers into tests/transport_conformance/mod.rs and
include it via the established #[path] mechanism (the in-memory harness
already does this).

The per-transport scenarios stay in their own files, and load-bearing
differences are preserved: kafka keeps its own nanos-based unique (it
persists topics across runs), and each transport keeps its own env-skip
helper (the env var name and message differ). #80's postgres corrupt-row
test is untouched.

Validated against real brokers via docker compose: postgres 9, rabbitmq
5, nats 5, kafka 5 tests pass; transport_in_memory still passes (12).
Net ~150 duplicated lines removed in favor of one shared copy.

Implements [[tasks/shared-helpers-dedup-batch]]
@coderabbitai

coderabbitai Bot commented Jun 11, 2026

Copy link
Copy Markdown

Review Change Stack

No actionable comments were generated in the recent review. 🎉

ℹ️ Recent review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 2a425b49-bd39-4e1b-af18-21be64796f23

📥 Commits

Reviewing files that changed from the base of the PR and between e3f8ed4 and 9a7df32.

📒 Files selected for processing (1)
  • tests/nats_transport/main.rs

📝 Walkthrough

Walkthrough

This PR refactors shared code generation, messaging utilities, and validation logic across the codebase. It extracts a common aggregate_impl_tokens helper used by both proc-macros, centralizes bus messaging utilities (MessageKind serialization, error formatting, address-prefix handling), moves backend-agnostic repository validation to a new module, and consolidates transport test infrastructure into shared helpers.

Changes

Proc-macro shared helper extraction

Layer / File(s) Summary
Shared aggregate impl token generation
distributed_macros/src/lib.rs
A new aggregate_impl_tokens helper generates the common impl distributed::Aggregate block. Both aggregate! and #[sourced] now collect replay match arms as Vec<TokenStream2> and delegate impl generation to this helper instead of inlining separate quote! blocks.

Bus messaging utilities and transport adapter refactoring

Layer / File(s) Summary
Core MessageKind and bus helper utilities
src/bus/message.rs, src/bus/error.rs
MessageKind adds as_str() and from_str_lossy() for wire-token conversion, a new retryable(context, err) helper constructs consistent error messages, and strip_address_prefix(address, prefix) handles optional prefix removal from transport addresses.
Bus module conditional re-exports
src/bus/mod.rs
retryable and strip_address_prefix are conditionally re-exported as pub(crate) under NATS/Kafka/RabbitMQ features.
Kafka and NATS adapter refactoring
src/bus/kafka.rs, src/bus/nats.rs
Kafka and NATS adapters switch from local kind_str/kind_from_str matchers and inline prefix logic to MessageKind::as_str(), MessageKind::from_str_lossy(), and strip_address_prefix().
RabbitMQ and Postgres adapter refactoring
src/bus/rabbitmq.rs, src/bus/postgres_bus.rs, src/bus/rabbit_bus.rs
RabbitMQ, Postgres, and Rabbit-bus adapters adopt shared utilities for message-kind serialization and error construction; Rabbit-bus uses the centralized strip_address_prefix helper.
Knative, Nats-bus, and minor cleanup
src/bus/knative_bus.rs, src/bus/nats_bus.rs
Knative adapter uses MessageKind::as_str() and removes local kind_str; Nats-bus imports are reorganized.

Repository validation module extraction

Layer / File(s) Summary
Backend-agnostic validation module creation
src/repository/mod.rs, src/repository/validation.rs
New validation.rs module defines six crate-internal validator functions for stream/outbox deduplication, entity-identity consistency, prepared-event sequencing, event codec support, and snapshot validation. The module is wired into repository/mod.rs with conditional re-exports for Postgres and SQLite builds.
Remove validators from sqlx_repo and wire into sqlite/postgres
src/sqlx_repo/mod.rs, src/sqlite_repo/mod.rs, src/postgres_repo/mod.rs
The six validator functions are removed from sqlx_repo, imports are trimmed, and sqlite_repo and postgres_repo now import validators from crate::repository instead.
Hashmap repository import reorganization
src/hashmap_repo/repository.rs
Imports are reorganized to use validators from crate::repository; validate_snapshot_write helper is repositioned and test code receives an explicit StreamWrite import.

Test infrastructure consolidation

Layer / File(s) Summary
Broker-test helper functions
tests/transport_conformance/mod.rs
New functions recording_for(), named_recording_for(), run_token(), and unique() provide shared mechanisms for collision-resistant test identifiers and message recording across transport tests.
Transport tests using shared helpers
tests/kafka_transport/main.rs, tests/nats_transport/main.rs, tests/postgres_transport/main.rs, tests/rabbitmq_transport/main.rs
Kafka, NATS, Postgres, and RabbitMQ transport tests now import shared helpers from transport_conformance and remove locally-defined recording and uniqueness utilities.

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

Possibly related PRs

  • hops-ops/distributed#51: Builds on the extracted crate::bus core types by adding MessageKind::as_str/from_str_lossy and shared bus helper functions that extend the messaging foundation introduced in that PR.

Poem

🐰 Through macro halls and bus refactors clean,
Shared tokens emerge, validators convene,
Test helpers hop from scope to scope—
One hopeful crate, less duplication's rope!

🚥 Pre-merge checks | ✅ 4 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 70.97% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title clearly and concisely summarizes the main refactoring effort: consolidating duplicated helper code across four areas (commit validation, bus helpers, test helpers, macro codegen) into shared, single-source-of-truth locations.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
📝 Generate docstrings
  • Create stacked PR
  • Commit on current branch
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch hardening/shared-helpers-dedup

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
tests/nats_transport/main.rs (1)

132-169: 🛠️ Refactor suggestion | 🟠 Major | ⚡ Quick win

Remove duplicate helper functions.

These functions are identical to the shared recording_for and named_recording_for helpers in transport_conformance/mod.rs. Import and use the shared versions instead to complete the deduplication goal.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@tests/nats_transport/main.rs` around lines 132 - 169, Delete the local helper
functions recording_service and named_recording_service and instead import and
call the shared helpers recording_for and named_recording_for from the
transport_conformance module; update all local references to
recording_service(...) -> recording_for(...) and named_recording_service(...) ->
named_recording_for(...), and add the necessary use/import for
transport_conformance::recording_for and
transport_conformance::named_recording_for so the file compiles.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@tests/nats_transport/main.rs`:
- Around line 17-20: Import the shared helpers from the conformance module and
remove the duplicated local implementations: add use
conformance::{recording_for, named_recording_for}; at the top alongside unique,
delete the local functions recording_service and named_recording_service (the
functions around lines 132–169), and update any call sites that referenced
recording_service/named_recording_service to call
recording_for/named_recording_for instead so tests use the shared helpers.

---

Outside diff comments:
In `@tests/nats_transport/main.rs`:
- Around line 132-169: Delete the local helper functions recording_service and
named_recording_service and instead import and call the shared helpers
recording_for and named_recording_for from the transport_conformance module;
update all local references to recording_service(...) -> recording_for(...) and
named_recording_service(...) -> named_recording_for(...), and add the necessary
use/import for transport_conformance::recording_for and
transport_conformance::named_recording_for so the file compiles.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: cad2aced-bc67-41fa-b757-ecff8a0c7c75

📥 Commits

Reviewing files that changed from the base of the PR and between 95e4318 and e3f8ed4.

📒 Files selected for processing (22)
  • distributed_macros/src/lib.rs
  • src/bus/error.rs
  • src/bus/kafka.rs
  • src/bus/knative_bus.rs
  • src/bus/message.rs
  • src/bus/mod.rs
  • src/bus/nats.rs
  • src/bus/nats_bus.rs
  • src/bus/postgres_bus.rs
  • src/bus/rabbit_bus.rs
  • src/bus/rabbitmq.rs
  • src/hashmap_repo/repository.rs
  • src/postgres_repo/mod.rs
  • src/repository/mod.rs
  • src/repository/validation.rs
  • src/sqlite_repo/mod.rs
  • src/sqlx_repo/mod.rs
  • tests/kafka_transport/main.rs
  • tests/nats_transport/main.rs
  • tests/postgres_transport/main.rs
  • tests/rabbitmq_transport/main.rs
  • tests/transport_conformance/mod.rs

Comment thread tests/nats_transport/main.rs Outdated
The NATS test still defined local recording_service/named_recording_service
that were byte-identical to the shared recording_for/named_recording_for in
transport_conformance — import them (aliased to the local names) and delete the
duplicates, completing the test-helper dedup.

Addresses CodeRabbit review on PR #88.

Refines [[tasks/shared-helpers-dedup-batch]]
…TS test

Drop the alias and call recording_for/named_recording_for directly at the call
sites, matching the kafka/rabbitmq/postgres transport tests for consistency.

Refines [[tasks/shared-helpers-dedup-batch]]
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant