Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
143 changes: 137 additions & 6 deletions src/lib/db/__tests__/pool.test.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
import { describe, it, expect, vi, beforeEach } from 'vitest';
import { dbPool } from '../pool';
import { describe, it, expect, vi, beforeEach, afterEach } from 'vitest';
import { dbPool, query } from '../pool';
import { Pool } from 'pg';

vi.mock('pg', () => {
const mPool = {
on: vi.fn(),
query: vi.fn(),
query: vi.fn().mockResolvedValue({ rows: [], rowCount: 0 }),
connect: vi.fn().mockRejectedValue(new Error('Connection failed')),
end: vi.fn(),
totalCount: 5,
idleCount: 2,
Expand All @@ -14,9 +15,27 @@ vi.mock('pg', () => {
return { Pool: vi.fn(() => mPool) };
});

function getMockPool() {
return vi.mocked(Pool).mock.results[0]!.value;
}

describe('DatabasePool', () => {
beforeEach(() => {
vi.clearAllMocks();
vi.useFakeTimers();
vi.spyOn(Math, 'random').mockReturnValue(0);
(dbPool as any).instance = undefined;
(dbPool as any).circuitState = 'CLOSED';
(dbPool as any).consecutiveFailures = 0;
(dbPool as any).lastFailureTime = 0;
(dbPool as any).isReconnecting = false;
(dbPool as any).queryQueue = [];
});

afterEach(async () => {
await vi.advanceTimersByTimeAsync(0);
vi.useRealTimers();
vi.restoreAllMocks();
});

it('should create a singleton instance of Pool', () => {
Expand All @@ -27,15 +46,17 @@ describe('DatabasePool', () => {
expect(pool1).toBe(pool2);
});

it('should report metrics correctly', () => {
// Initialize pool
it('should report metrics including circuit breaker state', () => {
dbPool.getInstance();

const metrics = dbPool.getMetrics();
expect(metrics).toEqual({
expect(metrics).toMatchObject({
totalConnections: 5,
idleConnections: 2,
waitingCount: 1,
circuitState: 'CLOSED',
consecutiveFailures: 0,
queuedQueries: 0,
});
});

Expand All @@ -44,4 +65,114 @@ describe('DatabasePool', () => {
await dbPool.end();
expect(pool.end).toHaveBeenCalled();
});

it('should retry query on transient failure and succeed', async () => {
dbPool.getInstance();
const mockPool = getMockPool();

mockPool.query
.mockRejectedValueOnce(new Error('Connection reset'))
.mockResolvedValueOnce({ rows: [{ id: 1 }], rowCount: 1 });

const promise = query('SELECT 1');
await vi.advanceTimersByTimeAsync(600);
const result = await promise;

expect(result.rows).toEqual([{ id: 1 }]);
expect(mockPool.query).toHaveBeenCalledTimes(2);
});

it('should fail after exhausting all retry attempts', async () => {
dbPool.getInstance();
const mockPool = getMockPool();

mockPool.query.mockRejectedValue(new Error('Connection error'));

const promise = query('SELECT 1');
await vi.advanceTimersByTimeAsync(2000);

await expect(promise).rejects.toThrow('Connection error');
expect(mockPool.query).toHaveBeenCalledTimes(3);
});

it('should open circuit breaker after 5 consecutive failures', async () => {
dbPool.getInstance();
const mockPool = getMockPool();

mockPool.query.mockRejectedValue(new Error('Connection error'));

for (let i = 0; i < 5; i++) {
const promise = query('SELECT 1');
await vi.advanceTimersByTimeAsync(2000);
await expect(promise).rejects.toThrow('Connection error');
}

await expect(query('SELECT 1')).rejects.toMatchObject({
message: 'Database service unavailable',
statusCode: 503,
});
});

it('should resume normal operation after circuit breaker reset timeout', async () => {
dbPool.getInstance();
const mockPool = getMockPool();

mockPool.query.mockRejectedValue(new Error('Connection error'));

for (let i = 0; i < 5; i++) {
const promise = query('SELECT 1');
await vi.advanceTimersByTimeAsync(2000);
await expect(promise).rejects.toThrow('Connection error');
}

(dbPool as any).lastFailureTime = Date.now() - 120_000;

mockPool.query.mockResolvedValue({ rows: [{ id: 1 }], rowCount: 1 });

const result = await query('SELECT 1');
expect(result.rows).toEqual([{ id: 1 }]);
});

it('should trigger reconnect attempt on pool error event', async () => {
dbPool.getInstance();
const mockPool = getMockPool();

mockPool.connect.mockResolvedValue({ release: vi.fn() });

const errorHandler = mockPool.on.mock.calls.find(
(call: unknown[]) => call[0] === 'error',
)![1] as (err: Error) => void;

errorHandler(new Error('ECONNREFUSED'));

await vi.advanceTimersByTimeAsync(100);

expect(mockPool.connect).toHaveBeenCalled();
});

it('should queue queries during reconnect and process them after success', async () => {
dbPool.getInstance();
const mockPool = getMockPool();

mockPool.connect.mockResolvedValue({ release: vi.fn() });

const errorHandler = mockPool.on.mock.calls.find(
(call: unknown[]) => call[0] === 'error',
)![1] as (err: Error) => void;

errorHandler(new Error('ECONNREFUSED'));
(dbPool as any).isReconnecting = true;

mockPool.query.mockResolvedValue({ rows: [{ id: 1 }], rowCount: 1 });

const queryPromise = query('SELECT 1');

expect((dbPool as any).queryQueue.length).toBe(1);

(dbPool as any).isReconnecting = false;
(dbPool as any).processQueue();

const result = await queryPromise;
expect(result.rows).toEqual([{ id: 1 }]);
});
});
143 changes: 135 additions & 8 deletions src/lib/db/pool.ts
Original file line number Diff line number Diff line change
@@ -1,50 +1,171 @@
import { Pool, PoolConfig } from 'pg';
import { Pool, PoolConfig, QueryResult } from 'pg';
import { logContextStorage } from '@/lib/logging/context';
import { retryWithBackoff } from '@/utils/errorUtils';

/**
* Database Connection Pool Management
* Configures and maintains a singleton PostgreSQL connection pool
* with integrated monitoring and resource management.
*
* Features:
* - Automatic reconnect on transient connection errors (exponential backoff)
* - Circuit breaker: surfaces 503 errors after N consecutive failures
* - Query queueing during reconnect windows
*/

const DB_CONFIG: PoolConfig = {
connectionString: process.env.DATABASE_URL,
max: parseInt(process.env.DB_POOL_MAX || '20', 10),
connectionTimeoutMillis: parseInt(process.env.DB_CONNECTION_TIMEOUT || '5000', 10),
idleTimeoutMillis: parseInt(process.env.DB_IDLE_TIMEOUT || '30000', 10),
// Enable SSL in production if needed
ssl: process.env.NODE_ENV === 'production' ? { rejectUnauthorized: false } : false,
};

type CircuitState = 'CLOSED' | 'OPEN';

const FAILURE_THRESHOLD = 5;
const RESET_TIMEOUT_MS = 60_000;

interface QueuedQuery {
execute: () => Promise<QueryResult>;
resolve: (value: QueryResult | PromiseLike<QueryResult>) => void;
reject: (reason?: unknown) => void;
}

class DatabasePool {
private static instance: Pool;
private static circuitState: CircuitState = 'CLOSED';
private static consecutiveFailures = 0;
private static lastFailureTime = 0;
private static isReconnecting = false;
private static queryQueue: QueuedQuery[] = [];

private constructor() {}

public static getInstance(): Pool {
if (!DatabasePool.instance) {
DatabasePool.instance = new Pool(DB_CONFIG);

// Monitoring events
DatabasePool.instance.on('connect', () => {
if (process.env.NODE_ENV === 'development') {
const traceId = logContextStorage.getStore()?.traceId ?? '';
console.log('[DB Pool] New client connected to database', traceId ? { traceId } : '');
}
});

DatabasePool.instance.on('acquire', () => {
// Track acquisition metrics
DatabasePool.consecutiveFailures = 0;
if (DatabasePool.circuitState === 'OPEN') {
DatabasePool.circuitState = 'CLOSED';
}
});

DatabasePool.instance.on('error', (err) => {
const traceId = logContextStorage.getStore()?.traceId ?? '';
console.error('[DB Pool] Unexpected error on idle client', err, traceId ? { traceId } : '');

DatabasePool.consecutiveFailures++;
DatabasePool.lastFailureTime = Date.now();

if (DatabasePool.consecutiveFailures >= FAILURE_THRESHOLD) {
DatabasePool.circuitState = 'OPEN';
DatabasePool.rejectQueue(err);
} else if (!DatabasePool.isReconnecting) {
DatabasePool.isReconnecting = true;
DatabasePool.attemptReconnect();
}
});
}
return DatabasePool.instance;
}

private static async attemptReconnect(): Promise<void> {
try {
await retryWithBackoff(
async () => {
const client = await DatabasePool.instance.connect();
client.release();
},
{ maxAttempts: FAILURE_THRESHOLD, initialDelayMs: 1000, maxDelayMs: 30_000 },
);

DatabasePool.consecutiveFailures = 0;
DatabasePool.circuitState = 'CLOSED';
DatabasePool.isReconnecting = false;
DatabasePool.processQueue();
} catch (err) {
DatabasePool.isReconnecting = false;
DatabasePool.circuitState = 'OPEN';
DatabasePool.lastFailureTime = Date.now();
DatabasePool.rejectQueue(err);
}
}

public static isCircuitOpen(): boolean {
if (DatabasePool.circuitState === 'OPEN') {
if (
DatabasePool.lastFailureTime > 0 &&
Date.now() - DatabasePool.lastFailureTime > RESET_TIMEOUT_MS
) {
DatabasePool.circuitState = 'CLOSED';
DatabasePool.consecutiveFailures = 0;
return false;
}
return true;
}
return false;
}

private static processQueue(): void {
const queue = DatabasePool.queryQueue.splice(0);
for (const item of queue) {
item.execute().then(item.resolve).catch(item.reject);
}
}

private static rejectQueue(err: unknown): void {
const queue = DatabasePool.queryQueue.splice(0);
for (const item of queue) {
item.reject(err);
}
}

public static async queryWithRetry(text: string, params?: unknown[]): Promise<QueryResult> {
if (DatabasePool.isCircuitOpen()) {
const error = new Error('Database service unavailable');
(error as Error & { statusCode: number }).statusCode = 503;
throw error;
}

if (DatabasePool.isReconnecting) {
return new Promise<QueryResult>((resolve, reject) => {
DatabasePool.queryQueue.push({
execute: () => DatabasePool.queryWithRetry(text, params),
resolve,
reject,
});
});
}

try {
const result = await retryWithBackoff(() => DatabasePool.getInstance().query(text, params), {
maxAttempts: 3,
initialDelayMs: 500,
maxDelayMs: 5000,
});

DatabasePool.consecutiveFailures = 0;
return result;
} catch (error) {
DatabasePool.consecutiveFailures++;
DatabasePool.lastFailureTime = Date.now();

if (DatabasePool.consecutiveFailures >= FAILURE_THRESHOLD) {
DatabasePool.circuitState = 'OPEN';
}

throw error;
}
}

/**
* Get current pool metrics for monitoring
*/
Expand All @@ -54,13 +175,19 @@ class DatabasePool {
totalConnections: 0,
idleConnections: 0,
waitingCount: 0,
circuitState: 'CLOSED' as CircuitState,
consecutiveFailures: 0,
queuedQueries: 0,
};
}

return {
totalConnections: DatabasePool.instance.totalCount,
idleConnections: DatabasePool.instance.idleCount,
waitingCount: DatabasePool.instance.waitingCount,
circuitState: DatabasePool.circuitState,
consecutiveFailures: DatabasePool.consecutiveFailures,
queuedQueries: DatabasePool.queryQueue.length,
};
}

Expand All @@ -75,10 +202,10 @@ class DatabasePool {
}

export const dbPool = DatabasePool;
export const query = (text: string, params?: any[]) => {
export const query = (text: string, params?: unknown[]) => {
const traceId = logContextStorage.getStore()?.traceId ?? '';
if (traceId && process.env.NODE_ENV === 'development') {
console.log('[DB Query]', { text: text.slice(0, 100), traceId });
}
return DatabasePool.getInstance().query(text, params);
return DatabasePool.queryWithRetry(text, params);
};
Loading