Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 81 additions & 10 deletions src/bus/postgres_bus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,22 +93,47 @@ fn kind_from_str(value: &str) -> MessageKind {
}
}

fn message_from_row(row: &sqlx::postgres::PgRow) -> Message {
/// Reconstruct a [`Message`] from a claimed `bus_queue`/`bus_log` row.
///
/// A row that fails to decode a **required** column (`name`, `kind`, `payload`)
/// is corrupt and can never be handled: returning a placeholder `Message` with an
/// empty name would route it to the runner's ack-and-ignore path, silently
/// deleting the row (queue) or advancing the offset past it (log) with no trace.
/// Instead this returns a **permanent** [`TransportError`] so the caller surfaces
/// it as a decode failure and the runner routes the claimed row through the
/// failure policy (dead-letter by default) like any other permanent failure.
///
/// `message_id`, `content_type`, and `metadata` keep tolerant defaults: they are
/// optional or have schema defaults, so a missing/garbled value there does not
/// make the message unhandleable.
fn message_from_row(row: &sqlx::postgres::PgRow) -> Result<Message, TransportError> {
fn decode_err(column: &str, err: sqlx::Error) -> TransportError {
TransportError::permanent(format!(
"postgres bus corrupt row: required column '{column}' failed to decode: {err}"
))
}

let name: String = row.try_get("name").map_err(|err| decode_err("name", err))?;
let kind: String = row.try_get("kind").map_err(|err| decode_err("kind", err))?;
let payload: Vec<u8> = row
.try_get("payload")
.map_err(|err| decode_err("payload", err))?;

let metadata_json: String = row.try_get("metadata").unwrap_or_else(|_| "[]".to_string());
let metadata =
serde_json::from_str::<Vec<(String, String)>>(&metadata_json).unwrap_or_default();
Message {
Ok(Message {
id: row
.try_get::<Option<String>, _>("message_id")
.unwrap_or(None),
name: row.try_get("name").unwrap_or_default(),
kind: kind_from_str(&row.try_get::<String, _>("kind").unwrap_or_default()),
payload: row.try_get("payload").unwrap_or_default(),
name,
kind: kind_from_str(&kind),
payload,
content_type: row
.try_get("content_type")
.unwrap_or_else(|_| "application/json".to_string()),
metadata,
}
})
}

/// Postgres [`Bus`] + [`BusConsumer`]. Cheap to clone (the pool is an `Arc`).
Expand Down Expand Up @@ -280,12 +305,20 @@ impl AsyncMessageSource for QueueSource {
type Received = QueueReceived;

async fn recv(&mut self) -> Result<Option<Self::Received>, TransportError> {
// Claim the next row whose `name` matches a subscribed command, OR whose
// `name` is NULL. A NULL name is un-routable corruption: it belongs to no
// consumer, so `name = ANY($2)` would never match it and it would sit in
// the queue forever, never claimed, never settled. Claiming it here lets
// the runner route it through the failure policy (dead-letter by default,
// which deletes the row) instead of leaving a poison row that blocks the
// queue from draining. `FOR UPDATE SKIP LOCKED` keeps the claim safe under
// competing listeners — only one settles the orphaned row.
let row = sqlx::query(
"UPDATE bus_queue SET locked_until = now() + ($1 * interval '1 second'), \
attempts = attempts + 1 \
WHERE seq = ( \
SELECT seq FROM bus_queue \
WHERE name = ANY($2) AND available_at <= now() \
WHERE (name = ANY($2) OR name IS NULL) AND available_at <= now() \
AND (locked_until IS NULL OR locked_until < now()) \
ORDER BY seq FOR UPDATE SKIP LOCKED LIMIT 1 \
) \
Expand All @@ -299,21 +332,41 @@ impl AsyncMessageSource for QueueSource {

Ok(row.map(|row| {
let seq: i64 = row.try_get("seq").unwrap_or_default();
let (message, decode_error) = decode_or_placeholder(&row);
QueueReceived {
pool: self.pool.clone(),
seq,
message: message_from_row(&row),
message,
decode_error,
}
}))
}
}

/// Split a decode result into the handle's `(message, decode_error)` fields.
///
/// On failure the handle still needs *some* `Message` to return from
/// [`ReceivedMessage::message`]; this placeholder is never dispatched because the
/// runner sees `decode_error()` first and routes the claim through the failure
/// policy. The placeholder name is deliberately empty — the decode error carries
/// the diagnostic.
fn decode_or_placeholder(row: &sqlx::postgres::PgRow) -> (Message, Option<TransportError>) {
match message_from_row(row) {
Ok(message) => (message, None),
Err(error) => (
Message::new("", MessageKind::Event, Vec::new()),
Some(error),
),
}
}

/// A claimed `bus_queue` row: `ack` deletes it (done); `nack` makes it available
/// again (redelivery); `dead_letter`/`park` delete it (stop redelivery).
pub struct QueueReceived {
pool: PgPool,
seq: i64,
message: Message,
decode_error: Option<TransportError>,
}

impl QueueReceived {
Expand All @@ -332,6 +385,10 @@ impl ReceivedMessage for QueueReceived {
&self.message
}

fn decode_error(&self) -> Option<&TransportError> {
self.decode_error.as_ref()
}

async fn ack(self) -> Result<(), TransportError> {
self.delete().await
}
Expand Down Expand Up @@ -366,9 +423,16 @@ impl AsyncMessageSource for LogSource {
type Received = LogReceived;

async fn recv(&mut self) -> Result<Option<Self::Received>, TransportError> {
// Read the next entry past this consumer's offset whose `name` matches a
// subscribed event, OR whose `name` is NULL. A NULL name is un-routable
// corruption: `name = ANY($1)` would skip past it silently when the offset
// jumps to a later healthy entry, dropping the poison record with no trace.
// Surfacing it lets the runner route it through the failure policy
// (dead-letter by default, which advances the offset past it) so the
// corrupt entry is settled, not silently skipped.
let row = sqlx::query(
"SELECT seq, name, message_id, kind, payload, content_type, metadata FROM bus_log \
WHERE name = ANY($1) \
WHERE (name = ANY($1) OR name IS NULL) \
AND seq > COALESCE((SELECT last_seq FROM bus_offset WHERE consumer = $2), 0) \
ORDER BY seq LIMIT 1",
)
Expand All @@ -380,11 +444,13 @@ impl AsyncMessageSource for LogSource {

Ok(row.map(|row| {
let seq: i64 = row.try_get("seq").unwrap_or_default();
let (message, decode_error) = decode_or_placeholder(&row);
LogReceived {
pool: self.pool.clone(),
consumer: self.consumer.clone(),
seq,
message: message_from_row(&row),
message,
decode_error,
}
}))
}
Expand All @@ -398,6 +464,7 @@ pub struct LogReceived {
consumer: String,
seq: i64,
message: Message,
decode_error: Option<TransportError>,
}

impl LogReceived {
Expand All @@ -421,6 +488,10 @@ impl ReceivedMessage for LogReceived {
&self.message
}

fn decode_error(&self) -> Option<&TransportError> {
self.decode_error.as_ref()
}

async fn ack(self) -> Result<(), TransportError> {
self.advance_offset().await
}
Expand Down
99 changes: 99 additions & 0 deletions src/bus/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,33 @@ where
I: Send,
{
while let Some(received) = source.recv().await? {
// A delivery the transport could not decode is a permanent failure: it
// carries no valid message to dispatch, and it must NOT be treated as an
// empty message (which would route to ack-and-ignore below and silently
// drop a corrupt row). Route it through the failure policy directly, the
// same as a permanent dispatch failure, so it is dead-lettered/parked.
if let Some(error) = received.decode_error() {
match options.failure_policy.resolve(error) {
FailureAction::Nack => {
let reason = error.to_string();
received.nack(&reason).await?
}
FailureAction::DeadLetter => {
let reason = error.to_string();
received.dead_letter(&reason).await?
}
FailureAction::Park => {
let reason = error.to_string();
received.park(&reason).await?
}
FailureAction::LogAndAck => {
eprintln!("[bus::runner] dropping undecodable message after permanent failure: {error}");
received.ack().await?
}
FailureAction::Stop => return Err(TransportError::permanent(error.to_string())),
}
continue;
}
// No handler for this message: intentionally ignore (ack) rather than
// dead-letter, so unrelated fan-out events don't pile into the DLQ.
if !router.handles(received.message().kind, received.message().name()) {
Expand Down Expand Up @@ -161,6 +188,7 @@ mod tests {
message: Message,
recorder: Arc<Recorder>,
settle_ok: bool,
decode_error: Option<TransportError>,
}

impl FakeReceived {
Expand All @@ -178,6 +206,9 @@ mod tests {
fn message(&self) -> &Message {
&self.message
}
fn decode_error(&self) -> Option<&TransportError> {
self.decode_error.as_ref()
}
async fn ack(self) -> Result<(), TransportError> {
self.settle(Event::Ack)
}
Expand All @@ -197,6 +228,9 @@ mod tests {
recorder: Arc<Recorder>,
settle_ok: bool,
recv_error: bool,
// When set, every received message reports this as a decode failure,
// modeling a transport that claims a row/offset before decoding it.
decode_error: bool,
}

impl AsyncMessageSource for FakeSource {
Expand All @@ -205,10 +239,13 @@ mod tests {
if self.recv_error {
return Err(TransportError::retryable("recv failed"));
}
let decode_error = self.decode_error;
Ok(self.queue.pop_front().map(|message| FakeReceived {
message,
recorder: self.recorder.clone(),
settle_ok: self.settle_ok,
decode_error: decode_error
.then(|| TransportError::permanent("corrupt row: name failed to decode")),
}))
}
}
Expand Down Expand Up @@ -277,6 +314,7 @@ mod tests {
recorder: recorder.clone(),
settle_ok,
recv_error,
decode_error: false,
};
let outcome = block_on(run_source(svc, source, options));
RunResult {
Expand Down Expand Up @@ -484,6 +522,66 @@ mod tests {
);
}

fn run_decode_error<I: Send>(messages: Vec<Message>, options: RunOptions<I>) -> RunResult {
let recorder = Recorder::new();
let svc = router(&recorder);
let source = FakeSource {
queue: messages.into_iter().collect(),
recorder: recorder.clone(),
settle_ok: true,
recv_error: false,
decode_error: true,
};
let outcome = block_on(run_source(svc, source, options));
RunResult {
outcome,
events: recorder.events(),
}
}

#[test]
fn corrupt_row_dead_letters_under_default_policy_not_acked_and_ignored() {
// The corrupt delivery has an unhandled (empty) name, which would
// otherwise fall into the ack-and-ignore path and silently drop it. The
// decode error must instead route it through the failure policy.
let result = run_decode_error(vec![event_message("", None)], RunOptions::idempotent());
assert!(result.outcome.is_ok());
assert_eq!(result.events.len(), 1);
match &result.events[0] {
Event::DeadLetter(reason) => assert!(reason.contains("corrupt row")),
other => panic!("expected dead-letter, got {other:?}"),
}
// It was never handled and never plain-acked.
assert!(!result.events.iter().any(|e| matches!(e, Event::Handled(_))));
assert!(!result.events.contains(&Event::Ack));
}

#[test]
fn corrupt_row_parks_under_park_policy() {
let result = run_decode_error(
vec![event_message("", None)],
RunOptions::idempotent().with_failure_policy(FailurePolicy::Park),
);
assert!(result.outcome.is_ok());
assert!(matches!(result.events.first(), Some(Event::Park(_))));
}

#[test]
fn corrupt_row_stops_under_stop_policy_with_permanent_error() {
let result = run_decode_error(
vec![event_message("", None)],
RunOptions::idempotent().with_failure_policy(FailurePolicy::Stop),
);
let err = result
.outcome
.expect_err("stop policy surfaces the decode error");
assert!(err.is_permanent());
assert!(
result.events.is_empty(),
"stop does not settle the corrupt row"
);
}

#[test]
fn run_source_future_is_send() {
// Guards the documented multi-threaded-executor contract for the common
Expand All @@ -496,6 +594,7 @@ mod tests {
recorder,
settle_ok: true,
recv_error: false,
decode_error: false,
};
let future = run_source(svc, source, RunOptions::idempotent());
assert_send(&future);
Expand Down
14 changes: 14 additions & 0 deletions src/bus/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,20 @@ pub trait ReceivedMessage: Send {
/// The canonical message to dispatch.
fn message(&self) -> &Message;

/// A permanent decode failure for this delivery, if the transport could not
/// reconstruct the message from its stored representation.
///
/// Defaults to `None`: most adapters either decode successfully or fail the
/// whole `recv`. An adapter that can claim a row/offset *before* decoding it
/// (so the claim must be settled even when decoding fails) returns the
/// classified error here. The runner treats `Some(err)` as a permanent
/// failure routed through the [`FailurePolicy`](super::FailurePolicy) — the
/// same path as a permanent dispatch failure — so a corrupt row is
/// dead-lettered/parked rather than ack-and-ignored as an empty message.
fn decode_error(&self) -> Option<&TransportError> {
None
}

/// Acknowledge successful handling. The transport removes the message.
///
/// The runner calls this only after consumer execution has succeeded (and,
Expand Down
Loading
Loading