refactor: shared-helpers dedup batch (commit validation, bus helpers, test helpers, macro codegen)#88
refactor: shared-helpers dedup batch (commit validation, bus helpers, test helpers, macro codegen)#88patrickleet wants to merge 6 commits into
Conversation
The six commit-batch validation fns (reject_duplicate_streams, reject_duplicate_outbox_messages, validate_entity_id_matches_identity, validate_prepared_appends, validate_supported_event_codec, validate_snapshot_identity) were duplicated byte-for-byte between src/sqlx_repo/mod.rs and src/hashmap_repo/repository.rs. They have no sqlx dependency; only the cfg gate forced the copy. Move them to a single ungated home, src/repository/validation.rs, since they validate repository-level types (StreamWrite, PreparedEventAppend, OutboxMessage, CommitBatch). All three backends (hashmap, sqlite, postgres) now import the one copy via crate::repository, so the definition of a valid CommitBatch cannot drift between backends. No behavior change: the moved fns are byte-identical to the originals; the error taxonomy work from #83 in sqlx_repo is untouched. Implements [[tasks/shared-helpers-dedup-batch]]
Three small transport helpers were copy-pasted across the bus adapters:
- kind_str/kind_from_str (5x: nats, kafka, rabbitmq, postgres_bus,
knative_bus) -> MessageKind::as_str / MessageKind::from_str_lossy on
the type's owning module, src/bus/message.rs.
- retryable(context, err) (5x: nats, kafka, rabbitmq, nats_bus,
rabbit_bus) -> one pub(crate) fn in src/bus/error.rs, where
TransportError lives.
- subject/topic/routing-key prefix stripping (3x: nats, kafka,
rabbit_bus) -> strip_address_prefix free fn next to Message.
Each helper now has one definition in the module that owns the concept.
The crate-internal re-exports and the retryable/strip fns are gated to
the transport features that consume them so non-transport builds stay
warning-free. No behavior change: the string tokens, the
event-default-on-unknown parse, and the "{context}: {err}" format are
identical to the originals; #80's postgres corrupt-row handling and the
in-flight NATS publish path are untouched beyond the helper swap.
Implements [[tasks/shared-helpers-dedup-batch]]
aggregate! (expand_aggregate) and #[sourced] (expand_sourced) emitted a byte-for-byte identical 'impl distributed::Aggregate' block: same ReplayError = String, same entity/entity_mut/replay_event bodies, and the same optional aggregate_type and upcasters methods. Only the replay match arms are built differently upstream. Extract one aggregate_impl_tokens(type_name, entity_field, aggregate_type_method, replay_arms, upcasters_method) helper that emits the impl block; both macros now call it. This prevents the replay semantics of the two entry points from drifting. Works with the post-#82 expand_* -> syn::Result shape; each caller still places #upcaster_wrappers where it already did. Verified byte-identical: a golden dump of both macros' to_string() output (covering no-payload, single-arg, multi-arg replay arms and aggregate_type) matched the pre-refactor output exactly. The large line delta is rustfmt re-indenting the two replay-arm closures after collecting them to a Vec. Implements [[tasks/shared-helpers-dedup-batch]]
The four broker test mains each redefined the same scaffolding: recording_for / named_recording_for (kafka, rabbitmq, postgres), run_token, and the run-token-based unique (nats, rabbitmq). Move the genuinely-identical helpers into tests/transport_conformance/mod.rs and include it via the established #[path] mechanism (the in-memory harness already does this). The per-transport scenarios stay in their own files, and load-bearing differences are preserved: kafka keeps its own nanos-based unique (it persists topics across runs), and each transport keeps its own env-skip helper (the env var name and message differ). #80's postgres corrupt-row test is untouched. Validated against real brokers via docker compose: postgres 9, rabbitmq 5, nats 5, kafka 5 tests pass; transport_in_memory still passes (12). Net ~150 duplicated lines removed in favor of one shared copy. Implements [[tasks/shared-helpers-dedup-batch]]
|
No actionable comments were generated in the recent review. 🎉 ℹ️ Recent review info⚙️ Run configurationConfiguration used: defaults Review profile: CHILL Plan: Pro Run ID: 📒 Files selected for processing (1)
📝 WalkthroughWalkthroughThis PR refactors shared code generation, messaging utilities, and validation logic across the codebase. It extracts a common ChangesProc-macro shared helper extraction
Bus messaging utilities and transport adapter refactoring
Repository validation module extraction
Test infrastructure consolidation
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Possibly related PRs
Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
tests/nats_transport/main.rs (1)
132-169: 🛠️ Refactor suggestion | 🟠 Major | ⚡ Quick winRemove duplicate helper functions.
These functions are identical to the shared
recording_forandnamed_recording_forhelpers intransport_conformance/mod.rs. Import and use the shared versions instead to complete the deduplication goal.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@tests/nats_transport/main.rs` around lines 132 - 169, Delete the local helper functions recording_service and named_recording_service and instead import and call the shared helpers recording_for and named_recording_for from the transport_conformance module; update all local references to recording_service(...) -> recording_for(...) and named_recording_service(...) -> named_recording_for(...), and add the necessary use/import for transport_conformance::recording_for and transport_conformance::named_recording_for so the file compiles.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@tests/nats_transport/main.rs`:
- Around line 17-20: Import the shared helpers from the conformance module and
remove the duplicated local implementations: add use
conformance::{recording_for, named_recording_for}; at the top alongside unique,
delete the local functions recording_service and named_recording_service (the
functions around lines 132–169), and update any call sites that referenced
recording_service/named_recording_service to call
recording_for/named_recording_for instead so tests use the shared helpers.
---
Outside diff comments:
In `@tests/nats_transport/main.rs`:
- Around line 132-169: Delete the local helper functions recording_service and
named_recording_service and instead import and call the shared helpers
recording_for and named_recording_for from the transport_conformance module;
update all local references to recording_service(...) -> recording_for(...) and
named_recording_service(...) -> named_recording_for(...), and add the necessary
use/import for transport_conformance::recording_for and
transport_conformance::named_recording_for so the file compiles.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: cad2aced-bc67-41fa-b757-ecff8a0c7c75
📒 Files selected for processing (22)
distributed_macros/src/lib.rssrc/bus/error.rssrc/bus/kafka.rssrc/bus/knative_bus.rssrc/bus/message.rssrc/bus/mod.rssrc/bus/nats.rssrc/bus/nats_bus.rssrc/bus/postgres_bus.rssrc/bus/rabbit_bus.rssrc/bus/rabbitmq.rssrc/hashmap_repo/repository.rssrc/postgres_repo/mod.rssrc/repository/mod.rssrc/repository/validation.rssrc/sqlite_repo/mod.rssrc/sqlx_repo/mod.rstests/kafka_transport/main.rstests/nats_transport/main.rstests/postgres_transport/main.rstests/rabbitmq_transport/main.rstests/transport_conformance/mod.rs
The NATS test still defined local recording_service/named_recording_service that were byte-identical to the shared recording_for/named_recording_for in transport_conformance — import them (aliased to the local names) and delete the duplicates, completing the test-helper dedup. Addresses CodeRabbit review on PR #88. Refines [[tasks/shared-helpers-dedup-batch]]
…TS test Drop the alias and call recording_for/named_recording_for directly at the call sites, matching the kafka/rabbitmq/postgres transport tests for consistency. Refines [[tasks/shared-helpers-dedup-batch]]
Batch of four small, independent de-duplications from the 2026-06-10 review (
tasks/shared-helpers-dedup-batch). Each gives a duplicated helper one definition in the module that owns the concept. No public API change; no behavior change.1. Commit-batch validation (drift risk) — ~90 lines removed
Six validation fns (
reject_duplicate_streams,reject_duplicate_outbox_messages,validate_entity_id_matches_identity,validate_prepared_appends,validate_supported_event_codec,validate_snapshot_identity) were duplicated byte-for-byte betweensrc/sqlx_repo/mod.rsandsrc/hashmap_repo/repository.rs. They have no sqlx dependency — only the cfg gate forced the copy.Moved to a single ungated home,
src/repository/validation.rs(they validate the repository typesStreamWrite/PreparedEventAppend/OutboxMessage/CommitBatch). All three backends (hashmap, sqlite, postgres) now import the one copy viacrate::repository, so the definition of a validCommitBatchcannot drift between backends. The error-taxonomy work from #83 insqlx_repois untouched.2. Bus helpers — ~80 lines removed
kind_str/kind_from_str(5x: nats, kafka, rabbitmq, postgres_bus, knative_bus) →MessageKind::as_str/MessageKind::from_str_lossyon the type's owning modulesrc/bus/message.rs.retryable(context, err)(5x: nats, kafka, rabbitmq, nats_bus, rabbit_bus) → onepub(crate) fninsrc/bus/error.rs, whereTransportErrorlives.strip_address_prefixfree fn next toMessage.The crate-internal re-exports and the transport-only fns are feature-gated to the consuming transports so non-transport builds stay warning-free. #80's postgres corrupt-row handling and the in-flight NATS publish path are untouched beyond the helper swap.
3. Transport test helpers — ~150 lines removed (one shared copy added)
The four broker test mains each redefined the same scaffolding. Moved the genuinely-identical helpers (
recording_for,named_recording_for,run_token, the run-token-basedunique) intotests/transport_conformance/mod.rs, included via the established#[path]mechanism (the in-memory harness already does this).Load-bearing differences preserved: kafka keeps its own nanos-based
unique(it persists topics across runs); each transport keeps its own env-skip helper (env var name + message differ); per-transport scenarios stay in their own files. #80's postgres corrupt-row test is untouched.4. Macro codegen — ~35 lines deduped
aggregate!(expand_aggregate) and#[sourced](expand_sourced) emitted a byte-for-byte identicalimpl distributed::Aggregateblock. Extracted oneaggregate_impl_tokens(...)helper used by both, preventing replay-semantics drift. Fits the post-#82expand_* -> syn::Resultshape; each caller still places#upcaster_wrapperswhere it did.Testing
All local (sqlite no Docker; brokers via
docker compose).cargo fmt --all --check— clean.cargo builddefault +--features sqlite|postgres|nats|rabbitmq|kafka— all clean (kafka built; librdkafka via cmake).cargo test --features sqlite— full suite green, including commit-validation conformance across hashmap + sqlite (271 lib tests + every sqlite test target, 0 failures).cargo test -p distributed_macros— 50 pass. Macro output verified byte-identical: a golden dump of both macros'to_string()output (covering no-payload / single-arg / multi-arg replay arms and theaggregate_typemethod) matched the pre-refactor output exactly (diffempty).docker compose up -dwith env URLs: postgres 9, rabbitmq 5, nats 5, kafka 5 transport tests pass;transport_in_memorystill 12.docker compose down --remove-orphansafter.🤖 Generated with Claude Code
Summary by CodeRabbit
Refactor
Tests