From a23e11fd96cb4f2ee6d57a2422b594f05f5a0ddd Mon Sep 17 00:00:00 2001 From: Patrick Lee Scott Date: Thu, 11 Jun 2026 00:14:48 -0500 Subject: [PATCH 1/4] refactor!: give RepositoryError and LockError a retryability signal MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit repository_storage_error mapped every sqlx failure — connection refused, pool timeout, SQLITE_BUSY, and constraint violations alike — to RepositoryError::Model(String). RepositoryError had no storage shape and no source(), and From/From collapsed to strings. Downstream, HandlerError::transport_error_kind classified every Repository(_) as Retryable, so a deterministic model error redelivered forever while a transient DB outage was indistinguishable from it. Add a Storage { operation, retryable, source } variant to RepositoryError plus a kind() -> RetryClass method, mirroring TransportError. RetryClass lives in the lock module (the lowest layer both errors reach) so repository does not depend on the bus. repository_storage_error now classifies sqlx errors via is_sqlx_transient (connection/pool/timeout and SQLITE_BUSY -> retryable; constraint/decode -> permanent) and preserves the source. The From impls map to Storage, keeping the source and the transient/permanent distinction. ConcurrentWrite and NotFound stay retryable (race resolved by a later attempt); Model/Replay/invalid-identity/InvalidState are permanent. HandlerError::transport_error_kind now defers to RepositoryError::kind() instead of bucketing all Repository(_) as retryable. LockError gains a Busy variant and the same kind()/is_retryable() signal: Poisoned is permanent (broken invariant); contention, lease loss, and busy are retryable. The existing AcquireFailed/ReleaseFailed lease mappings were already retryable under this classification. Storage carries a boxed source, so RepositoryError drops Clone/PartialEq/Eq (they were test-only). The handful of assert_eq! comparisons became matches! on the variant. Both enums are now #[non_exhaustive] so future variants are non-breaking. BREAKING CHANGE: RepositoryError no longer derives Clone/PartialEq/Eq, both RepositoryError and LockError are #[non_exhaustive], and From /From now produce RepositoryError::Storage instead of ::Model. Implements [[tasks/repository-error-taxonomy]] --- src/commit_builder/mod.rs | 6 +- src/hashmap_repo/repository.rs | 8 +- src/lock/error.rs | 91 +++++++++++ src/lock/mod.rs | 2 +- src/microsvc/error.rs | 63 +++++++- src/outbox/commit.rs | 5 +- src/outbox_worker/store.rs | 10 +- src/repository/error.rs | 148 +++++++++++++++++- src/snapshot/repository.rs | 5 +- src/sqlx_repo/mod.rs | 34 +++- .../scenario.rs | 25 ++- tests/postgres_repository/main.rs | 26 ++- tests/repository_api/main.rs | 11 +- tests/sqlite_repository/main.rs | 14 +- 14 files changed, 399 insertions(+), 49 deletions(-) diff --git a/src/commit_builder/mod.rs b/src/commit_builder/mod.rs index a85bf6b..040cdbb 100644 --- a/src/commit_builder/mod.rs +++ b/src/commit_builder/mod.rs @@ -733,9 +733,9 @@ mod tests { .await .unwrap_err(); - assert_eq!( - err, - RepositoryError::Model("injected async batch failure".into()) + assert!( + matches!(&err, RepositoryError::Model(message) if message == "injected async batch failure"), + "unexpected error: {err}" ); assert_eq!(agg.entity().committed_version(), 0); assert_eq!(agg.entity().new_events().len(), 1); diff --git a/src/hashmap_repo/repository.rs b/src/hashmap_repo/repository.rs index 6581cf1..89f0946 100644 --- a/src/hashmap_repo/repository.rs +++ b/src/hashmap_repo/repository.rs @@ -509,11 +509,9 @@ mod tests { ])) .await .unwrap_err(); - assert_eq!( - err, - RepositoryError::DuplicateStreamInBatch { - id: identity("same-id").to_string(), - } + assert!( + matches!(&err, RepositoryError::DuplicateStreamInBatch { id } if *id == identity("same-id").to_string()), + "unexpected error: {err}" ); assert!(repo diff --git a/src/lock/error.rs b/src/lock/error.rs index e8ad82c..8b81a97 100644 --- a/src/lock/error.rs +++ b/src/lock/error.rs @@ -1,7 +1,42 @@ use std::fmt; +/// Whether a failure is worth retrying as-is. +/// +/// This mirrors the transport layer's +/// [`TransportErrorKind`](crate::bus::TransportErrorKind): a `Retryable` failure +/// is transient (a later attempt may succeed), a `Permanent` failure is +/// deterministic (re-running the identical operation cannot change the outcome). +/// It lives in the lock module — the lowest layer that needs it — so both +/// [`LockError`] and [`RepositoryError`](crate::repository::RepositoryError) can +/// share one vocabulary without the repository layer depending on the bus. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub enum RetryClass { + /// A transient failure. Retrying the same operation may succeed. + Retryable, + /// A deterministic failure. Retrying the identical operation will not help. + Permanent, +} + +impl RetryClass { + /// Whether this class is retryable. + pub fn is_retryable(self) -> bool { + matches!(self, RetryClass::Retryable) + } + + /// Whether this class is permanent. + pub fn is_permanent(self) -> bool { + matches!(self, RetryClass::Permanent) + } +} + /// Error type for lock operations. +/// +/// Each variant carries a retry classification via [`LockError::kind`]: +/// contention and lease loss are transient (retry the acquire), whereas a +/// poisoned primitive is a process-fatal invariant violation that retrying +/// cannot clear. #[derive(Debug, Clone, PartialEq, Eq)] +#[non_exhaustive] pub enum LockError { /// The underlying lock primitive was poisoned (e.g. a thread panicked while holding it). Poisoned(String), @@ -11,10 +46,39 @@ pub enum LockError { ReleaseFailed(String), /// The lock expired (e.g. a distributed lock TTL elapsed). Expired(String), + /// A transient backend condition (e.g. SQLite `SQLITE_BUSY`/`SQLITE_LOCKED` + /// or a pool-acquire timeout) prevented the operation. Retrying may succeed. + Busy(String), /// Any other lock error. Other(String), } +impl LockError { + /// Classify this error for retry purposes. + /// + /// `Poisoned` is permanent — a poisoned primitive signals a broken invariant + /// that a retry cannot clear. Contention (`AcquireFailed`, `Busy`), lease + /// loss (`Expired`), and transient release failures are retryable. `Other` + /// is conservatively retryable: an unclassified lock failure is more often + /// transient infrastructure than a deterministic bug, and at-least-once + /// semantics make a redundant retry safe. + pub fn kind(&self) -> RetryClass { + match self { + LockError::Poisoned(_) => RetryClass::Permanent, + LockError::AcquireFailed(_) + | LockError::ReleaseFailed(_) + | LockError::Expired(_) + | LockError::Busy(_) + | LockError::Other(_) => RetryClass::Retryable, + } + } + + /// Whether this error is retryable. + pub fn is_retryable(&self) -> bool { + self.kind().is_retryable() + } +} + impl fmt::Display for LockError { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { @@ -22,9 +86,36 @@ impl fmt::Display for LockError { LockError::AcquireFailed(msg) => write!(f, "lock acquire failed: {}", msg), LockError::ReleaseFailed(msg) => write!(f, "lock release failed: {}", msg), LockError::Expired(msg) => write!(f, "lock expired: {}", msg), + LockError::Busy(msg) => write!(f, "lock busy: {}", msg), LockError::Other(msg) => write!(f, "lock error: {}", msg), } } } impl std::error::Error for LockError {} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn contention_and_lease_loss_are_retryable() { + for err in [ + LockError::AcquireFailed("contended".into()), + LockError::ReleaseFailed("transient".into()), + LockError::Expired("ttl elapsed".into()), + LockError::Busy("database is locked".into()), + LockError::Other("unknown".into()), + ] { + assert_eq!(err.kind(), RetryClass::Retryable, "{err}"); + assert!(err.is_retryable()); + } + } + + #[test] + fn poisoned_is_permanent() { + let err = LockError::Poisoned("map poisoned".into()); + assert_eq!(err.kind(), RetryClass::Permanent); + assert!(!err.is_retryable()); + } +} diff --git a/src/lock/mod.rs b/src/lock/mod.rs index 5cbd7af..165e9a8 100644 --- a/src/lock/mod.rs +++ b/src/lock/mod.rs @@ -43,7 +43,7 @@ mod sqlx_common; pub use async_in_memory::{InMemoryAsyncLock, InMemoryAsyncLockFuture, InMemoryAsyncLockManager}; pub use async_lock::AsyncLock; pub use async_lock_manager::AsyncLockManager; -pub use error::LockError; +pub use error::{LockError, RetryClass}; #[cfg(feature = "postgres")] pub use postgres_lock::{PostgresLock, PostgresLockManager}; #[cfg(feature = "sqlite")] diff --git a/src/microsvc/error.rs b/src/microsvc/error.rs index 410a418..edc9e17 100644 --- a/src/microsvc/error.rs +++ b/src/microsvc/error.rs @@ -4,6 +4,7 @@ use std::error::Error; use std::fmt; use crate::bus::{PayloadDecodeError, TransportError, TransportErrorKind}; +use crate::lock::RetryClass; use crate::{repository::RepositoryError, EventRecordError}; /// Error type for command handler operations. @@ -95,16 +96,23 @@ impl HandlerError { /// Classify this error for transport retry purposes (retryable vs permanent). /// - /// Transient failures (repository errors, not-found, otherwise-unclassified) - /// are retryable: in an at-least-once event-driven system a not-found is - /// usually an out-of-order delivery race a later redelivery resolves. - /// Deterministic failures (unknown routing, decode, rejection, auth, guard) - /// are permanent — redelivering the identical message cannot change them. + /// A repository error is no longer one blanket "retryable" bucket: it defers + /// to [`RepositoryError::kind`], so a transient storage outage (connection + /// refused, pool timeout, `SQLITE_BUSY`) stays retryable while a deterministic + /// model/read-model/decode fault is permanent — otherwise the latter would be + /// redelivered forever. `NotFound` and `Other` remain retryable: in an + /// at-least-once system a not-found is usually an out-of-order delivery race a + /// later redelivery resolves, and an unclassified error is more often + /// transient infrastructure than a deterministic bug. Deterministic failures + /// (unknown routing, decode, rejection, auth, guard) are permanent — + /// redelivering the identical message cannot change them. pub(crate) fn transport_error_kind(&self) -> TransportErrorKind { match self { - HandlerError::Repository(_) | HandlerError::NotFound(_) | HandlerError::Other(_) => { - TransportErrorKind::Retryable - } + HandlerError::Repository(err) => match err.kind() { + RetryClass::Retryable => TransportErrorKind::Retryable, + RetryClass::Permanent => TransportErrorKind::Permanent, + }, + HandlerError::NotFound(_) | HandlerError::Other(_) => TransportErrorKind::Retryable, HandlerError::UnknownCommand(_) | HandlerError::DecodeFailed(_) | HandlerError::Rejected(_) @@ -152,6 +160,45 @@ mod tests { } } + /// A deterministic repository fault (a model error, a permanent storage + /// failure such as a constraint violation) must NOT be classified retryable, + /// or the runner would redeliver the identical message forever. + #[test] + fn deterministic_repository_errors_are_permanent() { + let io = std::io::Error::new(std::io::ErrorKind::InvalidData, "bad row"); + for error in [ + HandlerError::Repository(RepositoryError::Model("invalid model state".into())), + HandlerError::Repository(RepositoryError::Replay("version mismatch".into())), + HandlerError::Repository(RepositoryError::permanent_storage("insert event", io)), + ] { + assert_eq!( + error.transport_error_kind(), + TransportErrorKind::Permanent, + "{error}" + ); + } + } + + /// A transient storage failure (connection refused, pool timeout, busy) must + /// stay retryable so a recovered backend gets the redelivery. + #[test] + fn transient_repository_storage_errors_are_retryable() { + let io = std::io::Error::new(std::io::ErrorKind::ConnectionRefused, "connection refused"); + let error = HandlerError::Repository(RepositoryError::retryable_storage("load stream", io)); + assert_eq!(error.transport_error_kind(), TransportErrorKind::Retryable); + + // A concurrency conflict is retryable: another writer won the race. + let conflict = HandlerError::Repository(RepositoryError::ConcurrentWrite { + id: "agg-1".into(), + expected: 1, + actual: 2, + }); + assert_eq!( + conflict.transport_error_kind(), + TransportErrorKind::Retryable + ); + } + #[test] fn from_handler_error_preserves_classification_and_source() { let err: TransportError = HandlerError::Rejected("invalid".into()).into(); diff --git a/src/outbox/commit.rs b/src/outbox/commit.rs index c9f225f..2dbdf3b 100644 --- a/src/outbox/commit.rs +++ b/src/outbox/commit.rs @@ -283,7 +283,10 @@ mod tests { let err = repo.outbox(event).commit(&mut aggregate).await.unwrap_err(); - assert_eq!(err, RepositoryError::Model("outbox write failed".into())); + assert!( + matches!(&err, RepositoryError::Model(message) if message == "outbox write failed"), + "unexpected error: {err}" + ); assert_eq!(aggregate.entity.committed_version(), 0); assert_eq!(aggregate.entity.new_events().len(), 1); assert_eq!( diff --git a/src/outbox_worker/store.rs b/src/outbox_worker/store.rs index 63d8678..8494e7d 100644 --- a/src/outbox_worker/store.rs +++ b/src/outbox_worker/store.rs @@ -795,9 +795,6 @@ mod tests { let store = HashMapOutboxStore { storage: Default::default(), }; - let expected = RepositoryError::NotFound { - id: "missing".into(), - }; let claim = OutboxClaimRef { message_id: "missing".into(), worker_id: "worker-1".into(), @@ -805,9 +802,10 @@ mod tests { attempt: 1, }; - assert_eq!(store.complete(&claim).unwrap_err(), expected); - assert_eq!(store.release(&claim, "error").unwrap_err(), expected); - assert_eq!(store.fail(&claim, "error").unwrap_err(), expected); + let is_missing = |err: RepositoryError| matches!(&err, RepositoryError::NotFound { id } if id == "missing"); + assert!(is_missing(store.complete(&claim).unwrap_err())); + assert!(is_missing(store.release(&claim, "error").unwrap_err())); + assert!(is_missing(store.fail(&claim, "error").unwrap_err())); } #[tokio::test] diff --git a/src/repository/error.rs b/src/repository/error.rs index a9cc549..454393b 100644 --- a/src/repository/error.rs +++ b/src/repository/error.rs @@ -1,10 +1,12 @@ +use std::error::Error; use std::fmt; -use crate::lock::LockError; +use crate::lock::{LockError, RetryClass}; use crate::read_model::ReadModelError; use crate::EventRecordError; -#[derive(Debug, Clone, PartialEq, Eq)] +#[derive(Debug)] +#[non_exhaustive] pub enum RepositoryError { LockPoisoned(&'static str), Lock(LockError), @@ -48,6 +50,97 @@ pub enum RepositoryError { }, Replay(String), Model(String), + /// A storage backend (event store, read model, snapshot store) failed to + /// complete an operation. Unlike [`RepositoryError::Model`] — a deterministic + /// modeling/decoding fault — this carries an explicit retry classification so + /// callers can distinguish a transient outage (connection refused, pool + /// timeout, `SQLITE_BUSY`) from a deterministic failure (constraint + /// violation, malformed row) without string-sniffing the message. + /// + /// The optional `source` preserves the underlying error for diagnostics and + /// dead-letter metadata; it is exposed through [`Error::source`]. + Storage { + /// The operation that failed (e.g. `"sqlite insert event"`). + operation: String, + /// Whether retrying the same operation may succeed. + retryable: bool, + /// The underlying backend error, if available. + source: Option>, + }, +} + +impl RepositoryError { + /// Construct a retryable storage failure carrying its source. + pub fn retryable_storage( + operation: impl Into, + source: impl Error + Send + Sync + 'static, + ) -> Self { + RepositoryError::Storage { + operation: operation.into(), + retryable: true, + source: Some(Box::new(source)), + } + } + + /// Construct a permanent storage failure carrying its source. + pub fn permanent_storage( + operation: impl Into, + source: impl Error + Send + Sync + 'static, + ) -> Self { + RepositoryError::Storage { + operation: operation.into(), + retryable: false, + source: Some(Box::new(source)), + } + } + + /// Classify this error for retry purposes. + /// + /// The contract a runner relies on: a retryable error should be redelivered + /// (a later attempt may succeed); a permanent error should not, because + /// re-running the identical operation cannot change a deterministic outcome. + /// + /// - `Storage { retryable, .. }` reports the classification captured when the + /// backend error was mapped (connection/pool/timeout → retryable; + /// constraint/decode → permanent). + /// - `Lock` defers to [`LockError::kind`]. + /// - `ConcurrentWrite` is **retryable**: an optimistic-concurrency conflict + /// means another writer won the race; reloading and reapplying typically + /// succeeds. This preserves the prior behavior where it fell into the + /// retryable bucket. + /// - `NotFound` is retryable: under at-least-once delivery it is usually an + /// out-of-order race a later redelivery resolves. + /// - The deterministic faults (`Model`, `Replay`, invalid identity/receipt, + /// `InvalidState`, duplicate-in-batch, `LockPoisoned`) are permanent. + pub fn kind(&self) -> RetryClass { + match self { + RepositoryError::Storage { retryable, .. } => { + if *retryable { + RetryClass::Retryable + } else { + RetryClass::Permanent + } + } + RepositoryError::Lock(err) => err.kind(), + RepositoryError::ConcurrentWrite { .. } | RepositoryError::NotFound { .. } => { + RetryClass::Retryable + } + RepositoryError::LockPoisoned(_) + | RepositoryError::DuplicateStreamInBatch { .. } + | RepositoryError::DuplicateOutboxMessageInBatch { .. } + | RepositoryError::DuplicateInboxReceipt { .. } + | RepositoryError::InvalidInboxReceipt { .. } + | RepositoryError::InvalidStreamIdentity { .. } + | RepositoryError::InvalidState { .. } + | RepositoryError::Replay(_) + | RepositoryError::Model(_) => RetryClass::Permanent, + } + } + + /// Whether this error is retryable. + pub fn is_retryable(&self) -> bool { + self.kind().is_retryable() + } } impl fmt::Display for RepositoryError { @@ -109,11 +202,35 @@ impl fmt::Display for RepositoryError { ), RepositoryError::Replay(message) => write!(f, "replay error: {}", message), RepositoryError::Model(message) => write!(f, "model error: {}", message), + RepositoryError::Storage { + operation, + retryable, + source, + } => { + let class = if *retryable { "retryable" } else { "permanent" }; + match source { + Some(source) => { + write!(f, "storage error ({class}) during {operation}: {source}") + } + None => write!(f, "storage error ({class}) during {operation}"), + } + } } } } -impl std::error::Error for RepositoryError {} +impl Error for RepositoryError { + fn source(&self) -> Option<&(dyn Error + 'static)> { + match self { + RepositoryError::Lock(err) => Some(err), + RepositoryError::Storage { + source: Some(source), + .. + } => Some(source.as_ref()), + _ => None, + } + } +} impl From for RepositoryError { fn from(err: LockError) -> Self { @@ -123,12 +240,33 @@ impl From for RepositoryError { impl From for RepositoryError { fn from(err: ReadModelError) -> Self { - RepositoryError::Model(err.to_string()) + // Map to `Storage` so the read-model error keeps a retry signal and its + // source instead of collapsing to an opaque `Model` string. Only a lock + // failure carries a transient/permanent distinction we can recover here; + // every other read-model variant is deterministic (a concurrency + // conflict, serde/metadata fault, or not-found will fail the same way on + // redelivery). `ReadModelError::Storage` is itself a stringified backend + // error with no preserved retry signal — without changing `read_model` + // it is classified permanent, which is the safe default (it cannot loop + // forever; it surfaces to the failure policy). + let retryable = matches!(&err, ReadModelError::Lock(lock) if lock.is_retryable()); + RepositoryError::Storage { + operation: "read model".into(), + retryable, + source: Some(Box::new(err)), + } } } impl From for RepositoryError { fn from(err: EventRecordError) -> Self { - RepositoryError::Model(err.to_string()) + // Event (de)serialization faults are deterministic: the same bytes will + // fail the same way on redelivery. Classify as permanent storage, but + // preserve the source for diagnostics rather than stringifying it away. + RepositoryError::Storage { + operation: "event record".into(), + retryable: false, + source: Some(Box::new(err)), + } } } diff --git a/src/snapshot/repository.rs b/src/snapshot/repository.rs index dd8f8de..d4df186 100644 --- a/src/snapshot/repository.rs +++ b/src/snapshot/repository.rs @@ -319,7 +319,10 @@ mod tests { let err = snapshot_repo.commit(&mut aggregate).await.unwrap_err(); - assert_eq!(err, RepositoryError::Model("snapshot write failed".into())); + assert!( + matches!(&err, RepositoryError::Model(message) if message == "snapshot write failed"), + "unexpected error: {err}" + ); assert!(snapshot_repo .repo() .saw_snapshot diff --git a/src/sqlx_repo/mod.rs b/src/sqlx_repo/mod.rs index a202f89..ca717bc 100644 --- a/src/sqlx_repo/mod.rs +++ b/src/sqlx_repo/mod.rs @@ -253,12 +253,44 @@ pub(crate) fn is_postgres_unique_violation(err: &sqlx::Error) -> bool { } } +/// Whether a `sqlx::Error` represents a transient condition worth retrying. +/// +/// Connection loss, pool exhaustion, and acquire/I/O timeouts are infrastructure +/// hiccups: the same statement may succeed once the backend recovers. SQLite +/// `SQLITE_BUSY`/`SQLITE_LOCKED` contention is likewise transient. Everything +/// else — most notably a `Database` error such as a constraint violation or a +/// malformed-row decode — is deterministic: re-running the identical statement +/// against the same data cannot change the outcome, so it is classified +/// permanent. Treating an unknown failure as permanent is the safe default: a +/// permanent classification hands the message to the failure policy instead of +/// redelivering it forever. +pub(crate) fn is_sqlx_transient(err: &sqlx::Error) -> bool { + // Connection / pool / timeout failures are transient regardless of backend. + if matches!( + err, + sqlx::Error::PoolTimedOut | sqlx::Error::PoolClosed | sqlx::Error::Io(_) + ) { + return true; + } + // SQLite serializes writers; busy/locked contention is retryable, not failure. + #[cfg(feature = "sqlite")] + if is_sqlite_busy(err) { + return true; + } + false +} + pub(crate) fn repository_storage_error( backend: &str, operation: &str, err: sqlx::Error, ) -> RepositoryError { - RepositoryError::Model(format!("{backend} {operation} failed: {err}")) + let retryable = is_sqlx_transient(&err); + RepositoryError::Storage { + operation: format!("{backend} {operation}"), + retryable, + source: Some(Box::new(err)), + } } #[cfg(any(feature = "postgres", feature = "sqlite"))] diff --git a/tests/persistent_repository_conformance/scenario.rs b/tests/persistent_repository_conformance/scenario.rs index 5ce3911..8bbffba 100644 --- a/tests/persistent_repository_conformance/scenario.rs +++ b/tests/persistent_repository_conformance/scenario.rs @@ -233,11 +233,13 @@ where .await .expect_err("duplicate stream should be rejected"); - assert_eq!( - err, - RepositoryError::DuplicateStreamInBatch { - id: format!("{}:{id}", Seat::aggregate_type()) - } + assert!( + matches!( + &err, + RepositoryError::DuplicateStreamInBatch { id: dup } + if *dup == format!("{}:{id}", Seat::aggregate_type()) + ), + "unexpected error: {err}" ); assert!(repo .get_stream(&identity) @@ -303,7 +305,18 @@ where .await .expect_err("unsupported codec should be rejected"); assert!( - matches!(err, RepositoryError::Model(message) if message.contains("unsupported payload codec")) + matches!( + &err, + RepositoryError::Storage { + retryable: false, + .. + } + ), + "unexpected error: {err}" + ); + assert!( + err.to_string().contains("unsupported payload codec"), + "unexpected error: {err}" ); assert!(repo .get_stream(&identity) diff --git a/tests/postgres_repository/main.rs b/tests/postgres_repository/main.rs index 7e39eb7..38c9014 100644 --- a/tests/postgres_repository/main.rs +++ b/tests/postgres_repository/main.rs @@ -261,11 +261,13 @@ async fn duplicate_stream_identity_is_rejected_before_sql_writes() { .await .unwrap_err(); - assert_eq!( - err, - RepositoryError::DuplicateStreamInBatch { - id: format!("{}:{id}", Counter::aggregate_type()) - } + assert!( + matches!( + &err, + RepositoryError::DuplicateStreamInBatch { id: dup } + if *dup == format!("{}:{id}", Counter::aggregate_type()) + ), + "unexpected error: {err}" ); assert!(repo.get_stream(&identity).await.unwrap().is_none()); } @@ -513,8 +515,20 @@ async fn unsupported_codec_rows_fail_on_read() { let err = repo.get_stream(&identity).await.unwrap_err(); assert!( - matches!(err, RepositoryError::Model(message) if message.contains("unsupported payload codec")) + matches!( + &err, + RepositoryError::Storage { + retryable: false, + .. + } + ), + "unexpected error: {err}" + ); + assert!( + err.to_string().contains("unsupported payload codec"), + "unexpected error: {err}" ); + assert!(!err.is_retryable()); } #[tokio::test] diff --git a/tests/repository_api/main.rs b/tests/repository_api/main.rs index ddfc4fd..1934211 100644 --- a/tests/repository_api/main.rs +++ b/tests/repository_api/main.rs @@ -123,11 +123,12 @@ async fn batch_rejects_duplicate_stream_identity_before_write() { .await .unwrap_err(); - assert_eq!( - err, - RepositoryError::DuplicateStreamInBatch { - id: "async.alpha:duplicate".into() - } + assert!( + matches!( + &err, + RepositoryError::DuplicateStreamInBatch { id } if id == "async.alpha:duplicate" + ), + "unexpected error: {err}" ); assert!(repo.get_stream(&identity).await.unwrap().is_none()); } diff --git a/tests/sqlite_repository/main.rs b/tests/sqlite_repository/main.rs index adcfe63..b697c97 100644 --- a/tests/sqlite_repository/main.rs +++ b/tests/sqlite_repository/main.rs @@ -437,8 +437,20 @@ async fn unsupported_codec_rows_fail_on_read() { let err = repo.get_stream(&identity).await.unwrap_err(); assert!( - matches!(err, RepositoryError::Model(message) if message.contains("unsupported payload codec")) + matches!( + &err, + RepositoryError::Storage { + retryable: false, + .. + } + ), + "unexpected error: {err}" + ); + assert!( + err.to_string().contains("unsupported payload codec"), + "unexpected error: {err}" ); + assert!(!err.is_retryable()); } #[tokio::test] From 0adf64173ea1fbad41b40342fd6ea09d41c06b6b Mon Sep 17 00:00:00 2001 From: Patrick Lee Scott Date: Thu, 11 Jun 2026 01:20:47 -0500 Subject: [PATCH 2/4] test(knative): use a genuinely retryable error for the 503 redeliver case MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The `retryable_failure_returns_503` ingress test handler returned `RepositoryError::Model("transient")`. After the error-taxonomy rework, `Model` is — correctly — a deterministic fault classified Permanent, so the ingress mapped it to 422 (do-not-retry), failing the assertion. This was a stale expectation asserting the old redeliver-forever bug, not a misclassification: a `Model` error is deterministic and re-running the identical message cannot change its outcome, so it MUST be permanent (the `deterministic_repository_errors_are_permanent` unit test pins this). The fix is in the test, not the classification logic. The handler now returns `RepositoryError::retryable_storage("load stream", ...)` — a transient storage outage (connection refused) that genuinely may succeed on redelivery — which is what a "temporarily failed" handler should signal. The 503 redeliver semantics the test names are now honestly exercised, and the deterministic-permanent path is covered by `order.rejected` -> 422. Refines [[tasks/repository-error-taxonomy]] --- tests/knative_cloudevents/main.rs | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/tests/knative_cloudevents/main.rs b/tests/knative_cloudevents/main.rs index d7ac06d..3328452 100644 --- a/tests/knative_cloudevents/main.rs +++ b/tests/knative_cloudevents/main.rs @@ -28,8 +28,16 @@ async fn spawn_server() -> (String, Arc>>) { }) .event("order.temporarily_failed") .handle(|_ctx: &Context<()>| async move { + // A transient storage outage (connection refused / pool timeout): + // the same message may succeed on redelivery, so it must classify + // retryable. A deterministic `Model` fault would (correctly) be + // permanent — that is the `order.rejected` path below. + let io = std::io::Error::new( + std::io::ErrorKind::ConnectionRefused, + "connection refused", + ); Err(HandlerError::Repository( - distributed::RepositoryError::Model("transient".into()), + distributed::RepositoryError::retryable_storage("load stream", io), )) }) .event("order.rejected") From 900edd2463497a7ed124dc909809fbf93bcbbcf0 Mon Sep 17 00:00:00 2001 From: Patrick Lee Scott Date: Thu, 11 Jun 2026 01:32:11 -0500 Subject: [PATCH 3/4] test: expect permanent Storage error for read-model constraint violation MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit A read-model CHECK-constraint violation is a deterministic, non-retryable fault. With the reworked error taxonomy it now surfaces as the new `RepositoryError::Storage { retryable: false, .. }` variant rather than the old `RepositoryError::Model(String)`, so re-running the identical commit is never retried forever. PR #85 added `read_model_failure_mid_plan_rolls_back_events_and_outbox` to both the sqlite and postgres repository suites asserting the old `Model` error. Update both assertions to expect the permanent `Storage` variant (and `!err.is_retryable()`), matching the corrected classification and the idiom already used by this PR's own backend-storage tests. The rollback invariant the test actually guards — events, outbox row, and the first read-model mutation all absent after the failed commit — is unchanged. Refines [[tasks/repository-error-taxonomy]] --- tests/postgres_repository/main.rs | 14 ++++++++++++-- tests/sqlite_repository/main.rs | 15 ++++++++++++--- 2 files changed, 24 insertions(+), 5 deletions(-) diff --git a/tests/postgres_repository/main.rs b/tests/postgres_repository/main.rs index 79d714a..a611f09 100644 --- a/tests/postgres_repository/main.rs +++ b/tests/postgres_repository/main.rs @@ -329,10 +329,20 @@ async fn read_model_failure_mid_plan_rolls_back_events_and_outbox() { }) .await .expect_err("a mid-plan constraint violation must fail the commit"); + // A read-model CHECK-constraint violation is a deterministic, non-retryable + // fault, so it surfaces as the permanent `Storage` variant (not `Model`): + // re-running the identical write cannot change the outcome. assert!( - matches!(err, RepositoryError::Model(_)), - "expected a Model error from the constraint violation, got {err:?}" + matches!( + &err, + RepositoryError::Storage { + retryable: false, + .. + } + ), + "expected a permanent Storage error from the constraint violation, got {err:?}" ); + assert!(!err.is_retryable()); // 1. Aggregate event absent. assert!( diff --git a/tests/sqlite_repository/main.rs b/tests/sqlite_repository/main.rs index 6a07630..a935aa3 100644 --- a/tests/sqlite_repository/main.rs +++ b/tests/sqlite_repository/main.rs @@ -275,11 +275,20 @@ async fn read_model_failure_mid_plan_rolls_back_events_and_outbox() { }) .await .expect_err("a mid-plan constraint violation must fail the commit"); - // The constraint failure surfaces as a Model error (read-model storage error). + // A read-model CHECK-constraint violation is a deterministic, non-retryable + // fault, so it surfaces as the permanent `Storage` variant (not `Model`): + // re-running the identical write cannot change the outcome. assert!( - matches!(err, RepositoryError::Model(_)), - "expected a Model error from the constraint violation, got {err:?}" + matches!( + &err, + RepositoryError::Storage { + retryable: false, + .. + } + ), + "expected a permanent Storage error from the constraint violation, got {err:?}" ); + assert!(!err.is_retryable()); // 1. The aggregate event must be absent. assert!( From deb6369423e97eb38ff4d2f77e186dc2c190c54c Mon Sep 17 00:00:00 2001 From: Patrick Lee Scott Date: Thu, 11 Jun 2026 02:59:46 -0500 Subject: [PATCH 4/4] fix(repo): classify Postgres deadlock/serialization SQLSTATEs as retryable 40001 (serialization_failure) and 40P01 (deadlock_detected) are transient write-race outcomes that should be retried, not handed to the failure policy. is_sqlx_transient previously only matched pool/timeout/IO and SQLite busy, so these fell through as permanent. Mirrors the existing 23505 unique-violation detection (no feature gate: SQLite never carries these SQLSTATEs). Addresses CodeRabbit review on PR #83. Refines [[tasks/repository-error-taxonomy]] --- src/sqlx_repo/mod.rs | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/src/sqlx_repo/mod.rs b/src/sqlx_repo/mod.rs index ca717bc..b3638c9 100644 --- a/src/sqlx_repo/mod.rs +++ b/src/sqlx_repo/mod.rs @@ -277,6 +277,14 @@ pub(crate) fn is_sqlx_transient(err: &sqlx::Error) -> bool { if is_sqlite_busy(err) { return true; } + // Postgres serialization_failure (40001) / deadlock_detected (40P01): the + // transaction lost a write race and should be retried, not handed to the + // failure policy. SQLite never carries these SQLSTATEs, so no feature gate. + if let sqlx::Error::Database(db_err) = err { + if matches!(db_err.code().as_deref(), Some("40001" | "40P01")) { + return true; + } + } false }