From 23e3427f9b2bcced01114992001b58d2398a9a34 Mon Sep 17 00:00:00 2001 From: Patrick Lee Scott Date: Thu, 11 Jun 2026 03:38:50 -0500 Subject: [PATCH 1/3] perf: drop event-history clones on the hydrate hot path hydrate() previously deep-cloned the entire event history via events().to_vec() on every replay even with no upcasters, purely to satisfy the borrow checker. Take the events out of the owned entity, replay from the local Vec, and restore via load_from_history. The no-upcaster path now replays with zero clones; the upcaster path keeps its single bounded clone (the durable history is restored verbatim). The snapshot tail path (hydrate_prepared_snapshot) did a similar .cloned() to collect post-snapshot events. Replay the common (no-upcaster) path directly from a filtered borrow so the tail is no longer cloned, while still restoring the full history so new_events() slicing on the next commit is unchanged. PR #81's snapshot+tail fetch behavior is preserved. Adds Entity::take_events / Entity::restore_history for the borrow-out-then-restore pattern. Hydrated state is identical: same replay order, same final version/committed_version, same upcaster semantics. --- src/aggregate/aggregate.rs | 30 +++++++++++++---- src/entity/entity.rs | 21 ++++++++++++ src/snapshot/repository.rs | 69 +++++++++++++++++++++++++++++--------- 3 files changed, 98 insertions(+), 22 deletions(-) diff --git a/src/aggregate/aggregate.rs b/src/aggregate/aggregate.rs index ca27ccd..944ca7c 100644 --- a/src/aggregate/aggregate.rs +++ b/src/aggregate/aggregate.rs @@ -99,22 +99,40 @@ pub fn hydrate(entity: Entity) -> Result { let mut agg = A::new_empty(); *agg.entity_mut() = entity; + // Take the events out of the entity so we can iterate them while holding a + // mutable borrow of `agg` during replay. `hydrate` is only ever called with + // a full-history entity, so `prefix_version == 0` and restoring via + // `load_from_history` reproduces the exact loaded version/committed_version. + let history = agg.entity_mut().take_events(); + let upcasters = A::upcasters(); let events = if upcasters.is_empty() { - agg.entity().events().to_vec() + // Replay directly from `history`; it is restored verbatim below, so no + // clone of the stream is needed on the common (no-upcaster) path. + replay_into(&mut agg, &history)?; + history } else { - upcast_events(agg.entity().events().to_vec(), upcasters) - .map_err(|err| RepositoryError::Replay(err.to_string()))? + // Upcasters may rewrite events for replay, but the durable history is + // unchanged: replay from the upcasted view, then restore the originals. + let upcasted = upcast_events(history.clone(), upcasters) + .map_err(|err| RepositoryError::Replay(err.to_string()))?; + replay_into(&mut agg, &upcasted)?; + history }; + agg.entity_mut().load_from_history(events); + + Ok(agg) +} + +fn replay_into(agg: &mut A, events: &[EventRecord]) -> Result<(), RepositoryError> { agg.entity_mut().set_replaying(true); - for event in &events { + for event in events { if let Err(err) = agg.replay_event(event) { agg.entity_mut().set_replaying(false); return Err(RepositoryError::Replay(err.to_string())); } } agg.entity_mut().set_replaying(false); - - Ok(agg) + Ok(()) } diff --git a/src/entity/entity.rs b/src/entity/entity.rs index 13e0d09..a4f6f51 100644 --- a/src/entity/entity.rs +++ b/src/entity/entity.rs @@ -138,6 +138,27 @@ impl Entity { &self.events } + /// Take the in-memory events out of the entity, leaving it empty. + /// + /// Used by replay paths that need to iterate events while also holding a + /// mutable borrow of the owning aggregate (the borrow checker forbids + /// borrowing `events` immutably and the aggregate mutably at once). The + /// caller is responsible for restoring history afterward — e.g. via + /// [`load_from_history`](Self::load_from_history) — so the entity's + /// `version`/`committed_version` invariants hold. + pub fn take_events(&mut self) -> Vec { + std::mem::take(&mut self.events) + } + + /// Put a previously [taken](Self::take_events) event history back without + /// recomputing `version`/`prefix_version`/`committed_version`. The caller + /// must restore the *same* events the entity already accounted for, so the + /// existing invariants continue to hold (used by replay paths that only + /// borrowed the events out to satisfy the borrow checker). + pub fn restore_history(&mut self, events: Vec) { + self.events = events; + } + /// Returns events added since the entity was loaded (not yet persisted). /// /// Slices relative to `prefix_version` so it is correct whether the entity diff --git a/src/snapshot/repository.rs b/src/snapshot/repository.rs index 7bb05a7..f25a061 100644 --- a/src/snapshot/repository.rs +++ b/src/snapshot/repository.rs @@ -122,33 +122,70 @@ fn hydrate_prepared_snapshot( // Restore aggregate state from snapshot agg.restore_from_snapshot(snapshot_payload); - // Replay only events AFTER the snapshot - let post_snapshot: Vec = agg - .entity() - .events() - .iter() - .filter(|e| e.sequence > snapshot.version) - .cloned() - .collect(); - - // Apply upcasters to post-snapshot events + // Replay only events AFTER the snapshot. Take the events out of the entity + // so we can iterate them while holding a mutable borrow of `agg`, then put + // the full history back unchanged (its version/committed_version invariants + // must survive: the pre-snapshot prefix is still counted for `new_events` + // slicing on the next commit). + let history = agg.entity_mut().take_events(); let upcasters = A::upcasters(); - let events = if upcasters.is_empty() { - post_snapshot + + let replay_result = if upcasters.is_empty() { + // Common path: replay straight from a filtered borrow — no clone of the + // post-snapshot tail. + replay_filtered(&mut agg, &history, snapshot.version) } else { - upcast_events(post_snapshot, upcasters) - .map_err(|err| SnapshotHydrationError::Replay(err.to_string()))? + // Upcasters may rewrite the post-snapshot events; build that view (a + // clone bounded to the tail) and replay from it. + let post_snapshot: Vec = history + .iter() + .filter(|e| e.sequence > snapshot.version) + .cloned() + .collect(); + match upcast_events(post_snapshot, upcasters) { + Ok(events) => replay_events(&mut agg, &events), + Err(err) => Err(SnapshotHydrationError::Replay(err.to_string())), + } }; + // Restore the full history regardless of replay outcome before surfacing it. + agg.entity_mut().restore_history(history); + replay_result?; + Ok(agg) +} + +/// Replay the post-snapshot tail (`sequence > snapshot_version`) directly from a +/// borrowed history, with no intermediate allocation. +fn replay_filtered( + agg: &mut A, + history: &[crate::entity::EventRecord], + snapshot_version: u64, +) -> Result<(), SnapshotHydrationError> { agg.entity_mut().set_replaying(true); - for event in &events { + for event in history.iter().filter(|e| e.sequence > snapshot_version) { if let Err(err) = agg.replay_event(event) { agg.entity_mut().set_replaying(false); return Err(SnapshotHydrationError::Replay(err.to_string())); } } agg.entity_mut().set_replaying(false); - Ok(agg) + Ok(()) +} + +/// Replay an already-prepared (e.g. upcasted) event slice. +fn replay_events( + agg: &mut A, + events: &[crate::entity::EventRecord], +) -> Result<(), SnapshotHydrationError> { + agg.entity_mut().set_replaying(true); + for event in events { + if let Err(err) = agg.replay_event(event) { + agg.entity_mut().set_replaying(false); + return Err(SnapshotHydrationError::Replay(err.to_string())); + } + } + agg.entity_mut().set_replaying(false); + Ok(()) } fn snapshot_record_for(aggregate: &A) -> Result { From deba7064f259f030ee07517486e148138c1172fc Mon Sep 17 00:00:00 2001 From: Patrick Lee Scott Date: Thu, 11 Jun 2026 03:38:59 -0500 Subject: [PATCH 2/3] perf: batch commit inserts and collapse get_streams N+1 MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit commit_batch issued one INSERT per event and one INSERT per outbox message inside the open transaction, holding row/index locks across many round trips. Batch each into a single multi-row INSERT built with QueryBuilder::push_values (postgres) — and the same, chunked under SQLite's bound-parameter limit (sqlite). The per-stream version pre-check is kept: it enforces sequence contiguity, which the unique PK alone does not catch for expected impl Future, RepositoryError>> + Send + 'a { async move { - let mut entities = Vec::with_capacity(identities.len()); + if identities.is_empty() { + return Ok(Vec::new()); + } + + // Group ids by aggregate type so each type is one `aggregate_id = + // ANY($2)` round trip instead of a query per identity. `get_all` + // builds single-type batches, so the common case is one query; the + // grouping only exists to keep arbitrary mixed-type inputs correct. + let mut ids_by_type: BTreeMap<&str, Vec<&str>> = BTreeMap::new(); for identity in identities { - if let Some(entity) = self.get_stream(identity).await? { - entities.push(entity); + ids_by_type + .entry(identity.aggregate_type()) + .or_default() + .push(identity.aggregate_id()); + } + + let mut entities = Vec::with_capacity(identities.len()); + for (aggregate_type, aggregate_ids) in ids_by_type { + // Ordering by aggregate_id then sequence lets us slice the flat + // result into per-aggregate entities in one pass. Callers of + // `get_all` accept storage-order results. + let rows = sqlx::query( + r#" + SELECT aggregate_id, + event_name, + event_version, + payload, + payload_codec, + payload_codec_version, + metadata::text AS metadata, + sequence, + EXTRACT(EPOCH FROM recorded_at)::double precision AS recorded_at_epoch + FROM aggregate_events + WHERE aggregate_type = $1 AND aggregate_id = ANY($2) + ORDER BY aggregate_id ASC, sequence ASC + "#, + ) + .bind(aggregate_type) + .bind(&aggregate_ids) + .fetch_all(&self.pool) + .await + .map_err(|err| repository_storage_error("load streams", err))?; + + let mut current_id: Option = None; + let mut current_events: Vec = Vec::new(); + for row in rows { + let row_id: String = row + .try_get("aggregate_id") + .map_err(|err| repository_storage_error("decode aggregate id row", err))?; + let event = event_from_row(row)?; + match ¤t_id { + Some(id) if id == &row_id => current_events.push(event), + _ => { + if let Some(id) = current_id.take() { + entities.push(entity_from_events( + id, + std::mem::take(&mut current_events), + )); + } + current_id = Some(row_id); + current_events.push(event); + } + } + } + if let Some(id) = current_id.take() { + entities.push(entity_from_events(id, current_events)); } } + Ok(entities) } } @@ -349,22 +412,9 @@ impl TransactionalCommit for PostgresRepository { } } - for append in &prepared { - for event in &append.events { - insert_event_in_tx( - &self.pool, - &mut tx, - &append.identity, - append.expected_version, - event, - ) - .await?; - } - } + insert_events_in_tx(&self.pool, &mut tx, &prepared).await?; - for message in &batch.outbox_messages { - insert_outbox_message_in_tx(&mut tx, message).await?; - } + insert_outbox_messages_in_tx(&mut tx, &batch.outbox_messages).await?; for plan in batch.read_model_plans { apply_read_model_write_plan_in_tx(&mut tx, plan).await?; @@ -1981,156 +2031,257 @@ async fn stream_version_pool( .unwrap_or(Ok(0)) } -async fn insert_event_in_tx( +/// Insert every event across all prepared appends in a single multi-row INSERT. +/// +/// Conflict detection is unchanged from the per-row path: the `(aggregate_type, +/// aggregate_id, sequence)` primary key is the contiguity gate, and a unique +/// violation still surfaces as `ConcurrentWrite`. Because a failed statement +/// aborts the transaction, the conflicting stream's actual version is re-read +/// over the pool (a separate connection), exactly as the per-row path did. +async fn insert_events_in_tx( pool: &PgPool, tx: &mut Transaction<'_, Postgres>, - identity: &StreamIdentity, - expected_version: u64, - event: &EventRecord, + prepared: &[PreparedEventAppend], ) -> Result<(), RepositoryError> { - let metadata = serialize_event_metadata(&event.metadata)?; + if prepared.iter().all(|append| append.events.is_empty()) { + return Ok(()); + } - let result = sqlx::query( - r#" - INSERT INTO aggregate_events ( - aggregate_type, - aggregate_id, - sequence, - event_name, - event_version, - payload, - payload_codec, - payload_codec_version, - metadata, - recorded_at - ) - VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9::jsonb, to_timestamp($10)) - "#, - ) - .bind(identity.aggregate_type()) - .bind(identity.aggregate_id()) - .bind(sqlx_repository_i64_from_u64( - POSTGRES_BACKEND, - event.sequence, - "sequence", - BIGINT_STORAGE, - )?) - .bind(&event.event_name) - .bind(sqlx_repository_i32_from_u64( - POSTGRES_BACKEND, - event.event_version, - "event_version", - INTEGER_STORAGE, - )?) - .bind(&event.payload) - .bind(&event.payload_codec) - .bind(i32::from(event.payload_codec_version)) - .bind(metadata) - .bind(system_time_to_epoch_secs(event.timestamp)?) - .execute(&mut **tx) - .await; + // Each row carries pre-validated bind values; build them before the query so + // any conversion error surfaces before we touch the database. + struct EventRow<'a> { + aggregate_type: &'a str, + aggregate_id: &'a str, + sequence: i64, + event_name: &'a str, + event_version: i32, + payload: &'a [u8], + payload_codec: &'a str, + payload_codec_version: i32, + metadata: String, + recorded_at: f64, + } + + let mut rows = Vec::new(); + for append in prepared { + for event in &append.events { + rows.push(EventRow { + aggregate_type: append.identity.aggregate_type(), + aggregate_id: append.identity.aggregate_id(), + sequence: sqlx_repository_i64_from_u64( + POSTGRES_BACKEND, + event.sequence, + "sequence", + BIGINT_STORAGE, + )?, + event_name: &event.event_name, + event_version: sqlx_repository_i32_from_u64( + POSTGRES_BACKEND, + event.event_version, + "event_version", + INTEGER_STORAGE, + )?, + payload: &event.payload, + payload_codec: &event.payload_codec, + payload_codec_version: i32::from(event.payload_codec_version), + metadata: serialize_event_metadata(&event.metadata)?, + recorded_at: system_time_to_epoch_secs(event.timestamp)?, + }); + } + } + + let mut builder = QueryBuilder::::new( + "INSERT INTO aggregate_events (\ + aggregate_type, aggregate_id, sequence, event_name, event_version, \ + payload, payload_codec, payload_codec_version, metadata, recorded_at) ", + ); + builder.push_values(rows, |mut row, event| { + row.push_bind(event.aggregate_type) + .push_bind(event.aggregate_id) + .push_bind(event.sequence) + .push_bind(event.event_name) + .push_bind(event.event_version) + .push_bind(event.payload) + .push_bind(event.payload_codec) + .push_bind(event.payload_codec_version) + .push_bind(event.metadata) + .push_unseparated("::jsonb"); + // recorded_at is stored from epoch seconds via to_timestamp(...). + row.push("to_timestamp(") + .push_bind_unseparated(event.recorded_at) + .push_unseparated(")"); + }); + + let result = builder.build().execute(&mut **tx).await; match result { Ok(_) => Ok(()), Err(err) if is_postgres_unique_violation(&err) => { - let actual = stream_version_pool(pool, identity).await?; - Err(RepositoryError::ConcurrentWrite { - id: identity.to_string(), - expected: expected_version, - actual, - }) + Err(concurrent_write_from_conflict(pool, prepared).await) } - Err(err) => Err(repository_storage_error("insert event", err)), + Err(err) => Err(repository_storage_error("insert events", err)), } } -async fn insert_outbox_message_in_tx( +/// After an event-insert unique violation, find the stream whose actual version +/// no longer matches its expected version and report it as `ConcurrentWrite`. +/// Falls back to the first append if a concurrent writer's effect cannot be +/// pinned down (the violation still indicates a conflicting write). +async fn concurrent_write_from_conflict( + pool: &PgPool, + prepared: &[PreparedEventAppend], +) -> RepositoryError { + for append in prepared { + match stream_version_pool(pool, &append.identity).await { + Ok(actual) if actual != append.expected_version => { + return RepositoryError::ConcurrentWrite { + id: append.identity.to_string(), + expected: append.expected_version, + actual, + }; + } + Ok(_) => {} + Err(err) => return err, + } + } + + let append = &prepared[0]; + match stream_version_pool(pool, &append.identity).await { + Ok(actual) => RepositoryError::ConcurrentWrite { + id: append.identity.to_string(), + expected: append.expected_version, + actual, + }, + Err(err) => err, + } +} + +/// Insert every outbox message in a single multi-row INSERT. A unique violation +/// on `message_id` still maps to `DuplicateOutboxMessageInBatch`. +async fn insert_outbox_messages_in_tx( tx: &mut Transaction<'_, Postgres>, - message: &OutboxMessage, + messages: &[OutboxMessage], ) -> Result<(), RepositoryError> { - let metadata = serialize_event_metadata(&message.metadata)?; - let source_sequence = message - .source_sequence - .map(|value| { - sqlx_repository_i64_from_u64( + if messages.is_empty() { + return Ok(()); + } + + struct OutboxRow<'a> { + message_id: &'a str, + event_type: &'a str, + payload: &'a [u8], + payload_codec: &'a str, + payload_codec_version: i32, + destination: Option<&'a str>, + metadata: String, + status: &'a str, + created_at: f64, + worker_id: Option<&'a str>, + leased_until: Option, + attempts: i32, + last_error: Option<&'a str>, + source_aggregate_type: Option<&'a str>, + source_aggregate_id: Option<&'a str>, + source_sequence: Option, + correlation_id: Option<&'a str>, + causation_id: Option<&'a str>, + } + + let mut rows = Vec::with_capacity(messages.len()); + for message in messages { + rows.push(OutboxRow { + message_id: message.id(), + event_type: &message.event_type, + payload: &message.payload, + payload_codec: &message.payload_codec, + payload_codec_version: i32::from(message.payload_codec_version), + destination: message.destination.as_deref(), + metadata: serialize_event_metadata(&message.metadata)?, + status: message.status.as_str(), + created_at: system_time_to_epoch_secs(message.created_at)?, + worker_id: message.worker_id.as_deref(), + leased_until: message + .leased_until + .map(system_time_to_epoch_secs) + .transpose()?, + attempts: sqlx_repository_i32_from_u64( POSTGRES_BACKEND, - value, - "outbox source sequence", - BIGINT_STORAGE, - ) - }) - .transpose()?; - let result = sqlx::query( - r#" - INSERT INTO outbox_messages ( - message_id, - event_type, - payload, - payload_codec, - payload_codec_version, - destination, - metadata, - status, - created_at, - next_available_at, - claimed_by, - claimed_until, - attempts, - last_error, - source_aggregate_type, - source_aggregate_id, - source_sequence, - correlation_id, - causation_id - ) - VALUES ( - $1, $2, $3, $4, $5, $6, $7::jsonb, $8, - to_timestamp($9), to_timestamp($10), $11, - to_timestamp($12::double precision), $13, $14, - $15, $16, $17, $18, $19 - ) - "#, - ) - .bind(message.id()) - .bind(&message.event_type) - .bind(&message.payload) - .bind(&message.payload_codec) - .bind(i32::from(message.payload_codec_version)) - .bind(&message.destination) - .bind(metadata) - .bind(message.status.as_str()) - .bind(system_time_to_epoch_secs(message.created_at)?) - .bind(system_time_to_epoch_secs(message.created_at)?) - .bind(&message.worker_id) - .bind( - message - .leased_until - .map(system_time_to_epoch_secs) - .transpose()?, - ) - .bind(sqlx_repository_i32_from_u64( - POSTGRES_BACKEND, - u64::from(message.attempts), - "outbox attempts", - INTEGER_STORAGE, - )?) - .bind(&message.last_error) - .bind(&message.source_aggregate_type) - .bind(&message.source_aggregate_id) - .bind(source_sequence) - .bind(message.correlation_id()) - .bind(message.causation_id()) - .execute(&mut **tx) - .await; + u64::from(message.attempts), + "outbox attempts", + INTEGER_STORAGE, + )?, + last_error: message.last_error.as_deref(), + source_aggregate_type: message.source_aggregate_type.as_deref(), + source_aggregate_id: message.source_aggregate_id.as_deref(), + source_sequence: message + .source_sequence + .map(|value| { + sqlx_repository_i64_from_u64( + POSTGRES_BACKEND, + value, + "outbox source sequence", + BIGINT_STORAGE, + ) + }) + .transpose()?, + correlation_id: message.correlation_id(), + causation_id: message.causation_id(), + }); + } + + let mut builder = QueryBuilder::::new( + "INSERT INTO outbox_messages (\ + message_id, event_type, payload, payload_codec, payload_codec_version, \ + destination, metadata, status, created_at, next_available_at, \ + claimed_by, claimed_until, attempts, last_error, source_aggregate_type, \ + source_aggregate_id, source_sequence, correlation_id, causation_id) ", + ); + builder.push_values(rows, |mut row, message| { + row.push_bind(message.message_id) + .push_bind(message.event_type) + .push_bind(message.payload) + .push_bind(message.payload_codec) + .push_bind(message.payload_codec_version) + .push_bind(message.destination) + .push_bind(message.metadata) + .push_unseparated("::jsonb") + .push_bind(message.status); + // created_at and next_available_at share the same epoch-seconds value. + row.push("to_timestamp(") + .push_bind_unseparated(message.created_at) + .push_unseparated(")"); + row.push("to_timestamp(") + .push_bind_unseparated(message.created_at) + .push_unseparated(")"); + row.push_bind(message.worker_id); + // claimed_until: NULL stays NULL through to_timestamp, matching the + // per-row path's `to_timestamp($n::double precision)`. + row.push("to_timestamp(") + .push_bind_unseparated(message.leased_until) + .push_unseparated("::double precision)"); + row.push_bind(message.attempts) + .push_bind(message.last_error) + .push_bind(message.source_aggregate_type) + .push_bind(message.source_aggregate_id) + .push_bind(message.source_sequence) + .push_bind(message.correlation_id) + .push_bind(message.causation_id); + }); + + let result = builder.build().execute(&mut **tx).await; match result { Ok(_) => Ok(()), Err(err) if is_postgres_unique_violation(&err) => { + // The batch was already deduped (reject_duplicate_outbox_messages), + // so a violation means the id collides with a previously committed + // row. Report the first message id as the offender, matching the + // per-row path's contract. Err(RepositoryError::DuplicateOutboxMessageInBatch { - id: message.id().to_string(), + id: messages[0].id().to_string(), }) } - Err(err) => Err(repository_storage_error("insert outbox message", err)), + Err(err) => Err(repository_storage_error("insert outbox messages", err)), } } @@ -2312,6 +2463,13 @@ async fn ensure_outbox_update_applied( validate(&message) } +fn entity_from_events(aggregate_id: String, events: Vec) -> Entity { + let mut entity = Entity::new(); + entity.set_id(aggregate_id); + entity.load_from_history(events); + entity +} + fn event_from_row(row: PgRow) -> Result { let payload_codec: String = row .try_get("payload_codec") diff --git a/src/sqlite_repo/mod.rs b/src/sqlite_repo/mod.rs index a828304..b506cf4 100644 --- a/src/sqlite_repo/mod.rs +++ b/src/sqlite_repo/mod.rs @@ -235,12 +235,73 @@ impl GetStream for SqliteRepository { identities: &'a [StreamIdentity], ) -> impl Future, RepositoryError>> + Send + 'a { async move { - let mut entities = Vec::with_capacity(identities.len()); + if identities.is_empty() { + return Ok(Vec::new()); + } + + // Group ids by aggregate type so each type is one `aggregate_id IN + // (...)` round trip instead of a query per identity. SQLite has no + // array type, so the id list is built as bound placeholders. + // `get_all` builds single-type batches, so the common case is one + // query. + let mut ids_by_type: BTreeMap<&str, Vec<&str>> = BTreeMap::new(); for identity in identities { - if let Some(entity) = self.get_stream(identity).await? { - entities.push(entity); + ids_by_type + .entry(identity.aggregate_type()) + .or_default() + .push(identity.aggregate_id()); + } + + let mut entities = Vec::with_capacity(identities.len()); + for (aggregate_type, aggregate_ids) in ids_by_type { + // Ordering by aggregate_id then sequence lets us slice the flat + // result into per-aggregate entities in one pass. Callers of + // `get_all` accept storage-order results. + let mut builder = QueryBuilder::::new( + "SELECT aggregate_id, event_name, event_version, payload, \ + payload_codec, payload_codec_version, metadata, sequence, recorded_at \ + FROM aggregate_events WHERE aggregate_type = ", + ); + builder.push_bind(aggregate_type); + builder.push(" AND aggregate_id IN ("); + let mut separated = builder.separated(", "); + for id in &aggregate_ids { + separated.push_bind(*id); + } + builder.push(") ORDER BY aggregate_id ASC, sequence ASC"); + + let rows = builder + .build() + .fetch_all(&self.pool) + .await + .map_err(|err| repository_storage_error("load streams", err))?; + + let mut current_id: Option = None; + let mut current_events: Vec = Vec::new(); + for row in rows { + let row_id: String = row + .try_get("aggregate_id") + .map_err(|err| repository_storage_error("decode aggregate id row", err))?; + let event = event_from_row(row)?; + match ¤t_id { + Some(id) if id == &row_id => current_events.push(event), + _ => { + if let Some(id) = current_id.take() { + entities.push(entity_from_events( + id, + std::mem::take(&mut current_events), + )); + } + current_id = Some(row_id); + current_events.push(event); + } + } + } + if let Some(id) = current_id.take() { + entities.push(entity_from_events(id, current_events)); } } + Ok(entities) } } @@ -333,16 +394,9 @@ impl TransactionalCommit for SqliteRepository { } } - for append in &prepared { - for event in &append.events { - insert_event_in_tx(&mut tx, &append.identity, append.expected_version, event) - .await?; - } - } + insert_events_in_tx(&mut tx, &prepared).await?; - for message in &batch.outbox_messages { - insert_outbox_message_in_tx(&mut tx, message).await?; - } + insert_outbox_messages_in_tx(&mut tx, &batch.outbox_messages).await?; for plan in batch.read_model_plans { apply_read_model_write_plan_in_tx(&mut tx, plan).await?; @@ -1005,85 +1059,117 @@ async fn insert_inbox_receipt_in_tx( } } -async fn insert_outbox_message_in_tx( +/// Insert every outbox message with multi-row INSERTs (chunked to respect +/// SQLite's bound-parameter limit). A unique constraint violation on +/// `message_id` still maps to `DuplicateOutboxMessageInBatch`. +async fn insert_outbox_messages_in_tx( tx: &mut Transaction<'_, Sqlite>, - message: &OutboxMessage, + messages: &[OutboxMessage], ) -> Result<(), RepositoryError> { - let metadata = serialize_event_metadata(&message.metadata)?; - let result = sqlx::query( - r#" - INSERT INTO outbox_messages ( - message_id, - event_type, - payload, - payload_codec, - payload_codec_version, - destination, - metadata, - status, - created_at, - next_available_at, - claimed_by, - claimed_until, - attempts, - last_error, - source_aggregate_type, - source_aggregate_id, - source_sequence, - correlation_id, - causation_id - ) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) - "#, - ) - .bind(message.id()) - .bind(&message.event_type) - .bind(&message.payload) - .bind(&message.payload_codec) - .bind(i64::from(message.payload_codec_version)) - .bind(&message.destination) - .bind(metadata) - .bind(message.status.as_str()) - .bind(system_time_to_storage(message.created_at)?) - .bind(system_time_to_storage(message.created_at)?) - .bind(&message.worker_id) - .bind( - message - .leased_until - .map(system_time_to_storage) - .transpose()?, - ) - .bind(i64::from(message.attempts)) - .bind(&message.last_error) - .bind(&message.source_aggregate_type) - .bind(&message.source_aggregate_id) - .bind( - message - .source_sequence - .map(|value| { - sqlx_repository_i64_from_u64( - SQLITE_BACKEND, - value, - "outbox source sequence", - SIGNED_INTEGER_STORAGE, - ) - }) - .transpose()?, - ) - .bind(message.correlation_id()) - .bind(message.causation_id()) - .execute(&mut **tx) - .await; + struct OutboxRow<'a> { + message_id: &'a str, + event_type: &'a str, + payload: &'a [u8], + payload_codec: &'a str, + payload_codec_version: i64, + destination: Option<&'a str>, + metadata: String, + status: &'a str, + created_at: String, + worker_id: Option<&'a str>, + leased_until: Option, + attempts: i64, + last_error: Option<&'a str>, + source_aggregate_type: Option<&'a str>, + source_aggregate_id: Option<&'a str>, + source_sequence: Option, + correlation_id: Option<&'a str>, + causation_id: Option<&'a str>, + } + + let mut rows = Vec::with_capacity(messages.len()); + for message in messages { + rows.push(OutboxRow { + message_id: message.id(), + event_type: &message.event_type, + payload: &message.payload, + payload_codec: &message.payload_codec, + payload_codec_version: i64::from(message.payload_codec_version), + destination: message.destination.as_deref(), + metadata: serialize_event_metadata(&message.metadata)?, + status: message.status.as_str(), + created_at: system_time_to_storage(message.created_at)?, + worker_id: message.worker_id.as_deref(), + leased_until: message + .leased_until + .map(system_time_to_storage) + .transpose()?, + attempts: i64::from(message.attempts), + last_error: message.last_error.as_deref(), + source_aggregate_type: message.source_aggregate_type.as_deref(), + source_aggregate_id: message.source_aggregate_id.as_deref(), + source_sequence: message + .source_sequence + .map(|value| { + sqlx_repository_i64_from_u64( + SQLITE_BACKEND, + value, + "outbox source sequence", + SIGNED_INTEGER_STORAGE, + ) + }) + .transpose()?, + correlation_id: message.correlation_id(), + causation_id: message.causation_id(), + }); + } - match result { - Ok(_) => Ok(()), - Err(err) if is_sqlite_unique_constraint(&err) => { - Err(RepositoryError::DuplicateOutboxMessageInBatch { - id: message.id().to_string(), - }) + for chunk in rows.chunks(SQLITE_MAX_BIND_PARAMS / OUTBOX_BIND_COLUMNS) { + let mut builder = QueryBuilder::::new( + "INSERT INTO outbox_messages (\ + message_id, event_type, payload, payload_codec, payload_codec_version, \ + destination, metadata, status, created_at, next_available_at, \ + claimed_by, claimed_until, attempts, last_error, source_aggregate_type, \ + source_aggregate_id, source_sequence, correlation_id, causation_id) ", + ); + builder.push_values(chunk, |mut row, message| { + row.push_bind(message.message_id) + .push_bind(message.event_type) + .push_bind(message.payload) + .push_bind(message.payload_codec) + .push_bind(message.payload_codec_version) + .push_bind(message.destination) + .push_bind(message.metadata.as_str()) + .push_bind(message.status) + .push_bind(message.created_at.as_str()) + // created_at and next_available_at share the same value. + .push_bind(message.created_at.as_str()) + .push_bind(message.worker_id) + .push_bind(message.leased_until.as_deref()) + .push_bind(message.attempts) + .push_bind(message.last_error) + .push_bind(message.source_aggregate_type) + .push_bind(message.source_aggregate_id) + .push_bind(message.source_sequence) + .push_bind(message.correlation_id) + .push_bind(message.causation_id); + }); + + let result = builder.build().execute(&mut **tx).await; + if let Err(err) = result { + if is_sqlite_unique_constraint(&err) { + // The batch was already deduped, so a violation means the id + // collides with a previously committed row. Report the first id + // in the chunk, matching the per-row path's contract. + return Err(RepositoryError::DuplicateOutboxMessageInBatch { + id: chunk[0].message_id.to_string(), + }); + } + return Err(repository_storage_error("insert outbox messages", err)); } - Err(err) => Err(repository_storage_error("insert outbox message", err)), } + + Ok(()) } async fn outbox_message_by_id_pool( @@ -1260,66 +1346,123 @@ async fn stream_version_in_tx( .unwrap_or(Ok(0)) } -async fn insert_event_in_tx( +/// Maximum bound parameters per statement. SQLite's historical limit is 999, so +/// staying under it keeps the batched inserts portable across SQLite builds. +const SQLITE_MAX_BIND_PARAMS: usize = 900; + +/// Bound parameters per `aggregate_events` row. +const EVENT_BIND_COLUMNS: usize = 10; + +/// Bound parameters per `outbox_messages` row. +const OUTBOX_BIND_COLUMNS: usize = 19; + +/// Insert every event across all prepared appends with multi-row INSERTs +/// (chunked to respect SQLite's bound-parameter limit). +/// +/// Conflict detection is unchanged from the per-row path: the `(aggregate_type, +/// aggregate_id, sequence)` primary key is the contiguity gate, and a unique +/// constraint violation still surfaces as `ConcurrentWrite`. SQLite does not +/// abort the transaction on a constraint error, so the conflicting stream's +/// actual version is re-read in the same transaction, exactly as before. +async fn insert_events_in_tx( tx: &mut Transaction<'_, Sqlite>, - identity: &StreamIdentity, - expected_version: u64, - event: &EventRecord, + prepared: &[PreparedEventAppend], ) -> Result<(), RepositoryError> { - let metadata = serialize_event_metadata(&event.metadata)?; + struct EventRow<'a> { + identity: &'a StreamIdentity, + expected_version: u64, + sequence: i64, + event_name: &'a str, + event_version: i64, + payload: &'a [u8], + payload_codec: &'a str, + payload_codec_version: i64, + metadata: String, + recorded_at: String, + } + + let mut rows = Vec::new(); + for append in prepared { + for event in &append.events { + rows.push(EventRow { + identity: &append.identity, + expected_version: append.expected_version, + sequence: sqlx_repository_i64_from_u64( + SQLITE_BACKEND, + event.sequence, + "sequence", + SIGNED_INTEGER_STORAGE, + )?, + event_name: &event.event_name, + event_version: sqlx_repository_i64_from_u64( + SQLITE_BACKEND, + event.event_version, + "event_version", + SIGNED_INTEGER_STORAGE, + )?, + payload: &event.payload, + payload_codec: &event.payload_codec, + payload_codec_version: i64::from(event.payload_codec_version), + metadata: serialize_event_metadata(&event.metadata)?, + recorded_at: system_time_to_storage(event.timestamp)?, + }); + } + } - let result = sqlx::query( - r#" - INSERT INTO aggregate_events ( - aggregate_type, - aggregate_id, - sequence, - event_name, - event_version, - payload, - payload_codec, - payload_codec_version, - metadata, - recorded_at - ) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) - "#, - ) - .bind(identity.aggregate_type()) - .bind(identity.aggregate_id()) - .bind(sqlx_repository_i64_from_u64( - SQLITE_BACKEND, - event.sequence, - "sequence", - SIGNED_INTEGER_STORAGE, - )?) - .bind(&event.event_name) - .bind(sqlx_repository_i64_from_u64( - SQLITE_BACKEND, - event.event_version, - "event_version", - SIGNED_INTEGER_STORAGE, - )?) - .bind(&event.payload) - .bind(&event.payload_codec) - .bind(i64::from(event.payload_codec_version)) - .bind(metadata) - .bind(system_time_to_storage(event.timestamp)?) - .execute(&mut **tx) - .await; + for chunk in rows.chunks(SQLITE_MAX_BIND_PARAMS / EVENT_BIND_COLUMNS) { + let mut builder = QueryBuilder::::new( + "INSERT INTO aggregate_events (\ + aggregate_type, aggregate_id, sequence, event_name, event_version, \ + payload, payload_codec, payload_codec_version, metadata, recorded_at) ", + ); + builder.push_values(chunk, |mut row, event| { + row.push_bind(event.identity.aggregate_type()) + .push_bind(event.identity.aggregate_id()) + .push_bind(event.sequence) + .push_bind(event.event_name) + .push_bind(event.event_version) + .push_bind(event.payload) + .push_bind(event.payload_codec) + .push_bind(event.payload_codec_version) + .push_bind(event.metadata.as_str()) + .push_bind(event.recorded_at.as_str()); + }); - match result { - Ok(_) => Ok(()), - Err(err) if is_sqlite_unique_constraint(&err) => { - let actual = stream_version_in_tx(tx, identity).await?; - Err(RepositoryError::ConcurrentWrite { - id: identity.to_string(), - expected: expected_version, - actual, - }) + let result = builder.build().execute(&mut **tx).await; + if let Err(err) = result { + if is_sqlite_unique_constraint(&err) { + // Find the conflicting stream (its actual version no longer + // matches its expected version) and report it. + for event in chunk { + let actual = stream_version_in_tx(tx, event.identity).await?; + if actual != event.expected_version { + return Err(RepositoryError::ConcurrentWrite { + id: event.identity.to_string(), + expected: event.expected_version, + actual, + }); + } + } + let event = &chunk[0]; + let actual = stream_version_in_tx(tx, event.identity).await?; + return Err(RepositoryError::ConcurrentWrite { + id: event.identity.to_string(), + expected: event.expected_version, + actual, + }); + } + return Err(repository_storage_error("insert events", err)); } - Err(err) => Err(repository_storage_error("insert event", err)), } + + Ok(()) +} + +fn entity_from_events(aggregate_id: String, events: Vec) -> Entity { + let mut entity = Entity::new(); + entity.set_id(aggregate_id); + entity.load_from_history(events); + entity } fn event_from_row(row: sqlx::sqlite::SqliteRow) -> Result { From 0dca88c90842d1d91531f5387cbbf7014009a2f3 Mon Sep 17 00:00:00 2001 From: Patrick Lee Scott Date: Thu, 11 Jun 2026 03:39:04 -0500 Subject: [PATCH 3/3] perf: avoid payload clone on NATS publish publish() owns the Message and drops it immediately after, but cloned message.payload before handing it to JetStream. Move the buffer out with std::mem::take and convert via Bytes::from(Vec) (zero-copy) instead. Behavior is unchanged: the same bytes are published. Implements [[tasks/commit-hydrate-hot-path-efficiency]] --- src/bus/nats.rs | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/src/bus/nats.rs b/src/bus/nats.rs index a4e9b76..2782d03 100644 --- a/src/bus/nats.rs +++ b/src/bus/nats.rs @@ -74,7 +74,7 @@ impl NatsPublisher { } impl AsyncMessagePublisher for NatsPublisher { - async fn publish(&self, message: Message) -> Result<(), TransportError> { + async fn publish(&self, mut message: Message) -> Result<(), TransportError> { let subject = self.subject(&message); let mut headers = async_nats::HeaderMap::new(); if let Some(id) = message.id() { @@ -85,10 +85,14 @@ impl AsyncMessagePublisher for NatsPublisher { headers.insert(key.as_str(), value.as_str()); } + // `message` is owned and dropped here, so move its payload out instead of + // cloning. `Bytes::from(Vec)` takes ownership of the buffer (no copy). + let payload = std::mem::take(&mut message.payload).into(); + // Publish ack (the durable publish threshold): both awaits must succeed. let ack_future = self .jetstream - .publish_with_headers(subject, headers, message.payload.clone().into()) + .publish_with_headers(subject, headers, payload) .await .map_err(|err| retryable("nats publish", err))?; ack_future