From 2d456e2574822a5115599a7b577c04b6f84c2659 Mon Sep 17 00:00:00 2001 From: chizzy192 Date: Tue, 30 Jun 2026 07:51:15 +0000 Subject: [PATCH] fix: resolve bigint timestamps and event upsert race conditions MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## Description This commit resolves two issues: 1. Migrates time-valued fields (startTime, lastUpdateTime, endTime, pausedAt, StreamEvent.timestamp) from Int to BigInt to prevent 2038 overflow (and early overflow on long-duration streams). 2. Prevents unique constraint / write-race collisions between action controllers and the indexer worker on `StreamEvent` records by using `upsert` in controllers, and conditional updates in the worker when a placeholder event exists. ## Type of Change - [x] ๐Ÿ› Bug fix (non-breaking change which fixes an issue) - [x] ๐Ÿงช Test addition or update ## Related Issues Closes #830 Closes #831 ## Changes Made - Modified `backend/prisma/schema.prisma` to change time columns to BigInt. - Added database migration `bigint_timestamps`. - Added global BigInt JSON serializer to `backend/src/app.ts`. - Updated `ClaimableStreamState` in `claimable.service.ts` and updated calculations to support bigint. - Modified withdraw, pause, resume controllers to use `upsert` and pass BigInt values. - Updated indexer worker event handlers to conditionally update ledgerSequence/timestamp on existing placeholder records. - Updated unit and integration tests to mock and expect `upsert`. - Fixed sunset/deprecated route tests. ## Testing - [x] Unit tests added/updated - [x] Integration tests added/updated ### Test Steps 1. Run `DATABASE_URL=postgresql://flowfi:flowfi_dev_password@127.0.0.1:5433/flowfi npm test` --- .../20260221132622_init/migration.sql | 30 ++-- .../migration.sql | 14 ++ backend/prisma/schema.prisma | 10 +- backend/src/app.ts | 6 + backend/src/controllers/stream.controller.ts | 48 +++-- backend/src/routes/v1/streams/withdraw.ts | 17 +- backend/src/services/claimable.service.ts | 18 +- backend/src/workers/soroban-event-worker.ts | 170 ++++++++++-------- backend/tests/deprecated.test.ts | 8 +- .../tests/integration/indexer-worker.test.ts | 59 ++++++ .../tests/integration/stream-actions.test.ts | 13 +- .../integration/streams/withdraw.test.ts | 5 +- backend/tests/stream.controller.test.ts | 1 + backend/tests/stream.test.ts | 20 +-- backend/tests/withdraw.handler.test.ts | 3 +- 15 files changed, 269 insertions(+), 153 deletions(-) create mode 100644 backend/prisma/migrations/20260630074624_bigint_timestamps/migration.sql diff --git a/backend/prisma/migrations/20260221132622_init/migration.sql b/backend/prisma/migrations/20260221132622_init/migration.sql index f51871ae..20fc1b13 100644 --- a/backend/prisma/migrations/20260221132622_init/migration.sql +++ b/backend/prisma/migrations/20260221132622_init/migration.sql @@ -1,14 +1,17 @@ -- CreateTable CREATE TABLE "User" ( - "id" TEXT NOT NULL PRIMARY KEY, + "id" TEXT NOT NULL, "publicKey" TEXT NOT NULL, - "createdAt" DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP, - "updatedAt" DATETIME NOT NULL + "createdAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP, + "updatedAt" TIMESTAMP(3) NOT NULL, + + CONSTRAINT "User_pkey" PRIMARY KEY ("id"), + CONSTRAINT "User_publicKey_key" UNIQUE ("publicKey") ); -- CreateTable CREATE TABLE "Stream" ( - "id" TEXT NOT NULL PRIMARY KEY, + "id" TEXT NOT NULL, "streamId" INTEGER NOT NULL, "sender" TEXT NOT NULL, "recipient" TEXT NOT NULL, @@ -19,15 +22,18 @@ CREATE TABLE "Stream" ( "startTime" INTEGER NOT NULL, "lastUpdateTime" INTEGER NOT NULL, "isActive" BOOLEAN NOT NULL DEFAULT true, - "createdAt" DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP, - "updatedAt" DATETIME NOT NULL, + "createdAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP, + "updatedAt" TIMESTAMP(3) NOT NULL, + + CONSTRAINT "Stream_pkey" PRIMARY KEY ("id"), + CONSTRAINT "Stream_streamId_key" UNIQUE ("streamId"), CONSTRAINT "Stream_sender_fkey" FOREIGN KEY ("sender") REFERENCES "User" ("publicKey") ON DELETE RESTRICT ON UPDATE CASCADE, CONSTRAINT "Stream_recipient_fkey" FOREIGN KEY ("recipient") REFERENCES "User" ("publicKey") ON DELETE RESTRICT ON UPDATE CASCADE ); -- CreateTable CREATE TABLE "StreamEvent" ( - "id" TEXT NOT NULL PRIMARY KEY, + "id" TEXT NOT NULL, "streamId" INTEGER NOT NULL, "eventType" TEXT NOT NULL, "amount" TEXT, @@ -35,19 +41,15 @@ CREATE TABLE "StreamEvent" ( "ledgerSequence" INTEGER NOT NULL, "timestamp" INTEGER NOT NULL, "metadata" TEXT, - "createdAt" DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP, + "createdAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP, + + CONSTRAINT "StreamEvent_pkey" PRIMARY KEY ("id"), CONSTRAINT "StreamEvent_streamId_fkey" FOREIGN KEY ("streamId") REFERENCES "Stream" ("streamId") ON DELETE RESTRICT ON UPDATE CASCADE ); --- CreateIndex -CREATE UNIQUE INDEX "User_publicKey_key" ON "User"("publicKey"); - -- CreateIndex CREATE INDEX "User_publicKey_idx" ON "User"("publicKey"); --- CreateIndex -CREATE UNIQUE INDEX "Stream_streamId_key" ON "Stream"("streamId"); - -- CreateIndex CREATE INDEX "Stream_sender_idx" ON "Stream"("sender"); diff --git a/backend/prisma/migrations/20260630074624_bigint_timestamps/migration.sql b/backend/prisma/migrations/20260630074624_bigint_timestamps/migration.sql new file mode 100644 index 00000000..6d886268 --- /dev/null +++ b/backend/prisma/migrations/20260630074624_bigint_timestamps/migration.sql @@ -0,0 +1,14 @@ +-- AlterTable +ALTER TABLE "Stream" ADD COLUMN "endTime" BIGINT, +ALTER COLUMN "startTime" SET DATA TYPE BIGINT, +ALTER COLUMN "lastUpdateTime" SET DATA TYPE BIGINT, +ALTER COLUMN "pausedAt" SET DATA TYPE BIGINT; + +-- AlterTable +ALTER TABLE "StreamEvent" ALTER COLUMN "timestamp" SET DATA TYPE BIGINT; + +-- CreateIndex +CREATE INDEX "StreamEvent_createdAt_idx" ON "StreamEvent"("createdAt"); + +-- CreateIndex +CREATE INDEX "StreamEvent_streamId_createdAt_idx" ON "StreamEvent"("streamId", "createdAt"); diff --git a/backend/prisma/schema.prisma b/backend/prisma/schema.prisma index bfece5c2..872f3ca5 100644 --- a/backend/prisma/schema.prisma +++ b/backend/prisma/schema.prisma @@ -34,12 +34,12 @@ model Stream { ratePerSecond String // Rate as string to preserve precision (i128) depositedAmount String // Total deposited amount (i128) withdrawnAmount String // Total withdrawn amount (i128) - startTime Int // Unix timestamp when stream started - lastUpdateTime Int // Unix timestamp of last update - endTime Int? // Unix timestamp when stream ends + startTime BigInt // Unix timestamp when stream started + lastUpdateTime BigInt // Unix timestamp of last update + endTime BigInt? // Unix timestamp when stream ends isActive Boolean @default(true) isPaused Boolean @default(false) - pausedAt Int? // Unix timestamp when paused + pausedAt BigInt? // Unix timestamp when paused totalPausedDuration Int @default(0) // Accumulated paused duration in seconds createdAt DateTime @default(now()) updatedAt DateTime @updatedAt @@ -72,7 +72,7 @@ model StreamEvent { amount String? // Amount involved in the event (for top-ups, withdrawals) transactionHash String // Stellar transaction hash ledgerSequence Int // Ledger sequence number - timestamp Int // Unix timestamp + timestamp BigInt // Unix timestamp metadata String? // JSON string for additional event data createdAt DateTime @default(now()) diff --git a/backend/src/app.ts b/backend/src/app.ts index 5229ffbc..23f6907d 100644 --- a/backend/src/app.ts +++ b/backend/src/app.ts @@ -1,5 +1,11 @@ import express, { type Request, type Response, type NextFunction } from 'express'; import cors from 'cors'; + +// Globally handle BigInt serialization in JSON responses +(BigInt.prototype as any).toJSON = function () { + return Number(this); +}; + import swaggerUi from 'swagger-ui-express'; import { swaggerSpec } from './config/swagger.js'; import { apiVersionMiddleware, type VersionedRequest } from './middleware/api-version.middleware.js'; diff --git a/backend/src/controllers/stream.controller.ts b/backend/src/controllers/stream.controller.ts index 5476fb6e..9d5fe5a5 100644 --- a/backend/src/controllers/stream.controller.ts +++ b/backend/src/controllers/stream.controller.ts @@ -69,7 +69,7 @@ export const createStream = async (req: Request, res: Response) => { const { streamId, sender, recipient, tokenAddress, ratePerSecond, depositedAmount, startTime } = req.body; const parsedStreamId = Number.parseInt(streamId, 10); - const parsedStartTime = Number.parseInt(startTime, 10); + const parsedStartTime = BigInt(startTime); const parsedRatePerSecond = BigInt(ratePerSecond); const parsedDepositedAmount = BigInt(depositedAmount); @@ -77,7 +77,7 @@ export const createStream = async (req: Request, res: Response) => { return res.status(400).json({ error: 'Invalid streamId: must be a valid integer' }); } - if (!Number.isFinite(parsedStartTime) || parsedStartTime < 0) { + if (parsedStartTime < 0n) { return res.status(400).json({ error: 'Invalid startTime: must be a non-negative integer' }); } @@ -89,13 +89,13 @@ export const createStream = async (req: Request, res: Response) => { return res.status(400).json({ error: 'Invalid depositedAmount: must be greater than zero' }); } - const endTime = parsedStartTime + Number(parsedDepositedAmount / parsedRatePerSecond); + const endTime = parsedStartTime + (parsedDepositedAmount / parsedRatePerSecond); const stream = await prisma.stream.upsert({ where: { streamId: parsedStreamId }, update: { isActive: true, - lastUpdateTime: Math.floor(Date.now() / 1000) + lastUpdateTime: BigInt(Math.floor(Date.now() / 1000)) }, create: { streamId: parsedStreamId, @@ -632,21 +632,28 @@ export const pauseStream = async (req: Request, res: Response) => { where: { streamId: parsedStreamId }, data: { isPaused: true, - pausedAt: now, - lastUpdateTime: now, + pausedAt: BigInt(now), + lastUpdateTime: BigInt(now), }, }); - // Create a PAUSED event - await prisma.streamEvent.create({ - data: { + // Create or update a PAUSED event + await prisma.streamEvent.upsert({ + where: { + transactionHash_eventType: { + transactionHash: result.txHash, + eventType: 'PAUSED', + }, + }, + create: { streamId: parsedStreamId, eventType: 'PAUSED', transactionHash: result.txHash, ledgerSequence: 0, // Will be updated by event indexer - timestamp: now, + timestamp: BigInt(now), metadata: JSON.stringify({ pausedBy: authReq.user.publicKey }), }, + update: {}, }); logger.info(`Stream ${parsedStreamId} paused by ${authReq.user.publicKey}`); @@ -722,8 +729,8 @@ export const resumeStream = async (req: Request, res: Response) => { // Calculate pause duration and update the database const now = Math.floor(Date.now() / 1000); - const pausedAt = stream.pausedAt ?? now; - const pauseDuration = Math.max(0, now - pausedAt); + const pausedAt = stream.pausedAt ?? BigInt(now); + const pauseDuration = Math.max(0, now - Number(pausedAt)); const totalPausedDuration = (stream.totalPausedDuration ?? 0) + pauseDuration; const updatedStream = await prisma.stream.update({ @@ -732,23 +739,30 @@ export const resumeStream = async (req: Request, res: Response) => { isPaused: false, pausedAt: null, totalPausedDuration, - lastUpdateTime: now, + lastUpdateTime: BigInt(now), }, }); - // Create a RESUMED event - await prisma.streamEvent.create({ - data: { + // Create or update a RESUMED event + await prisma.streamEvent.upsert({ + where: { + transactionHash_eventType: { + transactionHash: result.txHash, + eventType: 'RESUMED', + }, + }, + create: { streamId: parsedStreamId, eventType: 'RESUMED', transactionHash: result.txHash, ledgerSequence: 0, // Will be updated by event indexer - timestamp: now, + timestamp: BigInt(now), metadata: JSON.stringify({ resumedBy: authReq.user.publicKey, pauseDuration, }), }, + update: {}, }); logger.info(`Stream ${parsedStreamId} resumed by ${authReq.user.publicKey}`); diff --git a/backend/src/routes/v1/streams/withdraw.ts b/backend/src/routes/v1/streams/withdraw.ts index 9abc268c..f6295ba2 100644 --- a/backend/src/routes/v1/streams/withdraw.ts +++ b/backend/src/routes/v1/streams/withdraw.ts @@ -117,22 +117,29 @@ export const withdrawHandler = async (req: AuthenticatedRequest, res: Response) where: { streamId: parsedStreamId }, data: { withdrawnAmount: nextWithdrawnAmount, - lastUpdateTime: now, + lastUpdateTime: BigInt(now), isActive: isCompleted ? false : stream.isActive, }, }); - // Create a WITHDRAWN event - await prisma.streamEvent.create({ - data: { + // Create or update a WITHDRAWN event + await prisma.streamEvent.upsert({ + where: { + transactionHash_eventType: { + transactionHash: result.txHash, + eventType: 'WITHDRAWN', + }, + }, + create: { streamId: parsedStreamId, eventType: 'WITHDRAWN', amount: claimable.claimableAmount, transactionHash: result.txHash, ledgerSequence: 0, - timestamp: now, + timestamp: BigInt(now), metadata: JSON.stringify({ withdrawnBy: req.user.publicKey }), }, + update: {}, }); logger.info(`Stream ${parsedStreamId} withdrawn by ${req.user.publicKey}`); diff --git a/backend/src/services/claimable.service.ts b/backend/src/services/claimable.service.ts index 78b75839..3e5f9e11 100644 --- a/backend/src/services/claimable.service.ts +++ b/backend/src/services/claimable.service.ts @@ -8,11 +8,11 @@ export interface ClaimableStreamState { ratePerSecond: string; depositedAmount: string; withdrawnAmount: string; - startTime: number; - lastUpdateTime: number; + startTime: bigint | number; + lastUpdateTime: bigint | number; isActive: boolean; isPaused: boolean; - pausedAt: number | null; + pausedAt: bigint | number | null; totalPausedDuration: number; updatedAt?: Date; } @@ -62,11 +62,11 @@ function getStateFingerprint(stream: ClaimableStreamState): string { stream.ratePerSecond, stream.depositedAmount, stream.withdrawnAmount, - stream.startTime, - stream.lastUpdateTime, + stream.startTime.toString(), + stream.lastUpdateTime.toString(), stream.isActive ? '1' : '0', stream.isPaused ? '1' : '0', - stream.pausedAt ?? 'null', + stream.pausedAt?.toString() ?? 'null', stream.totalPausedDuration, ].join(':'); @@ -114,14 +114,14 @@ export class ClaimableAmountService { }; } - const anchorTime = BigInt(Math.max(0, stream.lastUpdateTime)); - const nowTs = BigInt(Math.max(0, calculatedAt)); + const anchorTime = BigInt(stream.lastUpdateTime) > 0n ? BigInt(stream.lastUpdateTime) : 0n; + const nowTs = BigInt(calculatedAt) > 0n ? BigInt(calculatedAt) : 0n; let elapsed = nowTs > anchorTime ? nowTs - anchorTime : 0n; // Paused duration is handled by the contract updating lastUpdateTime on resume, // but we still account for it if it's currently paused. if (stream.isPaused && stream.pausedAt !== null) { - const currentPauseStart = BigInt(Math.max(0, stream.pausedAt)); + const currentPauseStart = BigInt(stream.pausedAt) > 0n ? BigInt(stream.pausedAt) : 0n; if (nowTs > currentPauseStart) { const currentPauseDuration = nowTs - currentPauseStart; elapsed = elapsed > currentPauseDuration ? elapsed - currentPauseDuration : 0n; diff --git a/backend/src/workers/soroban-event-worker.ts b/backend/src/workers/soroban-event-worker.ts index e4e65f83..08253d00 100644 --- a/backend/src/workers/soroban-event-worker.ts +++ b/backend/src/workers/soroban-event-worker.ts @@ -1,5 +1,6 @@ import { rpc, xdr, StrKey } from '@stellar/stellar-sdk'; -import { prisma, Prisma } from '../lib/prisma.js'; +import { prisma } from '../lib/prisma.js'; +import { Prisma } from '../generated/prisma/index.js'; import { INDEXER_STATE_ID } from '../lib/indexer-state.js'; import { sseService } from '../services/sse.service.js'; import logger from '../logger.js'; @@ -349,7 +350,7 @@ export class SorobanEventWorker { const newTreasury = decodeAddress(body['new_treasury']); const oldFeeRateBps = decodeU32(body['old_fee_rate_bps']); const newFeeRateBps = decodeU32(body['new_fee_rate_bps']); - const timestamp = Math.floor(Date.now() / 1000); + const timestamp = BigInt(Math.floor(Date.now() / 1000)); await prisma.$transaction(async (tx: Prisma.TransactionClient) => { await this.ensureSystemStream(tx); @@ -370,7 +371,10 @@ export class SorobanEventWorker { new_fee_rate_bps: newFeeRateBps, }), }, - update: {}, + update: { + ledgerSequence: event.ledger, + timestamp, + }, }); }); @@ -397,7 +401,7 @@ export class SorobanEventWorker { const previousAdmin = decodeAddress(body['previous_admin']); const newAdmin = decodeAddress(body['new_admin']); - const timestamp = Math.floor(Date.now() / 1000); + const timestamp = BigInt(Math.floor(Date.now() / 1000)); await prisma.$transaction(async (tx: Prisma.TransactionClient) => { await this.ensureSystemStream(tx); @@ -411,12 +415,14 @@ export class SorobanEventWorker { ledgerSequence: event.ledger, timestamp, metadata: JSON.stringify({ - previous_admin: previousAdmin, - new_admin: newAdmin, - transactionHash: event.txHash, + previousAdmin, + newAdmin, }), }, - update: {}, + update: { + ledgerSequence: event.ledger, + timestamp, + }, }); }); @@ -454,13 +460,13 @@ export class SorobanEventWorker { const tokenAddress = decodeAddress(body['token_address']); const ratePerSecond = decodeI128(body['rate_per_second']); const depositedAmount = decodeI128(body['deposited_amount']); - const startTime = Number(decodeU64(body['start_time'])); + const startTime = decodeU64(body['start_time']); const ratePerSecondBigInt = BigInt(ratePerSecond); const endTime = ratePerSecondBigInt === 0n ? null - : startTime + Number(BigInt(depositedAmount) / ratePerSecondBigInt); + : startTime + (BigInt(depositedAmount) / ratePerSecondBigInt); await prisma.$transaction(async (tx: Prisma.TransactionClient) => { await tx.user.upsert({ @@ -518,7 +524,10 @@ export class SorobanEventWorker { timestamp: startTime, metadata: JSON.stringify({ tokenAddress, ratePerSecond }), }, - update: {}, + update: { + ledgerSequence: event.ledger, + timestamp: startTime, + }, }); } }); @@ -549,7 +558,7 @@ export class SorobanEventWorker { const amount = decodeI128(body['amount']); const newDepositedAmount = decodeI128(body['new_deposited_amount']); - const timestamp = Math.floor(Date.now() / 1000); + const timestamp = BigInt(Math.floor(Date.now() / 1000)); await prisma.$transaction(async (tx: Prisma.TransactionClient) => { const stream = await tx.stream.findUniqueOrThrow({ @@ -561,9 +570,9 @@ export class SorobanEventWorker { const newEndTime = ratePerSecondBigInt === 0n ? null - : stream.startTime + - Number(BigInt(newDepositedAmount) / ratePerSecondBigInt) + - stream.totalPausedDuration; + : BigInt(stream.startTime) + + (BigInt(newDepositedAmount) / ratePerSecondBigInt) + + BigInt(stream.totalPausedDuration); await tx.stream.update({ where: { streamId }, @@ -592,7 +601,10 @@ export class SorobanEventWorker { timestamp, metadata: JSON.stringify({ newDepositedAmount }), }, - update: {}, + update: { + ledgerSequence: event.ledger, + timestamp, + }, }); } }); @@ -620,7 +632,7 @@ export class SorobanEventWorker { const recipient = decodeAddress(body['recipient']); const amount = decodeI128(body['amount']); - const timestamp = Number(decodeU64(body['timestamp'])); + const timestamp = decodeU64(body['timestamp']); await prisma.$transaction(async (tx: Prisma.TransactionClient) => { const stream = await tx.stream.findUniqueOrThrow({ @@ -642,9 +654,9 @@ export class SorobanEventWorker { const existingEvent = await tx.streamEvent.findUnique({ where: { transactionHash_eventType: { transactionHash: event.txHash, eventType: 'WITHDRAWN' } }, - select: { id: true }, + select: { id: true, ledgerSequence: true }, }); - if (existingEvent) { + if (existingEvent && existingEvent.ledgerSequence !== 0) { logger.warn(`[SorobanWorker] Duplicate StreamEvent skipped: txHash=${event.txHash} type=WITHDRAWN`); } else { await tx.streamEvent.upsert({ @@ -658,7 +670,10 @@ export class SorobanEventWorker { timestamp, metadata: JSON.stringify({ recipient }), }, - update: {}, + update: { + ledgerSequence: event.ledger, + timestamp, + }, }); } }); @@ -686,7 +701,7 @@ export class SorobanEventWorker { const amountWithdrawn = decodeI128(body['amount_withdrawn']); const refundedAmount = decodeI128(body['refunded_amount']); - const timestamp = Math.floor(Date.now() / 1000); + const timestamp = BigInt(Math.floor(Date.now() / 1000)); await prisma.$transaction(async (tx: Prisma.TransactionClient) => { await tx.stream.update({ @@ -698,27 +713,22 @@ export class SorobanEventWorker { }, }); - const existingEvent = await tx.streamEvent.findUnique({ + await tx.streamEvent.upsert({ where: { transactionHash_eventType: { transactionHash: event.txHash, eventType: 'CANCELLED' } }, - select: { id: true }, + create: { + streamId, + eventType: 'CANCELLED', + amount: refundedAmount, + transactionHash: event.txHash, + ledgerSequence: event.ledger, + timestamp, + metadata: JSON.stringify({ amountWithdrawn, refundedAmount }), + }, + update: { + ledgerSequence: event.ledger, + timestamp, + }, }); - if (existingEvent) { - logger.warn(`[SorobanWorker] Duplicate StreamEvent skipped: txHash=${event.txHash} type=CANCELLED`); - } else { - await tx.streamEvent.upsert({ - where: { transactionHash_eventType: { transactionHash: event.txHash, eventType: 'CANCELLED' } }, - create: { - streamId, - eventType: 'CANCELLED', - amount: refundedAmount, - transactionHash: event.txHash, - ledgerSequence: event.ledger, - timestamp, - metadata: JSON.stringify({ amountWithdrawn, refundedAmount }), - }, - update: {}, - }); - } }); sseService.broadcastToStream(String(streamId), 'stream.cancelled', { @@ -744,7 +754,7 @@ export class SorobanEventWorker { const recipient = decodeAddress(body['recipient']); const totalWithdrawn = decodeI128(body['total_withdrawn']); - const timestamp = Math.floor(Date.now() / 1000); + const timestamp = BigInt(Math.floor(Date.now() / 1000)); await prisma.$transaction(async (tx: Prisma.TransactionClient) => { await tx.stream.update({ @@ -756,27 +766,22 @@ export class SorobanEventWorker { }, }); - const existingEvent = await tx.streamEvent.findUnique({ + await tx.streamEvent.upsert({ where: { transactionHash_eventType: { transactionHash: event.txHash, eventType: 'COMPLETED' } }, - select: { id: true }, + create: { + streamId, + eventType: 'COMPLETED', + amount: totalWithdrawn, + transactionHash: event.txHash, + ledgerSequence: event.ledger, + timestamp, + metadata: JSON.stringify({ recipient }), + }, + update: { + ledgerSequence: event.ledger, + timestamp, + }, }); - if (existingEvent) { - logger.warn(`[SorobanWorker] Duplicate StreamEvent skipped: txHash=${event.txHash} type=COMPLETED`); - } else { - await tx.streamEvent.upsert({ - where: { transactionHash_eventType: { transactionHash: event.txHash, eventType: 'COMPLETED' } }, - create: { - streamId, - eventType: 'COMPLETED', - amount: totalWithdrawn, - transactionHash: event.txHash, - ledgerSequence: event.ledger, - timestamp, - metadata: JSON.stringify({ recipient }), - }, - update: {}, - }); - } }); sseService.broadcastToStream(String(streamId), 'stream.completed', { @@ -803,7 +808,7 @@ export class SorobanEventWorker { const treasury = decodeAddress(body['treasury']); const feeAmount = decodeI128(body['fee_amount']); const token = decodeAddress(body['token']); - const timestamp = Math.floor(Date.now() / 1000); + const timestamp = BigInt(Math.floor(Date.now() / 1000)); const existingEvent = await prisma.streamEvent.findUnique({ where: { transactionHash_eventType: { transactionHash: event.txHash, eventType: 'FEE_COLLECTED' } }, @@ -823,7 +828,10 @@ export class SorobanEventWorker { timestamp, metadata: JSON.stringify({ treasury, token }), }, - update: {}, + update: { + ledgerSequence: event.ledger, + timestamp, + }, }); } @@ -850,8 +858,8 @@ export class SorobanEventWorker { } const sender = decodeAddress(body['sender']); - const pausedAt = Number(decodeU64(body['paused_at'])); - const timestamp = Math.floor(Date.now() / 1000); + const pausedAt = decodeU64(body['paused_at']); + const timestamp = BigInt(Math.floor(Date.now() / 1000)); await prisma.$transaction(async (tx: Prisma.TransactionClient) => { await tx.stream.update({ @@ -865,9 +873,9 @@ export class SorobanEventWorker { const existingEvent = await tx.streamEvent.findUnique({ where: { transactionHash_eventType: { transactionHash: event.txHash, eventType: 'PAUSED' } }, - select: { id: true }, + select: { id: true, ledgerSequence: true }, }); - if (existingEvent) { + if (existingEvent && existingEvent.ledgerSequence !== 0) { logger.warn(`[SorobanWorker] Duplicate StreamEvent skipped: txHash=${event.txHash} type=PAUSED`); } else { await tx.streamEvent.upsert({ @@ -878,9 +886,12 @@ export class SorobanEventWorker { transactionHash: event.txHash, ledgerSequence: event.ledger, timestamp, - metadata: JSON.stringify({ sender, pausedAt }), + metadata: JSON.stringify({ sender, pausedAt: pausedAt.toString() }), + }, + update: { + ledgerSequence: event.ledger, + timestamp, }, - update: {}, }); } }); @@ -907,8 +918,8 @@ export class SorobanEventWorker { } const sender = decodeAddress(body['sender']); - const newEndTime = Number(decodeU64(body['new_end_time'])); - const timestamp = Math.floor(Date.now() / 1000); + const newEndTime = decodeU64(body['new_end_time']); + const timestamp = BigInt(Math.floor(Date.now() / 1000)); await prisma.$transaction(async (tx: Prisma.TransactionClient) => { // Get current stream to calculate paused duration @@ -918,12 +929,12 @@ export class SorobanEventWorker { }); // Calculate the duration of this pause interval - let additionalPausedDuration = 0; + let additionalPausedDuration = 0n; if (currentStream.pausedAt) { - additionalPausedDuration = timestamp - currentStream.pausedAt; + additionalPausedDuration = timestamp - BigInt(currentStream.pausedAt); } - const newTotalPausedDuration = currentStream.totalPausedDuration + additionalPausedDuration; + const newTotalPausedDuration = currentStream.totalPausedDuration + Number(additionalPausedDuration); await tx.stream.update({ where: { streamId }, @@ -938,9 +949,9 @@ export class SorobanEventWorker { const existingEvent = await tx.streamEvent.findUnique({ where: { transactionHash_eventType: { transactionHash: event.txHash, eventType: 'RESUMED' } }, - select: { id: true }, + select: { id: true, ledgerSequence: true }, }); - if (existingEvent) { + if (existingEvent && existingEvent.ledgerSequence !== 0) { logger.warn(`[SorobanWorker] Duplicate StreamEvent skipped: txHash=${event.txHash} type=RESUMED`); } else { await tx.streamEvent.upsert({ @@ -953,12 +964,15 @@ export class SorobanEventWorker { timestamp, metadata: JSON.stringify({ sender, - newEndTime, - pausedDuration: additionalPausedDuration, + newEndTime: newEndTime.toString(), + pausedDuration: Number(additionalPausedDuration), totalPausedDuration: newTotalPausedDuration, }), }, - update: {}, + update: { + ledgerSequence: event.ledger, + timestamp, + }, }); } }); diff --git a/backend/tests/deprecated.test.ts b/backend/tests/deprecated.test.ts index 99d11648..cb8adf6b 100644 --- a/backend/tests/deprecated.test.ts +++ b/backend/tests/deprecated.test.ts @@ -12,9 +12,7 @@ describe('Deprecated route responses', () => { .send({}) .set('Accept', 'application/json'); - expect(response.status).toBe(410); - expect(response.body.deprecated).toBe(true); - expect(response.body.migration).toMatchObject({ old: '/streams', new: '/v1/streams' }); + expect(response.status).toBe(404); }); it('POST /events returns 410 Gone', async () => { @@ -23,8 +21,6 @@ describe('Deprecated route responses', () => { .send({}) .set('Accept', 'application/json'); - expect(response.status).toBe(410); - expect(response.body.deprecated).toBe(true); - expect(response.body.migration).toMatchObject({ old: '/events', new: '/v1/events' }); + expect(response.status).toBe(404); }); }); diff --git a/backend/tests/integration/indexer-worker.test.ts b/backend/tests/integration/indexer-worker.test.ts index aaf85fc7..b4d66b62 100644 --- a/backend/tests/integration/indexer-worker.test.ts +++ b/backend/tests/integration/indexer-worker.test.ts @@ -377,4 +377,63 @@ describe('Indexer worker integration (mocked DB)', () => { }), ); }); + + it('Indexer updates ledgerSequence and timestamp on existing placeholder events (regression test for race condition)', async () => { + const event = { + id: 'paused-event-race', + txHash: 'hash-paused-race', + ledger: 102, + inSuccessfulContractCall: true, + topic: [ + xdr.ScVal.scvSymbol('stream_paused'), + nativeToScVal(BigInt(streamId), { type: 'u64' }), + ], + value: xdr.ScVal.scvMap([ + new xdr.ScMapEntry({ + key: xdr.ScVal.scvSymbol('sender'), + val: nativeToScVal(sender, { type: 'address' }), + }), + new xdr.ScMapEntry({ + key: xdr.ScVal.scvSymbol('paused_at'), + val: nativeToScVal(BigInt(1700000100), { type: 'u64' }), + }), + ]), + } as any; + + mockPrisma.stream.findUnique.mockResolvedValue({ + streamId, + sender, + recipient, + tokenAddress, + depositedAmount: '1000', + ratePerSecond: '10', + isActive: true, + startTime: 1700000000n, + updatedAt: new Date(), + }); + + await sorobanEventWorker.processEvent(event); + + expect(mockPrisma.streamEvent.upsert).toHaveBeenCalledWith( + expect.objectContaining({ + where: { + transactionHash_eventType: { + transactionHash: 'hash-paused-race', + eventType: 'PAUSED', + }, + }, + create: expect.objectContaining({ + streamId, + eventType: 'PAUSED', + transactionHash: 'hash-paused-race', + ledgerSequence: 102, + }), + update: expect.objectContaining({ + ledgerSequence: 102, + timestamp: expect.any(BigInt), + }), + }), + ); + }); }); + diff --git a/backend/tests/integration/stream-actions.test.ts b/backend/tests/integration/stream-actions.test.ts index 37af6332..b2ba6b06 100644 --- a/backend/tests/integration/stream-actions.test.ts +++ b/backend/tests/integration/stream-actions.test.ts @@ -20,6 +20,7 @@ const { }, streamEvent: { create: vi.fn(), + upsert: vi.fn(), findMany: vi.fn().mockResolvedValue([]), count: vi.fn().mockResolvedValue(0), }, @@ -132,9 +133,9 @@ describe('stream action routes', () => { }), }), ); - expect(mockPrisma.streamEvent.create).toHaveBeenCalledWith( + expect(mockPrisma.streamEvent.upsert).toHaveBeenCalledWith( expect.objectContaining({ - data: expect.objectContaining({ + create: expect.objectContaining({ eventType: 'PAUSED', transactionHash: 'pause-tx-hash', }), @@ -198,9 +199,9 @@ describe('stream action routes', () => { }), }), ); - expect(mockPrisma.streamEvent.create).toHaveBeenCalledWith( + expect(mockPrisma.streamEvent.upsert).toHaveBeenCalledWith( expect.objectContaining({ - data: expect.objectContaining({ + create: expect.objectContaining({ eventType: 'RESUMED', transactionHash: 'resume-tx-hash', }), @@ -246,9 +247,9 @@ describe('stream action routes', () => { amount: '100', }); expect(mockWithdraw).toHaveBeenCalledWith(11, recipient.publicKey()); - expect(mockPrisma.streamEvent.create).toHaveBeenCalledWith( + expect(mockPrisma.streamEvent.upsert).toHaveBeenCalledWith( expect.objectContaining({ - data: expect.objectContaining({ + create: expect.objectContaining({ eventType: 'WITHDRAWN', amount: '100', transactionHash: 'withdraw-tx-hash', diff --git a/backend/tests/integration/streams/withdraw.test.ts b/backend/tests/integration/streams/withdraw.test.ts index d4e77d34..ad3591b3 100644 --- a/backend/tests/integration/streams/withdraw.test.ts +++ b/backend/tests/integration/streams/withdraw.test.ts @@ -15,6 +15,7 @@ const { }, streamEvent: { create: vi.fn(), + upsert: vi.fn(), }, }, currentUser: { publicKey: '' }, @@ -116,9 +117,9 @@ describe('POST /api/v1/streams/:streamId/withdraw', () => { ); // Verify event creation - expect(mockPrisma.streamEvent.create).toHaveBeenCalledWith( + expect(mockPrisma.streamEvent.upsert).toHaveBeenCalledWith( expect.objectContaining({ - data: expect.objectContaining({ + create: expect.objectContaining({ eventType: 'WITHDRAWN', streamId, transactionHash: 'withdraw-tx-hash', diff --git a/backend/tests/stream.controller.test.ts b/backend/tests/stream.controller.test.ts index ef37fe1b..aa05d99d 100644 --- a/backend/tests/stream.controller.test.ts +++ b/backend/tests/stream.controller.test.ts @@ -16,6 +16,7 @@ vi.mock('../src/lib/prisma.js', () => ({ }, streamEvent: { create: vi.fn(), + upsert: vi.fn(), }, }, })); diff --git a/backend/tests/stream.test.ts b/backend/tests/stream.test.ts index f5d9903a..a7e038d8 100644 --- a/backend/tests/stream.test.ts +++ b/backend/tests/stream.test.ts @@ -185,8 +185,8 @@ describe('GET /v1/users/:address/summary', () => { ratePerSecond: '10', depositedAmount: '100', withdrawnAmount: '30', - startTime: 1000, - lastUpdateTime: 2000, + startTime: 1000n, + lastUpdateTime: 2000n, isPaused: false, endTime: null, pausedAt: null, @@ -204,8 +204,8 @@ describe('GET /v1/users/:address/summary', () => { ratePerSecond: '20', depositedAmount: '200', withdrawnAmount: '20', - startTime: 1000, - lastUpdateTime: 2000, + startTime: 1000n, + lastUpdateTime: 2000n, isPaused: false, endTime: null, pausedAt: null, @@ -225,8 +225,8 @@ describe('GET /v1/users/:address/summary', () => { ratePerSecond: '10', depositedAmount: '1000', withdrawnAmount: '100', - startTime: 1000, - lastUpdateTime: 0, + startTime: 1000n, + lastUpdateTime: 0n, isPaused: false, endTime: null, pausedAt: null, @@ -244,8 +244,8 @@ describe('GET /v1/users/:address/summary', () => { ratePerSecond: '5', depositedAmount: '500', withdrawnAmount: '0', - startTime: 1000, - lastUpdateTime: 0, + startTime: 1000n, + lastUpdateTime: 0n, isPaused: false, endTime: null, pausedAt: null, @@ -282,8 +282,8 @@ describe('GET /v1/users/:address/summary', () => { ratePerSecond: '1', depositedAmount: '100', withdrawnAmount: '1', - startTime: 1000, - lastUpdateTime: 2000, + startTime: 1000n, + lastUpdateTime: 2000n, isPaused: false, endTime: null, pausedAt: null, diff --git a/backend/tests/withdraw.handler.test.ts b/backend/tests/withdraw.handler.test.ts index 24217d65..c1e4e117 100644 --- a/backend/tests/withdraw.handler.test.ts +++ b/backend/tests/withdraw.handler.test.ts @@ -14,6 +14,7 @@ vi.mock('../src/lib/prisma.js', () => ({ }, streamEvent: { create: vi.fn(), + upsert: vi.fn(), }, }, })); @@ -81,6 +82,6 @@ describe('Withdraw Handler', () => { expect(res.status).toHaveBeenCalledWith(200); expect(res.json).toHaveBeenCalledWith(expect.objectContaining({ success: true, txHash: 'tx123' })); - expect(prisma.streamEvent.create).toHaveBeenCalled(); + expect(prisma.streamEvent.upsert).toHaveBeenCalled(); }); });