diff --git a/backend/src/workers/soroban-event-worker.ts b/backend/src/workers/soroban-event-worker.ts index e4e65f83..41bd0291 100644 --- a/backend/src/workers/soroban-event-worker.ts +++ b/backend/src/workers/soroban-event-worker.ts @@ -623,6 +623,20 @@ export class SorobanEventWorker { const timestamp = Number(decodeU64(body['timestamp'])); await prisma.$transaction(async (tx: Prisma.TransactionClient) => { + // Issue #802: withdrawnAmount is additive, so it must only be mutated when + // this WITHDRAWN event is seen for the first time. Check the dedup guard + // BEFORE touching the financial field — otherwise the admin indexer replay + // (which re-polls already-processed ledgers) re-adds `amount` on every run, + // inflating withdrawnAmount and shrinking the recipient's claimable balance. + const existingEvent = await tx.streamEvent.findUnique({ + where: { transactionHash_eventType: { transactionHash: event.txHash, eventType: 'WITHDRAWN' } }, + select: { id: true }, + }); + if (existingEvent) { + logger.warn(`[SorobanWorker] Duplicate StreamEvent skipped: txHash=${event.txHash} type=WITHDRAWN`); + return; + } + const stream = await tx.stream.findUniqueOrThrow({ where: { streamId }, select: { withdrawnAmount: true }, @@ -632,6 +646,21 @@ export class SorobanEventWorker { BigInt(stream.withdrawnAmount) + BigInt(amount) ).toString(); + // Insert the event first: the unique (transactionHash, eventType) constraint + // makes a concurrent replay fail here and roll back the whole transaction, + // so withdrawnAmount can never be double-applied. + await tx.streamEvent.create({ + data: { + streamId, + eventType: 'WITHDRAWN', + amount, + transactionHash: event.txHash, + ledgerSequence: event.ledger, + timestamp, + metadata: JSON.stringify({ recipient }), + }, + }); + await tx.stream.update({ where: { streamId }, data: { @@ -639,28 +668,6 @@ export class SorobanEventWorker { lastUpdateTime: timestamp, }, }); - - const existingEvent = await tx.streamEvent.findUnique({ - where: { transactionHash_eventType: { transactionHash: event.txHash, eventType: 'WITHDRAWN' } }, - select: { id: true }, - }); - if (existingEvent) { - logger.warn(`[SorobanWorker] Duplicate StreamEvent skipped: txHash=${event.txHash} type=WITHDRAWN`); - } else { - await tx.streamEvent.upsert({ - where: { transactionHash_eventType: { transactionHash: event.txHash, eventType: 'WITHDRAWN' } }, - create: { - streamId, - eventType: 'WITHDRAWN', - amount, - transactionHash: event.txHash, - ledgerSequence: event.ledger, - timestamp, - metadata: JSON.stringify({ recipient }), - }, - update: {}, - }); - } }); sseService.broadcastToStream(String(streamId), 'stream.withdrawn', { diff --git a/backend/tests/soroban-event-worker.test.ts b/backend/tests/soroban-event-worker.test.ts index 775d8fee..69d0f2df 100644 --- a/backend/tests/soroban-event-worker.test.ts +++ b/backend/tests/soroban-event-worker.test.ts @@ -159,6 +159,65 @@ describe('SorobanEventWorker', () => { ); }); + it('applies a tokens_withdrawn event only once under indexer replay (Issue #802)', async () => { + const txHash = 'withdraw-tx-1'; + const streamId = 7; + + const mockEvent: rpc.Api.EventResponse = { + id: 'withdraw-event-1', + type: 'contract', + ledger: 2000, + ledgerClosedAt: '2024-06-01T00:00:00Z', + txHash, + transactionIndex: 0, + operationIndex: 0, + inSuccessfulContractCall: true, + topic: [ + { switch: () => ({ value: 0 }), sym: () => 'tokens_withdrawn' } as any, + { switch: () => ({ value: 1 }), u64: () => ({ toString: () => streamId.toString() }) } as any, + ], + value: { + switch: () => ({ value: 4 }), + map: () => [ + { key: () => ({ sym: () => 'recipient' }), val: () => ({ address: () => ({ switch: () => ({ value: 0 }), accountId: () => ({ ed25519: () => Buffer.alloc(32) }) }) }) }, + { key: () => ({ sym: () => 'amount' }), val: () => ({ i128: () => ({ hi: () => ({ toString: () => '0' }), lo: () => ({ toString: () => '500' }) }) }) }, + { key: () => ({ sym: () => 'timestamp' }), val: () => ({ u64: () => ({ toString: () => '1700000500' }) }) }, + ] as any, + } as any, + }; + + // Simulate persistent DB state across both runs of the same event. + let withdrawn = '1000'; + let eventExists = false; + + const mockTx = { + streamEvent: { + findUnique: vi.fn().mockImplementation(async () => (eventExists ? { id: 'evt-row' } : null)), + create: vi.fn().mockImplementation(async () => { eventExists = true; return { id: 'evt-row' }; }), + }, + stream: { + findUniqueOrThrow: vi.fn().mockImplementation(async () => ({ withdrawnAmount: withdrawn })), + update: vi.fn().mockImplementation(async ({ data }: any) => { withdrawn = data.withdrawnAmount; return { streamId }; }), + }, + }; + + (prisma.$transaction as ReturnType).mockImplementation((cb) => cb(mockTx)); + + // First processing applies the withdrawal: 1000 + 500 = 1500. + await (worker as any).handleTokensWithdrawn(mockEvent, mockEvent.topic![1]); + expect(withdrawn).toBe('1500'); + expect(mockTx.stream.update).toHaveBeenCalledTimes(1); + + // Admin replay re-polls the same ledger and re-emits the identical event. + await (worker as any).handleTokensWithdrawn(mockEvent, mockEvent.topic![1]); + expect(withdrawn).toBe('1500'); // unchanged — no double count + expect(mockTx.stream.update).toHaveBeenCalledTimes(1); // financial field not touched again + expect(mockTx.streamEvent.create).toHaveBeenCalledTimes(1); + expect(logger.warn).toHaveBeenCalledWith( + expect.stringContaining('Duplicate StreamEvent skipped') + ); + }); + it('should persist a zero-rate stream_created event without throwing', async () => { const txHash = 'zero-rate-tx-hash'; const streamId = 77;