diff --git a/prisma/schema/schema.prisma b/prisma/schema/schema.prisma index 2be1739..add68a8 100644 --- a/prisma/schema/schema.prisma +++ b/prisma/schema/schema.prisma @@ -7,7 +7,7 @@ generator client { provider = "prisma-client-js" output = "../../node_modules/.prisma/client" - previewFeatures = ["prismaSchemaFolder"] + previewFeatures = ["prismaSchemaFolder", "metrics"] } datasource db { diff --git a/src/config.schema.ts b/src/config.schema.ts index 3d23ea7..90f7358 100644 --- a/src/config.schema.ts +++ b/src/config.schema.ts @@ -98,6 +98,8 @@ export const envSchema = z .positive() .default(300000), SLOW_QUERY_THRESHOLD_MS: z.coerce.number().int().positive().default(500), + DB_POOL_WAIT_WARN_MS: z.coerce.number().int().positive().default(500), + DB_POOL_WAIT_ERROR_MS: z.coerce.number().int().positive().default(2000), CREATOR_LIST_SLOW_QUERY_THRESHOLD_MS: z.coerce .number() .int() diff --git a/src/modules/wallets/__tests__/wallet-holdings.integration.test.ts b/src/modules/wallets/__tests__/wallet-holdings.integration.test.ts index 3159b9f..13c4dc5 100644 --- a/src/modules/wallets/__tests__/wallet-holdings.integration.test.ts +++ b/src/modules/wallets/__tests__/wallet-holdings.integration.test.ts @@ -13,8 +13,8 @@ import { HoldingEntry } from '../wallet-holdings.schemas'; const VALID_ADDRESS = 'GAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA'; const MALFORMED_ADDRESS = 'not-a-stellar-address'; -function makeReq(params: Record = {}): any { - return { params }; +function makeReq(params: Record = {}, query: Record = {}): any { + return { params, query }; } function makeRes(): any { @@ -47,7 +47,7 @@ describe('GET /wallets/:address/holdings', () => { jest.restoreAllMocks(); }); - it('returns 200 with items and total for a wallet with holdings', async () => { + it('returns 200 with items and meta for a wallet with holdings', async () => { const holdings: HoldingEntry[] = [ makeHolding({ creator_id: 'creator-1', creator_handle: 'alice', key_count: '5' }), makeHolding({ creator_id: 'creator-2', creator_handle: 'bob', key_count: '3' }), @@ -62,7 +62,7 @@ describe('GET /wallets/:address/holdings', () => { const body = res.json.mock.calls[0][0]; expect(body.success).toBe(true); expect(body.data.items).toHaveLength(2); - expect(body.data.total).toBe(2); + expect(body.data.meta.total).toBe(2); }); it('each holding includes required fields', async () => { @@ -98,7 +98,7 @@ describe('GET /wallets/:address/holdings', () => { expect(res.status).toHaveBeenCalledWith(200); const body = res.json.mock.calls[0][0]; expect(body.data.items).toEqual([]); - expect(body.data.total).toBe(0); + expect(body.data.meta.total).toBe(0); }); it('returns 400 for a malformed Stellar address', async () => { diff --git a/src/modules/wallets/wallet-holdings-pagination.integration.test.ts b/src/modules/wallets/wallet-holdings-pagination.integration.test.ts new file mode 100644 index 0000000..6bbb884 --- /dev/null +++ b/src/modules/wallets/wallet-holdings-pagination.integration.test.ts @@ -0,0 +1,102 @@ +import supertest from 'supertest'; +import app from '../../app'; +import { prisma } from '../../utils/prisma.utils'; + +const PAGE_SIZE = 20; +const TOTAL_HOLDINGS = 50; +const TEST_WALLET_ADDRESS = 'GAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA'; + +describe('GET /api/v1/wallets/:address/holdings pagination', () => { + let creatorIds: string[] = []; + + beforeAll(async () => { + // Create user + const user = await prisma.user.create({ + data: { + id: 'wallet-holdings-pag-test-user', + email: 'wallet-holdings-pag@example.com', + passwordHash: 'dummy-hash', + firstName: 'Wallet', + lastName: 'HoldingsPagTest', + }, + }); + + // Create creators + const creators = await Promise.all( + Array.from({ length: TOTAL_HOLDINGS }).map((_, i) => + prisma.creatorProfile.create({ + data: { + userId: user.id, + handle: `creator-${i}`, + displayName: `Creator ${i}`, + }, + }) + ) + ); + creatorIds = creators.map((c) => c.id); + + // Create key ownerships for the test wallet + await prisma.keyOwnership.createMany({ + data: creatorIds.map((creatorId, i) => ({ + ownerAddress: TEST_WALLET_ADDRESS, + creatorId: creatorId, + balance: TOTAL_HOLDINGS - i, + createdAt: new Date(`2026-06-${String((i % 28) + 1).padStart(2, '0')}T00:00:00.000Z`), + })), + }); + }); + + afterAll(async () => { + // Cleanup + await prisma.keyOwnership.deleteMany({ + where: { ownerAddress: TEST_WALLET_ADDRESS }, + }); + await prisma.creatorProfile.deleteMany({ + where: { id: { in: creatorIds } }, + }); + await prisma.user.delete({ + where: { id: 'wallet-holdings-pag-test-user' }, + }); + await prisma.$disconnect(); + }); + + it('paginates correctly across multiple pages with no duplicates and all items present', async () => { + const allPageItems: string[][] = []; + let offset = 0; + let hasMore = true; + + while (hasMore) { + const res = await supertest(app) + .get(`/api/v1/wallets/${TEST_WALLET_ADDRESS}/holdings?limit=${PAGE_SIZE}&offset=${offset}`); + + expect(res.status).toBe(200); + expect(res.body.success).toBe(true); + + const items = res.body.data.items; + const meta = res.body.data.meta; + + allPageItems.push(items.map((item: any) => item.creator_id)); + hasMore = meta.hasMore; + offset += items.length; + } + + const seenAcrossAllPages = allPageItems.flat(); + expect(new Set(seenAcrossAllPages).size).toBe(TOTAL_HOLDINGS); + expect(seenAcrossAllPages.length).toBe(TOTAL_HOLDINGS); + + // Check no duplicates between pages + const page1 = allPageItems[0]; + const page2 = allPageItems[1]; + const overlap = page1.filter((id) => page2.includes(id)); + expect(overlap.length).toBe(0); + }); + + it('final page returns hasMore: false', async () => { + const res = await supertest(app) + .get(`/api/v1/wallets/${TEST_WALLET_ADDRESS}/holdings?limit=${PAGE_SIZE}&offset=${40}`); + + expect(res.status).toBe(200); + expect(res.body.success).toBe(true); + expect(res.body.data.meta.hasMore).toBe(false); + }); +}); diff --git a/src/modules/wallets/wallet-holdings.controllers.ts b/src/modules/wallets/wallet-holdings.controllers.ts index 4e999aa..e43e095 100644 --- a/src/modules/wallets/wallet-holdings.controllers.ts +++ b/src/modules/wallets/wallet-holdings.controllers.ts @@ -1,7 +1,8 @@ import { Request, Response, NextFunction } from 'express'; -import { WalletHoldingsParamsSchema } from './wallet-holdings.schemas'; +import { WalletHoldingsParamsSchema, WalletHoldingsQuerySchema } from './wallet-holdings.schemas'; import { fetchWalletHoldings } from './wallet-holdings.service'; import { sendSuccess, sendValidationError } from '../../utils/api-response.utils'; +import { buildOffsetPaginationMeta } from '../../utils/pagination.utils'; export async function httpGetWalletHoldings( req: Request, @@ -22,9 +23,31 @@ export async function httpGetWalletHoldings( return; } - const [items, total] = await fetchWalletHoldings(parsedParams.data.address); + const parsedQuery = WalletHoldingsQuerySchema.safeParse(req.query); + if (!parsedQuery.success) { + sendValidationError( + res, + 'Invalid query parameters', + parsedQuery.error.issues.map((issue: { path: (string | number)[]; message: string }) => ({ + field: issue.path.join('.'), + message: issue.message, + })) + ); + return; + } + + const [items, total] = await fetchWalletHoldings( + parsedParams.data.address, + parsedQuery.data + ); + + const meta = buildOffsetPaginationMeta({ + limit: parsedQuery.data.limit, + offset: parsedQuery.data.offset, + total, + }); - sendSuccess(res, { items, total }); + sendSuccess(res, { items, meta }); } catch (error) { next(error); } diff --git a/src/modules/wallets/wallet-holdings.schemas.ts b/src/modules/wallets/wallet-holdings.schemas.ts index 9524d7e..5303185 100644 --- a/src/modules/wallets/wallet-holdings.schemas.ts +++ b/src/modules/wallets/wallet-holdings.schemas.ts @@ -1,11 +1,32 @@ import { z } from 'zod'; import { StellarAddressSchema } from '../wallet/wallet.schemas'; +import { safeIntParam } from '../../utils/query.utils'; +import { MIN_PAGE_SIZE, MAX_PAGE_SIZE } from '../../constants/pagination.constants'; +import { PUBLIC_OFFSET_PAGINATION_DEFAULTS } from '../../utils/public-list-query-defaults'; export const WalletHoldingsParamsSchema = z.object({ address: StellarAddressSchema, }); +export const WalletHoldingsQuerySchema = z + .object({ + limit: safeIntParam({ + defaultValue: PUBLIC_OFFSET_PAGINATION_DEFAULTS.limit, + min: MIN_PAGE_SIZE, + max: MAX_PAGE_SIZE, + label: 'Limit', + }), + offset: safeIntParam({ + defaultValue: PUBLIC_OFFSET_PAGINATION_DEFAULTS.offset, + min: 0, + max: Number.MAX_SAFE_INTEGER, + label: 'Offset', + }), + }) + .strict(); + export type WalletHoldingsParamsType = z.infer; +export type WalletHoldingsQueryType = z.infer; export const HoldingEntrySchema = z.object({ creator_id: z.string(), diff --git a/src/modules/wallets/wallet-holdings.service.ts b/src/modules/wallets/wallet-holdings.service.ts index 16c8cd4..2c4e13a 100644 --- a/src/modules/wallets/wallet-holdings.service.ts +++ b/src/modules/wallets/wallet-holdings.service.ts @@ -1,6 +1,6 @@ import { prisma } from '../../utils/prisma.utils'; import { isValidStellarAddress } from '../wallet/wallet.utils'; -import { HoldingEntry } from './wallet-holdings.schemas'; +import { HoldingEntry, WalletHoldingsQueryType } from './wallet-holdings.schemas'; /** * Fetches all creator key holdings for a given Stellar wallet address. @@ -14,7 +14,8 @@ import { HoldingEntry } from './wallet-holdings.schemas'; * - total_value (null — not calculated server-side; consumers derive it from key_count * current_price) */ export async function fetchWalletHoldings( - address: string + address: string, + query?: WalletHoldingsQueryType ): Promise<[HoldingEntry[], number]> { if (!isValidStellarAddress(address)) { const err = Object.assign( @@ -83,5 +84,10 @@ export async function fetchWalletHoldings( return valB - valA; }); - return [items, total]; + // Apply pagination + const limit = query?.limit ?? 20; + const offset = query?.offset ?? 0; + const paginatedItems = items.slice(offset, offset + limit); + + return [paginatedItems, total]; } diff --git a/src/modules/webhooks/webhook.service.ts b/src/modules/webhooks/webhook.service.ts index 1580c0f..b679129 100644 --- a/src/modules/webhooks/webhook.service.ts +++ b/src/modules/webhooks/webhook.service.ts @@ -121,6 +121,7 @@ async function attemptDelivery( try { const controller = new AbortController(); const timeout = setTimeout(() => controller.abort(), 5000); + const startTime = Date.now(); const response = await fetch(callbackUrl, { method: 'POST', @@ -129,6 +130,8 @@ async function attemptDelivery( signal: controller.signal, }); + const endTime = Date.now(); + const responseTimeMs = endTime - startTime; clearTimeout(timeout); if (response.ok) { @@ -136,6 +139,18 @@ async function attemptDelivery( where: { webhookId, status: 'PENDING' }, data: { status: 'DELIVERED', retryCount: attempt }, }); + + logger.info( + { + webhook_id: webhookId, + creator_id: payload.creator_id, + event_type: payload.event_type, + response_status: response.status, + response_time_ms: responseTimeMs, + delivered_at: new Date().toISOString(), + }, + 'Webhook delivery succeeded' + ); return; } diff --git a/src/utils/prisma.utils.ts b/src/utils/prisma.utils.ts index ae42602..e9d098b 100644 --- a/src/utils/prisma.utils.ts +++ b/src/utils/prisma.utils.ts @@ -6,7 +6,7 @@ import { logger } from './logger.utils'; // Use global variable to prevent multiple instances in development declare global { - var prisma: any | undefined; + var prisma: any | undefined; } /** @@ -14,23 +14,23 @@ declare global { * resulting structure identifies the query pattern without exposing any values. */ function normalizeArgsForFingerprint(value: unknown, depth = 0): unknown { - if (depth > 8) return '?'; - if (value === null || value === undefined) return value; - if (Array.isArray(value)) { - return value.map((item) => normalizeArgsForFingerprint(item, depth + 1)); - } - if (typeof value === 'object') { - const sorted = Object.keys(value as object).sort(); - const result: Record = {}; - for (const key of sorted) { - result[key] = normalizeArgsForFingerprint( - (value as Record)[key], - depth + 1 - ); - } - return result; - } - return '?'; + if (depth > 8) return '?'; + if (value === null || value === undefined) return value; + if (Array.isArray(value)) { + return value.map((item) => normalizeArgsForFingerprint(item, depth + 1)); + } + if (typeof value === 'object') { + const sorted = Object.keys(value as object).sort(); + const result: Record = {}; + for (const key of sorted) { + result[key] = normalizeArgsForFingerprint( + (value as Record)[key], + depth + 1 + ); + } + return result; + } + return '?'; } /** @@ -38,85 +38,131 @@ function normalizeArgsForFingerprint(value: unknown, depth = 0): unknown { * operation, and arg structure) without including any parameter values. */ function buildQueryFingerprint( - model: string | undefined, - operation: string, - args: unknown + model: string | undefined, + operation: string, + args: unknown ): string { - const normalized = { - model: model ?? 'unknown', - operation, - args: normalizeArgsForFingerprint(args), - }; - return createHash('sha256') - .update(JSON.stringify(normalized)) - .digest('hex') - .slice(0, 16); + const normalized = { + model: model ?? 'unknown', + operation, + args: normalizeArgsForFingerprint(args), + }; + return createHash('sha256') + .update(JSON.stringify(normalized)) + .digest('hex') + .slice(0, 16); } const basePrisma = new PrismaClient({ - log: - envConfig.MODE === 'development' - ? ['query', 'error', 'warn'] - : ['error'], - datasourceUrl: envConfig.DATABASE_URL, + log: + envConfig.MODE === 'development' + ? ['query', 'error', 'warn'] + : ['error'], + datasourceUrl: envConfig.DATABASE_URL, }); -// Extend Prisma with query timeout and slow-query detection +// Track connection pool metrics and log when wait thresholds are exceeded +// We'll use a simple approach: count active queries as a proxy for pool usage +let activeQueries = 0; +const queryStartTimes = new Map(); + +// Extend Prisma with query timeout, slow-query detection, and pool wait tracking export const prisma = basePrisma.$extends({ - query: { - $allOperations({ operation, model, args, query }) { - const timeoutMs = envConfig.DB_QUERY_TIMEOUT_MS; - const slowThresholdMs = envConfig.SLOW_QUERY_THRESHOLD_MS; - const context = requestContextStorage.getStore(); + query: { + $allOperations({ operation, model, args, query }) { + const timeoutMs = envConfig.DB_QUERY_TIMEOUT_MS; + const slowThresholdMs = envConfig.SLOW_QUERY_THRESHOLD_MS; + const poolWarnThreshold = envConfig.DB_POOL_WAIT_WARN_MS; + const poolErrorThreshold = envConfig.DB_POOL_WAIT_ERROR_MS; + const context = requestContextStorage.getStore(); + + const queryId = Symbol(); + const waitStart = Date.now(); + activeQueries++; + + let timeoutId: NodeJS.Timeout; + let timedOut = false; + + const timeoutPromise = new Promise((_, reject) => { + timeoutId = setTimeout(() => { + timedOut = true; + const logContext = { + type: 'database_timeout', + operation, + model, + timeoutMs, + path: context?.path, + method: context?.method, + requestId: context?.requestId, + }; + logger.error(logContext, `Database query timed out after ${timeoutMs}ms`); + reject(new Error(`Database query timed out after ${timeoutMs}ms`)); + }, timeoutMs); + }); - let timeoutId: NodeJS.Timeout; - let timedOut = false; + const start = Date.now(); + const queryPromise = query(args).finally(() => { + clearTimeout(timeoutId); + const waitTime = Date.now() - waitStart; + activeQueries--; + queryStartTimes.delete(queryId); - const timeoutPromise = new Promise((_, reject) => { - timeoutId = setTimeout(() => { - timedOut = true; - const logContext = { - type: 'database_timeout', - operation, - model, - timeoutMs, - path: context?.path, - method: context?.method, - requestId: context?.requestId, - }; - logger.error(logContext, `Database query timed out after ${timeoutMs}ms`); - reject(new Error(`Database query timed out after ${timeoutMs}ms`)); - }, timeoutMs); - }); + // Log if wait time exceeds thresholds + if (waitTime > poolErrorThreshold) { + logger.error( + { + type: 'database_pool_wait_exceeded', + waitTimeMs: waitTime, + poolSize: 10, // Default Prisma pool size + queueDepth: activeQueries, + endpoint: context?.path, + operation, + model, + requestId: context?.requestId, + }, + `Database connection pool wait time exceeded ${poolErrorThreshold}ms` + ); + } else if (waitTime > poolWarnThreshold) { + logger.warn( + { + type: 'database_pool_wait_exceeded', + waitTimeMs: waitTime, + poolSize: 10, // Default Prisma pool size + queueDepth: activeQueries, + endpoint: context?.path, + operation, + model, + requestId: context?.requestId, + }, + `Database connection pool wait time exceeded ${poolWarnThreshold}ms` + ); + } - const start = Date.now(); - const queryPromise = query(args).finally(() => { - clearTimeout(timeoutId); - if (!timedOut) { - const elapsedMs = Date.now() - start; - if (elapsedMs > slowThresholdMs) { - logger.warn( - { - type: 'slow_query', - model, - operation, - fingerprint: buildQueryFingerprint(model, operation, args), - elapsedMs, - thresholdMs: slowThresholdMs, - requestId: context?.requestId, - }, - 'Slow database query detected' - ); - } - } - }); + if (!timedOut) { + const elapsedMs = Date.now() - start; + if (elapsedMs > slowThresholdMs) { + logger.warn( + { + type: 'slow_query', + model, + operation, + fingerprint: buildQueryFingerprint(model, operation, args), + elapsedMs, + thresholdMs: slowThresholdMs, + requestId: context?.requestId, + }, + 'Slow database query detected' + ); + } + } + }); - return Promise.race([queryPromise, timeoutPromise]); - }, - }, + return Promise.race([queryPromise, timeoutPromise]); + }, + }, }); // Prevent multiple instances in development environment if (envConfig.MODE !== 'production') { - global.prisma = prisma; + global.prisma = prisma; } diff --git a/src/utils/string-truncate.utils.test.ts b/src/utils/string-truncate.utils.test.ts index c74885b..58d9a08 100644 --- a/src/utils/string-truncate.utils.test.ts +++ b/src/utils/string-truncate.utils.test.ts @@ -1,4 +1,4 @@ -import { truncateString } from './string-truncate.utils'; +import { truncateString, truncateToBytes } from './string-truncate.utils'; describe('truncateString', () => { it('returns the original string when it is below the limit', () => { @@ -19,3 +19,38 @@ describe('truncateString', () => { ); }); }); + +describe('truncateToBytes', () => { + const encoder = new TextEncoder(); + + it('returns original ASCII string when within limit', () => { + const result = truncateToBytes('hello', 10); + expect(result).toBe('hello'); + expect(encoder.encode(result).length).toBeLessThanOrEqual(10); + }); + + it('truncates ASCII string when over limit', () => { + const result = truncateToBytes('hello world', 5); + expect(result).toBe('hello'); + expect(encoder.encode(result).length).toBe(5); + }); + + it('truncates multi-byte string at character boundary', () => { + const multiByteStr = 'こんにちは世界'; // Each Japanese character is 3 bytes + const result = truncateToBytes(multiByteStr, 10); // Should fit 3 characters (9 bytes) + expect(result).toBe('こんに'); + expect(encoder.encode(result).length).toBe(9); + }); + + it('returns empty string unchanged', () => { + const result = truncateToBytes('', 100); + expect(result).toBe(''); + expect(encoder.encode(result).length).toBe(0); + }); + + it('rejects negative maxBytes', () => { + expect(() => truncateToBytes('hello', -1)).toThrow( + 'maxBytes must be a non-negative finite number' + ); + }); +}); diff --git a/src/utils/string-truncate.utils.ts b/src/utils/string-truncate.utils.ts index f19e69d..1326354 100644 --- a/src/utils/string-truncate.utils.ts +++ b/src/utils/string-truncate.utils.ts @@ -15,3 +15,36 @@ export function truncateString(value: string, maxLength: number): string { return value.slice(0, maxLength); } + +/** + * Truncate a string to a maximum UTF-8 byte length, preserving character boundaries. + */ +export function truncateToBytes(value: string, maxBytes: number): string { + if (!Number.isFinite(maxBytes) || maxBytes < 0) { + throw new RangeError('maxBytes must be a non-negative finite number'); + } + + const encoder = new TextEncoder(); + const bytes = encoder.encode(value); + + if (bytes.length <= maxBytes) { + return value; + } + + let charLength = 0; + let byteLength = 0; + + for (let i = 0; i < value.length; i++) { + const char = value[i]; + const charBytes = encoder.encode(char).length; + + if (byteLength + charBytes > maxBytes) { + break; + } + + charLength++; + byteLength += charBytes; + } + + return value.slice(0, charLength); +}