fix: route corrupt PostgresBus rows through failure policy#80
Conversation
A `bus_queue`/`bus_log` row whose required columns (`name`, `kind`,
`payload`) failed to decode was silently lost. `message_from_row` used
`try_get(...).unwrap_or_default()`, so a corrupt row became a `Message`
with an empty `name`. The runner classifies an empty/unrouted name as
"no handler", which takes the ack-and-ignore path: `QueueReceived::ack`
deletes the row and `LogReceived::ack` advances the consumer offset past
it. The corrupt row vanished with no trace, bypassing the otherwise
careful never-silently-drop failure policy.
Silent-data-loss path:
recv -> message_from_row (unwrap_or_default -> name = "")
-> runner: router.handles(kind, "") == false
-> received.ack() (queue: DELETE row; log: advance offset)
-> message gone, no dead-letter, no log
Fix:
- `message_from_row` returns `Result<Message, TransportError>`, raising a
PERMANENT error when a required column is missing or fails to decode.
`message_id`/`content_type`/`metadata` keep tolerant defaults (optional
or schema-defaulted; they don't make a message unhandleable).
- The Postgres `recv` claims the row/offset before decoding, so the claim
must still be settled when decoding fails. `QueueReceived`/`LogReceived`
carry the decode error and surface it via a new
`ReceivedMessage::decode_error()` (defaults to `None`, so the other 8
adapters are unchanged).
- The runner checks `decode_error()` first and routes the claimed row
through the configured `FailurePolicy` — dead-letter by default — exactly
like a permanent dispatch failure. Queue corrupt rows are dead-lettered
(deleted, logged), log corrupt rows advance the offset past the poison
entry (don't get stuck), and the failure is visible instead of silent.
Tests:
- runner unit tests (no DB): a decode-error delivery with an unhandled
(empty) name is dead-lettered under the default policy (not ack-and-
ignored), parked under Park, and stops with a permanent error under Stop.
- postgres_transport integration tests (env-gated on DATABASE_URL): a
corrupt `bus_queue` row leaves the queue while the valid row beside it is
handled; a corrupt `bus_log` row advances the consumer offset past it
while the valid following event is handled.
Implements [[tasks/postgres-bus-corrupt-row-handling]]
|
Warning Review limit reached
More reviews will be available in 58 minutes and 1 second. Learn how PR review limits work. Your organization has reached its usage spending cap. Adjust your spending cap in the billing tab. ⌛ How to resolve this issue?After more reviews become available, a review can be triggered using the We recommend that you space out your commits to avoid hitting the rate limit. 🚦 How do rate limits work?CodeRabbit enforces hourly rate limits for each developer per organization. Our paid plans include higher PR review limits than trial, open-source, and free plans. In all cases, reviews become available again over time. During sustained high-volume PR review activity, CodeRabbit may temporarily slow when the next review becomes available. Please see our Fair Usage Limits Policy for further information. ℹ️ Review info⚙️ Run configurationConfiguration used: defaults Review profile: CHILL Plan: Pro Run ID: 📒 Files selected for processing (4)
✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
The prior commit routed undecodable rows through the failure policy, but that path was unreachable for the most common corruption: a NULL `name`. Both sources select work by name — the queue claim filters `WHERE name = ANY($2)` and the log read filters `WHERE name = ANY($1)`. A row whose `name` column is NULL matches neither, so it is never claimed (queue) or never read (log). The decode-error handling never fired because the corrupt row was never even fetched. The queue row therefore sat undelivered forever (the failing test: queue still had 1 row), and a corrupt log entry was silently skipped when the offset jumped to a later healthy entry — dropped with no trace, the exact ack-and-ignore behavior the hardening set out to prevent. The missing piece was not settlement identity (the claim already captures `seq` independently of payload decode); it was *visibility*: an un-routable row must still be selectable so the runner can settle it. Fix: both sources also select rows with a NULL `name` (un-routable poison). Such a row belongs to no consumer, so claiming/reading it to dead-letter it is correct — `FOR UPDATE SKIP LOCKED` keeps the queue claim safe under competing listeners, and each log group advances its own offset past it. The runner then surfaces the decode error and routes it through the failure policy (dead-letter by default: the queue row is deleted, the log offset advances past the entry). Also strengthen the log test: it now places a corrupt entry as the highest seq, so a consumer that silently skips by name would leave the offset short of max_seq. Reaching max_seq proves the corrupt entries were settled through the policy, not skipped. Verified against a real Postgres 18 (docker compose): the improved test fails without the LogSource fix and passes with it; full postgres_transport (9) and lib (253) suites green. Refines [[tasks/postgres-bus-corrupt-row-handling]]
Summary
PostgresBuscould silently lose a corrupt row.message_from_rowdecoded every column withtry_get(...).unwrap_or_default(), so a row whose required columns (name,kind,payload) failed to decode became aMessagewith an empty name. The runner treats an empty/unrouted name as "no handler" and takes the ack-and-ignore path — which for Postgres means the row is deleted (queue) or the consumer offset advances past it (log). The message vanished with no dead-letter and no log, bypassing the otherwise careful never-silently-drop failure policy.Failure path traced
Fix
message_from_rownow returnsResult<Message, TransportError>and raises a permanent error when a required column (name/kind/payload) is missing or fails to decode.message_id/content_type/metadatakeep tolerant defaults — they are optional or schema-defaulted and don't make a message unhandleable.recvclaims the row/offset before decoding, so a failed decode still leaves a claim that must be settled.QueueReceived/LogReceivedcarry the decode error and surface it through a newReceivedMessage::decode_error()that defaults toNone— the other 8 transport adapters are untouched (no parallel hierarchy, no wrapper type).decode_error()first and routes the claimed row through the configuredFailurePolicy— dead-letter by default — exactly like a permanent dispatch failure. Queue corrupt rows are dead-lettered (deleted, logged); log corrupt rows advance the offset past the poison entry (don't get stuck); the failure is now visible instead of silent. UnderParkit parks, underStopit surfaces a permanent error.The settlement primitives (
dead_letter-> delete / advance-offset) are unchanged; what changed is that a corrupt row now travels the failure-policy path (logged, configurable) instead of the ack-and-ignore path (silent).Testing
cargo fmt --allcargo build --features postgres— compiles without a DB.cargo test --features postgres --lib— 253 passed, including 3 new runner unit tests (no DB required):ParkStopcargo test --features postgres --test postgres_transport— compiles; DB-backed tests skip withoutDATABASE_URL(expected). Two new env-gated integration tests assert a corruptbus_queuerow leaves the queue (valid sibling still handled) and a corruptbus_logrow advances the offset past the poison entry (valid follower still handled).--all-features; did not start docker.🤖 Generated with Claude Code