Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 24 additions & 6 deletions src/aggregate/aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,22 +99,40 @@ pub fn hydrate<A: Aggregate>(entity: Entity) -> Result<A, RepositoryError> {
let mut agg = A::new_empty();
*agg.entity_mut() = entity;

// Take the events out of the entity so we can iterate them while holding a
// mutable borrow of `agg` during replay. `hydrate` is only ever called with
// a full-history entity, so `prefix_version == 0` and restoring via
// `load_from_history` reproduces the exact loaded version/committed_version.
let history = agg.entity_mut().take_events();

let upcasters = A::upcasters();
let events = if upcasters.is_empty() {
agg.entity().events().to_vec()
// Replay directly from `history`; it is restored verbatim below, so no
// clone of the stream is needed on the common (no-upcaster) path.
replay_into(&mut agg, &history)?;
history
} else {
upcast_events(agg.entity().events().to_vec(), upcasters)
.map_err(|err| RepositoryError::Replay(err.to_string()))?
// Upcasters may rewrite events for replay, but the durable history is
// unchanged: replay from the upcasted view, then restore the originals.
let upcasted = upcast_events(history.clone(), upcasters)
.map_err(|err| RepositoryError::Replay(err.to_string()))?;
replay_into(&mut agg, &upcasted)?;
history
};

agg.entity_mut().load_from_history(events);

Ok(agg)
}

fn replay_into<A: Aggregate>(agg: &mut A, events: &[EventRecord]) -> Result<(), RepositoryError> {
agg.entity_mut().set_replaying(true);
for event in &events {
for event in events {
if let Err(err) = agg.replay_event(event) {
agg.entity_mut().set_replaying(false);
return Err(RepositoryError::Replay(err.to_string()));
}
}
agg.entity_mut().set_replaying(false);

Ok(agg)
Ok(())
}
8 changes: 6 additions & 2 deletions src/bus/nats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ impl NatsPublisher {
}

impl AsyncMessagePublisher for NatsPublisher {
async fn publish(&self, message: Message) -> Result<(), TransportError> {
async fn publish(&self, mut message: Message) -> Result<(), TransportError> {
let subject = self.subject(&message);
let mut headers = async_nats::HeaderMap::new();
if let Some(id) = message.id() {
Expand All @@ -85,10 +85,14 @@ impl AsyncMessagePublisher for NatsPublisher {
headers.insert(key.as_str(), value.as_str());
}

// `message` is owned and dropped here, so move its payload out instead of
// cloning. `Bytes::from(Vec<u8>)` takes ownership of the buffer (no copy).
let payload = std::mem::take(&mut message.payload).into();

// Publish ack (the durable publish threshold): both awaits must succeed.
let ack_future = self
.jetstream
.publish_with_headers(subject, headers, message.payload.clone().into())
.publish_with_headers(subject, headers, payload)
.await
.map_err(|err| retryable("nats publish", err))?;
ack_future
Expand Down
21 changes: 21 additions & 0 deletions src/entity/entity.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,27 @@ impl Entity {
&self.events
}

/// Take the in-memory events out of the entity, leaving it empty.
///
/// Used by replay paths that need to iterate events while also holding a
/// mutable borrow of the owning aggregate (the borrow checker forbids
/// borrowing `events` immutably and the aggregate mutably at once). The
/// caller is responsible for restoring history afterward — e.g. via
/// [`load_from_history`](Self::load_from_history) — so the entity's
/// `version`/`committed_version` invariants hold.
pub fn take_events(&mut self) -> Vec<EventRecord> {
std::mem::take(&mut self.events)
}

/// Put a previously [taken](Self::take_events) event history back without
/// recomputing `version`/`prefix_version`/`committed_version`. The caller
/// must restore the *same* events the entity already accounted for, so the
/// existing invariants continue to hold (used by replay paths that only
/// borrowed the events out to satisfy the borrow checker).
pub fn restore_history(&mut self, events: Vec<EventRecord>) {
self.events = events;
}

/// Returns events added since the entity was loaded (not yet persisted).
///
/// Slices relative to `prefix_version` so it is correct whether the entity
Expand Down
Loading
Loading