diff --git a/Cargo.toml b/Cargo.toml index 9c7701f..6c6f00f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -61,6 +61,7 @@ tonic-build = { version = "0.14", default-features = false, features = ["transpo [dev-dependencies] axum = "0.8" +proptest = "1" prost = "0.14" reqwest = { version = "0.13", features = ["json"] } serde_json = "1.0" diff --git a/tests/hashmap_repository_conformance/main.rs b/tests/hashmap_repository_conformance/main.rs index a9a9781..d37c82f 100644 --- a/tests/hashmap_repository_conformance/main.rs +++ b/tests/hashmap_repository_conformance/main.rs @@ -114,3 +114,28 @@ async fn consumer_inbox_records_dedupes_and_fences_with_real_effects() { async fn consumer_inbox_rejects_empty_receipt() { conformance::inbox::inbox_rejects_empty_receipt(repository()).await; } + +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +async fn racing_commits_one_wins_one_conflicts() { + conformance::scenario::racing_commits_one_wins_one_conflicts(repository(), 8).await; +} + +#[tokio::test] +async fn expired_outbox_lease_is_reclaimed_by_second_worker() { + let repo = repository(); + conformance::outbox::expired_outbox_lease_is_reclaimed_by_second_worker( + repo.clone(), + repo.outbox_store(), + ) + .await; +} + +#[tokio::test] +async fn publish_failure_after_commit_retains_outbox_row_until_delivered() { + let repo = repository(); + conformance::outbox::publish_failure_after_commit_retains_outbox_row_until_delivered( + repo.clone(), + repo.outbox_store(), + ) + .await; +} diff --git a/tests/persistent_repository_conformance/outbox.rs b/tests/persistent_repository_conformance/outbox.rs index 7c0fd6c..5acf8b6 100644 --- a/tests/persistent_repository_conformance/outbox.rs +++ b/tests/persistent_repository_conformance/outbox.rs @@ -1,14 +1,64 @@ +use std::sync::Mutex; use std::time::Duration; +use distributed::bus::{AsyncMessagePublisher, Message, TransportError}; use distributed::{ Aggregate, AggregateBuilder, AsyncOutboxStore, ClaimOutboxMessages, GetStream, OutboxClaimRef, - OutboxMessage, OutboxMessageStatus, OutboxPublishFailureAction, RepositoryError, - StreamIdentity, TransactionalCommit, + OutboxDispatcher, OutboxMessage, OutboxMessageStatus, OutboxPublishFailureAction, + RepositoryError, StreamIdentity, TransactionalCommit, }; use super::scenario::unique_id; use super::seat::Seat; +/// A publisher that fails its first `fail_first` publish attempts (returning a +/// retryable transport error) and succeeds thereafter, recording every message +/// id it actually delivered. Used to prove the outbox keeps a row until it is +/// genuinely delivered, and that it is delivered exactly once. +struct FlakyPublisher { + fail_first: usize, + attempts: Mutex, + delivered: Mutex>, +} + +impl FlakyPublisher { + fn new(fail_first: usize) -> Self { + Self { + fail_first, + attempts: Mutex::new(0), + delivered: Mutex::new(Vec::new()), + } + } + + fn delivered(&self) -> Vec { + self.delivered.lock().expect("delivered lock").clone() + } + + fn attempts(&self) -> usize { + *self.attempts.lock().expect("attempts lock") + } +} + +impl AsyncMessagePublisher for FlakyPublisher { + async fn publish(&self, message: Message) -> Result<(), TransportError> { + let attempt = { + let mut attempts = self.attempts.lock().expect("attempts lock"); + *attempts += 1; + *attempts + }; + if attempt <= self.fail_first { + return Err(TransportError::retryable( + "simulated transient publish failure", + )); + } + self.delivered + .lock() + .expect("delivered lock") + .push(message.id().unwrap_or_default().to_string()); + Ok(()) + } +} + pub async fn high_level_outbox_commit_persists_row_without_stream(repo: R, outbox: S) where R: GetStream + TransactionalCommit + Clone + Send + Sync + 'static, @@ -334,6 +384,209 @@ where assert_eq!(still_owned.worker_id.as_deref(), Some("immediate-worker")); } +/// Red-team crash recovery: worker A claims a row with a short lease and then +/// "crashes" — it never completes or releases the claim. Once the lease expires, +/// worker B must be able to reclaim the same row (the claim predicate treats an +/// expired in-flight row as claimable). B then completes it. Finally A's *late* +/// settle attempts (it woke up with a stale claim) must be fenced: neither +/// `complete_async` nor `release_async` from the original lease may mutate the +/// row B now owns/published. +/// +/// This proves the SQL `claimed_until <= now` expiry predicate end-to-end — +/// today only the in-memory store has a unit test for lease-expiry reclaim +/// (`src/outbox_worker/store.rs::claim_includes_expired_in_flight_messages`). +/// +/// Note: lease expiry is inherently wall-clock based, so this scenario uses a +/// short real lease and a sleep just past it. There is no barrier/channel +/// substitute for the passage of lease time through the public store API. +pub async fn expired_outbox_lease_is_reclaimed_by_second_worker(repo: R, outbox: S) +where + R: GetStream + TransactionalCommit + Clone + Send + Sync + 'static, + S: AsyncOutboxStore + Send + Sync, +{ + let message_id = unique_id("crash-lease-outbox"); + let mut seat = added_seat(&unique_id("crash-lease-seat")); + let message = OutboxMessage::create(&message_id, "seat.added", b"{}".to_vec()) + .expect("outbox message should be valid"); + repo.clone() + .aggregate::() + .outbox(message) + .commit(&mut seat) + .await + .expect("message should be stored"); + + // Worker A claims with a short lease, then "crashes" (never settles). + let lease = Duration::from_secs(1); + let claimed_a = outbox + .claim_async(ClaimOutboxMessages::new("worker-a", 1, lease)) + .await + .expect("worker A claim should succeed"); + assert_eq!(claimed_a.len(), 1, "worker A claims the only pending row"); + assert_eq!(claimed_a[0].id(), message_id); + let stale_claim_a = OutboxClaimRef::from_message(&claimed_a[0]).expect("A's claim is valid"); + + // While the lease is live, worker B must NOT be able to steal the row. + let live = outbox + .claim_async(ClaimOutboxMessages::new("worker-b", 1, lease)) + .await + .expect("worker B poll should not error while the lease is live"); + assert!( + live.is_empty(), + "an unexpired lease must not be reclaimable by another worker" + ); + + // Let the lease expire (no settle from A — simulated crash). + tokio::time::sleep(Duration::from_millis(1_200)).await; + + // Worker B reclaims the now-expired in-flight row. + let claimed_b = outbox + .claim_async(ClaimOutboxMessages::new( + "worker-b", + 1, + Duration::from_secs(60), + )) + .await + .expect("worker B claim after expiry should succeed"); + assert_eq!( + claimed_b.len(), + 1, + "the expired lease must be reclaimable by a second worker" + ); + assert_eq!(claimed_b[0].id(), message_id); + assert_eq!(claimed_b[0].worker_id.as_deref(), Some("worker-b")); + assert!( + claimed_b[0].attempts > claimed_a[0].attempts, + "reclaim increments the attempt counter (fences A's stale claim)" + ); + let claim_b = OutboxClaimRef::from_message(&claimed_b[0]).expect("B's claim is valid"); + + // Worker A wakes up with its stale lease and tries to settle. Both complete + // and release must be rejected — A no longer owns the row. + let late_complete = outbox + .complete_async(&stale_claim_a) + .await + .expect_err("A's late complete must be fenced"); + assert!( + matches!(late_complete, RepositoryError::InvalidState { .. }), + "stale-worker complete should be InvalidState, got {late_complete:?}" + ); + let late_release = outbox + .release_async(&stale_claim_a, "A woke up late") + .await + .expect_err("A's late release must be fenced"); + assert!( + matches!(late_release, RepositoryError::InvalidState { .. }), + "stale-worker release should be InvalidState, got {late_release:?}" + ); + + // Worker B (the rightful owner) completes successfully. + outbox + .complete_async(&claim_b) + .await + .expect("the reclaiming worker should complete the row"); + let published = find_outbox_by_id(&outbox, &message_id) + .await + .expect("reclaimed message should still be queryable"); + assert_eq!( + published.status, + OutboxMessageStatus::Published, + "row is published by the worker that reclaimed it, not by the crashed one" + ); +} + +/// Publish-on-commit durability: a row committed alongside its aggregate must +/// survive transient publish failures and stay claimable until it is genuinely +/// delivered, then end Published — and the bus must see exactly one delivery. +/// +/// The dispatcher runs one pass per `dispatch_ids` call. The first two passes +/// fail to publish (row goes InFlight on claim → back to Pending on +/// `record_failure`, attempts incrementing); the third succeeds and the row +/// becomes Published. This pins the at-least-once-store / exactly-once-delivery +/// boundary that publish-on-commit relies on. +pub async fn publish_failure_after_commit_retains_outbox_row_until_delivered( + repo: R, + outbox: S, +) where + R: GetStream + TransactionalCommit + Clone + Send + Sync + 'static, + S: AsyncOutboxStore + Send + Sync + Clone, +{ + let message_id = unique_id("publish-retry-outbox"); + let mut seat = added_seat(&unique_id("publish-retry-seat")); + let message = OutboxMessage::create(&message_id, "seat.added", b"{}".to_vec()) + .expect("outbox message should be valid"); + repo.clone() + .aggregate::() + .outbox(message) + .commit(&mut seat) + .await + .expect("aggregate and outbox should commit atomically"); + + // max_attempts is high enough that the two failures only release (never + // permanently fail) the row. + let dispatcher = OutboxDispatcher::new( + outbox.clone(), + FlakyPublisher::new(2), + "immediate:publish-retry", + Duration::from_secs(60), + 10, + ); + let ids = [message_id.clone()]; + + // Pass 1: claim → publish fails → released back to Pending, attempts = 1. + let pass1 = dispatcher + .dispatch_ids(&ids) + .await + .expect("dispatch pass should not surface a transport error"); + assert_eq!(pass1.published, 0); + assert_eq!(pass1.released, 1); + assert_eq!(pass1.failed, 0); + let after_1 = find_outbox_by_id(&outbox, &message_id) + .await + .expect("row survives the first failure"); + assert_eq!( + after_1.status, + OutboxMessageStatus::Pending, + "a failed publish releases the row back to Pending (still owed)" + ); + assert_eq!(after_1.attempts, 1); + + // Pass 2: same again, attempts = 2. + let pass2 = dispatcher.dispatch_ids(&ids).await.expect("second pass"); + assert_eq!(pass2.published, 0); + assert_eq!(pass2.released, 1); + let after_2 = find_outbox_by_id(&outbox, &message_id) + .await + .expect("row survives the second failure"); + assert_eq!(after_2.status, OutboxMessageStatus::Pending); + assert_eq!(after_2.attempts, 2); + + // Pass 3: publish succeeds → Published. + let pass3 = dispatcher.dispatch_ids(&ids).await.expect("third pass"); + assert_eq!(pass3.published, 1); + assert_eq!(pass3.released, 0); + assert_eq!(pass3.failed, 0); + let published = find_outbox_by_id(&outbox, &message_id) + .await + .expect("row is still queryable after publish"); + assert_eq!( + published.status, + OutboxMessageStatus::Published, + "the row is Published only after a successful delivery" + ); + + // The bus saw exactly one delivery — never the failed attempts, never twice. + assert_eq!( + dispatcher.publisher().attempts(), + 3, + "publisher was invoked once per pass (two failures + one success)" + ); + assert_eq!( + dispatcher.publisher().delivered(), + vec![message_id], + "the message was delivered exactly once, only on the successful pass" + ); +} + fn added_seat(id: &str) -> Seat { let mut seat = Seat::default(); seat.add(id.to_string(), "floor".to_string()) diff --git a/tests/persistent_repository_conformance/scenario.rs b/tests/persistent_repository_conformance/scenario.rs index 5ce3911..27a1964 100644 --- a/tests/persistent_repository_conformance/scenario.rs +++ b/tests/persistent_repository_conformance/scenario.rs @@ -1,10 +1,12 @@ use std::sync::atomic::{AtomicU64, Ordering}; +use std::sync::Arc; use std::time::{SystemTime, UNIX_EPOCH}; use distributed::{ Aggregate, AggregateBuilder, CommitBatch, Entity, GetStream, RepositoryError, SnapshotRecord, SnapshotStore, SnapshotWrite, StreamIdentity, StreamWrite, TransactionalCommit, }; +use tokio::sync::Barrier; use super::checkout::{CHECKOUT_SEAT_RESERVED_STATUS, SEAT_RESERVED_STATUS}; use super::checkout_saga::CheckoutSaga; @@ -373,6 +375,119 @@ where assert_eq!(loaded_checkout.payload, vec![2]); } +/// Red-team concurrency: `N` tasks each load the **same** aggregate at the same +/// committed version and race to append a conflicting event, synchronized by a +/// barrier so the writes overlap as tightly as the runtime allows. Optimistic +/// concurrency must let **exactly one** win; the rest must observe +/// `ConcurrentWrite`. Crucially, the surviving stream must be gap-free and +/// duplicate-free: this exercises the unique `(aggregate, sequence)` PK backstop +/// behind the read-then-check version window under real concurrency (the +/// existing `concurrent_writes_detected` test only races sequentially). +/// +/// `concurrency` must be >= 2 (the test asserts at least one conflict). +pub async fn racing_commits_one_wins_one_conflicts(repo: R, concurrency: usize) +where + R: GetStream + TransactionalCommit + Clone + Send + Sync + 'static, +{ + assert!( + concurrency >= 2, + "racing test needs at least two writers to produce a conflict" + ); + + let seat_id = unique_id("racing-seat"); + let seat_repo = repo.clone().aggregate::(); + + // Seed at version 1. Every racer will load this exact state. + let mut seed = Seat::default(); + seed.add(seat_id.clone(), "balcony".into()) + .expect("seed seat should be valid"); + seat_repo + .commit(&mut seed) + .await + .expect("seed seat commit should succeed"); + + // Each racer loads independently *before* the barrier so all of them hold + // a v1 snapshot; the barrier then releases the commits together. + let barrier = Arc::new(Barrier::new(concurrency)); + let mut handles = Vec::with_capacity(concurrency); + for index in 0..concurrency { + let repo = repo.clone(); + let barrier = Arc::clone(&barrier); + let seat_id = seat_id.clone(); + handles.push(tokio::spawn(async move { + let seat_repo = repo.aggregate::(); + let mut seat = seat_repo + .get(&seat_id) + .await + .expect("racer load should succeed") + .expect("racer should see the seeded seat"); + assert_eq!( + seat.entity.committed_version(), + 1, + "every racer must start from the same committed version" + ); + // A reservation is a single new event on top of v1 for every racer, + // so they all target sequence 2 — exactly one can land it. + seat.reserve( + format!("checkout-{index}"), + seat_id.clone(), + seat.category.clone(), + ) + .expect("reservation should be locally valid"); + + // Synchronize the commit attempts as tightly as possible. + barrier.wait().await; + seat_repo.commit(&mut seat).await + })); + } + + let mut successes = 0usize; + let mut conflicts = 0usize; + for handle in handles { + match handle.await.expect("racer task should not panic") { + Ok(()) => successes += 1, + Err(RepositoryError::ConcurrentWrite { .. }) => conflicts += 1, + Err(other) => panic!("unexpected racer error: {other:?}"), + } + } + + assert_eq!( + successes, 1, + "exactly one racer must win the optimistic race" + ); + assert_eq!( + conflicts, + concurrency - 1, + "every other racer must observe ConcurrentWrite" + ); + + // The surviving stream must be a clean append-only log: versions 1..=2 with + // no gaps and no duplicate sequences. A TOCTOU leak would manifest as a + // duplicate sequence or a third event here. + let identity = + StreamIdentity::new(Seat::aggregate_type(), &seat_id).expect("identity should be valid"); + let entity = repo + .get_stream(&identity) + .await + .expect("final stream lookup should succeed") + .expect("final stream should exist"); + let sequences = entity + .events() + .iter() + .map(|event| event.sequence) + .collect::>(); + assert_eq!( + sequences, + vec![1, 2], + "winning stream must be exactly two contiguous events with no gaps or duplicates" + ); + assert_eq!( + entity.committed_version(), + 2, + "the stream advances by exactly one version despite N racers" + ); +} + async fn add_seat( repo: R, seat_id: String, diff --git a/tests/postgres_repository/main.rs b/tests/postgres_repository/main.rs index 7e39eb7..ab04142 100644 --- a/tests/postgres_repository/main.rs +++ b/tests/postgres_repository/main.rs @@ -9,7 +9,7 @@ use std::time::{SystemTime, UNIX_EPOCH}; use distributed::{ sourced, Aggregate, AggregateBuilder, AsyncOutboxStore, CommitBatch, Entity, GetStream, - OutboxMessageStatus, PostgresRepository, ReadModel, ReadModelWritePlanBuilder, + OutboxMessage, OutboxMessageStatus, PostgresRepository, ReadModel, ReadModelWritePlanBuilder, ReadModelWritePlanCommitExt, RepositoryError, RowKey, RowPatch, RowValue, SnapshotRecord, SnapshotStore, StreamIdentity, StreamWrite, TableSchemaRegistry, TransactionalCommit, }; @@ -270,6 +270,101 @@ async fn duplicate_stream_identity_is_rejected_before_sql_writes() { assert!(repo.get_stream(&identity).await.unwrap().is_none()); } +#[tokio::test] +async fn read_model_failure_mid_plan_rolls_back_events_and_outbox() { + // Postgres variant of the SQLite mid-plan rollback test: a commit carrying an + // aggregate event, an outbox row, and a two-mutation read-model plan whose + // second mutation violates a real CHECK constraint must roll the WHOLE + // transaction back — event, outbox row, and the first (already-applied) + // read-model mutation all absent. Skips locally without DATABASE_URL. + let Some((_schema, repo)) = repository().await else { + return; + }; + bootstrap_relational_counter_table(&repo).await; + + // A real engine-level constraint the read-model schema is unaware of. + sqlx::query( + r#"ALTER TABLE "postgres_relational_counter_views" + ADD CONSTRAINT reject_negative_value CHECK ("value" >= 0)"#, + ) + .execute(repo.pool()) + .await + .unwrap(); + + let good_id = unique_id("midplan-good"); + let bad_id = unique_id("midplan-bad"); + let mut read_models = ReadModelWritePlanBuilder::new(); + read_models + .upsert(&RelationalCounterView { + id: good_id.clone(), + value: 1, + counts: HashMap::new(), + }) + .unwrap(); + read_models + .upsert(&RelationalCounterView { + id: bad_id.clone(), + value: -1, + counts: HashMap::new(), + }) + .unwrap(); + + let aggregate_id = unique_id("midplan-aggregate"); + let mut projection = CounterProjection::default(); + projection.touch(aggregate_id.clone()).unwrap(); + let identity = StreamIdentity::new(CounterProjection::aggregate_type(), &aggregate_id).unwrap(); + + let outbox_id = unique_id("midplan-outbox"); + let outbox_message = + OutboxMessage::create(&outbox_id, "counter.touched", b"{}".to_vec()).unwrap(); + + let err = repo + .commit_batch(CommitBatch { + inbox_receipts: Vec::new(), + streams: vec![StreamWrite::new(identity.clone(), projection.entity_mut())], + outbox_messages: vec![outbox_message], + read_model_plans: vec![read_models.into_write_plan().unwrap()], + snapshots: Vec::new(), + }) + .await + .expect_err("a mid-plan constraint violation must fail the commit"); + assert!( + matches!(err, RepositoryError::Model(_)), + "expected a Model error from the constraint violation, got {err:?}" + ); + + // 1. Aggregate event absent. + assert!( + repo.get_stream(&identity).await.unwrap().is_none(), + "the aggregate stream must roll back" + ); + + // 2. Outbox row absent. + let outbox = repo.outbox_store(); + for status in [ + OutboxMessageStatus::Pending, + OutboxMessageStatus::InFlight, + OutboxMessageStatus::Published, + OutboxMessageStatus::Failed, + ] { + let rows = outbox.messages_by_status_async(status).await.unwrap(); + assert!( + rows.iter().all(|m| m.id() != outbox_id), + "the outbox row must roll back" + ); + } + + // 3. First read-model mutation absent. + let good_rows: i64 = sqlx::query_scalar( + r#"SELECT COUNT(*) FROM "postgres_relational_counter_views" WHERE "id" = $1"#, + ) + .bind(&good_id) + .fetch_one(repo.pool()) + .await + .unwrap(); + assert_eq!(good_rows, 0, "the first read-model mutation must roll back"); +} + #[tokio::test] async fn commit_batch_lowers_relational_read_model_plan_into_registered_table() { let Some((_schema, repo)) = repository().await else { diff --git a/tests/postgres_repository_conformance/main.rs b/tests/postgres_repository_conformance/main.rs index a9dd436..5845446 100644 --- a/tests/postgres_repository_conformance/main.rs +++ b/tests/postgres_repository_conformance/main.rs @@ -144,3 +144,35 @@ async fn consumer_inbox_rejects_empty_receipt() { }; conformance::inbox::inbox_rejects_empty_receipt(repo).await; } + +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +async fn racing_commits_one_wins_one_conflicts() { + let Some(repo) = repository().await else { + return; + }; + conformance::scenario::racing_commits_one_wins_one_conflicts(repo, 8).await; +} + +#[tokio::test] +async fn expired_outbox_lease_is_reclaimed_by_second_worker() { + let Some(repo) = repository().await else { + return; + }; + conformance::outbox::expired_outbox_lease_is_reclaimed_by_second_worker( + repo.clone(), + repo.outbox_store(), + ) + .await; +} + +#[tokio::test] +async fn publish_failure_after_commit_retains_outbox_row_until_delivered() { + let Some(repo) = repository().await else { + return; + }; + conformance::outbox::publish_failure_after_commit_retains_outbox_row_until_delivered( + repo.clone(), + repo.outbox_store(), + ) + .await; +} diff --git a/tests/queued_repo/main.rs b/tests/queued_repo/main.rs index e81eded..e4ee97d 100644 --- a/tests/queued_repo/main.rs +++ b/tests/queued_repo/main.rs @@ -4,13 +4,14 @@ //! and explicit abort. use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::Arc; +use std::sync::{Arc, Mutex}; use std::time::Duration; use distributed::{ - sourced, AggregateBuilder, AggregateRepository, Entity, HashMapRepository, - InMemoryAsyncLockManager, Queueable, + sourced, Aggregate, AggregateBuilder, AggregateRepository, Entity, GetStream, + HashMapRepository, InMemoryAsyncLockManager, Queueable, StreamIdentity, }; +use tokio::sync::Barrier; #[derive(Default)] struct Counter { @@ -119,6 +120,100 @@ async fn peek_reads_without_acquiring_the_lock() { assert!(peeked.is_some()); } +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +async fn queued_repo_n_writers_commit_in_fifo_order() { + // N writers race to increment the SAME aggregate. The queued repo serializes + // load→commit under a per-stream async lock, so the writes must apply one at + // a time. We pin two invariants: + // 1. The final version is exactly N (no lost updates, no double-applies). + // 2. The committed event order equals the lock-grant order — each writer + // records the order in which it *acquired* the lock (the instant its + // `get` returned), and stamps that grant index into its event payload. + // Replaying the stream must then reproduce that exact grant order. + const WRITERS: usize = 10; + + let base = HashMapRepository::new(); + let reader = base.clone(); + let repo: Arc = Arc::new(base.queued().aggregate::()); + + // Seed the aggregate so every writer takes the load-modify-commit path. + { + let mut counter = Counter::default(); + counter.create("fifo".into()).unwrap(); + repo.commit(&mut counter).await.unwrap(); + } + + // Shared log of the order in which writers were granted the lock. + let grant_order = Arc::new(Mutex::new(Vec::::new())); + // Release all writers together so they genuinely contend for the lock. + let barrier = Arc::new(Barrier::new(WRITERS)); + + let mut handles = Vec::with_capacity(WRITERS); + for writer in 0..WRITERS { + let repo = Arc::clone(&repo); + let grant_order = Arc::clone(&grant_order); + let barrier = Arc::clone(&barrier); + handles.push(tokio::spawn(async move { + barrier.wait().await; + // `get` parks on the held lock until granted; the instant it returns + // is this writer's place in the grant order. + let mut counter = repo.get("fifo").await.unwrap().unwrap(); + let grant_index = { + let mut order = grant_order.lock().unwrap(); + order.push(writer); + order.len() - 1 + }; + // Stamp the grant index into the event so the stream records order. + counter + .increment("fifo".into(), grant_index as i32) + .unwrap(); + repo.commit(&mut counter).await.unwrap(); + })); + } + for handle in handles { + handle.await.unwrap(); + } + + // Invariant 1: exactly N increments landed. + let identity = StreamIdentity::new(Counter::aggregate_type(), "fifo").unwrap(); + let entity = reader + .get_stream(&identity) + .await + .unwrap() + .expect("stream should exist"); + // version = 1 (create) + WRITERS increments. + assert_eq!( + entity.committed_version() as usize, + WRITERS + 1, + "every writer's increment must land exactly once (no lost or doubled writes)" + ); + + // Invariant 2: the increment events, in stream order, carry grant indices in + // ascending 0..WRITERS — i.e. the stream order equals the lock-grant order. + let grant_indices: Vec = entity + .events() + .iter() + .filter(|event| event.event_name == "incremented") + .map(|event| { + let (_, by): (String, i32) = + bitcode::deserialize(&event.payload).expect("payload should decode"); + by + }) + .collect(); + let expected: Vec = (0..WRITERS as i32).collect(); + assert_eq!( + grant_indices, expected, + "committed event order must match the lock-grant order (strict FIFO serialization)" + ); + + let recorded_grant_order = grant_order.lock().unwrap().clone(); + assert_eq!( + recorded_grant_order.len(), + WRITERS, + "every writer must have been granted the lock exactly once" + ); +} + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn abort_releases_a_held_lock() { let repo = queued_repo(); diff --git a/tests/replay_property/aggregate.rs b/tests/replay_property/aggregate.rs new file mode 100644 index 0000000..557ec85 --- /dev/null +++ b/tests/replay_property/aggregate.rs @@ -0,0 +1,104 @@ +//! A small but non-trivial sample aggregate for replay-determinism property +//! tests. It has multiple event types, a guarded command (so not every command +//! produces an event), and accumulates derived state — enough that a replay bug +//! (wrong order, dropped event, snapshot drift) would change the final state. + +use distributed::{sourced, Entity, Snapshottable}; +use serde::{Deserialize, Serialize}; + +#[derive(Default, Debug, Clone)] +pub struct Ledger { + pub entity: Entity, + /// Running balance; deposits add, withdrawals subtract. + pub balance: i64, + /// Number of events that actually applied (guarded ones may not). + pub applied: u64, + /// Last note set, to exercise a String field in snapshots. + pub last_note: String, +} + +#[sourced(entity, aggregate_type = "replay_property.ledger")] +impl Ledger { + #[event("opened", when = self.entity.id().is_empty())] + pub fn open(&mut self, id: String) { + self.entity.set_id(&id); + } + + #[event("deposited", when = amount > 0)] + pub fn deposit(&mut self, amount: i64) { + self.balance += amount; + self.applied += 1; + } + + // Guarded: only applies when the balance can cover it. This means an + // identical command stream can yield different events depending on prior + // state — a good stress for replay determinism. + #[event("withdrawn", when = amount > 0 && self.balance >= amount)] + pub fn withdraw(&mut self, amount: i64) { + self.balance -= amount; + self.applied += 1; + } + + #[event("annotated", when = !note.is_empty())] + pub fn annotate(&mut self, note: String) { + self.last_note = note; + self.applied += 1; + } +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct LedgerSnapshot { + pub id: String, + pub balance: i64, + pub applied: u64, + pub last_note: String, +} + +impl Snapshottable for Ledger { + type Snapshot = LedgerSnapshot; + + fn create_snapshot(&self) -> LedgerSnapshot { + LedgerSnapshot { + id: self.entity.id().to_string(), + balance: self.balance, + applied: self.applied, + last_note: self.last_note.clone(), + } + } + + fn restore_from_snapshot(&mut self, snapshot: LedgerSnapshot) { + self.entity.set_id(&snapshot.id); + self.balance = snapshot.balance; + self.applied = snapshot.applied; + self.last_note = snapshot.last_note; + } +} + +/// The closed set of commands a property test can generate. +#[derive(Clone, Debug)] +pub enum Command { + Deposit(i64), + Withdraw(i64), + Annotate(String), +} + +impl Command { + /// Apply a command to an in-memory aggregate, mirroring exactly what the + /// generated command methods do (a domain-invalid command is simply a + /// no-op that records no event — same as the guarded `#[event]` behaviour). + pub fn apply(&self, ledger: &mut Ledger) { + match self { + // Amounts are generated positive, so these never error; a guard that + // does not hold (e.g. overdraw) simply records no event. + Command::Deposit(amount) => { + let _ = ledger.deposit(*amount); + } + Command::Withdraw(amount) => { + let _ = ledger.withdraw(*amount); + } + Command::Annotate(note) => { + let _ = ledger.annotate(note.clone()); + } + } + } +} diff --git a/tests/replay_property/main.rs b/tests/replay_property/main.rs new file mode 100644 index 0000000..e605494 --- /dev/null +++ b/tests/replay_property/main.rs @@ -0,0 +1,170 @@ +//! Replay-determinism property tests. +//! +//! Event sourcing's core promise is that an aggregate's state is a pure +//! function of its event stream. These proptests generate arbitrary command +//! sequences for a sample aggregate and assert two invariants that the recent +//! publish-on-commit + snapshot work depends on: +//! +//! 1. **Reload determinism** — applying commands in memory and reloading from +//! storage yields identical state. (round-trip is lossless) +//! 2. **Snapshot transparency** — a snapshot-hydrated reload equals a +//! full-replay reload, for *any* snapshot frequency. Snapshots are an +//! optimization that must never change observable state. + +mod aggregate; + +use aggregate::{Command, Ledger}; +use distributed::{AggregateBuilder, HashMapRepository}; +use proptest::prelude::*; +use tokio::runtime::Runtime; + +/// Generate a single command. Amounts are kept positive (the aggregate's guards +/// already cover the "no-op" cases like overdraw) and bounded to keep balances +/// in a sane range. +fn command_strategy() -> impl Strategy { + prop_oneof![ + (1i64..1_000).prop_map(Command::Deposit), + (1i64..1_000).prop_map(Command::Withdraw), + "[a-z]{1,8}".prop_map(Command::Annotate), + ] +} + +fn commands_strategy() -> impl Strategy> { + prop::collection::vec(command_strategy(), 0..40) +} + +/// Build the canonical in-memory aggregate by applying `commands` to a freshly +/// opened ledger. This is the source of truth the storage round-trips must match. +fn in_memory_ledger(id: &str, commands: &[Command]) -> Ledger { + let mut ledger = Ledger::default(); + ledger.open(id.to_string()).expect("open should succeed"); + for command in commands { + command.apply(&mut ledger); + } + ledger +} + +/// Compare the *domain* projection (the part that is a pure fold of events). +/// Does not compare persistence bookkeeping like `committed_version`, so it is +/// valid to compare a reloaded aggregate against a never-persisted in-memory one. +fn assert_same_domain_state(actual: &Ledger, expected: &Ledger, context: &str) { + assert_eq!( + actual.balance, expected.balance, + "balance mismatch ({context})" + ); + assert_eq!( + actual.applied, expected.applied, + "applied mismatch ({context})" + ); + assert_eq!( + actual.last_note, expected.last_note, + "last_note mismatch ({context})" + ); + assert_eq!( + actual.entity.id(), + expected.entity.id(), + "id mismatch ({context})" + ); +} + +/// Compare two *reloaded* aggregates fully, including the committed version — +/// both came from storage, so the version is meaningful and must agree. +fn assert_same_persisted_state(actual: &Ledger, expected: &Ledger, context: &str) { + assert_same_domain_state(actual, expected, context); + assert_eq!( + actual.entity.committed_version(), + expected.entity.committed_version(), + "version mismatch ({context})" + ); +} + +proptest! { + #![proptest_config(ProptestConfig { cases: 96, ..ProptestConfig::default() })] + + /// Applying commands in memory then reloading from storage reproduces the + /// exact same state — for any command sequence. + #[test] + fn reload_reproduces_in_memory_state(commands in commands_strategy()) { + let runtime = Runtime::new().expect("runtime"); + runtime.block_on(async move { + let id = "ledger-reload"; + let repo = HashMapRepository::new(); + let ledger_repo = repo.aggregate::(); + + // Build and commit the whole sequence at once. + let mut expected = in_memory_ledger(id, &commands); + ledger_repo.commit(&mut expected).await.expect("commit"); + + let reloaded = ledger_repo + .get(id) + .await + .expect("reload") + .expect("ledger should exist"); + + // `expected` was committed through the same repo, so its version is + // set — compare the full persisted state. + assert_same_persisted_state(&reloaded, &expected, "reload round-trip"); + }); + } + + /// Snapshots are a transparent optimization: hydrating from a snapshot at any + /// frequency yields the same state as a full replay of the raw stream. + #[test] + fn snapshot_hydration_matches_full_replay( + commands in commands_strategy(), + frequency in 1u64..7, + ) { + let runtime = Runtime::new().expect("runtime"); + runtime.block_on(async move { + let id = "ledger-snapshot"; + let base = HashMapRepository::new(); + + // Commit through a snapshot-enabled repo, one command per cycle so + // snapshots actually trigger at the chosen frequency. Each cycle does + // a real load-modify-commit, exercising the partial-replay-from- + // snapshot path on every load after the first snapshot. + let snap_repo = base.clone().aggregate::().with_snapshots(frequency); + { + let mut ledger = Ledger::default(); + ledger.open(id.to_string()).expect("open"); + snap_repo.commit(&mut ledger).await.expect("open commit"); + } + for command in &commands { + let mut ledger = snap_repo + .get(id) + .await + .expect("load") + .expect("ledger exists"); + command.apply(&mut ledger); + // Commit even when no event was recorded — an empty commit is a + // no-op and must not corrupt state. + snap_repo.commit(&mut ledger).await.expect("cycle commit"); + } + + // Path A: hydrate via the snapshot-aware repo (uses the snapshot + + // post-snapshot replay). + let via_snapshot = snap_repo + .get(id) + .await + .expect("snapshot reload") + .expect("ledger exists"); + + // Path B: full replay of the raw stream from the same storage, with + // no snapshot policy at all. + let via_full_replay = base + .aggregate::() + .get(id) + .await + .expect("full replay reload") + .expect("ledger exists"); + + // The expected state is the in-memory fold of the same commands. + let expected = in_memory_ledger(id, &commands); + + // `expected` is an in-memory fold (never persisted) — compare domain + // state only. The two storage reloads are compared in full. + assert_same_domain_state(&via_full_replay, &expected, "full replay vs in-memory"); + assert_same_persisted_state(&via_snapshot, &via_full_replay, "snapshot vs full replay"); + }); + } +} diff --git a/tests/sqlite_repository/main.rs b/tests/sqlite_repository/main.rs index adcfe63..30e030d 100644 --- a/tests/sqlite_repository/main.rs +++ b/tests/sqlite_repository/main.rs @@ -4,9 +4,10 @@ use std::collections::HashMap; use distributed::{ sourced, Aggregate, AggregateBuilder, AsyncOutboxStore, CommitBatch, Entity, GetStream, - OutboxMessageStatus, ReadModel, ReadModelWritePlanBuilder, ReadModelWritePlanCommitExt, - RepositoryError, RowKey, RowPatch, RowValue, SnapshotRecord, SnapshotStore, SqliteRepository, - StreamIdentity, StreamWrite, TableSchemaRegistry, TransactionalCommit, OUTBOX_MESSAGES_TABLE, + OutboxMessage, OutboxMessageStatus, ReadModel, ReadModelWritePlanBuilder, + ReadModelWritePlanCommitExt, RepositoryError, RowKey, RowPatch, RowValue, SnapshotRecord, + SnapshotStore, SqliteRepository, StreamIdentity, StreamWrite, TableSchemaRegistry, + TransactionalCommit, OUTBOX_MESSAGES_TABLE, }; use serde::{Deserialize, Serialize}; @@ -206,6 +207,123 @@ async fn optimistic_conflict_rolls_back_other_stream_and_read_model_plan() { assert_eq!(stale.entity().new_events().len(), 1); } +#[tokio::test] +async fn read_model_failure_mid_plan_rolls_back_events_and_outbox() { + // A single commit carries: an aggregate event, an outbox row, and a + // read-model plan with TWO mutations. The first mutation is valid; the second + // violates a real database constraint (a BEFORE-INSERT trigger that ABORTs on + // a negative value — a genuine engine-level failure mid-plan, not an + // application pre-check). The whole transaction must roll back: the event, + // the outbox row, AND the first (already-applied) read-model mutation must + // ALL be absent afterward. + let repo = repository().await; + bootstrap_relational_counter_table(&repo).await; + + // Install a real DB constraint the read-model schema is unaware of: reject + // any row whose `value` is negative. SQLite triggers are enforced by the + // engine inside the transaction. + sqlx::query( + r#" + CREATE TRIGGER reject_negative_counter_value + BEFORE INSERT ON "local_relational_counter_views" + WHEN NEW."value" < 0 + BEGIN + SELECT RAISE(ABORT, 'value must be non-negative'); + END; + "#, + ) + .execute(repo.pool()) + .await + .unwrap(); + + let good_id = "midplan-good"; + let bad_id = "midplan-bad"; + let mut read_models = ReadModelWritePlanBuilder::new(); + // Mutation 1: valid, applied first within the tx. + read_models + .upsert(&RelationalCounterView { + id: good_id.into(), + value: 1, + counts: HashMap::new(), + }) + .unwrap(); + // Mutation 2: violates the trigger — must abort the whole transaction. + read_models + .upsert(&RelationalCounterView { + id: bad_id.into(), + value: -1, + counts: HashMap::new(), + }) + .unwrap(); + + let aggregate_id = "midplan-aggregate"; + let mut projection = CounterProjection::default(); + projection.touch(aggregate_id.into()).unwrap(); + let identity = StreamIdentity::new(CounterProjection::aggregate_type(), aggregate_id).unwrap(); + + let outbox_id = "midplan-outbox"; + let outbox_message = + OutboxMessage::create(outbox_id, "counter.touched", b"{}".to_vec()).unwrap(); + + let err = repo + .commit_batch(CommitBatch { + inbox_receipts: Vec::new(), + streams: vec![StreamWrite::new(identity.clone(), projection.entity_mut())], + outbox_messages: vec![outbox_message], + read_model_plans: vec![read_models.into_write_plan().unwrap()], + snapshots: Vec::new(), + }) + .await + .expect_err("a mid-plan constraint violation must fail the commit"); + // The constraint failure surfaces as a Model error (read-model storage error). + assert!( + matches!(err, RepositoryError::Model(_)), + "expected a Model error from the constraint violation, got {err:?}" + ); + + // 1. The aggregate event must be absent. + assert!( + repo.get_stream(&identity).await.unwrap().is_none(), + "the aggregate stream must roll back with the failed read-model plan" + ); + + // 2. The outbox row must be absent. + let outbox = repo.outbox_store(); + for status in [ + OutboxMessageStatus::Pending, + OutboxMessageStatus::InFlight, + OutboxMessageStatus::Published, + OutboxMessageStatus::Failed, + ] { + let rows = outbox.messages_by_status_async(status).await.unwrap(); + assert!( + rows.iter().all(|m| m.id() != outbox_id), + "the outbox row must roll back with the failed read-model plan" + ); + } + + // 3. The first (already-applied) read-model mutation must be absent. + let good_rows: i64 = sqlx::query_scalar( + r#"SELECT COUNT(*) FROM "local_relational_counter_views" WHERE "id" = ?"#, + ) + .bind(good_id) + .fetch_one(repo.pool()) + .await + .unwrap(); + assert_eq!( + good_rows, 0, + "the first read-model mutation must roll back when a later one fails" + ); + let bad_rows: i64 = sqlx::query_scalar( + r#"SELECT COUNT(*) FROM "local_relational_counter_views" WHERE "id" = ?"#, + ) + .bind(bad_id) + .fetch_one(repo.pool()) + .await + .unwrap(); + assert_eq!(bad_rows, 0, "the violating row must never be persisted"); +} + #[tokio::test] async fn commit_batch_lowers_relational_read_model_plan_into_registered_table() { let repo = repository().await; diff --git a/tests/sqlite_repository_conformance/main.rs b/tests/sqlite_repository_conformance/main.rs index 1be7274..4c022e3 100644 --- a/tests/sqlite_repository_conformance/main.rs +++ b/tests/sqlite_repository_conformance/main.rs @@ -107,3 +107,28 @@ async fn consumer_inbox_records_dedupes_and_fences_with_real_effects() { async fn consumer_inbox_rejects_empty_receipt() { conformance::inbox::inbox_rejects_empty_receipt(repository().await).await; } + +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +async fn racing_commits_one_wins_one_conflicts() { + conformance::scenario::racing_commits_one_wins_one_conflicts(repository().await, 8).await; +} + +#[tokio::test] +async fn expired_outbox_lease_is_reclaimed_by_second_worker() { + let repo = repository().await; + conformance::outbox::expired_outbox_lease_is_reclaimed_by_second_worker( + repo.clone(), + repo.outbox_store(), + ) + .await; +} + +#[tokio::test] +async fn publish_failure_after_commit_retains_outbox_row_until_delivered() { + let repo = repository().await; + conformance::outbox::publish_failure_after_commit_retains_outbox_row_until_delivered( + repo.clone(), + repo.outbox_store(), + ) + .await; +} diff --git a/tests/upcasting/main.rs b/tests/upcasting/main.rs index 71ee7aa..90a0770 100644 --- a/tests/upcasting/main.rs +++ b/tests/upcasting/main.rs @@ -4,7 +4,7 @@ use aggregate::{TodoV1, TodoV2, TodoV3}; use distributed::{ hydrate, hydrate_from_snapshot, upcast_events, Aggregate, AggregateBuilder, Entity, EventRecord, EventUpcaster, HashMapRepository, RepositoryError, SnapshotRecord, SnapshotStore, - StreamIdentity, UpcastError, + StreamIdentity, TransactionalCommit, UpcastError, }; fn identity_payload(event: &EventRecord) -> Result, UpcastError> { @@ -395,6 +395,86 @@ async fn snapshot_repo_with_v1_events_upcasted_on_hydrate() { assert!(loaded.completed); } +// ============================================================================= +// Unknown event types fail hydration with a clear, named error +// ============================================================================= + +#[test] +fn unknown_event_type_in_stream_fails_hydration_with_clear_error() { + // Simulate a stream that contains an event whose name no version of the + // aggregate knows how to replay — e.g. a producer wrote an event type that + // was later renamed/removed, or a stream was mis-keyed under the wrong + // aggregate type. Hydration must fail loudly (not silently skip) with a + // Replay error that names the offending event so an operator can diagnose it. + let unknown_name = "todo.relabelled_in_a_future_we_never_shipped"; + + let mut entity = Entity::with_id("t1"); + // A valid v1 initialized event so the rest of the stream is well-formed... + let init_payload = bitcode::serialize(&( + "t1".to_string(), + "alice".to_string(), + "Buy milk".to_string(), + )) + .unwrap(); + let mut events = vec![EventRecord::new("initialized", init_payload, 1)]; + // ...followed by an event the aggregate has no handler for. + let mut unknown = EventRecord::new(unknown_name, vec![], 2); + unknown.sequence = 2; + events.push(unknown); + entity.load_from_history(events); + + match hydrate::(entity) { + Err(RepositoryError::Replay(message)) => { + assert!( + message.contains(unknown_name), + "replay error must name the unknown event, got: {message}" + ); + } + Err(other) => panic!("expected a Replay error, got {other:?}"), + Ok(_) => panic!("hydration must not silently accept an unknown event type"), + } +} + +#[tokio::test] +async fn unknown_event_type_in_repo_stream_fails_get_with_named_replay_error() { + // The same guarantee through the repository load path: a raw stream carrying + // an unregistered event name must make `get` fail with a named Replay error, + // not panic and not return a half-built aggregate. + let unknown_name = "todo.never_registered_event"; + + let repo = HashMapRepository::new(); + let identity = StreamIdentity::new(TodoV1::aggregate_type(), "t-unknown").unwrap(); + // Build a fresh stream containing a valid v1 event followed by an event with + // an unregistered name, and persist it in one append (both events are new). + let mut raw = Entity::with_id("t-unknown"); + raw.digest( + "initialized", + &( + "t-unknown".to_string(), + "bob".to_string(), + "Walk dog".to_string(), + ), + ) + .unwrap(); + raw.digest_empty(unknown_name).unwrap(); + repo.commit_batch(distributed::CommitBatch::new(vec![ + distributed::StreamWrite::new(identity, &mut raw), + ])) + .await + .expect("raw stream with an unknown event name still persists"); + + match repo.aggregate::().get("t-unknown").await { + Err(RepositoryError::Replay(message)) => { + assert!( + message.contains(unknown_name), + "replay error must name the unknown event, got: {message}" + ); + } + Err(other) => panic!("expected a Replay error, got {other:?}"), + Ok(_) => panic!("loading a stream with an unknown event type must fail"), + } +} + #[test] fn hydrate_from_snapshot_returns_replay_error_when_post_snapshot_upcaster_decode_fails() { let snapshot = SnapshotRecord::new(