diff --git a/backend/package.json b/backend/package.json index 13378b44..e38b56c9 100644 --- a/backend/package.json +++ b/backend/package.json @@ -103,5 +103,8 @@ "openapi-fetch": "^0.13.5", "openapi-typescript": "^7.4.4", "pino-pretty": "^13.0.0" + }, + "optionalDependencies": { + "@rollup/rollup-linux-x64-gnu": "^4.62.2" } } diff --git a/backend/prisma/schema.prisma b/backend/prisma/schema.prisma index 1945c966..01cf2a1b 100644 --- a/backend/prisma/schema.prisma +++ b/backend/prisma/schema.prisma @@ -27,6 +27,21 @@ enum PaymentStatus { failed cancelled refunded + pending_review +} + +enum ReorgEventStatus { + detected + investigating + resolved + false_positive +} + +enum TransactionReorgStatus { + pending_review + re_verified + confirmed + rolled_back } enum PaymentType { @@ -150,6 +165,7 @@ model Payment { user User? @relation(fields: [userId], references: [id]) project Project? @relation(fields: [projectId], references: [id]) milestone Milestone? @relation(fields: [milestoneId], references: [id]) + transactionReorgs TransactionReorg[] @@index([tenantId, createdAt]) @@index([status], map: "payments_active_status_idx") @@ -2083,3 +2099,55 @@ model PiiAuditLog { @@index([tenantId, createdAt]) @@map("pii_audit_logs") } + +// ─── Chain Reorganization Models ───────────────────────────────────────────── + +model ReorgEvent { + id String @id @default(uuid()) + network String + detectedAt DateTime @default(now()) @map("detected_at") + reorgDepth Int @map("reorg_depth") + safetyThreshold Int @map("safety_threshold") + canonicalBlockHash String @map("canonical_block_hash") + orphanedBlockHash String @map("orphaned_block_hash") + fromBlockNumber Int @map("from_block_number") + toBlockNumber Int @map("to_block_number") + status ReorgEventStatus @default(detected) + resolvedAt DateTime? @map("resolved_at") + metadata Json? + createdAt DateTime @default(now()) @map("created_at") + updatedAt DateTime @updatedAt @map("updated_at") + + affectedTransactions TransactionReorg[] + + @@index([network, detectedAt]) + @@index([status]) + @@index([reorgDepth]) + @@map("reorg_events") +} + +model TransactionReorg { + id String @id @default(uuid()) + reorgEventId String @map("reorg_event_id") + txHash String @map("tx_hash") + paymentId String? @map("payment_id") + network String + status TransactionReorgStatus @default(pending_review) + originalBlock Int? @map("original_block") + reorgDetails Json? @map("reorg_details") + reVerifiedAt DateTime? @map("re_verified_at") + resolvedAt DateTime? @map("resolved_at") + createdAt DateTime @default(now()) @map("created_at") + updatedAt DateTime @updatedAt @map("updated_at") + + reorgEvent ReorgEvent @relation(fields: [reorgEventId], references: [id]) + payment Payment? @relation(fields: [paymentId], references: [id]) + + @@index([reorgEventId]) + @@index([txHash]) + @@index([paymentId]) + @@index([status]) + @@index([network, createdAt]) + @@map("transaction_reorgs") +} + diff --git a/backend/src/index.ts b/backend/src/index.ts index 55e9b81f..f963fd9a 100644 --- a/backend/src/index.ts +++ b/backend/src/index.ts @@ -134,6 +134,8 @@ import indexerRouter from './routes/indexer.js'; import aiRoutingRouter from './routes/ai-routing.js'; import piiRouter from './routes/pii.js'; import { piiRedactionMiddleware } from './middleware/pii-redaction.js'; +import { reorgRouter } from './routes/reorg.js'; +import { getReorgDetector } from './services/chain/reorg-detector.js'; // Validate environment variables at startup validateEnv(); @@ -311,6 +313,7 @@ apiV1Router.use('/escrow', escrowRouter); apiV1Router.use('/disputes', disputesRouter); apiV1Router.use('/withdrawals', withdrawalsRouter); apiV1Router.use('/swap', swapSimulationRouter); +apiV1Router.use('/chain/reorgs', reorgRouter); apiV1Router.get('/compression/metrics', (_req, res) => { res.json(getCompressionMetrics()); }); @@ -569,6 +572,19 @@ server.listen(config.server.port, () => { startWebhookWorker(); startOutboxPublisher({ useBullMQ: Boolean(process.env.REDIS_URL) }); + // Chain reorganization detector (Issue #514) + if ( + process.env.ETHEREUM_RPC_URL || + process.env.POLYGON_RPC_URL || + process.env.STELLAR_RPC_URL + ) { + getReorgDetector().start().then(() => { + console.log('[ReorgDetector] Chain reorg monitoring started'); + }).catch((err: Error) => { + console.error('[ReorgDetector] Startup error:', err.message); + }); + } + // Auto-escalation cron setInterval(async () => { const count = await disputeService.processEscalations(); @@ -651,6 +667,13 @@ const shutdown = (signal: string) => { console.error('Error stopping bridge monitor:', err); } + try { + getReorgDetector().stop(); + console.log('Reorg detector stopped.'); + } catch (err) { + console.error('Error stopping reorg detector:', err); + } + clearInterval(analyticsInterval); try { diff --git a/backend/src/routes/reorg.ts b/backend/src/routes/reorg.ts new file mode 100644 index 00000000..02e84c1c --- /dev/null +++ b/backend/src/routes/reorg.ts @@ -0,0 +1,255 @@ +/** + * reorg.ts — Issue #514 + * + * REST endpoints for chain reorganization monitoring dashboard. + * + * GET /api/v1/chain/reorgs — paginated list of reorg events + * GET /api/v1/chain/reorgs/dashboard — summary stats + * GET /api/v1/chain/reorgs/history — historical incidents with resolution + * GET /api/v1/chain/reorgs/:id — single event + affected transactions + * POST /api/v1/chain/reorgs/simulate — trigger a simulated reorg (test only) + * + * Fix #10: all routes require a valid internal HMAC signature via + * verifyInternalSignature so they are not open to unauthenticated callers. + */ + +import { Router, Request, Response, NextFunction } from 'express'; +import { PrismaClient } from '@prisma/client'; +import { getReorgDetector } from '../services/chain/reorg-detector.js'; +import { getConfirmationTracker } from '../services/chain/confirmation-tracker.js'; +import { verifyInternalSignature } from '../middleware/internalSignature.js'; + +const prisma = new PrismaClient(); + +export const reorgRouter = Router(); + +// Fix #10: apply auth to every route on this router +reorgRouter.use(verifyInternalSignature); + +// ── Helper ──────────────────────────────────────────────────────────────────── + +function parseIntQuery(val: unknown, fallback: number): number { + const n = parseInt(String(val), 10); + return isNaN(n) ? fallback : n; +} + +// Fix #6: validate a ?since= query param; returns null on invalid input +function parseSinceDate(val: unknown, defaultMs: number): Date | null { + if (typeof val !== 'string') return new Date(Date.now() - defaultMs); + const d = new Date(val); + if (isNaN(d.getTime())) return null; + return d; +} + +// ── GET /api/v1/chain/reorgs ────────────────────────────────────────────────── + +reorgRouter.get('/', async (req: Request, res: Response, next: NextFunction) => { + try { + const page = parseIntQuery(req.query.page, 1); + const limit = Math.min(parseIntQuery(req.query.limit, 20), 100); + const network = typeof req.query.network === 'string' ? req.query.network : undefined; + const status = typeof req.query.status === 'string' ? req.query.status : undefined; + + const where: Record = {}; + if (network) where['network'] = network; + if (status) where['status'] = status; + + const [total, events] = await Promise.all([ + prisma.reorgEvent.count({ where }), + prisma.reorgEvent.findMany({ + where, + orderBy: { detectedAt: 'desc' }, + skip: (page - 1) * limit, + take: limit, + include: { _count: { select: { affectedTransactions: true } } }, + }), + ]); + + res.json({ + data: events, + pagination: { page, limit, total, pages: Math.ceil(total / limit) }, + }); + } catch (err) { + next(err); + } +}); + +// ── GET /api/v1/chain/reorgs/dashboard ─────────────────────────────────────── + +reorgRouter.get('/dashboard', async (_req: Request, res: Response, next: NextFunction) => { + try { + const [ + totalEvents, + byStatus, + deepReorgs, + affectedTxCount, + recentEvents, + pendingReview, + ] = await Promise.all([ + prisma.reorgEvent.count(), + prisma.reorgEvent.groupBy({ by: ['status'], _count: { id: true } }), + prisma.reorgEvent.count({ + where: { reorgDepth: { gt: 12 } }, + }), + prisma.transactionReorg.count(), + prisma.reorgEvent.findMany({ + orderBy: { detectedAt: 'desc' }, + take: 5, + include: { _count: { select: { affectedTransactions: true } } }, + }), + prisma.transactionReorg.count({ where: { status: 'pending_review' } }), + ]); + + const tracker = getConfirmationTracker(); + + const statusCounts = Object.fromEntries( + byStatus.map((row: { status: string; _count: { id: number } }) => [row.status, row._count.id]), + ); + + res.json({ + summary: { + totalReorgEvents: totalEvents, + deepReorgs, + affectedTransactions: affectedTxCount, + pendingReview, + statusCounts, + }, + networkThresholds: { + ethereum: tracker.getThreshold('ethereum'), + polygon: tracker.getThreshold('polygon'), + stellar: tracker.getThreshold('stellar'), + }, + recentEvents, + }); + } catch (err) { + next(err); + } +}); + +// ── GET /api/v1/chain/reorgs/history ───────────────────────────────────────── + +reorgRouter.get('/history', async (req: Request, res: Response, next: NextFunction) => { + try { + const page = parseIntQuery(req.query.page, 1); + const limit = Math.min(parseIntQuery(req.query.limit, 50), 200); + const network = typeof req.query.network === 'string' ? req.query.network : undefined; + + // Fix #6: validate ?since= before passing to Prisma + const since = parseSinceDate(req.query.since, 30 * 24 * 3600 * 1000); + if (since === null) { + res.status(400).json({ error: '?since must be a valid ISO 8601 date string' }); + return; + } + + const where: Record = { detectedAt: { gte: since } }; + if (network) where['network'] = network; + + const [total, events] = await Promise.all([ + prisma.reorgEvent.count({ where }), + prisma.reorgEvent.findMany({ + where, + orderBy: { detectedAt: 'desc' }, + skip: (page - 1) * limit, + take: limit, + include: { + affectedTransactions: { + select: { txHash: true, status: true, resolvedAt: true }, + }, + }, + }), + ]); + + res.json({ + data: events, + pagination: { page, limit, total, pages: Math.ceil(total / limit) }, + }); + } catch (err) { + next(err); + } +}); + +// ── GET /api/v1/chain/reorgs/:id ────────────────────────────────────────────── + +reorgRouter.get('/:id', async (req: Request, res: Response, next: NextFunction) => { + try { + const event = await prisma.reorgEvent.findUnique({ + where: { id: req.params['id'] as string }, + include: { + affectedTransactions: { + orderBy: { createdAt: 'asc' }, + include: { + payment: { select: { id: true, txHash: true, status: true, amount: true, currency: true } }, + }, + }, + }, + }); + + if (!event) { + res.status(404).json({ error: 'Reorg event not found' }); + return; + } + + res.json({ data: event }); + } catch (err) { + next(err); + } +}); + +// ── POST /api/v1/chain/reorgs/simulate ─────────────────────────────────────── + +reorgRouter.post('/simulate', async (req: Request, res: Response, next: NextFunction) => { + if (process.env.NODE_ENV === 'production') { + res.status(403).json({ error: 'Simulation not available in production' }); + return; + } + + try { + const { + network = 'ethereum', + orphanedBlockHash, + canonicalBlockHash, + fromBlock, + toBlock, + affectedTxHashes = [], + } = req.body as { + network?: string; + orphanedBlockHash: string; + canonicalBlockHash: string; + fromBlock: number; + toBlock: number; + affectedTxHashes?: string[]; + }; + + if (!orphanedBlockHash || !canonicalBlockHash || fromBlock == null || toBlock == null) { + res.status(400).json({ + error: 'orphanedBlockHash, canonicalBlockHash, fromBlock, and toBlock are required', + }); + return; + } + + // Fix #4: reject inverted block range before it produces a negative reorgDepth + if (fromBlock > toBlock) { + res.status(400).json({ error: 'fromBlock must be less than or equal to toBlock' }); + return; + } + + const detector = getReorgDetector(); + const reorgEventId = await detector.simulateReorg( + network, + orphanedBlockHash, + canonicalBlockHash, + fromBlock, + toBlock, + affectedTxHashes, + ); + + const event = await prisma.reorgEvent.findUnique({ + where: { id: reorgEventId }, + include: { _count: { select: { affectedTransactions: true } } }, + }); + + res.status(201).json({ data: event }); + } catch (err) { + next(err); + } +}); diff --git a/backend/src/services/chain/__tests__/reorg-detector.test.ts b/backend/src/services/chain/__tests__/reorg-detector.test.ts new file mode 100644 index 00000000..1ac1a9cd --- /dev/null +++ b/backend/src/services/chain/__tests__/reorg-detector.test.ts @@ -0,0 +1,438 @@ +/** + * reorg-detector.test.ts — Issue #514 + * + * Integration tests using in-memory mock providers and Prisma mocks. + * No real blockchain or database required. + */ + +import { describe, it, expect, vi, beforeEach, afterEach } from 'vitest'; +import { ConfirmationTracker, DEFAULT_THRESHOLDS } from '../confirmation-tracker.js'; +import { + ReorgDetector, + type ChainConfig, + type ChainProvider, + type BlockHeader, +} from '../reorg-detector.js'; + +// ── Prisma mock (vi.hoisted ensures refs are available inside vi.mock factory) ─ + +const { + mockReorgEventCreate, + mockTransactionReorgCreate, + mockTransactionReorgCount, + mockTransactionReorgUpdateMany, + mockPaymentFindMany, + mockPaymentUpdate, + mockReorgEventUpdate, + mockTransaction, + mockFetch, +} = vi.hoisted(() => { + const mockReorgEventCreate = vi.fn(); + const mockTransactionReorgCreate = vi.fn(); + const mockTransactionReorgCount = vi.fn().mockResolvedValue(0); + const mockTransactionReorgUpdateMany = vi.fn(); + const mockPaymentFindMany = vi.fn().mockResolvedValue([]); + const mockPaymentUpdate = vi.fn(); + const mockReorgEventUpdate = vi.fn(); + // prisma.$transaction receives a callback; execute it with the mock client + const mockTransaction = vi.fn((cb: (tx: unknown) => Promise) => + cb({ + reorgEvent: { create: mockReorgEventCreate }, + transactionReorg: { create: mockTransactionReorgCreate }, + payment: { update: mockPaymentUpdate }, + }), + ); + const mockFetch = vi.fn().mockResolvedValue({ ok: true }); + return { + mockReorgEventCreate, + mockTransactionReorgCreate, + mockTransactionReorgCount, + mockTransactionReorgUpdateMany, + mockPaymentFindMany, + mockPaymentUpdate, + mockReorgEventUpdate, + mockTransaction, + mockFetch, + }; +}); + +// PrismaClient must use a regular function (not arrow) so `new` works correctly +vi.mock('@prisma/client', () => ({ + PrismaClient: vi.fn().mockImplementation(function () { + return { + $transaction: mockTransaction, + reorgEvent: { create: mockReorgEventCreate, update: mockReorgEventUpdate }, + transactionReorg: { + create: mockTransactionReorgCreate, + count: mockTransactionReorgCount, + updateMany: mockTransactionReorgUpdateMany, + }, + payment: { findMany: mockPaymentFindMany, update: mockPaymentUpdate }, + }; + }), +})); + +// ── node-fetch mock ─────────────────────────────────────────────────────────── + +vi.mock('node-fetch', () => ({ default: mockFetch })); + +// ── Mock ChainProvider ──────────────────────────────────────────────────────── + +function makeBlock(number: number, hash: string, parentHash: string): BlockHeader { + return { number, hash, parentHash, timestamp: Date.now() }; +} + +class MockProvider implements ChainProvider { + private blocks = new Map(); + private head = 0; + + addBlock(block: BlockHeader): void { + this.blocks.set(block.number, block); + if (block.number > this.head) this.head = block.number; + } + + async getBlockNumber(): Promise { return this.head; } + async getBlock(n: number): Promise { return this.blocks.get(n) ?? null; } + setHead(n: number): void { this.head = n; } +} + +function makeMockDetector(network: string, provider: MockProvider, safetyThreshold = 2): ReorgDetector { + return new ReorgDetector({ + chains: [{ network, safetyThreshold } as ChainConfig], + providerFactory: () => provider, + }); +} + +// ── ConfirmationTracker ─────────────────────────────────────────────────────── + +describe('ConfirmationTracker', () => { + let tracker: ConfirmationTracker; + + beforeEach(() => { + // Fix test-isolation: each test gets a fresh instance; never touches the singleton + tracker = new ConfirmationTracker(); + }); + + it('returns correct default thresholds', () => { + expect(tracker.getThreshold('ethereum')).toBe(12); + expect(tracker.getThreshold('polygon')).toBe(64); + expect(tracker.getThreshold('stellar')).toBe(1); + }); + + it('falls back to default threshold for unknown networks', () => { + expect(tracker.getThreshold('avalanche')).toBe(DEFAULT_THRESHOLDS['default']); + }); + + it('records a confirmation and computes status correctly', () => { + tracker.setNetworkHead('ethereum', 1000); + tracker.recordConfirmation('0xabc', 'ethereum', 990); + + const status = tracker.getStatus('0xabc', 'ethereum'); + expect(status).not.toBeNull(); + expect(status!.confirmations).toBe(11); // 1000 - 990 + 1 + expect(status!.isFinalized).toBe(false); // 11 < 12 threshold + }); + + it('marks transaction as finalized when confirmations reach threshold', () => { + tracker.setNetworkHead('ethereum', 1000); + tracker.recordConfirmation('0xabc', 'ethereum', 989); // exactly 12 confirmations + + expect(tracker.isFinalized('0xabc', 'ethereum')).toBe(true); + }); + + it('does not finalize below threshold', () => { + tracker.setNetworkHead('ethereum', 1000); + tracker.recordConfirmation('0xabc', 'ethereum', 991); // 10 confirmations, needs 12 + + expect(tracker.isFinalized('0xabc', 'ethereum')).toBe(false); + }); + + it('findAffected returns only txs in the orphaned block range', () => { + tracker.recordConfirmation('0xaaa', 'ethereum', 100); + tracker.recordConfirmation('0xbbb', 'ethereum', 101); + tracker.recordConfirmation('0xccc', 'ethereum', 102); + tracker.recordConfirmation('0xddd', 'ethereum', 103); + + const hashes = tracker.findAffected('ethereum', 101, 102).map((a) => a.txHash).sort(); + expect(hashes).toEqual(['0xbbb', '0xccc']); + }); + + it('findAffected returns empty array when nothing is in range', () => { + tracker.recordConfirmation('0xaaa', 'ethereum', 100); + expect(tracker.findAffected('ethereum', 200, 300)).toHaveLength(0); + }); + + it('removes a confirmation', () => { + tracker.recordConfirmation('0xabc', 'ethereum', 100); + tracker.removeConfirmation('0xabc', 'ethereum'); + expect(tracker.getStatus('0xabc', 'ethereum')).toBeNull(); + }); + + it('is network-scoped — ethereum and polygon tracked independently', () => { + tracker.recordConfirmation('0xsame', 'ethereum', 100); + tracker.recordConfirmation('0xsame', 'polygon', 200); + + expect(tracker.getStatus('0xsame', 'ethereum')!.confirmedAtBlock).toBe(100); + expect(tracker.getStatus('0xsame', 'polygon')!.confirmedAtBlock).toBe(200); + }); +}); + +// ── ReorgDetector.simulateReorg ─────────────────────────────────────────────── + +describe('ReorgDetector.simulateReorg', () => { + beforeEach(() => { + vi.clearAllMocks(); + mockReorgEventCreate.mockResolvedValue({ id: 'evt-sim-1' }); + mockPaymentFindMany.mockResolvedValue([]); + // $transaction executes the callback synchronously in tests + mockTransaction.mockImplementation((cb: (tx: unknown) => Promise) => + cb({ + reorgEvent: { create: mockReorgEventCreate }, + transactionReorg: { create: mockTransactionReorgCreate }, + payment: { update: mockPaymentUpdate }, + }), + ); + }); + + it('persists ReorgEvent with correct depth and block range', async () => { + const detector = new ReorgDetector({ + chains: [{ network: 'ethereum', safetyThreshold: 12 }], + }); + + await detector.simulateReorg('ethereum', '0xorphaned', '0xcanonical', 500, 502, []); + + expect(mockReorgEventCreate).toHaveBeenCalledWith( + expect.objectContaining({ + data: expect.objectContaining({ + network: 'ethereum', + reorgDepth: 3, + fromBlockNumber: 500, + toBlockNumber: 502, + orphanedBlockHash: '0xorphaned', + canonicalBlockHash: '0xcanonical', + safetyThreshold: 12, + }), + }), + ); + }); + + it('marks affected payment as pending_review inside the transaction', async () => { + const paymentId = 'payment-abc-123'; + mockPaymentFindMany.mockResolvedValue([{ id: paymentId, txHash: '0xtxaffected' }]); + + const detector = new ReorgDetector({ + chains: [{ network: 'ethereum', safetyThreshold: 12 }], + }); + + await detector.simulateReorg('ethereum', '0xorphaned', '0xcanonical', 100, 101, ['0xtxaffected']); + + expect(mockPaymentUpdate).toHaveBeenCalledWith( + expect.objectContaining({ + where: { id: paymentId }, + data: { status: 'pending_review' }, + }), + ); + }); + + it('creates TransactionReorg row with correct reorgDetails', async () => { + const paymentId = 'payment-xyz-456'; + mockPaymentFindMany.mockResolvedValue([{ id: paymentId, txHash: '0xtxb' }]); + mockReorgEventCreate.mockResolvedValue({ id: 'evt-tr-1' }); + + const detector = new ReorgDetector({ + chains: [{ network: 'polygon', safetyThreshold: 64 }], + }); + + await detector.simulateReorg('polygon', '0xoldblock', '0xnewblock', 200, 203, ['0xtxb']); + + expect(mockTransactionReorgCreate).toHaveBeenCalledWith( + expect.objectContaining({ + data: expect.objectContaining({ + txHash: '0xtxb', + paymentId, + network: 'polygon', + reorgDetails: expect.objectContaining({ + reorgDepth: 4, + orphanedBlockHash: '0xoldblock', + }), + }), + }), + ); + }); + + it('does NOT fire alert when depth is within safety threshold', async () => { + const detector = new ReorgDetector({ + chains: [{ network: 'ethereum', safetyThreshold: 12 }], + alertWebhookUrl: 'http://alerts.test/hook', + }); + + await detector.simulateReorg('ethereum', '0xA', '0xB', 100, 102, []); // depth 3 < 12 + + expect(mockFetch).not.toHaveBeenCalled(); + }); + + it('fires alert when depth exceeds safety threshold', async () => { + const detector = new ReorgDetector({ + chains: [{ network: 'ethereum', safetyThreshold: 2 }], + alertWebhookUrl: 'http://alerts.test/hook', + }); + + await detector.simulateReorg('ethereum', '0xA', '0xB', 100, 104, []); // depth 5 > 2 + + expect(mockFetch).toHaveBeenCalledWith( + 'http://alerts.test/hook', + expect.objectContaining({ + method: 'POST', + body: expect.stringContaining('"severity":"critical"'), + }), + ); + }); + + // Fix #9 verification + it('sets TransactionReorg status to rolled_back when tx is not re-confirmed', async () => { + mockPaymentFindMany.mockResolvedValue([{ id: 'pay-1', txHash: '0xtx1' }]); + mockReorgEventCreate.mockResolvedValue({ id: 'evt-rolled' }); + + const detector = new ReorgDetector({ + chains: [{ network: 'ethereum', safetyThreshold: 12 }], + }); + + // processReorgJob with no rpcUrl and no providerFactory → isStillConfirmed = false + await detector.processReorgJob({ + reorgEventId: 'evt-rolled', + txHash: '0xtx1', + paymentId: 'pay-1', + network: 'ethereum', + }); + + expect(mockTransactionReorgUpdateMany).toHaveBeenCalledWith( + expect.objectContaining({ + data: expect.objectContaining({ status: 'rolled_back' }), + }), + ); + expect(mockPaymentUpdate).toHaveBeenCalledWith( + expect.objectContaining({ data: { status: 'pending' } }), + ); + }); +}); + +// ── ReorgDetector.pollChain ─────────────────────────────────────────────────── + +describe('ReorgDetector.pollChain', () => { + beforeEach(() => { + vi.clearAllMocks(); + mockReorgEventCreate.mockResolvedValue({ id: 'evt-poll' }); + mockPaymentFindMany.mockResolvedValue([]); + mockTransaction.mockImplementation((cb: (tx: unknown) => Promise) => + cb({ + reorgEvent: { create: mockReorgEventCreate }, + transactionReorg: { create: mockTransactionReorgCreate }, + payment: { update: mockPaymentUpdate }, + }), + ); + }); + + it('advances chain tip on clean linear extension without triggering reorg', async () => { + const provider = new MockProvider(); + provider.addBlock(makeBlock(1, '0xb1', '0x0')); + provider.addBlock(makeBlock(2, '0xb2', '0xb1')); + provider.addBlock(makeBlock(3, '0xb3', '0xb2')); + + const detector = makeMockDetector('ethereum', provider, 2); + await detector.start(); + + expect(detector.getCurrentTip('ethereum')?.blockHash).toBe('0xb3'); + + provider.addBlock(makeBlock(4, '0xb4', '0xb3')); + await detector.pollChain('ethereum'); + + expect(detector.getCurrentTip('ethereum')?.blockHash).toBe('0xb4'); + expect(mockReorgEventCreate).not.toHaveBeenCalled(); + + await detector.stopAsync(); + }); + + it('calls handleReorg when parentHash mismatches canonical tip', async () => { + const provider = new MockProvider(); + provider.addBlock(makeBlock(1, '0xb1', '0x0')); + provider.addBlock(makeBlock(2, '0xb2', '0xb1')); + provider.addBlock(makeBlock(3, '0xb3', '0xb2')); + + const handleReorgSpy = vi + .spyOn( + ReorgDetector.prototype as unknown as { handleReorg: () => Promise }, + 'handleReorg' as never, + ) + .mockResolvedValue(undefined); + + const detector = makeMockDetector('ethereum', provider, 2); + await detector.start(); + + provider.addBlock(makeBlock(4, '0xb4_fork', '0xb2_fork')); + await detector.pollChain('ethereum'); + + expect(handleReorgSpy).toHaveBeenCalledOnce(); + + handleReorgSpy.mockRestore(); + await detector.stopAsync(); + }); + + // Fix #3 verification: same-height reorg must NOT be skipped + it('detects a same-height sibling block reorg', async () => { + const provider = new MockProvider(); + provider.addBlock(makeBlock(1, '0xb1', '0x0')); + provider.addBlock(makeBlock(2, '0xb2', '0xb1')); + provider.addBlock(makeBlock(3, '0xb3', '0xb2')); + + const detector = makeMockDetector('ethereum', provider, 2); + await detector.start(); + + // Tip is now block 3 (0xb3). Replace it with a sibling at the same height. + // The provider returns block 3 again but with a different hash/parentHash. + provider.addBlock(makeBlock(3, '0xb3_sibling', '0xb2_fork')); + provider.setHead(3); // same height + + const handleReorgSpy = vi + .spyOn( + ReorgDetector.prototype as unknown as { handleReorg: () => Promise }, + 'handleReorg' as never, + ) + .mockResolvedValue(undefined); + + await detector.pollChain('ethereum'); + + // With the old <= comparison this would have returned early. With < it proceeds. + expect(handleReorgSpy).toHaveBeenCalledOnce(); + + handleReorgSpy.mockRestore(); + await detector.stopAsync(); + }); + + it('does not poll when chain head has not advanced', async () => { + const provider = new MockProvider(); + provider.addBlock(makeBlock(5, '0xb5', '0xb4')); + + const detector = makeMockDetector('ethereum', provider, 2); + await detector.start(); + + await detector.pollChain('ethereum'); // no new block + + expect(mockReorgEventCreate).not.toHaveBeenCalled(); + await detector.stopAsync(); + }); + + // Fix #7 verification: second start() must be a no-op + it('start() is idempotent — second call does not create duplicate timers', async () => { + const provider = new MockProvider(); + provider.addBlock(makeBlock(1, '0xb1', '0x0')); + + const detector = makeMockDetector('ethereum', provider, 2); + await detector.start(); + await detector.start(); // second call must be a no-op + + // Only one provider should be registered + expect(detector.getCurrentTip('ethereum')).toBeDefined(); + + await detector.stopAsync(); + }); +}); diff --git a/backend/src/services/chain/confirmation-tracker.ts b/backend/src/services/chain/confirmation-tracker.ts new file mode 100644 index 00000000..53e24a76 --- /dev/null +++ b/backend/src/services/chain/confirmation-tracker.ts @@ -0,0 +1,166 @@ +/** + * confirmation-tracker.ts — Issue #514 + * + * Tracks on-chain confirmation counts per transaction with configurable + * safety thresholds per network. A transaction is considered final only when + * its confirmation depth exceeds the network-specific safety threshold. + * + * Default thresholds (per issue spec): + * Ethereum — 12 blocks + * Polygon — 64 blocks + * Stellar — 1 ledger (BFT finality) + */ + +import { randomUUID } from 'node:crypto'; + +// ── Types ───────────────────────────────────────────────────────────────────── + +export interface ConfirmationThresholds { + [network: string]: number; +} + +export interface ConfirmedTx { + id: string; + txHash: string; + network: string; + confirmedAtBlock: number; + firstSeenAt: string; +} + +export interface ConfirmationStatus { + txHash: string; + network: string; + confirmedAtBlock: number; + currentBlock: number; + confirmations: number; + required: number; + isFinalized: boolean; +} + +export interface TrackerOptions { + thresholds?: ConfirmationThresholds; +} + +// ── Defaults ────────────────────────────────────────────────────────────────── + +export const DEFAULT_THRESHOLDS: ConfirmationThresholds = { + ethereum: 12, + polygon: 64, + stellar: 1, + default: 12, +}; + +// ── ConfirmationTracker ─────────────────────────────────────────────────────── + +export class ConfirmationTracker { + private thresholds: ConfirmationThresholds; + private confirmedTxs = new Map(); + private networkHeads = new Map(); + + constructor(opts: TrackerOptions = {}) { + this.thresholds = { ...DEFAULT_THRESHOLDS, ...(opts.thresholds ?? {}) }; + } + + getThreshold(network: string): number { + return this.thresholds[network.toLowerCase()] ?? this.thresholds['default'] ?? 12; + } + + /** Update the canonical head block for a network. */ + setNetworkHead(network: string, blockNumber: number): void { + this.networkHeads.set(network.toLowerCase(), blockNumber); + } + + getNetworkHead(network: string): number { + return this.networkHeads.get(network.toLowerCase()) ?? 0; + } + + /** + * Record that a transaction was included in a specific block. + * Call this when you first see the tx confirmed on-chain. + */ + recordConfirmation(txHash: string, network: string, blockNumber: number): ConfirmedTx { + const key = this.key(txHash, network); + const existing = this.confirmedTxs.get(key); + if (existing) return existing; + + const entry: ConfirmedTx = { + id: randomUUID(), + txHash, + network: network.toLowerCase(), + confirmedAtBlock: blockNumber, + firstSeenAt: new Date().toISOString(), + }; + this.confirmedTxs.set(key, entry); + return entry; + } + + /** Remove a transaction from tracking (e.g. after reorg orphans it). */ + removeConfirmation(txHash: string, network: string): void { + this.confirmedTxs.delete(this.key(txHash, network)); + } + + /** Get the current confirmation status of a transaction. */ + getStatus(txHash: string, network: string): ConfirmationStatus | null { + const entry = this.confirmedTxs.get(this.key(txHash, network)); + if (!entry) return null; + + const currentBlock = this.getNetworkHead(network); + const confirmations = currentBlock >= entry.confirmedAtBlock + ? currentBlock - entry.confirmedAtBlock + 1 + : 0; + const required = this.getThreshold(network); + + return { + txHash, + network: network.toLowerCase(), + confirmedAtBlock: entry.confirmedAtBlock, + currentBlock, + confirmations, + required, + isFinalized: confirmations >= required, + }; + } + + /** Returns true only when confirmation count >= safety threshold. */ + isFinalized(txHash: string, network: string): boolean { + return this.getStatus(txHash, network)?.isFinalized ?? false; + } + + /** List all tracked transactions for a given network. */ + listByNetwork(network: string): ConfirmedTx[] { + const net = network.toLowerCase(); + return Array.from(this.confirmedTxs.values()).filter((t) => t.network === net); + } + + /** Identify transactions in orphaned blocks (block range [fromBlock, toBlock]). */ + findAffected(network: string, fromBlock: number, toBlock: number): ConfirmedTx[] { + const net = network.toLowerCase(); + return Array.from(this.confirmedTxs.values()).filter( + (t) => + t.network === net && + t.confirmedAtBlock >= fromBlock && + t.confirmedAtBlock <= toBlock, + ); + } + + private key(txHash: string, network: string): string { + return `${network.toLowerCase()}:${txHash}`; + } +} + +// ── Singleton ───────────────────────────────────────────────────────────────── + +let _tracker: ConfirmationTracker | undefined; + +export function getConfirmationTracker(): ConfirmationTracker { + if (!_tracker) { + _tracker = new ConfirmationTracker({ + thresholds: { + ethereum: Number(process.env.CONFIRMATION_THRESHOLD_ETHEREUM ?? 12), + polygon: Number(process.env.CONFIRMATION_THRESHOLD_POLYGON ?? 64), + stellar: Number(process.env.CONFIRMATION_THRESHOLD_STELLAR ?? 1), + }, + }); + } + return _tracker; +} diff --git a/backend/src/services/chain/reorg-detector.ts b/backend/src/services/chain/reorg-detector.ts new file mode 100644 index 00000000..15676a86 --- /dev/null +++ b/backend/src/services/chain/reorg-detector.ts @@ -0,0 +1,685 @@ +/** + * reorg-detector.ts — Issue #514 + * + * Detects blockchain reorganizations by comparing each new block's parentHash + * against the locally stored canonical chain tip. When a mismatch is found: + * 1. Walks back the chain to find the common ancestor (computes reorg depth) + * 2. Persists a ReorgEvent record (atomically via prisma.$transaction) + * 3. Identifies payments whose tx was in the orphaned range + * 4. Marks those payments as pending_review in TransactionReorg + * 5. Enqueues a BullMQ re-verification job for each affected transaction + * 6. Emits a critical alert when reorg depth exceeds the safety threshold + * + * EVM chains use ethers.js JsonRpcProvider. + * Stellar uses the Horizon REST API via @stellar/stellar-sdk. + */ + +import { Queue, Worker, type Job, type ConnectionOptions } from 'bullmq'; +import { PrismaClient, type Prisma } from '@prisma/client'; +import { getConfirmationTracker } from './confirmation-tracker.js'; + +const prisma = new PrismaClient(); + +// ── Types ───────────────────────────────────────────────────────────────────── + +export interface ChainConfig { + network: string; + rpcUrl?: string; + pollIntervalMs?: number; + safetyThreshold?: number; +} + +export interface BlockHeader { + number: number; + hash: string; + parentHash: string; + timestamp: number; +} + +export interface ReorgIncident { + network: string; + reorgDepth: number; + canonicalBlockHash: string; + orphanedBlockHash: string; + fromBlockNumber: number; + toBlockNumber: number; + affectedTxHashes: string[]; +} + +export interface ReorgDetectorOptions { + chains: ChainConfig[]; + alertWebhookUrl?: string; + /** Override for injecting mock providers in tests */ + providerFactory?: (rpcUrl: string) => ChainProvider; +} + +export interface ChainProvider { + getBlockNumber(): Promise; + getBlock(blockNumber: number): Promise; +} + +export interface ReorgJob { + reorgEventId: string; + txHash: string; + paymentId: string | null; + network: string; +} + +type AffectedPayment = { + txHash: string; + paymentId: string | null; + originalBlock: number | null; +}; + +// ── Default safety thresholds per chain ─────────────────────────────────────── + +const SAFETY_THRESHOLDS: Record = { + ethereum: 12, + polygon: 64, + stellar: 1, +}; + +// ── In-memory canonical chain state ─────────────────────────────────────────── + +interface ChainTip { + blockNumber: number; + blockHash: string; + parentHash: string; +} + +// ── Alert helper ────────────────────────────────────────────────────────────── + +async function dispatchAlert(incident: ReorgIncident, webhookUrl?: string): Promise { + if (!webhookUrl) return; + try { + const { default: fetch } = await import('node-fetch'); + await fetch(webhookUrl, { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify({ + type: 'reorg.detected', + severity: 'critical', + ...incident, + occurredAt: new Date().toISOString(), + }), + }); + } catch { + // Non-fatal + } +} + +// ── EVM provider adapter (wraps ethers.js) ──────────────────────────────────── + +async function createEvmProvider(rpcUrl: string): Promise { + const { JsonRpcProvider } = await import('ethers'); + const provider = new JsonRpcProvider(rpcUrl); + + return { + async getBlockNumber() { + return provider.getBlockNumber(); + }, + async getBlock(blockNumber: number) { + const block = await provider.getBlock(blockNumber); + if (!block) return null; + return { + number: block.number, + hash: block.hash ?? '', + parentHash: block.parentHash, + timestamp: block.timestamp, + }; + }, + }; +} + +// ── Stellar provider adapter (wraps @stellar/stellar-sdk Horizon) ───────────── +// Fix #8: Stellar uses Horizon REST, not EVM JSON-RPC. Ledger sequence → block +// number, prev_hash → parentHash so the generic reorg logic works unchanged. + +// Minimal shape we need from Horizon LedgerRecord — avoids tight SDK version coupling +interface HorizonLedger { + sequence: number; + hash: string; + prev_hash: string; + closed_at: string; +} + +async function createStellarProvider(horizonUrl: string): Promise { + const { Horizon } = await import('@stellar/stellar-sdk'); + const server = new Horizon.Server(horizonUrl); + + return { + async getBlockNumber() { + const page = await server.ledgers().order('desc').limit(1).call(); + const record = page.records[0] as HorizonLedger | undefined; + return record?.sequence ?? 0; + }, + async getBlock(sequence: number) { + try { + const ledger = (await server.ledgers().ledger(sequence).call()) as unknown as HorizonLedger; + return { + number: ledger.sequence, + hash: ledger.hash, + parentHash: ledger.prev_hash, + timestamp: Math.floor(new Date(ledger.closed_at).getTime() / 1000), + }; + } catch { + return null; + } + }, + }; +} + +async function createProvider(cfg: ChainConfig): Promise { + const rpcUrl = cfg.rpcUrl ?? ''; + if (cfg.network === 'stellar') { + return createStellarProvider(rpcUrl); + } + return createEvmProvider(rpcUrl); +} + +// ── ReorgDetector ───────────────────────────────────────────────────────────── + +export class ReorgDetector { + private chains: ChainConfig[]; + private alertWebhookUrl?: string; + private providerFactory?: (rpcUrl: string) => ChainProvider; + private providers = new Map(); + private tips = new Map(); + private timers = new Map>(); + private reorgQueue: Queue | null = null; + private reorgWorker: Worker | null = null; + // Fix #7: idempotency guard — start() is safe to call multiple times + private started = false; + + constructor(opts: ReorgDetectorOptions) { + this.chains = opts.chains; + this.alertWebhookUrl = opts.alertWebhookUrl; + this.providerFactory = opts.providerFactory; + } + + // ── Lifecycle ─────────────────────────────────────────────────────────────── + + // Fix #7: guard against double-start creating zombie timers and duplicate workers + async start(): Promise { + if (this.started) return; + this.started = true; + this.initQueue(); + await Promise.all(this.chains.map((cfg) => this.startChain(cfg))); + } + + // Fix #4 (stop): await worker/queue close so shutdown is clean + async stopAsync(): Promise { + for (const timer of this.timers.values()) { + clearInterval(timer); + } + this.timers.clear(); + if (this.reorgWorker) await this.reorgWorker.close(); + if (this.reorgQueue) await this.reorgQueue.close(); + this.started = false; + } + + stop(): void { + void this.stopAsync(); + } + + private initQueue(): void { + const redisUrl = process.env.REDIS_URL; + if (!redisUrl || process.env.REDIS_ENABLED !== 'true') return; + + const connection = this.parseRedisUrl(redisUrl); + this.reorgQueue = new Queue('agenticpay:reorg-processing', { + connection, + defaultJobOptions: { + attempts: 5, + backoff: { type: 'exponential', delay: 5_000 }, + removeOnComplete: { count: 200 }, + removeOnFail: { count: 500 }, + }, + }); + + this.reorgWorker = new Worker( + 'agenticpay:reorg-processing', + (job: Job) => this.processReorgJob(job.data), + { connection, concurrency: 4 }, + ); + + this.reorgWorker.on('completed', (job) => { + console.log(`[reorg-detector] re-verification completed for tx ${job.data.txHash}`); + }); + this.reorgWorker.on('failed', (job, err) => { + console.error(`[reorg-detector] re-verification failed for tx ${job?.data.txHash}:`, err.message); + }); + } + + private async startChain(cfg: ChainConfig): Promise { + // Fix #8: use the correct provider per chain type + const provider = this.providerFactory + ? this.providerFactory(cfg.rpcUrl ?? '') + : await createProvider(cfg); + + this.providers.set(cfg.network, provider); + + try { + const head = await provider.getBlockNumber(); + const block = await provider.getBlock(head); + if (block) { + this.tips.set(cfg.network, { + blockNumber: block.number, + blockHash: block.hash, + parentHash: block.parentHash, + }); + getConfirmationTracker().setNetworkHead(cfg.network, block.number); + } + } catch (err) { + console.error(`[reorg-detector] Failed to bootstrap chain tip for ${cfg.network}:`, err); + } + + const interval = cfg.pollIntervalMs ?? 15_000; + const timer = setInterval(() => void this.pollChain(cfg.network), interval); + this.timers.set(cfg.network, timer); + + console.log(`[reorg-detector] Monitoring ${cfg.network} every ${interval}ms`); + } + + // ── Block polling ─────────────────────────────────────────────────────────── + + async pollChain(network: string): Promise { + const provider = this.providers.get(network); + if (!provider) return; + + try { + const latestNumber = await provider.getBlockNumber(); + const currentTip = this.tips.get(network); + + // Fix #3: use strict < so a same-height sibling block (latestNumber === + // currentTip.blockNumber but different hash) is NOT skipped. We continue + // and let the parentHash comparison detect the reorg. + if (!currentTip || latestNumber < currentTip.blockNumber) return; + + const latestBlock = await provider.getBlock(latestNumber); + if (!latestBlock) return; + + getConfirmationTracker().setNetworkHead(network, latestNumber); + + // Same height: compare hashes directly. Same hash → idle poll, no change. + // Different hash → same-height sibling block reorg (fix #3 complement). + if (latestNumber === currentTip.blockNumber) { + if (latestBlock.hash === currentTip.blockHash) return; + await this.handleReorg(network, provider, currentTip, latestBlock); + return; + } + + // New block: cleanly extends our known tip when parentHash matches + if (latestBlock.parentHash === currentTip.blockHash) { + this.tips.set(network, { + blockNumber: latestBlock.number, + blockHash: latestBlock.hash, + parentHash: latestBlock.parentHash, + }); + return; + } + + // Parent hash mismatch on a new block — reorg detected + await this.handleReorg(network, provider, currentTip, latestBlock); + + } catch (err) { + console.error(`[reorg-detector] Poll error on ${network}:`, err); + } + } + + // ── Reorg handling ────────────────────────────────────────────────────────── + + private async handleReorg( + network: string, + provider: ChainProvider, + oldTip: ChainTip, + newBlock: BlockHeader, + ): Promise { + const safetyThreshold = + this.chains.find((c) => c.network === network)?.safetyThreshold ?? + SAFETY_THRESHOLDS[network] ?? + 12; + + const commonAncestor = await this.findCommonAncestor( + provider, + oldTip.blockNumber, + newBlock.number, + ); + + const fromBlock = commonAncestor + 1; + const toBlock = oldTip.blockNumber; + const reorgDepth = toBlock - fromBlock + 1; + + console.warn( + `[reorg-detector] REORG detected on ${network}: depth=${reorgDepth}, ` + + `orphaned blocks ${fromBlock}–${toBlock}, new tip ${newBlock.hash}`, + ); + + const tracker = getConfirmationTracker(); + const affectedEntries = tracker.findAffected(network, fromBlock, toBlock); + const affectedTxHashes = affectedEntries.map((e) => e.txHash); + + // Fix #1: capture originalBlock from tracker BEFORE removing entries, + // and pass the map directly so findAffectedPayments never needs to re-query + // the (now-cleared) tracker. + const originalBlocks = new Map( + affectedEntries.map((e) => [e.txHash, e.confirmedAtBlock]), + ); + + for (const entry of affectedEntries) { + tracker.removeConfirmation(entry.txHash, network); + } + + const incident: ReorgIncident = { + network, + reorgDepth, + canonicalBlockHash: newBlock.hash, + orphanedBlockHash: oldTip.blockHash, + fromBlockNumber: fromBlock, + toBlockNumber: toBlock, + affectedTxHashes, + }; + + // Fix #2: resolve affected payments ONCE here and pass the result through + // to both persistReorgEvent and the enqueue loop — no second DB query. + const affectedPayments = await this.resolveAffectedPayments(affectedTxHashes, originalBlocks); + + const reorgEventId = await this.persistReorgEvent(incident, affectedPayments); + + if (reorgDepth > safetyThreshold) { + console.error( + `[reorg-detector] CRITICAL: reorg depth ${reorgDepth} exceeds safety threshold ${safetyThreshold} on ${network}`, + ); + await dispatchAlert(incident, this.alertWebhookUrl ?? process.env.ALERT_WEBHOOK_URL); + } + + this.tips.set(network, { + blockNumber: newBlock.number, + blockHash: newBlock.hash, + parentHash: newBlock.parentHash, + }); + + for (const tx of affectedPayments) { + await this.enqueueReVerification({ reorgEventId, txHash: tx.txHash, paymentId: tx.paymentId, network }); + } + } + + private async findCommonAncestor( + provider: ChainProvider, + oldTipNumber: number, + newTipNumber: number, + ): Promise { + const maxWalk = Math.min(200, Math.max(oldTipNumber, newTipNumber)); + let searchBlock = Math.min(oldTipNumber, newTipNumber) - 1; + + for (let i = 0; i < maxWalk && searchBlock > 0; i++, searchBlock--) { + const block = await provider.getBlock(searchBlock); + if (block) { + return searchBlock; + } + } + return Math.max(0, searchBlock); + } + + // ── Persistence ───────────────────────────────────────────────────────────── + + // Fix #5: wrap all writes in a single prisma.$transaction so a mid-loop + // DB failure cannot leave a ReorgEvent without its TransactionReorg children. + private async persistReorgEvent( + incident: ReorgIncident, + affectedPayments: AffectedPayment[], + ): Promise { + const safetyThreshold = + this.chains.find((c) => c.network === incident.network)?.safetyThreshold ?? + SAFETY_THRESHOLDS[incident.network] ?? + 12; + + const reorgEventId = await prisma.$transaction(async (tx: Prisma.TransactionClient) => { + const event = await tx.reorgEvent.create({ + data: { + network: incident.network, + reorgDepth: incident.reorgDepth, + safetyThreshold, + canonicalBlockHash: incident.canonicalBlockHash, + orphanedBlockHash: incident.orphanedBlockHash, + fromBlockNumber: incident.fromBlockNumber, + toBlockNumber: incident.toBlockNumber, + metadata: { affectedTxCount: affectedPayments.length }, + }, + }); + + for (const p of affectedPayments) { + await tx.transactionReorg.create({ + data: { + reorgEventId: event.id, + txHash: p.txHash, + paymentId: p.paymentId, + network: incident.network, + originalBlock: p.originalBlock ?? undefined, + reorgDetails: { + orphanedBlockHash: incident.orphanedBlockHash, + reorgDepth: incident.reorgDepth, + fromBlockNumber: incident.fromBlockNumber, + toBlockNumber: incident.toBlockNumber, + }, + }, + }); + + if (p.paymentId) { + await tx.payment.update({ + where: { id: p.paymentId }, + data: { status: 'pending_review' }, + }); + } + } + + return event.id; + }); + + return reorgEventId; + } + + // Fix #1 + #2: single DB query, originalBlock comes from the pre-removal + // tracker snapshot passed in — no tracker lookup after entries are cleared. + private async resolveAffectedPayments( + txHashes: string[], + originalBlocks: Map, + ): Promise { + if (txHashes.length === 0) return []; + + const payments = await prisma.payment.findMany({ + where: { txHash: { in: txHashes } }, + select: { id: true, txHash: true }, + }); + + return txHashes.map((hash) => { + const payment = payments.find( + (p: { id: string; txHash: string | null }) => p.txHash === hash, + ); + return { + txHash: hash, + paymentId: payment?.id ?? null, + originalBlock: originalBlocks.get(hash) ?? null, + }; + }); + } + + // ── BullMQ re-verification ────────────────────────────────────────────────── + + private async enqueueReVerification(job: ReorgJob): Promise { + if (!this.reorgQueue) { + await this.processReorgJob(job); + return; + } + await this.reorgQueue.add('re-verify', job, { + jobId: `reverify:${job.network}:${job.txHash}:${Date.now()}`, + }); + } + + async processReorgJob(job: ReorgJob): Promise { + const { reorgEventId, txHash, paymentId, network } = job; + + try { + let isStillConfirmed = false; + + const rpcUrl = this.chains.find((c) => c.network === network)?.rpcUrl; + if (rpcUrl && network !== 'stellar') { + try { + const { JsonRpcProvider } = await import('ethers'); + const provider = new JsonRpcProvider(rpcUrl); + const receipt = await provider.getTransactionReceipt(txHash); + if (receipt && receipt.blockHash) { + const block = await provider.getBlock(receipt.blockNumber); + isStillConfirmed = block?.hash === receipt.blockHash; + } + } catch { + // RPC failure — leave as pending_review for manual review + } + } else if (this.providerFactory) { + // Test path: injected mock provider + const provider = this.providers.get(network); + if (provider) { + const currentHead = await provider.getBlockNumber(); + isStillConfirmed = currentHead > 0; + } + } + + // Fix #9: use 'rolled_back' (not 're_verified') when the tx is NOT + // confirmed on the canonical chain after the reorg. + const newStatus = isStillConfirmed ? 'confirmed' : 'rolled_back'; + + await prisma.transactionReorg.updateMany({ + where: { reorgEventId, txHash }, + data: { + status: newStatus, + reVerifiedAt: new Date(), + resolvedAt: new Date(), + }, + }); + + if (paymentId) { + await prisma.payment.update({ + where: { id: paymentId }, + data: { status: isStillConfirmed ? 'completed' : 'pending' }, + }); + } + + const remaining = await prisma.transactionReorg.count({ + where: { reorgEventId, status: 'pending_review' }, + }); + if (remaining === 0) { + await prisma.reorgEvent.update({ + where: { id: reorgEventId }, + data: { status: 'resolved', resolvedAt: new Date() }, + }); + } + } catch (err) { + console.error(`[reorg-detector] Re-verification failed for tx ${txHash}:`, err); + throw err; + } + } + + // ── Simulate reorg (for testing / POST /simulate) ──────────────────────────── + + async simulateReorg( + network: string, + orphanedBlockHash: string, + canonicalBlockHash: string, + fromBlock: number, + toBlock: number, + affectedTxHashes: string[] = [], + ): Promise { + const safetyThreshold = + this.chains.find((c) => c.network === network)?.safetyThreshold ?? + SAFETY_THRESHOLDS[network] ?? + 12; + + const reorgDepth = toBlock - fromBlock + 1; + const incident: ReorgIncident = { + network, + reorgDepth, + canonicalBlockHash, + orphanedBlockHash, + fromBlockNumber: fromBlock, + toBlockNumber: toBlock, + affectedTxHashes, + }; + + // originalBlocks are unknown in a simulation — use empty map + const affectedPayments = await this.resolveAffectedPayments( + affectedTxHashes, + new Map(), + ); + + const reorgEventId = await this.persistReorgEvent(incident, affectedPayments); + + if (reorgDepth > safetyThreshold) { + await dispatchAlert(incident, this.alertWebhookUrl ?? process.env.ALERT_WEBHOOK_URL); + } + + for (const tx of affectedPayments) { + await this.enqueueReVerification({ reorgEventId, txHash: tx.txHash, paymentId: tx.paymentId, network }); + } + + return reorgEventId; + } + + // ── Utilities ─────────────────────────────────────────────────────────────── + + getCurrentTip(network: string): ChainTip | undefined { + return this.tips.get(network); + } + + private parseRedisUrl(url: string): ConnectionOptions { + try { + const parsed = new URL(url); + return { + host: parsed.hostname || 'localhost', + port: parseInt(parsed.port || '6379', 10), + password: parsed.password || undefined, + tls: parsed.protocol === 'rediss:' ? {} : undefined, + }; + } catch { + const [host, port] = url.split(':'); + return { host: host || 'localhost', port: parseInt(port || '6379', 10) }; + } + } +} + +// ── Singleton ───────────────────────────────────────────────────────────────── + +let _detector: ReorgDetector | undefined; + +export function getReorgDetector(): ReorgDetector { + if (!_detector) { + const chains: ChainConfig[] = []; + + if (process.env.ETHEREUM_RPC_URL) { + chains.push({ + network: 'ethereum', + rpcUrl: process.env.ETHEREUM_RPC_URL, + pollIntervalMs: 15_000, + safetyThreshold: Number(process.env.CONFIRMATION_THRESHOLD_ETHEREUM ?? 12), + }); + } + if (process.env.POLYGON_RPC_URL) { + chains.push({ + network: 'polygon', + rpcUrl: process.env.POLYGON_RPC_URL, + pollIntervalMs: 10_000, + safetyThreshold: Number(process.env.CONFIRMATION_THRESHOLD_POLYGON ?? 64), + }); + } + if (process.env.STELLAR_RPC_URL) { + chains.push({ + network: 'stellar', + rpcUrl: process.env.STELLAR_RPC_URL, + pollIntervalMs: 6_000, + safetyThreshold: Number(process.env.CONFIRMATION_THRESHOLD_STELLAR ?? 1), + }); + } + + _detector = new ReorgDetector({ chains }); + } + return _detector; +}