Skip to content

perf: reduce commit/hydrate hot-path round trips and clones#87

Open
patrickleet wants to merge 3 commits into
mainfrom
hardening/commit-hydrate-efficiency
Open

perf: reduce commit/hydrate hot-path round trips and clones#87
patrickleet wants to merge 3 commits into
mainfrom
hardening/commit-hydrate-efficiency

Conversation

@patrickleet

Copy link
Copy Markdown
Collaborator

Implements the four efficiency fixes from the 2026-06-10 review
(A3/A4/A5/B6). Each change is behavior-preserving — same conflict
detection, same ordering, same hydrated state. Deferred items
(B1/B2/B3) are intentionally untouched.

Changes

A3 — batched commit inserts (postgres_repo, sqlite_repo)

commit_batch issued one INSERT per event and one per outbox message
inside the open transaction, holding row/index locks across many round
trips. Each is now a single multi-row INSERT built with
QueryBuilder::push_values (Postgres), and the same chunked under
SQLite's bound-parameter limit (SQLite). The per-stream
SELECT MAX(sequence) version pre-check is kept — it enforces
sequence contiguity, which the unique PK alone does not catch for
expected < actual gaps. Conflict detection is unchanged: an event
unique violation still surfaces as ConcurrentWrite (Postgres re-reads
the conflicting stream's version over the pool since a failed statement
aborts the tx; SQLite re-reads in-tx, as SQLite does not abort on a
constraint error), and an outbox unique violation still maps to
DuplicateOutboxMessageInBatch.

A4 — hydrate clone (aggregate, entity, snapshot)

hydrate() deep-cloned the entire event history via events().to_vec()
on every replay, even with no upcasters, purely for the borrow checker.
Events are now taken out of the owned entity, replayed from a local Vec,
and restored via load_from_history. The no-upcaster path replays with
zero clones; the upcaster path keeps its single bounded clone with the
durable history restored verbatim. The snapshot tail path
(hydrate_prepared_snapshot) likewise replays the common path directly
from a filtered borrow instead of .cloned()-collecting the
post-snapshot events, restoring the full history so new_events()
slicing on the next commit is unchanged. PR #81's snapshot+tail fetch
behavior is preserved. Adds Entity::take_events / restore_history.

A5 — get_streams N+1 (postgres_repo, sqlite_repo)

get_streams looped awaiting a query per identity. Replaced with one
WHERE aggregate_type = $1 AND aggregate_id = ANY($2) ORDER BY aggregate_id, sequence query per aggregate type (SQLite uses a bound
IN list, having no array type), splitting the flat result into
entities client-side. get_all callers already accept storage-order
results.

B6 — NATS publish clone (bus/nats)

publish() owns the Message and drops it immediately, but cloned
message.payload before publishing. Now moves the buffer out with
std::mem::take and converts zero-copy via Bytes::from(Vec<u8>).

Testing

  • cargo fmt --all — clean
  • cargo build (default), --features sqlite, --features postgres — all clean
  • cargo test --features sqlite — all green (conformance, repo, snapshot, upcaster suites)
  • Real Postgres via Docker (compose.yaml, postgres:18):
    cargo test --features postgres --test postgres_repository --test postgres_repository_conformance --test event_store
    plus the full cargo test --features postgres suite — all green. The
    batched-insert and get_streams changes were validated against real
    Postgres (the outbox ::jsonb builder fragment was caught and fixed
    here).
  • No new clippy warnings introduced (the 3 pre-existing warnings predate this branch).
  • --all-features not attempted (requires librdkafka cmake), per task scope.

🤖 Generated with Claude Code

hydrate() previously deep-cloned the entire event history via
events().to_vec() on every replay even with no upcasters, purely to
satisfy the borrow checker. Take the events out of the owned entity,
replay from the local Vec, and restore via load_from_history. The
no-upcaster path now replays with zero clones; the upcaster path keeps
its single bounded clone (the durable history is restored verbatim).

The snapshot tail path (hydrate_prepared_snapshot) did a similar
.cloned() to collect post-snapshot events. Replay the common
(no-upcaster) path directly from a filtered borrow so the tail is no
longer cloned, while still restoring the full history so new_events()
slicing on the next commit is unchanged. PR #81's snapshot+tail
fetch behavior is preserved.

Adds Entity::take_events / Entity::restore_history for the
borrow-out-then-restore pattern.

Hydrated state is identical: same replay order, same final
version/committed_version, same upcaster semantics.
commit_batch issued one INSERT per event and one INSERT per outbox
message inside the open transaction, holding row/index locks across
many round trips. Batch each into a single multi-row INSERT built with
QueryBuilder::push_values (postgres) — and the same, chunked under
SQLite's bound-parameter limit (sqlite). The per-stream version
pre-check is kept: it enforces sequence contiguity, which the unique
PK alone does not catch for expected<actual gaps.

Conflict detection is unchanged. An event unique violation still maps
to ConcurrentWrite (postgres re-reads the conflicting stream's version
over the pool because a failed statement aborts the tx; sqlite re-reads
in-tx since SQLite does not abort on a constraint error). An outbox
unique violation still maps to DuplicateOutboxMessageInBatch.

get_streams looped awaiting a query per identity (N+1). Replace with one
WHERE aggregate_type = $1 AND aggregate_id = ANY($2) ORDER BY
aggregate_id, sequence query per aggregate type (sqlite uses a bound IN
list, having no array type), splitting the flat result into entities
client-side. Callers of get_all already accept storage-order results.

Ordering and hydrated state are unchanged.
publish() owns the Message and drops it immediately after, but cloned
message.payload before handing it to JetStream. Move the buffer out with
std::mem::take and convert via Bytes::from(Vec<u8>) (zero-copy) instead.

Behavior is unchanged: the same bytes are published.

Implements [[tasks/commit-hydrate-hot-path-efficiency]]
@coderabbitai

coderabbitai Bot commented Jun 11, 2026

Copy link
Copy Markdown

Warning

Review limit reached

@patrickleet, we couldn't start this review because you've reached your PR review rate limit.

More reviews will be available in 20 minutes and 19 seconds. Learn how PR review limits work.

Your organization has run out of usage credits. Purchase more credits in the billing tab to continue.

⌛ How to resolve this issue?

After more reviews become available, a review can be triggered using the @coderabbitai review command as a PR comment. Alternatively, push new commits to this PR.

We recommend that you space out your commits to avoid hitting the rate limit.

🚦 How do rate limits work?

CodeRabbit enforces hourly rate limits for each developer per organization.

Our paid plans include higher PR review limits than trial, open-source, and free plans. In all cases, reviews become available again over time. During sustained high-volume PR review activity, CodeRabbit may temporarily slow when the next review becomes available.

Please see our Fair Usage Limits Policy for further information.

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 0d5cdafb-0248-4dfa-acdf-e8ccef5d3b90

📥 Commits

Reviewing files that changed from the base of the PR and between 95e4318 and 0dca88c.

📒 Files selected for processing (6)
  • src/aggregate/aggregate.rs
  • src/bus/nats.rs
  • src/entity/entity.rs
  • src/postgres_repo/mod.rs
  • src/snapshot/repository.rs
  • src/sqlite_repo/mod.rs
✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch hardening/commit-hydrate-efficiency

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant