diff --git a/src/bus/postgres_bus.rs b/src/bus/postgres_bus.rs index 7cfff55..1c6ccc2 100644 --- a/src/bus/postgres_bus.rs +++ b/src/bus/postgres_bus.rs @@ -93,22 +93,47 @@ fn kind_from_str(value: &str) -> MessageKind { } } -fn message_from_row(row: &sqlx::postgres::PgRow) -> Message { +/// Reconstruct a [`Message`] from a claimed `bus_queue`/`bus_log` row. +/// +/// A row that fails to decode a **required** column (`name`, `kind`, `payload`) +/// is corrupt and can never be handled: returning a placeholder `Message` with an +/// empty name would route it to the runner's ack-and-ignore path, silently +/// deleting the row (queue) or advancing the offset past it (log) with no trace. +/// Instead this returns a **permanent** [`TransportError`] so the caller surfaces +/// it as a decode failure and the runner routes the claimed row through the +/// failure policy (dead-letter by default) like any other permanent failure. +/// +/// `message_id`, `content_type`, and `metadata` keep tolerant defaults: they are +/// optional or have schema defaults, so a missing/garbled value there does not +/// make the message unhandleable. +fn message_from_row(row: &sqlx::postgres::PgRow) -> Result { + fn decode_err(column: &str, err: sqlx::Error) -> TransportError { + TransportError::permanent(format!( + "postgres bus corrupt row: required column '{column}' failed to decode: {err}" + )) + } + + let name: String = row.try_get("name").map_err(|err| decode_err("name", err))?; + let kind: String = row.try_get("kind").map_err(|err| decode_err("kind", err))?; + let payload: Vec = row + .try_get("payload") + .map_err(|err| decode_err("payload", err))?; + let metadata_json: String = row.try_get("metadata").unwrap_or_else(|_| "[]".to_string()); let metadata = serde_json::from_str::>(&metadata_json).unwrap_or_default(); - Message { + Ok(Message { id: row .try_get::, _>("message_id") .unwrap_or(None), - name: row.try_get("name").unwrap_or_default(), - kind: kind_from_str(&row.try_get::("kind").unwrap_or_default()), - payload: row.try_get("payload").unwrap_or_default(), + name, + kind: kind_from_str(&kind), + payload, content_type: row .try_get("content_type") .unwrap_or_else(|_| "application/json".to_string()), metadata, - } + }) } /// Postgres [`Bus`] + [`BusConsumer`]. Cheap to clone (the pool is an `Arc`). @@ -280,12 +305,20 @@ impl AsyncMessageSource for QueueSource { type Received = QueueReceived; async fn recv(&mut self) -> Result, TransportError> { + // Claim the next row whose `name` matches a subscribed command, OR whose + // `name` is NULL. A NULL name is un-routable corruption: it belongs to no + // consumer, so `name = ANY($2)` would never match it and it would sit in + // the queue forever, never claimed, never settled. Claiming it here lets + // the runner route it through the failure policy (dead-letter by default, + // which deletes the row) instead of leaving a poison row that blocks the + // queue from draining. `FOR UPDATE SKIP LOCKED` keeps the claim safe under + // competing listeners — only one settles the orphaned row. let row = sqlx::query( "UPDATE bus_queue SET locked_until = now() + ($1 * interval '1 second'), \ attempts = attempts + 1 \ WHERE seq = ( \ SELECT seq FROM bus_queue \ - WHERE name = ANY($2) AND available_at <= now() \ + WHERE (name = ANY($2) OR name IS NULL) AND available_at <= now() \ AND (locked_until IS NULL OR locked_until < now()) \ ORDER BY seq FOR UPDATE SKIP LOCKED LIMIT 1 \ ) \ @@ -299,21 +332,41 @@ impl AsyncMessageSource for QueueSource { Ok(row.map(|row| { let seq: i64 = row.try_get("seq").unwrap_or_default(); + let (message, decode_error) = decode_or_placeholder(&row); QueueReceived { pool: self.pool.clone(), seq, - message: message_from_row(&row), + message, + decode_error, } })) } } +/// Split a decode result into the handle's `(message, decode_error)` fields. +/// +/// On failure the handle still needs *some* `Message` to return from +/// [`ReceivedMessage::message`]; this placeholder is never dispatched because the +/// runner sees `decode_error()` first and routes the claim through the failure +/// policy. The placeholder name is deliberately empty — the decode error carries +/// the diagnostic. +fn decode_or_placeholder(row: &sqlx::postgres::PgRow) -> (Message, Option) { + match message_from_row(row) { + Ok(message) => (message, None), + Err(error) => ( + Message::new("", MessageKind::Event, Vec::new()), + Some(error), + ), + } +} + /// A claimed `bus_queue` row: `ack` deletes it (done); `nack` makes it available /// again (redelivery); `dead_letter`/`park` delete it (stop redelivery). pub struct QueueReceived { pool: PgPool, seq: i64, message: Message, + decode_error: Option, } impl QueueReceived { @@ -332,6 +385,10 @@ impl ReceivedMessage for QueueReceived { &self.message } + fn decode_error(&self) -> Option<&TransportError> { + self.decode_error.as_ref() + } + async fn ack(self) -> Result<(), TransportError> { self.delete().await } @@ -366,9 +423,16 @@ impl AsyncMessageSource for LogSource { type Received = LogReceived; async fn recv(&mut self) -> Result, TransportError> { + // Read the next entry past this consumer's offset whose `name` matches a + // subscribed event, OR whose `name` is NULL. A NULL name is un-routable + // corruption: `name = ANY($1)` would skip past it silently when the offset + // jumps to a later healthy entry, dropping the poison record with no trace. + // Surfacing it lets the runner route it through the failure policy + // (dead-letter by default, which advances the offset past it) so the + // corrupt entry is settled, not silently skipped. let row = sqlx::query( "SELECT seq, name, message_id, kind, payload, content_type, metadata FROM bus_log \ - WHERE name = ANY($1) \ + WHERE (name = ANY($1) OR name IS NULL) \ AND seq > COALESCE((SELECT last_seq FROM bus_offset WHERE consumer = $2), 0) \ ORDER BY seq LIMIT 1", ) @@ -380,11 +444,13 @@ impl AsyncMessageSource for LogSource { Ok(row.map(|row| { let seq: i64 = row.try_get("seq").unwrap_or_default(); + let (message, decode_error) = decode_or_placeholder(&row); LogReceived { pool: self.pool.clone(), consumer: self.consumer.clone(), seq, - message: message_from_row(&row), + message, + decode_error, } })) } @@ -398,6 +464,7 @@ pub struct LogReceived { consumer: String, seq: i64, message: Message, + decode_error: Option, } impl LogReceived { @@ -421,6 +488,10 @@ impl ReceivedMessage for LogReceived { &self.message } + fn decode_error(&self) -> Option<&TransportError> { + self.decode_error.as_ref() + } + async fn ack(self) -> Result<(), TransportError> { self.advance_offset().await } diff --git a/src/bus/runner.rs b/src/bus/runner.rs index f8e4a25..e1ddb3f 100644 --- a/src/bus/runner.rs +++ b/src/bus/runner.rs @@ -54,6 +54,33 @@ where I: Send, { while let Some(received) = source.recv().await? { + // A delivery the transport could not decode is a permanent failure: it + // carries no valid message to dispatch, and it must NOT be treated as an + // empty message (which would route to ack-and-ignore below and silently + // drop a corrupt row). Route it through the failure policy directly, the + // same as a permanent dispatch failure, so it is dead-lettered/parked. + if let Some(error) = received.decode_error() { + match options.failure_policy.resolve(error) { + FailureAction::Nack => { + let reason = error.to_string(); + received.nack(&reason).await? + } + FailureAction::DeadLetter => { + let reason = error.to_string(); + received.dead_letter(&reason).await? + } + FailureAction::Park => { + let reason = error.to_string(); + received.park(&reason).await? + } + FailureAction::LogAndAck => { + eprintln!("[bus::runner] dropping undecodable message after permanent failure: {error}"); + received.ack().await? + } + FailureAction::Stop => return Err(TransportError::permanent(error.to_string())), + } + continue; + } // No handler for this message: intentionally ignore (ack) rather than // dead-letter, so unrelated fan-out events don't pile into the DLQ. if !router.handles(received.message().kind, received.message().name()) { @@ -161,6 +188,7 @@ mod tests { message: Message, recorder: Arc, settle_ok: bool, + decode_error: Option, } impl FakeReceived { @@ -178,6 +206,9 @@ mod tests { fn message(&self) -> &Message { &self.message } + fn decode_error(&self) -> Option<&TransportError> { + self.decode_error.as_ref() + } async fn ack(self) -> Result<(), TransportError> { self.settle(Event::Ack) } @@ -197,6 +228,9 @@ mod tests { recorder: Arc, settle_ok: bool, recv_error: bool, + // When set, every received message reports this as a decode failure, + // modeling a transport that claims a row/offset before decoding it. + decode_error: bool, } impl AsyncMessageSource for FakeSource { @@ -205,10 +239,13 @@ mod tests { if self.recv_error { return Err(TransportError::retryable("recv failed")); } + let decode_error = self.decode_error; Ok(self.queue.pop_front().map(|message| FakeReceived { message, recorder: self.recorder.clone(), settle_ok: self.settle_ok, + decode_error: decode_error + .then(|| TransportError::permanent("corrupt row: name failed to decode")), })) } } @@ -277,6 +314,7 @@ mod tests { recorder: recorder.clone(), settle_ok, recv_error, + decode_error: false, }; let outcome = block_on(run_source(svc, source, options)); RunResult { @@ -484,6 +522,66 @@ mod tests { ); } + fn run_decode_error(messages: Vec, options: RunOptions) -> RunResult { + let recorder = Recorder::new(); + let svc = router(&recorder); + let source = FakeSource { + queue: messages.into_iter().collect(), + recorder: recorder.clone(), + settle_ok: true, + recv_error: false, + decode_error: true, + }; + let outcome = block_on(run_source(svc, source, options)); + RunResult { + outcome, + events: recorder.events(), + } + } + + #[test] + fn corrupt_row_dead_letters_under_default_policy_not_acked_and_ignored() { + // The corrupt delivery has an unhandled (empty) name, which would + // otherwise fall into the ack-and-ignore path and silently drop it. The + // decode error must instead route it through the failure policy. + let result = run_decode_error(vec![event_message("", None)], RunOptions::idempotent()); + assert!(result.outcome.is_ok()); + assert_eq!(result.events.len(), 1); + match &result.events[0] { + Event::DeadLetter(reason) => assert!(reason.contains("corrupt row")), + other => panic!("expected dead-letter, got {other:?}"), + } + // It was never handled and never plain-acked. + assert!(!result.events.iter().any(|e| matches!(e, Event::Handled(_)))); + assert!(!result.events.contains(&Event::Ack)); + } + + #[test] + fn corrupt_row_parks_under_park_policy() { + let result = run_decode_error( + vec![event_message("", None)], + RunOptions::idempotent().with_failure_policy(FailurePolicy::Park), + ); + assert!(result.outcome.is_ok()); + assert!(matches!(result.events.first(), Some(Event::Park(_)))); + } + + #[test] + fn corrupt_row_stops_under_stop_policy_with_permanent_error() { + let result = run_decode_error( + vec![event_message("", None)], + RunOptions::idempotent().with_failure_policy(FailurePolicy::Stop), + ); + let err = result + .outcome + .expect_err("stop policy surfaces the decode error"); + assert!(err.is_permanent()); + assert!( + result.events.is_empty(), + "stop does not settle the corrupt row" + ); + } + #[test] fn run_source_future_is_send() { // Guards the documented multi-threaded-executor contract for the common @@ -496,6 +594,7 @@ mod tests { recorder, settle_ok: true, recv_error: false, + decode_error: false, }; let future = run_source(svc, source, RunOptions::idempotent()); assert_send(&future); diff --git a/src/bus/source.rs b/src/bus/source.rs index d1f6492..c858686 100644 --- a/src/bus/source.rs +++ b/src/bus/source.rs @@ -46,6 +46,20 @@ pub trait ReceivedMessage: Send { /// The canonical message to dispatch. fn message(&self) -> &Message; + /// A permanent decode failure for this delivery, if the transport could not + /// reconstruct the message from its stored representation. + /// + /// Defaults to `None`: most adapters either decode successfully or fail the + /// whole `recv`. An adapter that can claim a row/offset *before* decoding it + /// (so the claim must be settled even when decoding fails) returns the + /// classified error here. The runner treats `Some(err)` as a permanent + /// failure routed through the [`FailurePolicy`](super::FailurePolicy) — the + /// same path as a permanent dispatch failure — so a corrupt row is + /// dead-lettered/parked rather than ack-and-ignored as an empty message. + fn decode_error(&self) -> Option<&TransportError> { + None + } + /// Acknowledge successful handling. The transport removes the message. /// /// The runner calls this only after consumer execution has succeeded (and, diff --git a/tests/postgres_transport/main.rs b/tests/postgres_transport/main.rs index da5c5b2..16ccefb 100644 --- a/tests/postgres_transport/main.rs +++ b/tests/postgres_transport/main.rs @@ -355,3 +355,170 @@ async fn bus_subscribe_uses_named_service_as_consumer_group() { vec!["e0".to_string(), "e1".to_string(), "e2".to_string()] ); } + +// ---- corrupt-row handling: a row that fails to decode must NOT vanish ---- + +/// Make the most recently enqueued `bus_queue` row corrupt by nulling its +/// required `name` column, so `try_get::` fails with `UnexpectedNull` +/// when the source claims it — the exact decode failure a real corruption (a +/// migration mishap, a manual edit, a driver/type mismatch) produces. +async fn corrupt_latest_queue_name(pool: &sqlx::PgPool) { + sqlx::query("ALTER TABLE bus_queue ALTER COLUMN name DROP NOT NULL") + .execute(pool) + .await + .expect("drop not null"); + sqlx::query("UPDATE bus_queue SET name = NULL WHERE seq = (SELECT max(seq) FROM bus_queue)") + .execute(pool) + .await + .expect("null out name"); +} + +async fn corrupt_latest_log_name(pool: &sqlx::PgPool) { + sqlx::query("ALTER TABLE bus_log ALTER COLUMN name DROP NOT NULL") + .execute(pool) + .await + .expect("drop not null"); + sqlx::query("UPDATE bus_log SET name = NULL WHERE seq = (SELECT max(seq) FROM bus_log)") + .execute(pool) + .await + .expect("null out name"); +} + +/// A corrupt `bus_queue` row is routed through the failure policy (dead-letter by +/// default → the row is deleted) rather than being decoded into an empty-named +/// message and silently ack-and-ignored. The valid row beside it is still +/// handled, and the run drains to completion. +#[tokio::test] +async fn bus_listen_dead_letters_corrupt_queue_row_not_silently() { + let Some(schema) = postgres::PostgresTestSchema::create_from_env("bus_corrupt_q", SKIP).await + else { + return; + }; + let repo = schema.repository().await; + let pool = repo.pool().clone(); + let bus = PostgresBus::new(pool.clone()).group("orders"); + bus.ensure_tables().await.expect("ensure tables"); + + // A poison row (name will be nulled) and a healthy row. + bus.send_message( + Message::new("order.initialize", MessageKind::Command, b"{}".to_vec()).with_id("poison"), + ) + .await + .expect("send poison"); + corrupt_latest_queue_name(&pool).await; + bus.send_message( + Message::new("order.initialize", MessageKind::Command, b"{}".to_vec()).with_id("ok"), + ) + .await + .expect("send ok"); + + let rec = Arc::new(Mutex::new(Vec::new())); + bus.listen( + recording_for("order.initialize", MessageKind::Command, rec.clone()), + RunOptions::idempotent(), + ) + .await + .expect("listen drains without surfacing the corrupt row as a fatal error"); + + // The healthy command was handled; the corrupt row was never dispatched as + // an empty-named message. + let handled = rec.lock().unwrap().clone(); + assert_eq!( + handled, + vec!["ok".to_string()], + "only the valid row handled" + ); + + // The corrupt row did not vanish into ack-and-ignore *and* did not get stuck + // redelivering forever: under the default dead-letter policy it leaves the + // queue. The queue is fully drained. + let remaining: i64 = sqlx::query_scalar("SELECT count(*) FROM bus_queue") + .fetch_one(&pool) + .await + .expect("count queue"); + assert_eq!( + remaining, 0, + "corrupt row routed through policy, not redelivered forever" + ); +} + +/// A corrupt `bus_log` row is routed through the failure policy (dead-letter by +/// default → the consumer offset advances past it) rather than silently +/// ack-and-ignored. The consumer makes progress to the healthy entry after it. +#[tokio::test] +async fn bus_subscribe_dead_letters_corrupt_log_row_not_silently() { + let Some(schema) = postgres::PostgresTestSchema::create_from_env("bus_corrupt_l", SKIP).await + else { + return; + }; + let repo = schema.repository().await; + let pool = repo.pool().clone(); + let producer = PostgresBus::new(pool.clone()); + producer.ensure_tables().await.expect("ensure tables"); + + // Layout (by seq): poison, ok, poison. The trailing poison is the highest + // seq, so a consumer that *silently skips* corrupt entries (matching only by + // name) would stop its offset at the healthy `ok` entry and never reach the + // last seq — the offset would fall short of max_seq and this test would fail. + // Reaching max_seq proves the corrupt entries were settled through the policy + // (offset advanced past them), not skipped because their name no longer matched. + producer + .publish_message( + Message::new("order.initialized", MessageKind::Event, b"{}".to_vec()).with_id("poison"), + ) + .await + .expect("publish leading poison"); + corrupt_latest_log_name(&pool).await; + producer + .publish_message( + Message::new("order.initialized", MessageKind::Event, b"{}".to_vec()).with_id("ok"), + ) + .await + .expect("publish ok"); + producer + .publish_message( + Message::new("order.initialized", MessageKind::Event, b"{}".to_vec()) + .with_id("poison-tail"), + ) + .await + .expect("publish trailing poison"); + corrupt_latest_log_name(&pool).await; + + let rec = Arc::new(Mutex::new(Vec::new())); + PostgresBus::new(pool.clone()) + .group("projections") + .subscribe( + recording_for("order.initialized", MessageKind::Event, rec.clone()), + RunOptions::idempotent(), + ) + .await + .expect("subscribe drains past the corrupt entries"); + + // The healthy event between the poison entries was handled — the consumer did + // not get stuck on a corrupt row, and no corrupt row was dispatched as an + // empty-named message. + let handled = rec.lock().unwrap().clone(); + assert_eq!( + handled, + vec!["ok".to_string()], + "only the valid event handled" + ); + + // The offset advanced past every entry, including the trailing corrupt one + // (dead-letter advances the log offset). If the corrupt entries were skipped + // silently by name, the offset would stop at the `ok` entry, short of max_seq. + let offset: Option = + sqlx::query_scalar("SELECT last_seq FROM bus_offset WHERE consumer = 'projections'") + .fetch_optional(&pool) + .await + .expect("read offset"); + let max_seq: i64 = sqlx::query_scalar("SELECT max(seq) FROM bus_log") + .fetch_one(&pool) + .await + .expect("max seq"); + assert_eq!( + offset, + Some(max_seq), + "offset advanced past the trailing corrupt entry, not stuck or skipped-silently" + ); +}