From bd658a9fb59bc026e7f34b9ab13d273b042491a5 Mon Sep 17 00:00:00 2001 From: Patrick Lee Scott Date: Wed, 10 Jun 2026 23:51:45 -0500 Subject: [PATCH 1/2] fix: route corrupt PostgresBus rows through failure policy MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit A `bus_queue`/`bus_log` row whose required columns (`name`, `kind`, `payload`) failed to decode was silently lost. `message_from_row` used `try_get(...).unwrap_or_default()`, so a corrupt row became a `Message` with an empty `name`. The runner classifies an empty/unrouted name as "no handler", which takes the ack-and-ignore path: `QueueReceived::ack` deletes the row and `LogReceived::ack` advances the consumer offset past it. The corrupt row vanished with no trace, bypassing the otherwise careful never-silently-drop failure policy. Silent-data-loss path: recv -> message_from_row (unwrap_or_default -> name = "") -> runner: router.handles(kind, "") == false -> received.ack() (queue: DELETE row; log: advance offset) -> message gone, no dead-letter, no log Fix: - `message_from_row` returns `Result`, raising a PERMANENT error when a required column is missing or fails to decode. `message_id`/`content_type`/`metadata` keep tolerant defaults (optional or schema-defaulted; they don't make a message unhandleable). - The Postgres `recv` claims the row/offset before decoding, so the claim must still be settled when decoding fails. `QueueReceived`/`LogReceived` carry the decode error and surface it via a new `ReceivedMessage::decode_error()` (defaults to `None`, so the other 8 adapters are unchanged). - The runner checks `decode_error()` first and routes the claimed row through the configured `FailurePolicy` — dead-letter by default — exactly like a permanent dispatch failure. Queue corrupt rows are dead-lettered (deleted, logged), log corrupt rows advance the offset past the poison entry (don't get stuck), and the failure is visible instead of silent. Tests: - runner unit tests (no DB): a decode-error delivery with an unhandled (empty) name is dead-lettered under the default policy (not ack-and- ignored), parked under Park, and stops with a permanent error under Stop. - postgres_transport integration tests (env-gated on DATABASE_URL): a corrupt `bus_queue` row leaves the queue while the valid row beside it is handled; a corrupt `bus_log` row advances the consumer offset past it while the valid following event is handled. Implements [[tasks/postgres-bus-corrupt-row-handling]] --- src/bus/postgres_bus.rs | 72 +++++++++++++-- src/bus/runner.rs | 99 ++++++++++++++++++++ src/bus/source.rs | 14 +++ tests/postgres_transport/main.rs | 151 +++++++++++++++++++++++++++++++ 4 files changed, 328 insertions(+), 8 deletions(-) diff --git a/src/bus/postgres_bus.rs b/src/bus/postgres_bus.rs index 7cfff55..b2a3294 100644 --- a/src/bus/postgres_bus.rs +++ b/src/bus/postgres_bus.rs @@ -93,22 +93,47 @@ fn kind_from_str(value: &str) -> MessageKind { } } -fn message_from_row(row: &sqlx::postgres::PgRow) -> Message { +/// Reconstruct a [`Message`] from a claimed `bus_queue`/`bus_log` row. +/// +/// A row that fails to decode a **required** column (`name`, `kind`, `payload`) +/// is corrupt and can never be handled: returning a placeholder `Message` with an +/// empty name would route it to the runner's ack-and-ignore path, silently +/// deleting the row (queue) or advancing the offset past it (log) with no trace. +/// Instead this returns a **permanent** [`TransportError`] so the caller surfaces +/// it as a decode failure and the runner routes the claimed row through the +/// failure policy (dead-letter by default) like any other permanent failure. +/// +/// `message_id`, `content_type`, and `metadata` keep tolerant defaults: they are +/// optional or have schema defaults, so a missing/garbled value there does not +/// make the message unhandleable. +fn message_from_row(row: &sqlx::postgres::PgRow) -> Result { + fn decode_err(column: &str, err: sqlx::Error) -> TransportError { + TransportError::permanent(format!( + "postgres bus corrupt row: required column '{column}' failed to decode: {err}" + )) + } + + let name: String = row.try_get("name").map_err(|err| decode_err("name", err))?; + let kind: String = row.try_get("kind").map_err(|err| decode_err("kind", err))?; + let payload: Vec = row + .try_get("payload") + .map_err(|err| decode_err("payload", err))?; + let metadata_json: String = row.try_get("metadata").unwrap_or_else(|_| "[]".to_string()); let metadata = serde_json::from_str::>(&metadata_json).unwrap_or_default(); - Message { + Ok(Message { id: row .try_get::, _>("message_id") .unwrap_or(None), - name: row.try_get("name").unwrap_or_default(), - kind: kind_from_str(&row.try_get::("kind").unwrap_or_default()), - payload: row.try_get("payload").unwrap_or_default(), + name, + kind: kind_from_str(&kind), + payload, content_type: row .try_get("content_type") .unwrap_or_else(|_| "application/json".to_string()), metadata, - } + }) } /// Postgres [`Bus`] + [`BusConsumer`]. Cheap to clone (the pool is an `Arc`). @@ -299,21 +324,41 @@ impl AsyncMessageSource for QueueSource { Ok(row.map(|row| { let seq: i64 = row.try_get("seq").unwrap_or_default(); + let (message, decode_error) = decode_or_placeholder(&row); QueueReceived { pool: self.pool.clone(), seq, - message: message_from_row(&row), + message, + decode_error, } })) } } +/// Split a decode result into the handle's `(message, decode_error)` fields. +/// +/// On failure the handle still needs *some* `Message` to return from +/// [`ReceivedMessage::message`]; this placeholder is never dispatched because the +/// runner sees `decode_error()` first and routes the claim through the failure +/// policy. The placeholder name is deliberately empty — the decode error carries +/// the diagnostic. +fn decode_or_placeholder(row: &sqlx::postgres::PgRow) -> (Message, Option) { + match message_from_row(row) { + Ok(message) => (message, None), + Err(error) => ( + Message::new("", MessageKind::Event, Vec::new()), + Some(error), + ), + } +} + /// A claimed `bus_queue` row: `ack` deletes it (done); `nack` makes it available /// again (redelivery); `dead_letter`/`park` delete it (stop redelivery). pub struct QueueReceived { pool: PgPool, seq: i64, message: Message, + decode_error: Option, } impl QueueReceived { @@ -332,6 +377,10 @@ impl ReceivedMessage for QueueReceived { &self.message } + fn decode_error(&self) -> Option<&TransportError> { + self.decode_error.as_ref() + } + async fn ack(self) -> Result<(), TransportError> { self.delete().await } @@ -380,11 +429,13 @@ impl AsyncMessageSource for LogSource { Ok(row.map(|row| { let seq: i64 = row.try_get("seq").unwrap_or_default(); + let (message, decode_error) = decode_or_placeholder(&row); LogReceived { pool: self.pool.clone(), consumer: self.consumer.clone(), seq, - message: message_from_row(&row), + message, + decode_error, } })) } @@ -398,6 +449,7 @@ pub struct LogReceived { consumer: String, seq: i64, message: Message, + decode_error: Option, } impl LogReceived { @@ -421,6 +473,10 @@ impl ReceivedMessage for LogReceived { &self.message } + fn decode_error(&self) -> Option<&TransportError> { + self.decode_error.as_ref() + } + async fn ack(self) -> Result<(), TransportError> { self.advance_offset().await } diff --git a/src/bus/runner.rs b/src/bus/runner.rs index f8e4a25..e1ddb3f 100644 --- a/src/bus/runner.rs +++ b/src/bus/runner.rs @@ -54,6 +54,33 @@ where I: Send, { while let Some(received) = source.recv().await? { + // A delivery the transport could not decode is a permanent failure: it + // carries no valid message to dispatch, and it must NOT be treated as an + // empty message (which would route to ack-and-ignore below and silently + // drop a corrupt row). Route it through the failure policy directly, the + // same as a permanent dispatch failure, so it is dead-lettered/parked. + if let Some(error) = received.decode_error() { + match options.failure_policy.resolve(error) { + FailureAction::Nack => { + let reason = error.to_string(); + received.nack(&reason).await? + } + FailureAction::DeadLetter => { + let reason = error.to_string(); + received.dead_letter(&reason).await? + } + FailureAction::Park => { + let reason = error.to_string(); + received.park(&reason).await? + } + FailureAction::LogAndAck => { + eprintln!("[bus::runner] dropping undecodable message after permanent failure: {error}"); + received.ack().await? + } + FailureAction::Stop => return Err(TransportError::permanent(error.to_string())), + } + continue; + } // No handler for this message: intentionally ignore (ack) rather than // dead-letter, so unrelated fan-out events don't pile into the DLQ. if !router.handles(received.message().kind, received.message().name()) { @@ -161,6 +188,7 @@ mod tests { message: Message, recorder: Arc, settle_ok: bool, + decode_error: Option, } impl FakeReceived { @@ -178,6 +206,9 @@ mod tests { fn message(&self) -> &Message { &self.message } + fn decode_error(&self) -> Option<&TransportError> { + self.decode_error.as_ref() + } async fn ack(self) -> Result<(), TransportError> { self.settle(Event::Ack) } @@ -197,6 +228,9 @@ mod tests { recorder: Arc, settle_ok: bool, recv_error: bool, + // When set, every received message reports this as a decode failure, + // modeling a transport that claims a row/offset before decoding it. + decode_error: bool, } impl AsyncMessageSource for FakeSource { @@ -205,10 +239,13 @@ mod tests { if self.recv_error { return Err(TransportError::retryable("recv failed")); } + let decode_error = self.decode_error; Ok(self.queue.pop_front().map(|message| FakeReceived { message, recorder: self.recorder.clone(), settle_ok: self.settle_ok, + decode_error: decode_error + .then(|| TransportError::permanent("corrupt row: name failed to decode")), })) } } @@ -277,6 +314,7 @@ mod tests { recorder: recorder.clone(), settle_ok, recv_error, + decode_error: false, }; let outcome = block_on(run_source(svc, source, options)); RunResult { @@ -484,6 +522,66 @@ mod tests { ); } + fn run_decode_error(messages: Vec, options: RunOptions) -> RunResult { + let recorder = Recorder::new(); + let svc = router(&recorder); + let source = FakeSource { + queue: messages.into_iter().collect(), + recorder: recorder.clone(), + settle_ok: true, + recv_error: false, + decode_error: true, + }; + let outcome = block_on(run_source(svc, source, options)); + RunResult { + outcome, + events: recorder.events(), + } + } + + #[test] + fn corrupt_row_dead_letters_under_default_policy_not_acked_and_ignored() { + // The corrupt delivery has an unhandled (empty) name, which would + // otherwise fall into the ack-and-ignore path and silently drop it. The + // decode error must instead route it through the failure policy. + let result = run_decode_error(vec![event_message("", None)], RunOptions::idempotent()); + assert!(result.outcome.is_ok()); + assert_eq!(result.events.len(), 1); + match &result.events[0] { + Event::DeadLetter(reason) => assert!(reason.contains("corrupt row")), + other => panic!("expected dead-letter, got {other:?}"), + } + // It was never handled and never plain-acked. + assert!(!result.events.iter().any(|e| matches!(e, Event::Handled(_)))); + assert!(!result.events.contains(&Event::Ack)); + } + + #[test] + fn corrupt_row_parks_under_park_policy() { + let result = run_decode_error( + vec![event_message("", None)], + RunOptions::idempotent().with_failure_policy(FailurePolicy::Park), + ); + assert!(result.outcome.is_ok()); + assert!(matches!(result.events.first(), Some(Event::Park(_)))); + } + + #[test] + fn corrupt_row_stops_under_stop_policy_with_permanent_error() { + let result = run_decode_error( + vec![event_message("", None)], + RunOptions::idempotent().with_failure_policy(FailurePolicy::Stop), + ); + let err = result + .outcome + .expect_err("stop policy surfaces the decode error"); + assert!(err.is_permanent()); + assert!( + result.events.is_empty(), + "stop does not settle the corrupt row" + ); + } + #[test] fn run_source_future_is_send() { // Guards the documented multi-threaded-executor contract for the common @@ -496,6 +594,7 @@ mod tests { recorder, settle_ok: true, recv_error: false, + decode_error: false, }; let future = run_source(svc, source, RunOptions::idempotent()); assert_send(&future); diff --git a/src/bus/source.rs b/src/bus/source.rs index d1f6492..c858686 100644 --- a/src/bus/source.rs +++ b/src/bus/source.rs @@ -46,6 +46,20 @@ pub trait ReceivedMessage: Send { /// The canonical message to dispatch. fn message(&self) -> &Message; + /// A permanent decode failure for this delivery, if the transport could not + /// reconstruct the message from its stored representation. + /// + /// Defaults to `None`: most adapters either decode successfully or fail the + /// whole `recv`. An adapter that can claim a row/offset *before* decoding it + /// (so the claim must be settled even when decoding fails) returns the + /// classified error here. The runner treats `Some(err)` as a permanent + /// failure routed through the [`FailurePolicy`](super::FailurePolicy) — the + /// same path as a permanent dispatch failure — so a corrupt row is + /// dead-lettered/parked rather than ack-and-ignored as an empty message. + fn decode_error(&self) -> Option<&TransportError> { + None + } + /// Acknowledge successful handling. The transport removes the message. /// /// The runner calls this only after consumer execution has succeeded (and, diff --git a/tests/postgres_transport/main.rs b/tests/postgres_transport/main.rs index da5c5b2..51461ed 100644 --- a/tests/postgres_transport/main.rs +++ b/tests/postgres_transport/main.rs @@ -355,3 +355,154 @@ async fn bus_subscribe_uses_named_service_as_consumer_group() { vec!["e0".to_string(), "e1".to_string(), "e2".to_string()] ); } + +// ---- corrupt-row handling: a row that fails to decode must NOT vanish ---- + +/// Make the most recently enqueued `bus_queue` row corrupt by nulling its +/// required `name` column, so `try_get::` fails with `UnexpectedNull` +/// when the source claims it — the exact decode failure a real corruption (a +/// migration mishap, a manual edit, a driver/type mismatch) produces. +async fn corrupt_latest_queue_name(pool: &sqlx::PgPool) { + sqlx::query("ALTER TABLE bus_queue ALTER COLUMN name DROP NOT NULL") + .execute(pool) + .await + .expect("drop not null"); + sqlx::query("UPDATE bus_queue SET name = NULL WHERE seq = (SELECT max(seq) FROM bus_queue)") + .execute(pool) + .await + .expect("null out name"); +} + +async fn corrupt_latest_log_name(pool: &sqlx::PgPool) { + sqlx::query("ALTER TABLE bus_log ALTER COLUMN name DROP NOT NULL") + .execute(pool) + .await + .expect("drop not null"); + sqlx::query("UPDATE bus_log SET name = NULL WHERE seq = (SELECT max(seq) FROM bus_log)") + .execute(pool) + .await + .expect("null out name"); +} + +/// A corrupt `bus_queue` row is routed through the failure policy (dead-letter by +/// default → the row is deleted) rather than being decoded into an empty-named +/// message and silently ack-and-ignored. The valid row beside it is still +/// handled, and the run drains to completion. +#[tokio::test] +async fn bus_listen_dead_letters_corrupt_queue_row_not_silently() { + let Some(schema) = postgres::PostgresTestSchema::create_from_env("bus_corrupt_q", SKIP).await + else { + return; + }; + let repo = schema.repository().await; + let pool = repo.pool().clone(); + let bus = PostgresBus::new(pool.clone()).group("orders"); + bus.ensure_tables().await.expect("ensure tables"); + + // A poison row (name will be nulled) and a healthy row. + bus.send_message( + Message::new("order.initialize", MessageKind::Command, b"{}".to_vec()).with_id("poison"), + ) + .await + .expect("send poison"); + corrupt_latest_queue_name(&pool).await; + bus.send_message( + Message::new("order.initialize", MessageKind::Command, b"{}".to_vec()).with_id("ok"), + ) + .await + .expect("send ok"); + + let rec = Arc::new(Mutex::new(Vec::new())); + bus.listen( + recording_for("order.initialize", MessageKind::Command, rec.clone()), + RunOptions::idempotent(), + ) + .await + .expect("listen drains without surfacing the corrupt row as a fatal error"); + + // The healthy command was handled; the corrupt row was never dispatched as + // an empty-named message. + let handled = rec.lock().unwrap().clone(); + assert_eq!( + handled, + vec!["ok".to_string()], + "only the valid row handled" + ); + + // The corrupt row did not vanish into ack-and-ignore *and* did not get stuck + // redelivering forever: under the default dead-letter policy it leaves the + // queue. The queue is fully drained. + let remaining: i64 = sqlx::query_scalar("SELECT count(*) FROM bus_queue") + .fetch_one(&pool) + .await + .expect("count queue"); + assert_eq!( + remaining, 0, + "corrupt row routed through policy, not redelivered forever" + ); +} + +/// A corrupt `bus_log` row is routed through the failure policy (dead-letter by +/// default → the consumer offset advances past it) rather than silently +/// ack-and-ignored. The consumer makes progress to the healthy entry after it. +#[tokio::test] +async fn bus_subscribe_dead_letters_corrupt_log_row_not_silently() { + let Some(schema) = postgres::PostgresTestSchema::create_from_env("bus_corrupt_l", SKIP).await + else { + return; + }; + let repo = schema.repository().await; + let pool = repo.pool().clone(); + let producer = PostgresBus::new(pool.clone()); + producer.ensure_tables().await.expect("ensure tables"); + + producer + .publish_message( + Message::new("order.initialized", MessageKind::Event, b"{}".to_vec()).with_id("poison"), + ) + .await + .expect("publish poison"); + corrupt_latest_log_name(&pool).await; + producer + .publish_message( + Message::new("order.initialized", MessageKind::Event, b"{}".to_vec()).with_id("ok"), + ) + .await + .expect("publish ok"); + + let rec = Arc::new(Mutex::new(Vec::new())); + PostgresBus::new(pool.clone()) + .group("projections") + .subscribe( + recording_for("order.initialized", MessageKind::Event, rec.clone()), + RunOptions::idempotent(), + ) + .await + .expect("subscribe drains past the corrupt entry"); + + // The healthy event after the poison entry was handled — the consumer did not + // get stuck on the corrupt row, and the corrupt row was not dispatched as an + // empty-named message. + let handled = rec.lock().unwrap().clone(); + assert_eq!( + handled, + vec!["ok".to_string()], + "only the valid event handled" + ); + + // The offset advanced past both entries (dead-letter advances the log offset). + let offset: Option = + sqlx::query_scalar("SELECT last_seq FROM bus_offset WHERE consumer = 'projections'") + .fetch_optional(&pool) + .await + .expect("read offset"); + let max_seq: i64 = sqlx::query_scalar("SELECT max(seq) FROM bus_log") + .fetch_one(&pool) + .await + .expect("max seq"); + assert_eq!( + offset, + Some(max_seq), + "offset advanced past the corrupt entry, not stuck or skipped-silently" + ); +} From a616fd440dc527769274a1422df72a2db20613e6 Mon Sep 17 00:00:00 2001 From: Patrick Lee Scott Date: Thu, 11 Jun 2026 01:22:45 -0500 Subject: [PATCH 2/2] fix: claim corrupt name-NULL postgres bus rows so they settle MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The prior commit routed undecodable rows through the failure policy, but that path was unreachable for the most common corruption: a NULL `name`. Both sources select work by name — the queue claim filters `WHERE name = ANY($2)` and the log read filters `WHERE name = ANY($1)`. A row whose `name` column is NULL matches neither, so it is never claimed (queue) or never read (log). The decode-error handling never fired because the corrupt row was never even fetched. The queue row therefore sat undelivered forever (the failing test: queue still had 1 row), and a corrupt log entry was silently skipped when the offset jumped to a later healthy entry — dropped with no trace, the exact ack-and-ignore behavior the hardening set out to prevent. The missing piece was not settlement identity (the claim already captures `seq` independently of payload decode); it was *visibility*: an un-routable row must still be selectable so the runner can settle it. Fix: both sources also select rows with a NULL `name` (un-routable poison). Such a row belongs to no consumer, so claiming/reading it to dead-letter it is correct — `FOR UPDATE SKIP LOCKED` keeps the queue claim safe under competing listeners, and each log group advances its own offset past it. The runner then surfaces the decode error and routes it through the failure policy (dead-letter by default: the queue row is deleted, the log offset advances past the entry). Also strengthen the log test: it now places a corrupt entry as the highest seq, so a consumer that silently skips by name would leave the offset short of max_seq. Reaching max_seq proves the corrupt entries were settled through the policy, not skipped. Verified against a real Postgres 18 (docker compose): the improved test fails without the LogSource fix and passes with it; full postgres_transport (9) and lib (253) suites green. Refines [[tasks/postgres-bus-corrupt-row-handling]] --- src/bus/postgres_bus.rs | 19 +++++++++++++++++-- tests/postgres_transport/main.rs | 28 ++++++++++++++++++++++------ 2 files changed, 39 insertions(+), 8 deletions(-) diff --git a/src/bus/postgres_bus.rs b/src/bus/postgres_bus.rs index b2a3294..1c6ccc2 100644 --- a/src/bus/postgres_bus.rs +++ b/src/bus/postgres_bus.rs @@ -305,12 +305,20 @@ impl AsyncMessageSource for QueueSource { type Received = QueueReceived; async fn recv(&mut self) -> Result, TransportError> { + // Claim the next row whose `name` matches a subscribed command, OR whose + // `name` is NULL. A NULL name is un-routable corruption: it belongs to no + // consumer, so `name = ANY($2)` would never match it and it would sit in + // the queue forever, never claimed, never settled. Claiming it here lets + // the runner route it through the failure policy (dead-letter by default, + // which deletes the row) instead of leaving a poison row that blocks the + // queue from draining. `FOR UPDATE SKIP LOCKED` keeps the claim safe under + // competing listeners — only one settles the orphaned row. let row = sqlx::query( "UPDATE bus_queue SET locked_until = now() + ($1 * interval '1 second'), \ attempts = attempts + 1 \ WHERE seq = ( \ SELECT seq FROM bus_queue \ - WHERE name = ANY($2) AND available_at <= now() \ + WHERE (name = ANY($2) OR name IS NULL) AND available_at <= now() \ AND (locked_until IS NULL OR locked_until < now()) \ ORDER BY seq FOR UPDATE SKIP LOCKED LIMIT 1 \ ) \ @@ -415,9 +423,16 @@ impl AsyncMessageSource for LogSource { type Received = LogReceived; async fn recv(&mut self) -> Result, TransportError> { + // Read the next entry past this consumer's offset whose `name` matches a + // subscribed event, OR whose `name` is NULL. A NULL name is un-routable + // corruption: `name = ANY($1)` would skip past it silently when the offset + // jumps to a later healthy entry, dropping the poison record with no trace. + // Surfacing it lets the runner route it through the failure policy + // (dead-letter by default, which advances the offset past it) so the + // corrupt entry is settled, not silently skipped. let row = sqlx::query( "SELECT seq, name, message_id, kind, payload, content_type, metadata FROM bus_log \ - WHERE name = ANY($1) \ + WHERE (name = ANY($1) OR name IS NULL) \ AND seq > COALESCE((SELECT last_seq FROM bus_offset WHERE consumer = $2), 0) \ ORDER BY seq LIMIT 1", ) diff --git a/tests/postgres_transport/main.rs b/tests/postgres_transport/main.rs index 51461ed..16ccefb 100644 --- a/tests/postgres_transport/main.rs +++ b/tests/postgres_transport/main.rs @@ -456,12 +456,18 @@ async fn bus_subscribe_dead_letters_corrupt_log_row_not_silently() { let producer = PostgresBus::new(pool.clone()); producer.ensure_tables().await.expect("ensure tables"); + // Layout (by seq): poison, ok, poison. The trailing poison is the highest + // seq, so a consumer that *silently skips* corrupt entries (matching only by + // name) would stop its offset at the healthy `ok` entry and never reach the + // last seq — the offset would fall short of max_seq and this test would fail. + // Reaching max_seq proves the corrupt entries were settled through the policy + // (offset advanced past them), not skipped because their name no longer matched. producer .publish_message( Message::new("order.initialized", MessageKind::Event, b"{}".to_vec()).with_id("poison"), ) .await - .expect("publish poison"); + .expect("publish leading poison"); corrupt_latest_log_name(&pool).await; producer .publish_message( @@ -469,6 +475,14 @@ async fn bus_subscribe_dead_letters_corrupt_log_row_not_silently() { ) .await .expect("publish ok"); + producer + .publish_message( + Message::new("order.initialized", MessageKind::Event, b"{}".to_vec()) + .with_id("poison-tail"), + ) + .await + .expect("publish trailing poison"); + corrupt_latest_log_name(&pool).await; let rec = Arc::new(Mutex::new(Vec::new())); PostgresBus::new(pool.clone()) @@ -478,10 +492,10 @@ async fn bus_subscribe_dead_letters_corrupt_log_row_not_silently() { RunOptions::idempotent(), ) .await - .expect("subscribe drains past the corrupt entry"); + .expect("subscribe drains past the corrupt entries"); - // The healthy event after the poison entry was handled — the consumer did not - // get stuck on the corrupt row, and the corrupt row was not dispatched as an + // The healthy event between the poison entries was handled — the consumer did + // not get stuck on a corrupt row, and no corrupt row was dispatched as an // empty-named message. let handled = rec.lock().unwrap().clone(); assert_eq!( @@ -490,7 +504,9 @@ async fn bus_subscribe_dead_letters_corrupt_log_row_not_silently() { "only the valid event handled" ); - // The offset advanced past both entries (dead-letter advances the log offset). + // The offset advanced past every entry, including the trailing corrupt one + // (dead-letter advances the log offset). If the corrupt entries were skipped + // silently by name, the offset would stop at the `ok` entry, short of max_seq. let offset: Option = sqlx::query_scalar("SELECT last_seq FROM bus_offset WHERE consumer = 'projections'") .fetch_optional(&pool) @@ -503,6 +519,6 @@ async fn bus_subscribe_dead_letters_corrupt_log_row_not_silently() { assert_eq!( offset, Some(max_seq), - "offset advanced past the corrupt entry, not stuck or skipped-silently" + "offset advanced past the trailing corrupt entry, not stuck or skipped-silently" ); }