diff --git a/src/lib/db/__tests__/pool.test.ts b/src/lib/db/__tests__/pool.test.ts index 8c20c8c6..7fb23ca8 100644 --- a/src/lib/db/__tests__/pool.test.ts +++ b/src/lib/db/__tests__/pool.test.ts @@ -1,11 +1,12 @@ -import { describe, it, expect, vi, beforeEach } from 'vitest'; -import { dbPool } from '../pool'; +import { describe, it, expect, vi, beforeEach, afterEach } from 'vitest'; +import { dbPool, query } from '../pool'; import { Pool } from 'pg'; vi.mock('pg', () => { const mPool = { on: vi.fn(), - query: vi.fn(), + query: vi.fn().mockResolvedValue({ rows: [], rowCount: 0 }), + connect: vi.fn().mockRejectedValue(new Error('Connection failed')), end: vi.fn(), totalCount: 5, idleCount: 2, @@ -14,9 +15,27 @@ vi.mock('pg', () => { return { Pool: vi.fn(() => mPool) }; }); +function getMockPool() { + return vi.mocked(Pool).mock.results[0]!.value; +} + describe('DatabasePool', () => { beforeEach(() => { vi.clearAllMocks(); + vi.useFakeTimers(); + vi.spyOn(Math, 'random').mockReturnValue(0); + (dbPool as any).instance = undefined; + (dbPool as any).circuitState = 'CLOSED'; + (dbPool as any).consecutiveFailures = 0; + (dbPool as any).lastFailureTime = 0; + (dbPool as any).isReconnecting = false; + (dbPool as any).queryQueue = []; + }); + + afterEach(async () => { + await vi.advanceTimersByTimeAsync(0); + vi.useRealTimers(); + vi.restoreAllMocks(); }); it('should create a singleton instance of Pool', () => { @@ -27,15 +46,17 @@ describe('DatabasePool', () => { expect(pool1).toBe(pool2); }); - it('should report metrics correctly', () => { - // Initialize pool + it('should report metrics including circuit breaker state', () => { dbPool.getInstance(); const metrics = dbPool.getMetrics(); - expect(metrics).toEqual({ + expect(metrics).toMatchObject({ totalConnections: 5, idleConnections: 2, waitingCount: 1, + circuitState: 'CLOSED', + consecutiveFailures: 0, + queuedQueries: 0, }); }); @@ -44,4 +65,114 @@ describe('DatabasePool', () => { await dbPool.end(); expect(pool.end).toHaveBeenCalled(); }); + + it('should retry query on transient failure and succeed', async () => { + dbPool.getInstance(); + const mockPool = getMockPool(); + + mockPool.query + .mockRejectedValueOnce(new Error('Connection reset')) + .mockResolvedValueOnce({ rows: [{ id: 1 }], rowCount: 1 }); + + const promise = query('SELECT 1'); + await vi.advanceTimersByTimeAsync(600); + const result = await promise; + + expect(result.rows).toEqual([{ id: 1 }]); + expect(mockPool.query).toHaveBeenCalledTimes(2); + }); + + it('should fail after exhausting all retry attempts', async () => { + dbPool.getInstance(); + const mockPool = getMockPool(); + + mockPool.query.mockRejectedValue(new Error('Connection error')); + + const promise = query('SELECT 1'); + await vi.advanceTimersByTimeAsync(2000); + + await expect(promise).rejects.toThrow('Connection error'); + expect(mockPool.query).toHaveBeenCalledTimes(3); + }); + + it('should open circuit breaker after 5 consecutive failures', async () => { + dbPool.getInstance(); + const mockPool = getMockPool(); + + mockPool.query.mockRejectedValue(new Error('Connection error')); + + for (let i = 0; i < 5; i++) { + const promise = query('SELECT 1'); + await vi.advanceTimersByTimeAsync(2000); + await expect(promise).rejects.toThrow('Connection error'); + } + + await expect(query('SELECT 1')).rejects.toMatchObject({ + message: 'Database service unavailable', + statusCode: 503, + }); + }); + + it('should resume normal operation after circuit breaker reset timeout', async () => { + dbPool.getInstance(); + const mockPool = getMockPool(); + + mockPool.query.mockRejectedValue(new Error('Connection error')); + + for (let i = 0; i < 5; i++) { + const promise = query('SELECT 1'); + await vi.advanceTimersByTimeAsync(2000); + await expect(promise).rejects.toThrow('Connection error'); + } + + (dbPool as any).lastFailureTime = Date.now() - 120_000; + + mockPool.query.mockResolvedValue({ rows: [{ id: 1 }], rowCount: 1 }); + + const result = await query('SELECT 1'); + expect(result.rows).toEqual([{ id: 1 }]); + }); + + it('should trigger reconnect attempt on pool error event', async () => { + dbPool.getInstance(); + const mockPool = getMockPool(); + + mockPool.connect.mockResolvedValue({ release: vi.fn() }); + + const errorHandler = mockPool.on.mock.calls.find( + (call: unknown[]) => call[0] === 'error', + )![1] as (err: Error) => void; + + errorHandler(new Error('ECONNREFUSED')); + + await vi.advanceTimersByTimeAsync(100); + + expect(mockPool.connect).toHaveBeenCalled(); + }); + + it('should queue queries during reconnect and process them after success', async () => { + dbPool.getInstance(); + const mockPool = getMockPool(); + + mockPool.connect.mockResolvedValue({ release: vi.fn() }); + + const errorHandler = mockPool.on.mock.calls.find( + (call: unknown[]) => call[0] === 'error', + )![1] as (err: Error) => void; + + errorHandler(new Error('ECONNREFUSED')); + (dbPool as any).isReconnecting = true; + + mockPool.query.mockResolvedValue({ rows: [{ id: 1 }], rowCount: 1 }); + + const queryPromise = query('SELECT 1'); + + expect((dbPool as any).queryQueue.length).toBe(1); + + (dbPool as any).isReconnecting = false; + (dbPool as any).processQueue(); + + const result = await queryPromise; + expect(result.rows).toEqual([{ id: 1 }]); + }); }); diff --git a/src/lib/db/pool.ts b/src/lib/db/pool.ts index db99ecda..f1fc9dd8 100644 --- a/src/lib/db/pool.ts +++ b/src/lib/db/pool.ts @@ -1,10 +1,16 @@ -import { Pool, PoolConfig } from 'pg'; +import { Pool, PoolConfig, QueryResult } from 'pg'; import { logContextStorage } from '@/lib/logging/context'; +import { retryWithBackoff } from '@/utils/errorUtils'; /** * Database Connection Pool Management * Configures and maintains a singleton PostgreSQL connection pool * with integrated monitoring and resource management. + * + * Features: + * - Automatic reconnect on transient connection errors (exponential backoff) + * - Circuit breaker: surfaces 503 errors after N consecutive failures + * - Query queueing during reconnect windows */ const DB_CONFIG: PoolConfig = { @@ -12,12 +18,27 @@ const DB_CONFIG: PoolConfig = { max: parseInt(process.env.DB_POOL_MAX || '20', 10), connectionTimeoutMillis: parseInt(process.env.DB_CONNECTION_TIMEOUT || '5000', 10), idleTimeoutMillis: parseInt(process.env.DB_IDLE_TIMEOUT || '30000', 10), - // Enable SSL in production if needed ssl: process.env.NODE_ENV === 'production' ? { rejectUnauthorized: false } : false, }; +type CircuitState = 'CLOSED' | 'OPEN'; + +const FAILURE_THRESHOLD = 5; +const RESET_TIMEOUT_MS = 60_000; + +interface QueuedQuery { + execute: () => Promise; + resolve: (value: QueryResult | PromiseLike) => void; + reject: (reason?: unknown) => void; +} + class DatabasePool { private static instance: Pool; + private static circuitState: CircuitState = 'CLOSED'; + private static consecutiveFailures = 0; + private static lastFailureTime = 0; + private static isReconnecting = false; + private static queryQueue: QueuedQuery[] = []; private constructor() {} @@ -25,26 +46,126 @@ class DatabasePool { if (!DatabasePool.instance) { DatabasePool.instance = new Pool(DB_CONFIG); - // Monitoring events DatabasePool.instance.on('connect', () => { if (process.env.NODE_ENV === 'development') { const traceId = logContextStorage.getStore()?.traceId ?? ''; console.log('[DB Pool] New client connected to database', traceId ? { traceId } : ''); } - }); - DatabasePool.instance.on('acquire', () => { - // Track acquisition metrics + DatabasePool.consecutiveFailures = 0; + if (DatabasePool.circuitState === 'OPEN') { + DatabasePool.circuitState = 'CLOSED'; + } }); DatabasePool.instance.on('error', (err) => { const traceId = logContextStorage.getStore()?.traceId ?? ''; console.error('[DB Pool] Unexpected error on idle client', err, traceId ? { traceId } : ''); + + DatabasePool.consecutiveFailures++; + DatabasePool.lastFailureTime = Date.now(); + + if (DatabasePool.consecutiveFailures >= FAILURE_THRESHOLD) { + DatabasePool.circuitState = 'OPEN'; + DatabasePool.rejectQueue(err); + } else if (!DatabasePool.isReconnecting) { + DatabasePool.isReconnecting = true; + DatabasePool.attemptReconnect(); + } }); } return DatabasePool.instance; } + private static async attemptReconnect(): Promise { + try { + await retryWithBackoff( + async () => { + const client = await DatabasePool.instance.connect(); + client.release(); + }, + { maxAttempts: FAILURE_THRESHOLD, initialDelayMs: 1000, maxDelayMs: 30_000 }, + ); + + DatabasePool.consecutiveFailures = 0; + DatabasePool.circuitState = 'CLOSED'; + DatabasePool.isReconnecting = false; + DatabasePool.processQueue(); + } catch (err) { + DatabasePool.isReconnecting = false; + DatabasePool.circuitState = 'OPEN'; + DatabasePool.lastFailureTime = Date.now(); + DatabasePool.rejectQueue(err); + } + } + + public static isCircuitOpen(): boolean { + if (DatabasePool.circuitState === 'OPEN') { + if ( + DatabasePool.lastFailureTime > 0 && + Date.now() - DatabasePool.lastFailureTime > RESET_TIMEOUT_MS + ) { + DatabasePool.circuitState = 'CLOSED'; + DatabasePool.consecutiveFailures = 0; + return false; + } + return true; + } + return false; + } + + private static processQueue(): void { + const queue = DatabasePool.queryQueue.splice(0); + for (const item of queue) { + item.execute().then(item.resolve).catch(item.reject); + } + } + + private static rejectQueue(err: unknown): void { + const queue = DatabasePool.queryQueue.splice(0); + for (const item of queue) { + item.reject(err); + } + } + + public static async queryWithRetry(text: string, params?: unknown[]): Promise { + if (DatabasePool.isCircuitOpen()) { + const error = new Error('Database service unavailable'); + (error as Error & { statusCode: number }).statusCode = 503; + throw error; + } + + if (DatabasePool.isReconnecting) { + return new Promise((resolve, reject) => { + DatabasePool.queryQueue.push({ + execute: () => DatabasePool.queryWithRetry(text, params), + resolve, + reject, + }); + }); + } + + try { + const result = await retryWithBackoff(() => DatabasePool.getInstance().query(text, params), { + maxAttempts: 3, + initialDelayMs: 500, + maxDelayMs: 5000, + }); + + DatabasePool.consecutiveFailures = 0; + return result; + } catch (error) { + DatabasePool.consecutiveFailures++; + DatabasePool.lastFailureTime = Date.now(); + + if (DatabasePool.consecutiveFailures >= FAILURE_THRESHOLD) { + DatabasePool.circuitState = 'OPEN'; + } + + throw error; + } + } + /** * Get current pool metrics for monitoring */ @@ -54,6 +175,9 @@ class DatabasePool { totalConnections: 0, idleConnections: 0, waitingCount: 0, + circuitState: 'CLOSED' as CircuitState, + consecutiveFailures: 0, + queuedQueries: 0, }; } @@ -61,6 +185,9 @@ class DatabasePool { totalConnections: DatabasePool.instance.totalCount, idleConnections: DatabasePool.instance.idleCount, waitingCount: DatabasePool.instance.waitingCount, + circuitState: DatabasePool.circuitState, + consecutiveFailures: DatabasePool.consecutiveFailures, + queuedQueries: DatabasePool.queryQueue.length, }; } @@ -75,10 +202,10 @@ class DatabasePool { } export const dbPool = DatabasePool; -export const query = (text: string, params?: any[]) => { +export const query = (text: string, params?: unknown[]) => { const traceId = logContextStorage.getStore()?.traceId ?? ''; if (traceId && process.env.NODE_ENV === 'development') { console.log('[DB Query]', { text: text.slice(0, 100), traceId }); } - return DatabasePool.getInstance().query(text, params); + return DatabasePool.queryWithRetry(text, params); };