perf: reduce commit/hydrate hot-path round trips and clones#87
perf: reduce commit/hydrate hot-path round trips and clones#87patrickleet wants to merge 3 commits into
Conversation
hydrate() previously deep-cloned the entire event history via events().to_vec() on every replay even with no upcasters, purely to satisfy the borrow checker. Take the events out of the owned entity, replay from the local Vec, and restore via load_from_history. The no-upcaster path now replays with zero clones; the upcaster path keeps its single bounded clone (the durable history is restored verbatim). The snapshot tail path (hydrate_prepared_snapshot) did a similar .cloned() to collect post-snapshot events. Replay the common (no-upcaster) path directly from a filtered borrow so the tail is no longer cloned, while still restoring the full history so new_events() slicing on the next commit is unchanged. PR #81's snapshot+tail fetch behavior is preserved. Adds Entity::take_events / Entity::restore_history for the borrow-out-then-restore pattern. Hydrated state is identical: same replay order, same final version/committed_version, same upcaster semantics.
commit_batch issued one INSERT per event and one INSERT per outbox message inside the open transaction, holding row/index locks across many round trips. Batch each into a single multi-row INSERT built with QueryBuilder::push_values (postgres) — and the same, chunked under SQLite's bound-parameter limit (sqlite). The per-stream version pre-check is kept: it enforces sequence contiguity, which the unique PK alone does not catch for expected<actual gaps. Conflict detection is unchanged. An event unique violation still maps to ConcurrentWrite (postgres re-reads the conflicting stream's version over the pool because a failed statement aborts the tx; sqlite re-reads in-tx since SQLite does not abort on a constraint error). An outbox unique violation still maps to DuplicateOutboxMessageInBatch. get_streams looped awaiting a query per identity (N+1). Replace with one WHERE aggregate_type = $1 AND aggregate_id = ANY($2) ORDER BY aggregate_id, sequence query per aggregate type (sqlite uses a bound IN list, having no array type), splitting the flat result into entities client-side. Callers of get_all already accept storage-order results. Ordering and hydrated state are unchanged.
publish() owns the Message and drops it immediately after, but cloned message.payload before handing it to JetStream. Move the buffer out with std::mem::take and convert via Bytes::from(Vec<u8>) (zero-copy) instead. Behavior is unchanged: the same bytes are published. Implements [[tasks/commit-hydrate-hot-path-efficiency]]
|
Warning Review limit reached
More reviews will be available in 20 minutes and 19 seconds. Learn how PR review limits work. Your organization has run out of usage credits. Purchase more credits in the billing tab to continue. ⌛ How to resolve this issue?After more reviews become available, a review can be triggered using the We recommend that you space out your commits to avoid hitting the rate limit. 🚦 How do rate limits work?CodeRabbit enforces hourly rate limits for each developer per organization. Our paid plans include higher PR review limits than trial, open-source, and free plans. In all cases, reviews become available again over time. During sustained high-volume PR review activity, CodeRabbit may temporarily slow when the next review becomes available. Please see our Fair Usage Limits Policy for further information. ℹ️ Review info⚙️ Run configurationConfiguration used: defaults Review profile: CHILL Plan: Pro Run ID: 📒 Files selected for processing (6)
✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
Implements the four efficiency fixes from the 2026-06-10 review
(A3/A4/A5/B6). Each change is behavior-preserving — same conflict
detection, same ordering, same hydrated state. Deferred items
(B1/B2/B3) are intentionally untouched.
Changes
A3 — batched commit inserts (
postgres_repo,sqlite_repo)commit_batchissued oneINSERTper event and one per outbox messageinside the open transaction, holding row/index locks across many round
trips. Each is now a single multi-row
INSERTbuilt withQueryBuilder::push_values(Postgres), and the same chunked underSQLite's bound-parameter limit (SQLite). The per-stream
SELECT MAX(sequence)version pre-check is kept — it enforcessequence contiguity, which the unique PK alone does not catch for
expected < actualgaps. Conflict detection is unchanged: an eventunique violation still surfaces as
ConcurrentWrite(Postgres re-readsthe conflicting stream's version over the pool since a failed statement
aborts the tx; SQLite re-reads in-tx, as SQLite does not abort on a
constraint error), and an outbox unique violation still maps to
DuplicateOutboxMessageInBatch.A4 — hydrate clone (
aggregate,entity,snapshot)hydrate()deep-cloned the entire event history viaevents().to_vec()on every replay, even with no upcasters, purely for the borrow checker.
Events are now taken out of the owned entity, replayed from a local Vec,
and restored via
load_from_history. The no-upcaster path replays withzero clones; the upcaster path keeps its single bounded clone with the
durable history restored verbatim. The snapshot tail path
(
hydrate_prepared_snapshot) likewise replays the common path directlyfrom a filtered borrow instead of
.cloned()-collecting thepost-snapshot events, restoring the full history so
new_events()slicing on the next commit is unchanged. PR #81's snapshot+tail fetch
behavior is preserved. Adds
Entity::take_events/restore_history.A5 — get_streams N+1 (
postgres_repo,sqlite_repo)get_streamslooped awaiting a query per identity. Replaced with oneWHERE aggregate_type = $1 AND aggregate_id = ANY($2) ORDER BY aggregate_id, sequencequery per aggregate type (SQLite uses a boundINlist, having no array type), splitting the flat result intoentities client-side.
get_allcallers already accept storage-orderresults.
B6 — NATS publish clone (
bus/nats)publish()owns theMessageand drops it immediately, but clonedmessage.payloadbefore publishing. Now moves the buffer out withstd::mem::takeand converts zero-copy viaBytes::from(Vec<u8>).Testing
cargo fmt --all— cleancargo build(default),--features sqlite,--features postgres— all cleancargo test --features sqlite— all green (conformance, repo, snapshot, upcaster suites)compose.yaml, postgres:18):cargo test --features postgres --test postgres_repository --test postgres_repository_conformance --test event_storeplus the full
cargo test --features postgressuite — all green. Thebatched-insert and
get_streamschanges were validated against realPostgres (the outbox
::jsonbbuilder fragment was caught and fixedhere).
--all-featuresnot attempted (requires librdkafka cmake), per task scope.🤖 Generated with Claude Code