Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 29 additions & 22 deletions backend/src/workers/soroban-event-worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -623,6 +623,20 @@ export class SorobanEventWorker {
const timestamp = Number(decodeU64(body['timestamp']));

await prisma.$transaction(async (tx: Prisma.TransactionClient) => {
// Issue #802: withdrawnAmount is additive, so it must only be mutated when
// this WITHDRAWN event is seen for the first time. Check the dedup guard
// BEFORE touching the financial field — otherwise the admin indexer replay
// (which re-polls already-processed ledgers) re-adds `amount` on every run,
// inflating withdrawnAmount and shrinking the recipient's claimable balance.
const existingEvent = await tx.streamEvent.findUnique({
where: { transactionHash_eventType: { transactionHash: event.txHash, eventType: 'WITHDRAWN' } },
select: { id: true },
});
if (existingEvent) {
logger.warn(`[SorobanWorker] Duplicate StreamEvent skipped: txHash=${event.txHash} type=WITHDRAWN`);
return;
}

const stream = await tx.stream.findUniqueOrThrow({
where: { streamId },
select: { withdrawnAmount: true },
Expand All @@ -632,35 +646,28 @@ export class SorobanEventWorker {
BigInt(stream.withdrawnAmount) + BigInt(amount)
).toString();

// Insert the event first: the unique (transactionHash, eventType) constraint
// makes a concurrent replay fail here and roll back the whole transaction,
// so withdrawnAmount can never be double-applied.
await tx.streamEvent.create({
data: {
streamId,
eventType: 'WITHDRAWN',
amount,
transactionHash: event.txHash,
ledgerSequence: event.ledger,
timestamp,
metadata: JSON.stringify({ recipient }),
},
});

await tx.stream.update({
where: { streamId },
data: {
withdrawnAmount: newWithdrawnAmount,
lastUpdateTime: timestamp,
},
});

const existingEvent = await tx.streamEvent.findUnique({
where: { transactionHash_eventType: { transactionHash: event.txHash, eventType: 'WITHDRAWN' } },
select: { id: true },
});
if (existingEvent) {
logger.warn(`[SorobanWorker] Duplicate StreamEvent skipped: txHash=${event.txHash} type=WITHDRAWN`);
} else {
await tx.streamEvent.upsert({
where: { transactionHash_eventType: { transactionHash: event.txHash, eventType: 'WITHDRAWN' } },
create: {
streamId,
eventType: 'WITHDRAWN',
amount,
transactionHash: event.txHash,
ledgerSequence: event.ledger,
timestamp,
metadata: JSON.stringify({ recipient }),
},
update: {},
});
}
});

sseService.broadcastToStream(String(streamId), 'stream.withdrawn', {
Expand Down
59 changes: 59 additions & 0 deletions backend/tests/soroban-event-worker.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,65 @@ describe('SorobanEventWorker', () => {
);
});

it('applies a tokens_withdrawn event only once under indexer replay (Issue #802)', async () => {
const txHash = 'withdraw-tx-1';
const streamId = 7;

const mockEvent: rpc.Api.EventResponse = {
id: 'withdraw-event-1',
type: 'contract',
ledger: 2000,
ledgerClosedAt: '2024-06-01T00:00:00Z',
txHash,
transactionIndex: 0,
operationIndex: 0,
inSuccessfulContractCall: true,
topic: [
{ switch: () => ({ value: 0 }), sym: () => 'tokens_withdrawn' } as any,
{ switch: () => ({ value: 1 }), u64: () => ({ toString: () => streamId.toString() }) } as any,
],
value: {
switch: () => ({ value: 4 }),
map: () => [
{ key: () => ({ sym: () => 'recipient' }), val: () => ({ address: () => ({ switch: () => ({ value: 0 }), accountId: () => ({ ed25519: () => Buffer.alloc(32) }) }) }) },
{ key: () => ({ sym: () => 'amount' }), val: () => ({ i128: () => ({ hi: () => ({ toString: () => '0' }), lo: () => ({ toString: () => '500' }) }) }) },
{ key: () => ({ sym: () => 'timestamp' }), val: () => ({ u64: () => ({ toString: () => '1700000500' }) }) },
] as any,
} as any,
};

// Simulate persistent DB state across both runs of the same event.
let withdrawn = '1000';
let eventExists = false;

const mockTx = {
streamEvent: {
findUnique: vi.fn().mockImplementation(async () => (eventExists ? { id: 'evt-row' } : null)),
create: vi.fn().mockImplementation(async () => { eventExists = true; return { id: 'evt-row' }; }),
},
stream: {
findUniqueOrThrow: vi.fn().mockImplementation(async () => ({ withdrawnAmount: withdrawn })),
update: vi.fn().mockImplementation(async ({ data }: any) => { withdrawn = data.withdrawnAmount; return { streamId }; }),
},
};

(prisma.$transaction as ReturnType<typeof vi.fn>).mockImplementation((cb) => cb(mockTx));

// First processing applies the withdrawal: 1000 + 500 = 1500.
await (worker as any).handleTokensWithdrawn(mockEvent, mockEvent.topic![1]);
expect(withdrawn).toBe('1500');
expect(mockTx.stream.update).toHaveBeenCalledTimes(1);

// Admin replay re-polls the same ledger and re-emits the identical event.
await (worker as any).handleTokensWithdrawn(mockEvent, mockEvent.topic![1]);
expect(withdrawn).toBe('1500'); // unchanged — no double count
expect(mockTx.stream.update).toHaveBeenCalledTimes(1); // financial field not touched again
expect(mockTx.streamEvent.create).toHaveBeenCalledTimes(1);
expect(logger.warn).toHaveBeenCalledWith(
expect.stringContaining('Duplicate StreamEvent skipped')
);
});

it('should persist a zero-rate stream_created event without throwing', async () => {
const txHash = 'zero-rate-tx-hash';
const streamId = 77;
Expand Down
Loading