diff --git a/backend/tests/integration/stream-lifecycle.test.ts b/backend/tests/integration/stream-lifecycle.test.ts index ff1a5f1..9bde6ec 100644 --- a/backend/tests/integration/stream-lifecycle.test.ts +++ b/backend/tests/integration/stream-lifecycle.test.ts @@ -557,6 +557,72 @@ describe("Stream Lifecycle Integration Tests", () => { }); }); + describe("StreamEvent @@unique([transactionHash, eventType]) deduplication", () => { + it("rejects a duplicate (transactionHash, eventType) insert at the database level", async () => { + const streamId = 11; + const txHash = "unique-constraint-tx-hash"; + + await testPrisma.stream.create({ + data: { + streamId, + sender: SENDER, + recipient: RECIPIENT, + tokenAddress: TOKEN, + ratePerSecond: "10", + depositedAmount: "86400", + withdrawnAmount: "0", + startTime: 1700000000, + endTime: 1700000000 + 8640, + lastUpdateTime: 1700000000, + isActive: true, + isPaused: false, + }, + }); + + const eventData = { + streamId, + eventType: "CREATED", + transactionHash: txHash, + ledgerSequence: 12345, + timestamp: 1700000000, + }; + + await testPrisma.streamEvent.create({ data: eventData }); + + await expect( + testPrisma.streamEvent.create({ + data: { + ...eventData, + ledgerSequence: 12346, + timestamp: 1700000001, + }, + }), + ).rejects.toMatchObject({ code: "P2002" }); + + const count = await testPrisma.streamEvent.count({ + where: { transactionHash: txHash, eventType: "CREATED" }, + }); + expect(count).toBe(1); + }); + + it("dedupes worker replay of the same event without creating a second row", async () => { + const streamId = 12; + const txHash = "worker-replay-dedup-tx-hash"; + const event = { ...createStreamCreatedEvent(streamId), txHash }; + + await worker.processEvent(event); + await expect(worker.processEvent(event)).resolves.toBeUndefined(); + + const count = await testPrisma.streamEvent.count({ + where: { + transactionHash: event.txHash, + eventType: "CREATED", + }, + }); + expect(count).toBe(1); + }); + }); + describe("SSE client receives broadcast for each stream event", () => { let eventSource: EventSource; diff --git a/backend/tests/stream.repository.test.ts b/backend/tests/stream.repository.test.ts index a6297c0..8699661 100644 --- a/backend/tests/stream.repository.test.ts +++ b/backend/tests/stream.repository.test.ts @@ -23,6 +23,14 @@ describe('Stream Repository', () => { }); }); + it('should update isActive to false for COMPLETED', async () => { + await updateStatus(123, 'COMPLETED'); + expect(prisma.stream.update).toHaveBeenCalledWith({ + where: { streamId: 123 }, + data: { isActive: false }, + }); + }); + it('should update isActive to true for ACTIVE', async () => { await updateStatus(123, 'ACTIVE'); expect(prisma.stream.update).toHaveBeenCalledWith({