Skip to content

fix: route corrupt PostgresBus rows through failure policy#80

Merged
patrickleet merged 2 commits into
mainfrom
hardening/postgres-bus-corrupt-row
Jun 11, 2026
Merged

fix: route corrupt PostgresBus rows through failure policy#80
patrickleet merged 2 commits into
mainfrom
hardening/postgres-bus-corrupt-row

Conversation

@patrickleet

Copy link
Copy Markdown
Collaborator

Summary

PostgresBus could silently lose a corrupt row. message_from_row decoded every column with try_get(...).unwrap_or_default(), so a row whose required columns (name, kind, payload) failed to decode became a Message with an empty name. The runner treats an empty/unrouted name as "no handler" and takes the ack-and-ignore path — which for Postgres means the row is deleted (queue) or the consumer offset advances past it (log). The message vanished with no dead-letter and no log, bypassing the otherwise careful never-silently-drop failure policy.

Failure path traced

QueueSource::recv / LogSource::recv
  -> message_from_row()  (unwrap_or_default -> name = "")
  -> runner: router.handles(kind, "") == false      (src/bus/runner.rs)
  -> received.ack().await                            (ack-and-ignore branch)
       QueueReceived::ack -> DELETE FROM bus_queue   (row gone)
       LogReceived::ack   -> advance bus_offset      (skipped forever)
  -> corrupt row gone, no dead-letter, no trace

Fix

  • message_from_row now returns Result<Message, TransportError> and raises a permanent error when a required column (name/kind/payload) is missing or fails to decode. message_id/content_type/metadata keep tolerant defaults — they are optional or schema-defaulted and don't make a message unhandleable.
  • The Postgres recv claims the row/offset before decoding, so a failed decode still leaves a claim that must be settled. QueueReceived/LogReceived carry the decode error and surface it through a new ReceivedMessage::decode_error() that defaults to None — the other 8 transport adapters are untouched (no parallel hierarchy, no wrapper type).
  • The runner checks decode_error() first and routes the claimed row through the configured FailurePolicydead-letter by default — exactly like a permanent dispatch failure. Queue corrupt rows are dead-lettered (deleted, logged); log corrupt rows advance the offset past the poison entry (don't get stuck); the failure is now visible instead of silent. Under Park it parks, under Stop it surfaces a permanent error.

The settlement primitives (dead_letter -> delete / advance-offset) are unchanged; what changed is that a corrupt row now travels the failure-policy path (logged, configurable) instead of the ack-and-ignore path (silent).

Testing

  • cargo fmt --all
  • cargo build --features postgres — compiles without a DB.
  • cargo test --features postgres --lib253 passed, including 3 new runner unit tests (no DB required):
    • corrupt row dead-letters under the default policy (not ack-and-ignored, never handled, never plain-acked)
    • corrupt row parks under Park
    • corrupt row stops with a permanent error under Stop
  • cargo test --features postgres --test postgres_transport — compiles; DB-backed tests skip without DATABASE_URL (expected). Two new env-gated integration tests assert a corrupt bus_queue row leaves the queue (valid sibling still handled) and a corrupt bus_log row advances the offset past the poison entry (valid follower still handled).
  • Did not run --all-features; did not start docker.

🤖 Generated with Claude Code

A `bus_queue`/`bus_log` row whose required columns (`name`, `kind`,
`payload`) failed to decode was silently lost. `message_from_row` used
`try_get(...).unwrap_or_default()`, so a corrupt row became a `Message`
with an empty `name`. The runner classifies an empty/unrouted name as
"no handler", which takes the ack-and-ignore path: `QueueReceived::ack`
deletes the row and `LogReceived::ack` advances the consumer offset past
it. The corrupt row vanished with no trace, bypassing the otherwise
careful never-silently-drop failure policy.

Silent-data-loss path:
  recv -> message_from_row (unwrap_or_default -> name = "")
       -> runner: router.handles(kind, "") == false
       -> received.ack()  (queue: DELETE row; log: advance offset)
       -> message gone, no dead-letter, no log

Fix:
- `message_from_row` returns `Result<Message, TransportError>`, raising a
  PERMANENT error when a required column is missing or fails to decode.
  `message_id`/`content_type`/`metadata` keep tolerant defaults (optional
  or schema-defaulted; they don't make a message unhandleable).
- The Postgres `recv` claims the row/offset before decoding, so the claim
  must still be settled when decoding fails. `QueueReceived`/`LogReceived`
  carry the decode error and surface it via a new
  `ReceivedMessage::decode_error()` (defaults to `None`, so the other 8
  adapters are unchanged).
- The runner checks `decode_error()` first and routes the claimed row
  through the configured `FailurePolicy` — dead-letter by default — exactly
  like a permanent dispatch failure. Queue corrupt rows are dead-lettered
  (deleted, logged), log corrupt rows advance the offset past the poison
  entry (don't get stuck), and the failure is visible instead of silent.

Tests:
- runner unit tests (no DB): a decode-error delivery with an unhandled
  (empty) name is dead-lettered under the default policy (not ack-and-
  ignored), parked under Park, and stops with a permanent error under Stop.
- postgres_transport integration tests (env-gated on DATABASE_URL): a
  corrupt `bus_queue` row leaves the queue while the valid row beside it is
  handled; a corrupt `bus_log` row advances the consumer offset past it
  while the valid following event is handled.

Implements [[tasks/postgres-bus-corrupt-row-handling]]
@coderabbitai

coderabbitai Bot commented Jun 11, 2026

Copy link
Copy Markdown

Warning

Review limit reached

@patrickleet, we couldn't start this review because you've reached your PR review rate limit.

More reviews will be available in 58 minutes and 1 second. Learn how PR review limits work.

Your organization has reached its usage spending cap. Adjust your spending cap in the billing tab.

⌛ How to resolve this issue?

After more reviews become available, a review can be triggered using the @coderabbitai review command as a PR comment. Alternatively, push new commits to this PR.

We recommend that you space out your commits to avoid hitting the rate limit.

🚦 How do rate limits work?

CodeRabbit enforces hourly rate limits for each developer per organization.

Our paid plans include higher PR review limits than trial, open-source, and free plans. In all cases, reviews become available again over time. During sustained high-volume PR review activity, CodeRabbit may temporarily slow when the next review becomes available.

Please see our Fair Usage Limits Policy for further information.

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 672d34b8-9e05-4c4a-a94c-59908c18968a

📥 Commits

Reviewing files that changed from the base of the PR and between da838d7 and a616fd4.

📒 Files selected for processing (4)
  • src/bus/postgres_bus.rs
  • src/bus/runner.rs
  • src/bus/source.rs
  • tests/postgres_transport/main.rs
✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch hardening/postgres-bus-corrupt-row

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

The prior commit routed undecodable rows through the failure policy, but
that path was unreachable for the most common corruption: a NULL `name`.

Both sources select work by name — the queue claim filters
`WHERE name = ANY($2)` and the log read filters `WHERE name = ANY($1)`.
A row whose `name` column is NULL matches neither, so it is never claimed
(queue) or never read (log). The decode-error handling never fired because
the corrupt row was never even fetched. The queue row therefore sat
undelivered forever (the failing test: queue still had 1 row), and a
corrupt log entry was silently skipped when the offset jumped to a later
healthy entry — dropped with no trace, the exact ack-and-ignore behavior
the hardening set out to prevent.

The missing piece was not settlement identity (the claim already captures
`seq` independently of payload decode); it was *visibility*: an un-routable
row must still be selectable so the runner can settle it.

Fix: both sources also select rows with a NULL `name` (un-routable
poison). Such a row belongs to no consumer, so claiming/reading it to
dead-letter it is correct — `FOR UPDATE SKIP LOCKED` keeps the queue claim
safe under competing listeners, and each log group advances its own offset
past it. The runner then surfaces the decode error and routes it through
the failure policy (dead-letter by default: the queue row is deleted, the
log offset advances past the entry).

Also strengthen the log test: it now places a corrupt entry as the highest
seq, so a consumer that silently skips by name would leave the offset short
of max_seq. Reaching max_seq proves the corrupt entries were settled
through the policy, not skipped. Verified against a real Postgres 18
(docker compose): the improved test fails without the LogSource fix and
passes with it; full postgres_transport (9) and lib (253) suites green.

Refines [[tasks/postgres-bus-corrupt-row-handling]]
@patrickleet patrickleet merged commit 60f2c4f into main Jun 11, 2026
7 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant