Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions src/commit_builder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -733,9 +733,9 @@ mod tests {
.await
.unwrap_err();

assert_eq!(
err,
RepositoryError::Model("injected async batch failure".into())
assert!(
matches!(&err, RepositoryError::Model(message) if message == "injected async batch failure"),
"unexpected error: {err}"
);
assert_eq!(agg.entity().committed_version(), 0);
assert_eq!(agg.entity().new_events().len(), 1);
Expand Down
8 changes: 3 additions & 5 deletions src/hashmap_repo/repository.rs
Original file line number Diff line number Diff line change
Expand Up @@ -509,11 +509,9 @@ mod tests {
]))
.await
.unwrap_err();
assert_eq!(
err,
RepositoryError::DuplicateStreamInBatch {
id: identity("same-id").to_string(),
}
assert!(
matches!(&err, RepositoryError::DuplicateStreamInBatch { id } if *id == identity("same-id").to_string()),
"unexpected error: {err}"
);

assert!(repo
Expand Down
91 changes: 91 additions & 0 deletions src/lock/error.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,42 @@
use std::fmt;

/// Whether a failure is worth retrying as-is.
///
/// This mirrors the transport layer's
/// [`TransportErrorKind`](crate::bus::TransportErrorKind): a `Retryable` failure
/// is transient (a later attempt may succeed), a `Permanent` failure is
/// deterministic (re-running the identical operation cannot change the outcome).
/// It lives in the lock module — the lowest layer that needs it — so both
/// [`LockError`] and [`RepositoryError`](crate::repository::RepositoryError) can
/// share one vocabulary without the repository layer depending on the bus.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum RetryClass {
/// A transient failure. Retrying the same operation may succeed.
Retryable,
/// A deterministic failure. Retrying the identical operation will not help.
Permanent,
}

impl RetryClass {
/// Whether this class is retryable.
pub fn is_retryable(self) -> bool {
matches!(self, RetryClass::Retryable)
}

/// Whether this class is permanent.
pub fn is_permanent(self) -> bool {
matches!(self, RetryClass::Permanent)
}
}

/// Error type for lock operations.
///
/// Each variant carries a retry classification via [`LockError::kind`]:
/// contention and lease loss are transient (retry the acquire), whereas a
/// poisoned primitive is a process-fatal invariant violation that retrying
/// cannot clear.
#[derive(Debug, Clone, PartialEq, Eq)]
#[non_exhaustive]
pub enum LockError {
/// The underlying lock primitive was poisoned (e.g. a thread panicked while holding it).
Poisoned(String),
Expand All @@ -11,20 +46,76 @@ pub enum LockError {
ReleaseFailed(String),
/// The lock expired (e.g. a distributed lock TTL elapsed).
Expired(String),
/// A transient backend condition (e.g. SQLite `SQLITE_BUSY`/`SQLITE_LOCKED`
/// or a pool-acquire timeout) prevented the operation. Retrying may succeed.
Busy(String),
/// Any other lock error.
Other(String),
}

impl LockError {
/// Classify this error for retry purposes.
///
/// `Poisoned` is permanent — a poisoned primitive signals a broken invariant
/// that a retry cannot clear. Contention (`AcquireFailed`, `Busy`), lease
/// loss (`Expired`), and transient release failures are retryable. `Other`
/// is conservatively retryable: an unclassified lock failure is more often
/// transient infrastructure than a deterministic bug, and at-least-once
/// semantics make a redundant retry safe.
pub fn kind(&self) -> RetryClass {
match self {
LockError::Poisoned(_) => RetryClass::Permanent,
LockError::AcquireFailed(_)
| LockError::ReleaseFailed(_)
| LockError::Expired(_)
| LockError::Busy(_)
| LockError::Other(_) => RetryClass::Retryable,
}
}

/// Whether this error is retryable.
pub fn is_retryable(&self) -> bool {
self.kind().is_retryable()
}
}

impl fmt::Display for LockError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
LockError::Poisoned(msg) => write!(f, "lock poisoned: {}", msg),
LockError::AcquireFailed(msg) => write!(f, "lock acquire failed: {}", msg),
LockError::ReleaseFailed(msg) => write!(f, "lock release failed: {}", msg),
LockError::Expired(msg) => write!(f, "lock expired: {}", msg),
LockError::Busy(msg) => write!(f, "lock busy: {}", msg),
LockError::Other(msg) => write!(f, "lock error: {}", msg),
}
}
}

impl std::error::Error for LockError {}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn contention_and_lease_loss_are_retryable() {
for err in [
LockError::AcquireFailed("contended".into()),
LockError::ReleaseFailed("transient".into()),
LockError::Expired("ttl elapsed".into()),
LockError::Busy("database is locked".into()),
LockError::Other("unknown".into()),
] {
assert_eq!(err.kind(), RetryClass::Retryable, "{err}");
assert!(err.is_retryable());
}
}

#[test]
fn poisoned_is_permanent() {
let err = LockError::Poisoned("map poisoned".into());
assert_eq!(err.kind(), RetryClass::Permanent);
assert!(!err.is_retryable());
}
}
2 changes: 1 addition & 1 deletion src/lock/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ mod sqlx_common;
pub use async_in_memory::{InMemoryAsyncLock, InMemoryAsyncLockFuture, InMemoryAsyncLockManager};
pub use async_lock::AsyncLock;
pub use async_lock_manager::AsyncLockManager;
pub use error::LockError;
pub use error::{LockError, RetryClass};
#[cfg(feature = "postgres")]
pub use postgres_lock::{PostgresLock, PostgresLockManager};
#[cfg(feature = "sqlite")]
Expand Down
63 changes: 55 additions & 8 deletions src/microsvc/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use std::error::Error;
use std::fmt;

use crate::bus::{PayloadDecodeError, TransportError, TransportErrorKind};
use crate::lock::RetryClass;
use crate::{repository::RepositoryError, EventRecordError};

/// Error type for command handler operations.
Expand Down Expand Up @@ -113,16 +114,23 @@ impl HandlerError {

/// Classify this error for transport retry purposes (retryable vs permanent).
///
/// Transient failures (repository errors, not-found, otherwise-unclassified)
/// are retryable: in an at-least-once event-driven system a not-found is
/// usually an out-of-order delivery race a later redelivery resolves.
/// Deterministic failures (unknown routing, decode, rejection, auth, guard)
/// are permanent — redelivering the identical message cannot change them.
/// A repository error is no longer one blanket "retryable" bucket: it defers
/// to [`RepositoryError::kind`], so a transient storage outage (connection
/// refused, pool timeout, `SQLITE_BUSY`) stays retryable while a deterministic
/// model/read-model/decode fault is permanent — otherwise the latter would be
/// redelivered forever. `NotFound` and `Other` remain retryable: in an
/// at-least-once system a not-found is usually an out-of-order delivery race a
/// later redelivery resolves, and an unclassified error is more often
/// transient infrastructure than a deterministic bug. Deterministic failures
/// (unknown routing, decode, rejection, auth, guard) are permanent —
/// redelivering the identical message cannot change them.
pub(crate) fn transport_error_kind(&self) -> TransportErrorKind {
match self {
HandlerError::Repository(_) | HandlerError::NotFound(_) | HandlerError::Other(_) => {
TransportErrorKind::Retryable
}
HandlerError::Repository(err) => match err.kind() {
RetryClass::Retryable => TransportErrorKind::Retryable,
RetryClass::Permanent => TransportErrorKind::Permanent,
},
HandlerError::NotFound(_) | HandlerError::Other(_) => TransportErrorKind::Retryable,
HandlerError::UnknownCommand(_)
| HandlerError::DecodeFailed(_)
| HandlerError::Rejected(_)
Expand Down Expand Up @@ -170,6 +178,45 @@ mod tests {
}
}

/// A deterministic repository fault (a model error, a permanent storage
/// failure such as a constraint violation) must NOT be classified retryable,
/// or the runner would redeliver the identical message forever.
#[test]
fn deterministic_repository_errors_are_permanent() {
let io = std::io::Error::new(std::io::ErrorKind::InvalidData, "bad row");
for error in [
HandlerError::Repository(RepositoryError::Model("invalid model state".into())),
HandlerError::Repository(RepositoryError::Replay("version mismatch".into())),
HandlerError::Repository(RepositoryError::permanent_storage("insert event", io)),
] {
assert_eq!(
error.transport_error_kind(),
TransportErrorKind::Permanent,
"{error}"
);
}
}

/// A transient storage failure (connection refused, pool timeout, busy) must
/// stay retryable so a recovered backend gets the redelivery.
#[test]
fn transient_repository_storage_errors_are_retryable() {
let io = std::io::Error::new(std::io::ErrorKind::ConnectionRefused, "connection refused");
let error = HandlerError::Repository(RepositoryError::retryable_storage("load stream", io));
assert_eq!(error.transport_error_kind(), TransportErrorKind::Retryable);

// A concurrency conflict is retryable: another writer won the race.
let conflict = HandlerError::Repository(RepositoryError::ConcurrentWrite {
id: "agg-1".into(),
expected: 1,
actual: 2,
});
assert_eq!(
conflict.transport_error_kind(),
TransportErrorKind::Retryable
);
}

#[test]
fn from_handler_error_preserves_classification_and_source() {
let err: TransportError = HandlerError::Rejected("invalid".into()).into();
Expand Down
5 changes: 4 additions & 1 deletion src/outbox/commit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,10 @@ mod tests {

let err = repo.outbox(event).commit(&mut aggregate).await.unwrap_err();

assert_eq!(err, RepositoryError::Model("outbox write failed".into()));
assert!(
matches!(&err, RepositoryError::Model(message) if message == "outbox write failed"),
"unexpected error: {err}"
);
assert_eq!(aggregate.entity.committed_version(), 0);
assert_eq!(aggregate.entity.new_events().len(), 1);
assert_eq!(
Expand Down
10 changes: 4 additions & 6 deletions src/outbox_worker/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -795,19 +795,17 @@ mod tests {
let store = HashMapOutboxStore {
storage: Default::default(),
};
let expected = RepositoryError::NotFound {
id: "missing".into(),
};
let claim = OutboxClaimRef {
message_id: "missing".into(),
worker_id: "worker-1".into(),
leased_until: SystemTime::now(),
attempt: 1,
};

assert_eq!(store.complete(&claim).unwrap_err(), expected);
assert_eq!(store.release(&claim, "error").unwrap_err(), expected);
assert_eq!(store.fail(&claim, "error").unwrap_err(), expected);
let is_missing = |err: RepositoryError| matches!(&err, RepositoryError::NotFound { id } if id == "missing");
assert!(is_missing(store.complete(&claim).unwrap_err()));
assert!(is_missing(store.release(&claim, "error").unwrap_err()));
assert!(is_missing(store.fail(&claim, "error").unwrap_err()));
}

#[tokio::test]
Expand Down
Loading
Loading