Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ tonic-build = { version = "0.14", default-features = false, features = ["transpo

[dev-dependencies]
axum = "0.8"
proptest = "1"
prost = "0.14"
reqwest = { version = "0.13", features = ["json"] }
serde_json = "1.0"
Expand Down
25 changes: 25 additions & 0 deletions tests/hashmap_repository_conformance/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,3 +114,28 @@ async fn consumer_inbox_records_dedupes_and_fences_with_real_effects() {
async fn consumer_inbox_rejects_empty_receipt() {
conformance::inbox::inbox_rejects_empty_receipt(repository()).await;
}

#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn racing_commits_one_wins_one_conflicts() {
conformance::scenario::racing_commits_one_wins_one_conflicts(repository(), 8).await;
}

#[tokio::test]
async fn expired_outbox_lease_is_reclaimed_by_second_worker() {
let repo = repository();
conformance::outbox::expired_outbox_lease_is_reclaimed_by_second_worker(
repo.clone(),
repo.outbox_store(),
)
.await;
}

#[tokio::test]
async fn publish_failure_after_commit_retains_outbox_row_until_delivered() {
let repo = repository();
conformance::outbox::publish_failure_after_commit_retains_outbox_row_until_delivered(
repo.clone(),
repo.outbox_store(),
)
.await;
}
257 changes: 255 additions & 2 deletions tests/persistent_repository_conformance/outbox.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,64 @@
use std::sync::Mutex;
use std::time::Duration;

use distributed::bus::{AsyncMessagePublisher, Message, TransportError};
use distributed::{
Aggregate, AggregateBuilder, AsyncOutboxStore, ClaimOutboxMessages, GetStream, OutboxClaimRef,
OutboxMessage, OutboxMessageStatus, OutboxPublishFailureAction, RepositoryError,
StreamIdentity, TransactionalCommit,
OutboxDispatcher, OutboxMessage, OutboxMessageStatus, OutboxPublishFailureAction,
RepositoryError, StreamIdentity, TransactionalCommit,
};

use super::scenario::unique_id;
use super::seat::Seat;

/// A publisher that fails its first `fail_first` publish attempts (returning a
/// retryable transport error) and succeeds thereafter, recording every message
/// id it actually delivered. Used to prove the outbox keeps a row until it is
/// genuinely delivered, and that it is delivered exactly once.
struct FlakyPublisher {
fail_first: usize,
attempts: Mutex<usize>,
delivered: Mutex<Vec<String>>,
}

impl FlakyPublisher {
fn new(fail_first: usize) -> Self {
Self {
fail_first,
attempts: Mutex::new(0),
delivered: Mutex::new(Vec::new()),
}
}

fn delivered(&self) -> Vec<String> {
self.delivered.lock().expect("delivered lock").clone()
}

fn attempts(&self) -> usize {
*self.attempts.lock().expect("attempts lock")
}
}

impl AsyncMessagePublisher for FlakyPublisher {
async fn publish(&self, message: Message) -> Result<(), TransportError> {
let attempt = {
let mut attempts = self.attempts.lock().expect("attempts lock");
*attempts += 1;
*attempts
};
if attempt <= self.fail_first {
return Err(TransportError::retryable(
"simulated transient publish failure",
));
}
self.delivered
.lock()
.expect("delivered lock")
.push(message.id().unwrap_or_default().to_string());
Ok(())
}
}

pub async fn high_level_outbox_commit_persists_row_without_stream<R, S>(repo: R, outbox: S)
where
R: GetStream + TransactionalCommit + Clone + Send + Sync + 'static,
Expand Down Expand Up @@ -334,6 +384,209 @@ where
assert_eq!(still_owned.worker_id.as_deref(), Some("immediate-worker"));
}

/// Red-team crash recovery: worker A claims a row with a short lease and then
/// "crashes" — it never completes or releases the claim. Once the lease expires,
/// worker B must be able to reclaim the same row (the claim predicate treats an
/// expired in-flight row as claimable). B then completes it. Finally A's *late*
/// settle attempts (it woke up with a stale claim) must be fenced: neither
/// `complete_async` nor `release_async` from the original lease may mutate the
/// row B now owns/published.
///
/// This proves the SQL `claimed_until <= now` expiry predicate end-to-end —
/// today only the in-memory store has a unit test for lease-expiry reclaim
/// (`src/outbox_worker/store.rs::claim_includes_expired_in_flight_messages`).
///
/// Note: lease expiry is inherently wall-clock based, so this scenario uses a
/// short real lease and a sleep just past it. There is no barrier/channel
/// substitute for the passage of lease time through the public store API.
pub async fn expired_outbox_lease_is_reclaimed_by_second_worker<R, S>(repo: R, outbox: S)
where
R: GetStream + TransactionalCommit + Clone + Send + Sync + 'static,
S: AsyncOutboxStore + Send + Sync,
{
let message_id = unique_id("crash-lease-outbox");
let mut seat = added_seat(&unique_id("crash-lease-seat"));
let message = OutboxMessage::create(&message_id, "seat.added", b"{}".to_vec())
.expect("outbox message should be valid");
repo.clone()
.aggregate::<Seat>()
.outbox(message)
.commit(&mut seat)
.await
.expect("message should be stored");

// Worker A claims with a short lease, then "crashes" (never settles).
let lease = Duration::from_secs(1);
let claimed_a = outbox
.claim_async(ClaimOutboxMessages::new("worker-a", 1, lease))
.await
.expect("worker A claim should succeed");
assert_eq!(claimed_a.len(), 1, "worker A claims the only pending row");
assert_eq!(claimed_a[0].id(), message_id);
let stale_claim_a = OutboxClaimRef::from_message(&claimed_a[0]).expect("A's claim is valid");

// While the lease is live, worker B must NOT be able to steal the row.
let live = outbox
.claim_async(ClaimOutboxMessages::new("worker-b", 1, lease))
.await
.expect("worker B poll should not error while the lease is live");
assert!(
live.is_empty(),
"an unexpired lease must not be reclaimable by another worker"
);

// Let the lease expire (no settle from A — simulated crash).
tokio::time::sleep(Duration::from_millis(1_200)).await;

// Worker B reclaims the now-expired in-flight row.
let claimed_b = outbox
.claim_async(ClaimOutboxMessages::new(
"worker-b",
1,
Duration::from_secs(60),
))
.await
.expect("worker B claim after expiry should succeed");
assert_eq!(
claimed_b.len(),
1,
"the expired lease must be reclaimable by a second worker"
);
assert_eq!(claimed_b[0].id(), message_id);
assert_eq!(claimed_b[0].worker_id.as_deref(), Some("worker-b"));
assert!(
claimed_b[0].attempts > claimed_a[0].attempts,
"reclaim increments the attempt counter (fences A's stale claim)"
);
let claim_b = OutboxClaimRef::from_message(&claimed_b[0]).expect("B's claim is valid");

// Worker A wakes up with its stale lease and tries to settle. Both complete
// and release must be rejected — A no longer owns the row.
let late_complete = outbox
.complete_async(&stale_claim_a)
.await
.expect_err("A's late complete must be fenced");
assert!(
matches!(late_complete, RepositoryError::InvalidState { .. }),
"stale-worker complete should be InvalidState, got {late_complete:?}"
);
let late_release = outbox
.release_async(&stale_claim_a, "A woke up late")
.await
.expect_err("A's late release must be fenced");
assert!(
matches!(late_release, RepositoryError::InvalidState { .. }),
"stale-worker release should be InvalidState, got {late_release:?}"
);

// Worker B (the rightful owner) completes successfully.
outbox
.complete_async(&claim_b)
.await
.expect("the reclaiming worker should complete the row");
let published = find_outbox_by_id(&outbox, &message_id)
.await
.expect("reclaimed message should still be queryable");
assert_eq!(
published.status,
OutboxMessageStatus::Published,
"row is published by the worker that reclaimed it, not by the crashed one"
);
}

/// Publish-on-commit durability: a row committed alongside its aggregate must
/// survive transient publish failures and stay claimable until it is genuinely
/// delivered, then end Published — and the bus must see exactly one delivery.
///
/// The dispatcher runs one pass per `dispatch_ids` call. The first two passes
/// fail to publish (row goes InFlight on claim → back to Pending on
/// `record_failure`, attempts incrementing); the third succeeds and the row
/// becomes Published. This pins the at-least-once-store / exactly-once-delivery
/// boundary that publish-on-commit relies on.
pub async fn publish_failure_after_commit_retains_outbox_row_until_delivered<R, S>(
repo: R,
outbox: S,
) where
R: GetStream + TransactionalCommit + Clone + Send + Sync + 'static,
S: AsyncOutboxStore + Send + Sync + Clone,
{
let message_id = unique_id("publish-retry-outbox");
let mut seat = added_seat(&unique_id("publish-retry-seat"));
let message = OutboxMessage::create(&message_id, "seat.added", b"{}".to_vec())
.expect("outbox message should be valid");
repo.clone()
.aggregate::<Seat>()
.outbox(message)
.commit(&mut seat)
.await
.expect("aggregate and outbox should commit atomically");

// max_attempts is high enough that the two failures only release (never
// permanently fail) the row.
let dispatcher = OutboxDispatcher::new(
outbox.clone(),
FlakyPublisher::new(2),
"immediate:publish-retry",
Duration::from_secs(60),
10,
);
let ids = [message_id.clone()];

// Pass 1: claim → publish fails → released back to Pending, attempts = 1.
let pass1 = dispatcher
.dispatch_ids(&ids)
.await
.expect("dispatch pass should not surface a transport error");
assert_eq!(pass1.published, 0);
assert_eq!(pass1.released, 1);
assert_eq!(pass1.failed, 0);
let after_1 = find_outbox_by_id(&outbox, &message_id)
.await
.expect("row survives the first failure");
assert_eq!(
after_1.status,
OutboxMessageStatus::Pending,
"a failed publish releases the row back to Pending (still owed)"
);
assert_eq!(after_1.attempts, 1);

// Pass 2: same again, attempts = 2.
let pass2 = dispatcher.dispatch_ids(&ids).await.expect("second pass");
assert_eq!(pass2.published, 0);
assert_eq!(pass2.released, 1);
let after_2 = find_outbox_by_id(&outbox, &message_id)
.await
.expect("row survives the second failure");
assert_eq!(after_2.status, OutboxMessageStatus::Pending);
assert_eq!(after_2.attempts, 2);

// Pass 3: publish succeeds → Published.
let pass3 = dispatcher.dispatch_ids(&ids).await.expect("third pass");
assert_eq!(pass3.published, 1);
assert_eq!(pass3.released, 0);
assert_eq!(pass3.failed, 0);
let published = find_outbox_by_id(&outbox, &message_id)
.await
.expect("row is still queryable after publish");
assert_eq!(
published.status,
OutboxMessageStatus::Published,
"the row is Published only after a successful delivery"
);

// The bus saw exactly one delivery — never the failed attempts, never twice.
assert_eq!(
dispatcher.publisher().attempts(),
3,
"publisher was invoked once per pass (two failures + one success)"
);
assert_eq!(
dispatcher.publisher().delivered(),
vec![message_id],
"the message was delivered exactly once, only on the successful pass"
);
}

fn added_seat(id: &str) -> Seat {
let mut seat = Seat::default();
seat.add(id.to_string(), "floor".to_string())
Expand Down
Loading
Loading