Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
132 changes: 90 additions & 42 deletions .github/skills/event-driven-architecture/SKILL.md
Original file line number Diff line number Diff line change
Expand Up @@ -221,47 +221,61 @@ See [examples/event-consumer.ts](./examples/event-consumer.ts).

### 5. Error Handling & Retries

RabbitMQ retries failed messages automatically:
Use **header-based retry with ceiling** — track `x-retry-count` in message headers, use `sendToQueue` back to the same queue (not the exchange), and discard after `maxRetries` with full error logging.

```typescript
async handleEvent(message: any): Promise<void> {
try {
// Process event
await this.processEvent(message);
} catch (error) {
// Check if error is retry-worthy
if (this.shouldRetry(error)) {
console.error('Retriable error:', error);
throw error; // RabbitMQ will retry
} else {
// Non-retriable error (bad data, etc.)
console.error('Non-retriable error:', error);
// Log to monitoring, don't throw
// Message will be acknowledged and won't retry
}
}
}

private shouldRetry(error: any): boolean {
// Retry on network errors, 5xx responses
if (error.code === 'ECONNREFUSED') return true;
if (error.statusCode >= 500) return true;
**Key rules:**

// Don't retry on validation errors, 4xx responses
if (error.statusCode >= 400 && error.statusCode < 500) return false;
- ❌ **NEVER** `nack(msg, false, true)` — immediate requeue with no delay or ceiling causes tight retry loops (thousands of DB calls/sec on persistent errors; can take down the database)
- ❌ **NEVER** republish to the exchange on retry — that re-routes to ALL consumers for that routing key, causing duplicate processing in other services
- ✅ Use `channel.sendToQueue(this.queue, ...)` to send directly to the same queue
- ✅ Track retry count in `x-retry-count` header; after 5 failures, `ack` + discard

return false;
```typescript
} catch (error) {
this.logger.error({ err: error }, 'Error handling event');
const retryCount = (msg.properties.headers?.['x-retry-count'] as number) ?? 0;
const maxRetries = 5;
if (retryCount >= maxRetries) {
// Permanently failed — ack to remove from queue, log full context
this.logger.error(
{ err: error, retry_count: retryCount, event_content: msg.content.toString() },
'Message permanently failed after max retries — discarding'
);
this.channel?.ack(msg);
} else {
const delay = Math.min(1000 * Math.pow(2, retryCount), 30000); // 1s, 2s, 4s, 8s, 16s → max 30s
this.logger.warn(
{ err: error, retry_count: retryCount + 1, max_retries: maxRetries, delay_ms: delay },
'Event processing failed, scheduling retry'
);
setTimeout(() => {
if (!this.channel) return;
// sendToQueue to THIS queue (not exchange) — avoids re-routing to other consumers
this.channel.sendToQueue(this.queue, msg.content, {
persistent: msg.properties.deliveryMode === 2,
headers: {
...msg.properties.headers,
'x-retry-count': retryCount + 1,
'x-first-failed-at': msg.properties.headers?.['x-first-failed-at'] ?? new Date().toISOString(),
'x-last-error': (error as Error).message?.substring(0, 200),
}
});
this.channel.ack(msg); // Ack original — new message is now in queue
}, delay);
}
}
```

**Retry Configuration**:
**Retry behaviour:**

- Max retries: 3
- Retry delay: Exponential backoff (1s, 2s, 4s)
- Dead-letter queue: After max retries
- Monitoring: Alert on dead-letter queue growth
- Attempt 1: immediate (first delivery)
- Attempt 2: 1s delay
- Attempt 3: 2s delay
- Attempt 4: 4s delay
- Attempt 5: 8s delay
- Attempt 6: 16s delay → discard

See [examples/error-handling.ts](./examples/error-handling.ts) and [references/retry-strategies.md](./references/retry-strategies.md).
On service restart: `x-retry-count` headers are preserved in republished messages, so the ceiling still applies across restarts.

### 6. Event Versioning

Expand Down Expand Up @@ -458,16 +472,50 @@ export class JobQueueClient {
this.channel?.ack(msg);
} catch (error) {
console.error("Consumer error:", error);
// ⚠️ NEVER use nack(msg, false, true) here — that requeuees immediately and
// ⚠️ NEVER use nack(msg, false, true) — that requeues immediately and
// causes a tight retry loop (thousands of DB calls/sec on persistent errors).
// Always delay before requeueing to prevent storms.
setTimeout(() => {
try {
this.channel?.nack(msg, false, true);
} catch {
/* channel may have closed; RabbitMQ will requeue on reconnect */
}
}, 5000);
//
// Use header-based retry with ceiling: track x-retry-count in message headers,
// sendToQueue back to the SAME queue (not the exchange — republishing to the
// exchange re-routes to ALL consumers and causes duplicate processing).
// After maxRetries, ack + discard with full error logging.
const retryCount =
(msg.properties.headers?.["x-retry-count"] as number) ?? 0;
const maxRetries = 5;
if (retryCount >= maxRetries) {
console.error(
{
err: error,
retry_count: retryCount,
event_content: msg.content.toString(),
},
"Message permanently failed after max retries — discarding",
);
this.channel?.ack(msg);
} else {
const delay = Math.min(
1000 * Math.pow(2, retryCount),
30000,
); // 1s, 2s, 4s, 8s, 16s → max 30s
setTimeout(() => {
if (!this.channel) return;
this.channel.sendToQueue(queue, msg.content, {
persistent: msg.properties.deliveryMode === 2,
headers: {
...msg.properties.headers,
"x-retry-count": retryCount + 1,
"x-first-failed-at":
msg.properties.headers?.[
"x-first-failed-at"
] ?? new Date().toISOString(),
"x-last-error": (
error as Error
).message?.substring(0, 200),
},
});
this.channel.ack(msg);
}, delay);
}
}
});
}
Expand Down
4 changes: 2 additions & 2 deletions apps/portal/src/app/portal/candidates/page.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ export default function CandidatesPage() {
const [showAddModal, setShowAddModal] = useState(false);

// Scope management (persisted to localStorage)
const [scope, setScopeState] = useState<CandidateScope>("mine");
const [scope, setScopeState] = useState<CandidateScope>("all");
const [scopeLoaded, setScopeLoaded] = useState(false);

useEffect(() => {
Expand Down Expand Up @@ -119,7 +119,7 @@ export default function CandidatesPage() {
updateItem,
} = useStandardList<Candidate, CandidateFilters>({
endpoint: "/candidates/views/enriched",
defaultFilters: { scope: scopeLoaded ? scope : "mine" },
defaultFilters: { scope: scopeLoaded ? scope : "all" },
defaultSortBy: "created_at",
defaultSortOrder: "desc",
defaultLimit: 25,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ export function ControlsBar({
statusLeft={
<>
<BaselScopeToggle
value={filters.job_owner_filter || "assigned"}
value={filters.job_owner_filter || "all"}
onChange={(v) => onFilterChange("job_owner_filter", v as "all" | "assigned" | "saved")}
options={[
{ value: "assigned", label: "Mine" },
Expand Down
2 changes: 1 addition & 1 deletion apps/portal/src/app/portal/roles/page.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ export default function RolesPage() {
setSortOrder,
} = useStandardList<Job, UnifiedJobFilters>({
endpoint: boardEndpoint,
defaultFilters: { status: undefined, job_owner_filter: isRecruiter ? "assigned" : undefined },
defaultFilters: { status: undefined, job_owner_filter: isRecruiter ? "all" : undefined },
defaultSortBy: "created_at",
defaultSortOrder: "desc",
defaultLimit: 10,
Expand Down
Loading