diff --git a/backend/package.json b/backend/package.json index 14c53c5e..43fa701e 100644 --- a/backend/package.json +++ b/backend/package.json @@ -21,42 +21,42 @@ "dependencies": { "@aws-sdk/client-s3": "^3.1057.0", "@opentelemetry/api": "^1.9.0", - "@opentelemetry/exporter-trace-otlp-http": "^0.55.0", - "@opentelemetry/instrumentation-express": "^0.45.0", - "@opentelemetry/instrumentation-http": "^0.55.0", - "@opentelemetry/resources": "^1.28.0", - "@opentelemetry/sdk-node": "^0.55.0", + "@opentelemetry/exporter-trace-otlp-http": "^0.219.0", + "@opentelemetry/instrumentation-express": "^0.67.0", + "@opentelemetry/instrumentation-http": "^0.219.0", + "@opentelemetry/resources": "^2.8.0", + "@opentelemetry/sdk-node": "^0.219.0", "@opentelemetry/semantic-conventions": "^1.28.0", - "@stellar/stellar-sdk": "^14.0.0", - "better-sqlite3": "^11.0.0", + "@stellar/stellar-sdk": "^16.0.1", + "better-sqlite3": "^12.11.1", "compression": "^1.8.0", "cors": "^2.8.5", - "dotenv": "^16.4.5", - "express": "^4.21.0", + "dotenv": "^17.4.2", + "express": "^5.2.1", "helmet": "^8.0.0", "ioredis": "^5.4.0", "multer": "^2.1.1", "pg": "^8.13.0", - "pino": "^9.0.0", + "pino": "^10.3.1", "supertest": "^7.0.0", "swagger-ui-express": "^5.0.1", "validator": "^13.12.0", "web-push": "^3.6.7", "ws": "^8.18.0", - "zod": "^3.23.0" + "zod": "^4.4.3" }, "devDependencies": { - "@readme/openapi-parser": "^2.6.0", - "@redocly/cli": "^1.34.5", + "@readme/openapi-parser": "^6.1.3", + "@redocly/cli": "^2.35.1", "@types/cors": "^2.8.0", "@types/express": "^4.17.0", "@types/node": "^20.0.0", "@types/supertest": "^6.0.0", "ajv": "^8.17.0", "ajv-formats": "^3.0.1", - "js-yaml": "^4.1.0", + "js-yaml": "^5.1.0", "nodemon": "^3.1.0", "redoc-cli": "^0.13.21", - "typescript": "^5.0.0" + "typescript": "^6.0.3" } } diff --git a/backend/src/dal/index.js b/backend/src/dal/index.js index 67e49715..ac296ded 100644 --- a/backend/src/dal/index.js +++ b/backend/src/dal/index.js @@ -20,6 +20,7 @@ import { createSqliteAllowlistRepository } from './sqliteAllowlistRepository.js' import { SqliteOrganizationRepository } from './sqliteOrganizationRepository.js'; import { createSqliteOrgMemberRepository } from './sqliteOrgMemberRepository.js'; import { createSqliteUsageRepository } from './sqliteUsageRepository.js'; +import { createSqliteFeatureFlagRepository } from './sqliteFeatureFlagRepository.js'; import { runPgMigrations } from './pg/migrate.js'; import { createPgCampaignRepository } from './pg/pgCampaignRepository.js'; @@ -92,6 +93,7 @@ export async function createDal({ organizations: new SqliteOrganizationRepository(db), orgMembers: createSqliteOrgMemberRepository({ db }), usage: createSqliteUsageRepository({ db }), + featureFlags: createSqliteFeatureFlagRepository({ db }), db, pgPool, }; diff --git a/backend/src/dal/sqliteCampaignRepository.js b/backend/src/dal/sqliteCampaignRepository.js index 534daa4e..3148b4b8 100644 --- a/backend/src/dal/sqliteCampaignRepository.js +++ b/backend/src/dal/sqliteCampaignRepository.js @@ -179,6 +179,8 @@ export function createSqliteCampaignRepository({ if (status && status !== 'all') { where.push('campaigns.status = ?'); params.push(status); + } else if (!status) { + where.push("campaigns.status = 'published'"); } if (category) { diff --git a/backend/src/dal/sqliteFeatureFlagRepository.js b/backend/src/dal/sqliteFeatureFlagRepository.js new file mode 100644 index 00000000..cfe68321 --- /dev/null +++ b/backend/src/dal/sqliteFeatureFlagRepository.js @@ -0,0 +1,55 @@ +// @ts-check + +/** + * @param {{ id: number, flag_key: string, enabled: number, targeting: string, description: string|null, created_at: string, updated_at: string }} row + */ +function rowToFlag(row) { + return { + id: row.id, + flagKey: row.flag_key, + enabled: row.enabled === 1, + targeting: JSON.parse(row.targeting || '{}'), + description: row.description ?? null, + createdAt: row.created_at, + updatedAt: row.updated_at, + }; +} + +/** + * @param {{ db: import('better-sqlite3').Database }} deps + */ +export function createSqliteFeatureFlagRepository({ db }) { + function upsert({ flagKey, enabled = false, targeting = {}, description = null }) { + const now = new Date().toISOString(); + const targetingJson = JSON.stringify(targeting); + const enabledInt = enabled ? 1 : 0; + + db.prepare( + `INSERT INTO feature_flags (flag_key, enabled, targeting, description, created_at, updated_at) + VALUES (?, ?, ?, ?, ?, ?) + ON CONFLICT(flag_key) DO UPDATE SET + enabled = excluded.enabled, + targeting = excluded.targeting, + description = excluded.description, + updated_at = excluded.updated_at`, + ).run(flagKey, enabledInt, targetingJson, description, now, now); + + return rowToFlag(db.prepare('SELECT * FROM feature_flags WHERE flag_key = ?').get(flagKey)); + } + + function getByKey(flagKey) { + const row = db.prepare('SELECT * FROM feature_flags WHERE flag_key = ?').get(flagKey); + return row ? rowToFlag(row) : null; + } + + function list() { + return db.prepare('SELECT * FROM feature_flags ORDER BY flag_key').all().map(rowToFlag); + } + + function remove(flagKey) { + const result = db.prepare('DELETE FROM feature_flags WHERE flag_key = ?').run(flagKey); + return result.changes > 0; + } + + return { upsert, getByKey, list, remove }; +} diff --git a/backend/src/db/migrations/016_feature_flags.js b/backend/src/db/migrations/016_feature_flags.js new file mode 100644 index 00000000..7509c2e6 --- /dev/null +++ b/backend/src/db/migrations/016_feature_flags.js @@ -0,0 +1,18 @@ +// @ts-check +export const version = 16; +export const description = 'feature flags store'; + +/** @param {import('better-sqlite3').Database} db */ +export function up(db) { + db.exec(` + CREATE TABLE IF NOT EXISTS feature_flags ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + flag_key TEXT NOT NULL UNIQUE, + enabled INTEGER NOT NULL DEFAULT 0, + targeting TEXT NOT NULL DEFAULT '{}', + description TEXT, + created_at TEXT NOT NULL DEFAULT (datetime('now')), + updated_at TEXT NOT NULL DEFAULT (datetime('now')) + ); + `); +} diff --git a/backend/src/index.js b/backend/src/index.js index 1b09a523..f6201bc9 100644 --- a/backend/src/index.js +++ b/backend/src/index.js @@ -61,9 +61,12 @@ import { createAuditLogService } from './services/auditLogService.js'; import { createWebPushService } from './services/webPushService.js'; import { createOrganizationRoutes } from './routes/organizations.js'; import { createUsageMeteringService } from './services/usageMeteringService.js'; +import { createFeatureFlagRoutes } from './routes/featureFlags.js'; +import { createFeatureFlagService } from './services/featureFlagService.js'; import { createUsageMeteringMiddleware } from './middleware/usageMetering.js'; import { requestTimeout } from './middleware/timeout.js'; import { PoolSaturatedError } from './rpcPool.js'; +import { initializeWebSocket, getWebSocketServer } from './websocket/index.js'; import { requireScope } from './middleware/rbac.js'; const DEFAULT_PORT = 3001; @@ -1658,6 +1661,8 @@ export async function createApp(options = {}) { app.put(`${prefix}/campaigns/:id/archive`, rateLimiter, requireApiKey, archiveCampaign); app.delete(`${prefix}/campaigns/:id`, rateLimiter, requireApiKey, deleteCampaign); app.put(`${prefix}/campaigns/:id`, rateLimiter, ...guard, updateCampaign); + app.put(`${prefix}/campaigns/:id/publish`, rateLimiter, ...guard, publishCampaign); + app.put(`${prefix}/campaigns/:id/archive`, rateLimiter, ...guard, archiveCampaign); app.delete(`${prefix}/campaigns/:id`, rateLimiter, ...guard, deleteCampaign); app.post(`${prefix}/admin/api-keys`, rateLimiter, requireMasterKey, createApiKeyHandler); @@ -1903,6 +1908,11 @@ export async function createApp(options = {}) { const organizationRouter = createOrganizationRoutes(dal); app.use(`${prefix}/organizations`, rateLimiter, requireApiKey, organizationRouter); app.use(prefix, rateLimiter, ...guard, pushRouter); + + // Feature flag system routes (Issue #625) + const featureFlagService = createFeatureFlagService({ featureFlagRepository: dal.featureFlags }); + const featureFlagRouter = createFeatureFlagRoutes({ featureFlagService }); + app.use(`${prefix}/feature-flags`, rateLimiter, featureFlagRouter); } registerApiRoutes(API_V1_PREFIX); @@ -1967,12 +1977,6 @@ export async function startServer(options = {}) { } // ── Graceful shutdown (issue #650) ───────────────────────────────────────── - // On SIGTERM / SIGINT: - // 1. Stop accepting new connections (server.close). - // 2. Allow in-flight HTTP requests to finish for up to SHUTDOWN_GRACE_MS. - // 3. Send "Connection: close / will-reconnect" hint to open SSE/WS streams. - // 4. Flush OTel spans. - // 5. Exit 0 once everything is drained (or force-exit after the grace window). const SHUTDOWN_GRACE_MS = normalizePositiveInteger(process.env.SHUTDOWN_GRACE_MS, 15_000); let shuttingDown = false; @@ -1982,21 +1986,17 @@ export async function startServer(options = {}) { shuttingDown = true; log.info({ signal, graceMs: SHUTDOWN_GRACE_MS }, 'graceful shutdown started'); - // Force exit after the grace window so a stuck handler never blocks a deploy. const forceTimer = setTimeout(() => { log.error('graceful shutdown timed out — forcing exit'); process.exit(1); }, SHUTDOWN_GRACE_MS); if (typeof forceTimer.unref === 'function') forceTimer.unref(); - // Stop accepting new connections; drain in-flight HTTP requests. await new Promise((resolve) => server.close(resolve)); - // Flush usage counters to DB before exiting. stopUsageFlush(); await usageMeteringService.flushToDb().catch((err) => log.warn({ err }, 'usage flush warning')); - // Flush OTel exporter. await shutdownTracing().catch((err) => log.warn({ err }, 'OTel shutdown warning')); log.info('graceful shutdown complete'); diff --git a/backend/src/routes/batchPayout.js b/backend/src/routes/batchPayout.js new file mode 100644 index 00000000..658bf571 --- /dev/null +++ b/backend/src/routes/batchPayout.js @@ -0,0 +1,90 @@ +// @ts-check +import { Router } from 'express'; +import { randomUUID } from 'node:crypto'; +import { BatchPayoutError } from '../services/batchPayoutService.js'; + +/** + * Admin API for batch payout operations. + * + * Routes: + * POST /api/v1/batch-payouts – register + launch a batch + * GET /api/v1/batch-payouts – list all batches + * GET /api/v1/batch-payouts/:batchId – get batch status + per-recipient detail + * + * @param {{ + * batchPayoutService: ReturnType, + * requireApiKey: import('express').RequestHandler | import('express').RequestHandler[], + * log: Pick, + * }} deps + * @returns {import('express').Router} + */ +export function createBatchPayoutRouter({ batchPayoutService, requireApiKey, log }) { + const router = Router(); + const auth = Array.isArray(requireApiKey) ? requireApiKey : [requireApiKey]; + + // ── POST /api/v1/batch-payouts ───────────────────────────────────────────── + // Register a new batch and kick off async execution. + // Idempotent: if batchId is supplied and already exists, returns the existing + // record without re-launching. Omit batchId to auto-generate one. + + router.post('/batch-payouts', ...auth, (req, res) => { + const { campaignId, recipients, batchId, failMode } = req.body ?? {}; + + if (!Array.isArray(recipients) || recipients.length === 0) { + return res.status(400).json({ error: 'recipients must be a non-empty array', code: 'VALIDATION_ERROR' }); + } + if (recipients.length > 10_000) { + return res.status(400).json({ error: 'Maximum 10 000 recipients per batch', code: 'BATCH_TOO_LARGE' }); + } + + const id = typeof batchId === 'string' && batchId ? batchId : randomUUID(); + + let batch; + try { + batch = batchPayoutService.registerBatch({ batchId: id, recipients, campaignId: campaignId ?? '' }); + } catch (err) { + if (err instanceof BatchPayoutError) { + return res.status(400).json({ error: err.message, code: err.code }); + } + log.error(err, 'batch_payout:register_error'); + return res.status(500).json({ error: 'Internal server error', code: 'INTERNAL_ERROR' }); + } + + // Fire-and-forget — progress is visible via GET /batch-payouts/:batchId + batchPayoutService.executeBatch(id).catch((err) => { + if (err instanceof BatchPayoutError && err.code === 'ALREADY_RUNNING') return; + log.error({ err, batchId: id }, 'batch_payout:execution_error'); + }); + + return res.status(202).json({ + batchId: id, + status: batch.status, + totalRecipients: batch.totalRecipients, + }); + }); + + // ── GET /api/v1/batch-payouts ────────────────────────────────────────────── + + router.get('/batch-payouts', ...auth, (_req, res) => { + const batches = batchPayoutService.listBatches().map(summaryView); + return res.json({ batches, count: batches.length }); + }); + + // ── GET /api/v1/batch-payouts/:batchId ──────────────────────────────────── + + router.get('/batch-payouts/:batchId', ...auth, (req, res) => { + const batch = batchPayoutService.getBatch(req.params.batchId); + if (!batch) { + return res.status(404).json({ error: 'Batch not found', code: 'NOT_FOUND' }); + } + return res.json({ batch }); + }); + + return router; +} + +/** Strip the recipient list for the summary view to keep list responses small. */ +function summaryView(batch) { + const { recipients: _r, ...rest } = batch; + return rest; +} diff --git a/backend/src/routes/featureFlags.js b/backend/src/routes/featureFlags.js new file mode 100644 index 00000000..14409c03 --- /dev/null +++ b/backend/src/routes/featureFlags.js @@ -0,0 +1,50 @@ +// @ts-check +import express from 'express'; + +/** + * @param {{ featureFlagService: ReturnType }} deps + */ +export function createFeatureFlagRoutes({ featureFlagService }) { + const router = express.Router(); + + // Hydrate client with all flags (evaluated values not stored rules) + router.get('/', (_req, res) => { + const flags = featureFlagService.getAllFlags(); + // Return a simple key→enabled map safe to expose to the client + const map = Object.fromEntries(flags.map((f) => [f.flagKey, f.enabled])); + res.json({ flags: map }); + }); + + // Evaluate a single flag for a given context + router.get('/:key', (req, res) => { + const { key } = req.params; + const userId = typeof req.query.userId === 'string' ? req.query.userId : undefined; + const orgId = typeof req.query.orgId === 'string' ? req.query.orgId : undefined; + const enabled = featureFlagService.isEnabled(key, { userId, orgId }); + res.json({ flagKey: key, enabled }); + }); + + // Admin: create or update a flag + router.post('/', (req, res) => { + const { flagKey, enabled, targeting, description } = req.body ?? {}; + if (!flagKey || typeof flagKey !== 'string') { + return res.status(400).json({ error: 'flagKey is required' }); + } + const flag = featureFlagService.setFlag({ + flagKey, + enabled: Boolean(enabled), + targeting: targeting ?? {}, + description: description ?? null, + }); + res.status(201).json(flag); + }); + + // Admin: delete a flag (kill-switch removal) + router.delete('/:key', (req, res) => { + const deleted = featureFlagService.deleteFlag(req.params.key); + if (!deleted) return res.status(404).json({ error: 'Flag not found' }); + res.status(204).end(); + }); + + return router; +} diff --git a/backend/src/rpcPool.js b/backend/src/rpcPool.js index 28977c75..b941c21a 100644 --- a/backend/src/rpcPool.js +++ b/backend/src/rpcPool.js @@ -2,6 +2,17 @@ const DEFAULT_BACKOFF_MS = 30_000; const DEFAULT_MAX_CONCURRENT = 10; const DEFAULT_ACQUIRE_TIMEOUT_MS = 5_000; +// Circuit breaker state machine values per endpoint. +const BREAKER_CLOSED = 'closed'; +const BREAKER_OPEN = 'open'; +const BREAKER_HALF_OPEN = 'half_open'; + +// Circuit breaker defaults — conservative values suited for a Soroban RPC pool. +const DEFAULT_CB_WINDOW_SIZE = 10; +const DEFAULT_CB_ERROR_THRESHOLD = 0.5; +const DEFAULT_CB_LATENCY_MS = 2_000; +const DEFAULT_CB_OPEN_DURATION_MS = 30_000; + /** * Typed error thrown when the RPC pool is saturated and an acquire times out. * Callers should catch this and respond with HTTP 503 + code POOL_SATURATED. @@ -16,15 +27,33 @@ export class PoolSaturatedError extends Error { /** * Creates a round-robin RPC connection pool with automatic failover, - * backoff-based recovery, concurrency tracking, and acquire timeouts. + * backoff-based recovery, concurrency tracking, acquire timeouts, and a + * per-endpoint circuit breaker that trips on sustained error rates or slow + * calls (latency-based tripping) and recovers via half-open probing. + * + * Circuit breaker lifecycle per endpoint: + * closed → open : error rate ≥ threshold over the last `windowSize` calls, + * or enough slow calls (latencyMs > latencyThresholdMs). + * open → half_open: after `openDurationMs` ms the endpoint gets one probe. + * half_open → closed: probe succeeds (success=true, latency within threshold). + * half_open → open : probe fails. * - * The pool tracks in-flight calls via acquire()/release() so saturation - * metrics (in_use / idle / waiting) are always current. When the concurrency - * cap is reached and an acquire() caller waits longer than acquireTimeoutMs, - * a PoolSaturatedError is thrown instead of hanging indefinitely. + * The bulkhead (concurrency cap + acquire queue) coexists with the circuit + * breaker: `acquire()` already enforces the bulkhead; `reportOutcome()` drives + * breaker transitions so callers feed back real success/failure/latency data. * * @param {string[]} urls - * @param {{ backoffMs?: number, maxConcurrent?: number, acquireTimeoutMs?: number }} [options] + * @param {{ + * backoffMs?: number, + * maxConcurrent?: number, + * acquireTimeoutMs?: number, + * circuitBreaker?: { + * windowSize?: number, + * errorThreshold?: number, + * latencyThresholdMs?: number, + * openDurationMs?: number, + * } + * }} [options] */ export function createRpcPool( urls, @@ -32,16 +61,30 @@ export function createRpcPool( backoffMs = DEFAULT_BACKOFF_MS, maxConcurrent = DEFAULT_MAX_CONCURRENT, acquireTimeoutMs = DEFAULT_ACQUIRE_TIMEOUT_MS, + circuitBreaker: cbOpts = {}, } = {}, ) { if (!Array.isArray(urls) || urls.length === 0) { throw new Error('RPC pool requires at least one URL'); } + const { + windowSize = DEFAULT_CB_WINDOW_SIZE, + errorThreshold = DEFAULT_CB_ERROR_THRESHOLD, + latencyThresholdMs = DEFAULT_CB_LATENCY_MS, + openDurationMs = DEFAULT_CB_OPEN_DURATION_MS, + } = cbOpts; + const endpoints = urls.map((url) => ({ url, healthy: true, unhealthySince: /** @type {number|null} */ (null), + // Circuit breaker state + breakerState: BREAKER_CLOSED, + openSince: /** @type {number|null} */ (null), + halfOpenInFlight: false, + /** @type {{ isError: boolean }[]} */ + window: [], })); let rrIndex = 0; @@ -57,12 +100,31 @@ export function createRpcPool( ep.healthy = true; ep.unhealthySince = null; } + // Advance open → half_open once the cooldown has elapsed. + if ( + ep.breakerState === BREAKER_OPEN && + ep.openSince !== null && + now - ep.openSince >= openDurationMs + ) { + ep.breakerState = BREAKER_HALF_OPEN; + ep.openSince = null; + ep.halfOpenInFlight = false; + } } } + function _isAvailable(ep) { + return ( + ep.healthy && + ep.breakerState !== BREAKER_OPEN && + !(ep.breakerState === BREAKER_HALF_OPEN && ep.halfOpenInFlight) + ); + } + /** - * Returns the next healthy URL via round-robin. - * Falls back to the first URL when all endpoints are unhealthy. + * Returns the next available URL via round-robin, skipping endpoints whose + * circuit breaker is open or whose half-open probe is already in flight. + * Falls back to the first URL when no endpoint is available. * * @returns {string} */ @@ -70,12 +132,16 @@ export function createRpcPool( _recoverStale(); for (let i = 0; i < endpoints.length; i++) { const idx = (rrIndex + i) % endpoints.length; - if (endpoints[idx].healthy) { + const ep = endpoints[idx]; + if (_isAvailable(ep)) { rrIndex = (idx + 1) % endpoints.length; - return endpoints[idx].url; + if (ep.breakerState === BREAKER_HALF_OPEN) { + ep.halfOpenInFlight = true; + } + return ep.url; } } - // All unhealthy: fall back to first + // All endpoints unavailable: fall back to first (fail-open safety valve). return endpoints[0].url; } @@ -124,6 +190,54 @@ export function createRpcPool( if (next) next(); } + /** + * Feed back the outcome of an RPC call to drive circuit-breaker transitions. + * + * Slow calls (latencyMs > latencyThresholdMs) count as errors even when + * success=true so a degraded-but-up RPC trips the breaker without hard + * failures. Call this inside the same try/finally as acquire()/release(). + * + * @param {string} url - The endpoint URL returned by acquire() / getHealthyRpcUrl(). + * @param {{ success: boolean, latencyMs?: number }} outcome + */ + function reportOutcome(url, { success, latencyMs = 0 }) { + const ep = endpoints.find((e) => e.url === url); + if (!ep) return; + + const isError = !success || latencyMs > latencyThresholdMs; + + if (ep.breakerState === BREAKER_HALF_OPEN) { + ep.halfOpenInFlight = false; + if (!isError) { + // Probe succeeded — close the breaker and clear the window. + ep.breakerState = BREAKER_CLOSED; + ep.openSince = null; + ep.window = []; + } else { + // Probe failed — reopen immediately. + ep.breakerState = BREAKER_OPEN; + ep.openSince = Date.now(); + } + return; + } + + if (ep.breakerState !== BREAKER_CLOSED) return; + + // Maintain a fixed-size sliding window of recent outcomes. + ep.window.push({ isError }); + if (ep.window.length > windowSize) ep.window.shift(); + + // Trip the breaker once the window is full and error rate meets threshold. + if (ep.window.length >= windowSize) { + const errorCount = ep.window.filter((e) => e.isError).length; + if (errorCount / ep.window.length >= errorThreshold) { + ep.breakerState = BREAKER_OPEN; + ep.openSince = Date.now(); + ep.window = []; + } + } + } + /** * Marks an endpoint as unhealthy and starts its backoff timer. * @@ -138,7 +252,7 @@ export function createRpcPool( } /** - * Marks an endpoint as healthy, clearing any backoff state. + * Marks an endpoint as healthy, clearing any backoff and circuit-breaker state. * * @param {string} url */ @@ -147,25 +261,44 @@ export function createRpcPool( if (ep) { ep.healthy = true; ep.unhealthySince = null; + ep.breakerState = BREAKER_CLOSED; + ep.openSince = null; + ep.window = []; + ep.halfOpenInFlight = false; } } /** * Returns pool status for health endpoint exposure. * - * Includes saturation counters: + * Saturation counters: * - in_use: slots currently occupied by active callers * - idle: slots available immediately * - waiting: callers queued pending a slot * - * @returns {{ healthy: number, unhealthy: number, urls: { url: string, healthy: boolean }[], in_use: number, idle: number, waiting: number, max: number }} + * Each url entry includes `breakerState` ('closed' | 'open' | 'half_open') + * so the health route can surface circuit-breaker degradation to operators. + * + * @returns {{ + * healthy: number, + * unhealthy: number, + * urls: { url: string, healthy: boolean, breakerState: string }[], + * in_use: number, + * idle: number, + * waiting: number, + * max: number + * }} */ function getStatus() { _recoverStale(); return { healthy: endpoints.filter((ep) => ep.healthy).length, unhealthy: endpoints.filter((ep) => !ep.healthy).length, - urls: endpoints.map((ep) => ({ url: ep.url, healthy: ep.healthy })), + urls: endpoints.map((ep) => ({ + url: ep.url, + healthy: ep.healthy, + breakerState: ep.breakerState, + })), in_use: _inUse, idle: Math.max(0, maxConcurrent - _inUse), waiting: _waiters.length, @@ -188,6 +321,7 @@ export function createRpcPool( release, markUnhealthy, markHealthy, + reportOutcome, getStatus, getUrls, PoolSaturatedError, diff --git a/backend/src/rpcPool.test.js b/backend/src/rpcPool.test.js index dc2b900f..cde9608e 100644 --- a/backend/src/rpcPool.test.js +++ b/backend/src/rpcPool.test.js @@ -68,3 +68,146 @@ test('getUrls returns all configured URLs in order', () => { const pool = createRpcPool(urls); assert.deepEqual(pool.getUrls(), urls); }); + +// --------------------------------------------------------------------------- +// Circuit breaker tests (#569) +// --------------------------------------------------------------------------- + +test('circuit: breaker starts closed for all endpoints', () => { + const pool = createRpcPool(['https://a.com', 'https://b.com'], { + circuitBreaker: { windowSize: 4, errorThreshold: 0.5 }, + }); + const { urls } = pool.getStatus(); + assert.equal(urls[0].breakerState, 'closed'); + assert.equal(urls[1].breakerState, 'closed'); +}); + +test('circuit: breaker opens after error rate meets threshold', () => { + const pool = createRpcPool(['https://a.com', 'https://b.com'], { + circuitBreaker: { windowSize: 4, errorThreshold: 0.5 }, + }); + // 4/4 errors fills window at >= 50% threshold — breaker should trip. + for (let i = 0; i < 4; i++) { + pool.reportOutcome('https://a.com', { success: false }); + } + const ep = pool.getStatus().urls.find((u) => u.url === 'https://a.com'); + assert.equal(ep.breakerState, 'open'); +}); + +test('circuit: open endpoint is skipped in round-robin', () => { + const pool = createRpcPool(['https://a.com', 'https://b.com'], { + circuitBreaker: { windowSize: 2, errorThreshold: 0.5 }, + }); + pool.reportOutcome('https://a.com', { success: false }); + pool.reportOutcome('https://a.com', { success: false }); + assert.equal( + pool.getStatus().urls.find((u) => u.url === 'https://a.com').breakerState, + 'open', + ); + // All requests should be routed to b.com while a.com's breaker is open. + for (let i = 0; i < 5; i++) { + assert.equal(pool.getHealthyRpcUrl(), 'https://b.com'); + } +}); + +test('circuit: open breaker transitions to half_open after cooldown', async () => { + const pool = createRpcPool(['https://a.com'], { + circuitBreaker: { windowSize: 2, errorThreshold: 0.5, openDurationMs: 20 }, + }); + pool.reportOutcome('https://a.com', { success: false }); + pool.reportOutcome('https://a.com', { success: false }); + assert.equal(pool.getStatus().urls[0].breakerState, 'open'); + + await new Promise((resolve) => setTimeout(resolve, 30)); + + assert.equal(pool.getStatus().urls[0].breakerState, 'half_open'); +}); + +test('circuit: successful probe closes the breaker', async () => { + const pool = createRpcPool(['https://a.com', 'https://b.com'], { + circuitBreaker: { windowSize: 2, errorThreshold: 0.5, openDurationMs: 20 }, + }); + pool.reportOutcome('https://a.com', { success: false }); + pool.reportOutcome('https://a.com', { success: false }); + + await new Promise((resolve) => setTimeout(resolve, 30)); + pool.getStatus(); // trigger OPEN → HALF_OPEN transition + + const probeUrl = pool.getHealthyRpcUrl(); + assert.equal(probeUrl, 'https://a.com', 'half-open endpoint should be selected for probe'); + + pool.reportOutcome('https://a.com', { success: true, latencyMs: 100 }); + assert.equal( + pool.getStatus().urls.find((u) => u.url === 'https://a.com').breakerState, + 'closed', + ); +}); + +test('circuit: failed probe in half_open re-opens the breaker', async () => { + const pool = createRpcPool(['https://a.com', 'https://b.com'], { + circuitBreaker: { windowSize: 2, errorThreshold: 0.5, openDurationMs: 20 }, + }); + pool.reportOutcome('https://a.com', { success: false }); + pool.reportOutcome('https://a.com', { success: false }); + + await new Promise((resolve) => setTimeout(resolve, 30)); + pool.getStatus(); // trigger OPEN → HALF_OPEN transition + pool.getHealthyRpcUrl(); // consume probe slot + + pool.reportOutcome('https://a.com', { success: false }); + assert.equal( + pool.getStatus().urls.find((u) => u.url === 'https://a.com').breakerState, + 'open', + 'failed probe should reopen the breaker', + ); +}); + +test('circuit: slow calls above latency threshold count as errors', () => { + const pool = createRpcPool(['https://a.com', 'https://b.com'], { + circuitBreaker: { windowSize: 2, errorThreshold: 0.5, latencyThresholdMs: 500 }, + }); + pool.reportOutcome('https://a.com', { success: true, latencyMs: 600 }); + pool.reportOutcome('https://a.com', { success: true, latencyMs: 700 }); + assert.equal( + pool.getStatus().urls.find((u) => u.url === 'https://a.com').breakerState, + 'open', + 'two slow calls should trip the breaker', + ); +}); + +test('circuit: getStatus exposes breakerState per endpoint', () => { + const pool = createRpcPool(['https://a.com', 'https://b.com'], { + circuitBreaker: { windowSize: 2, errorThreshold: 0.5 }, + }); + pool.reportOutcome('https://a.com', { success: false }); + pool.reportOutcome('https://a.com', { success: false }); + const { urls } = pool.getStatus(); + assert.equal(urls.find((u) => u.url === 'https://a.com').breakerState, 'open'); + assert.equal(urls.find((u) => u.url === 'https://b.com').breakerState, 'closed'); +}); + +test('circuit: markHealthy resets a tripped breaker to closed', () => { + const pool = createRpcPool(['https://a.com'], { + circuitBreaker: { windowSize: 2, errorThreshold: 0.5 }, + }); + pool.reportOutcome('https://a.com', { success: false }); + pool.reportOutcome('https://a.com', { success: false }); + assert.equal(pool.getStatus().urls[0].breakerState, 'open'); + + pool.markHealthy('https://a.com'); + assert.equal(pool.getStatus().urls[0].breakerState, 'closed'); +}); + +test('circuit: window fills gradually — no trip before windowSize reached', () => { + const pool = createRpcPool(['https://a.com'], { + circuitBreaker: { windowSize: 5, errorThreshold: 0.5 }, + }); + // 4 errors but window isn't full yet (windowSize=5) — should stay closed. + for (let i = 0; i < 4; i++) { + pool.reportOutcome('https://a.com', { success: false }); + } + assert.equal(pool.getStatus().urls[0].breakerState, 'closed'); + // Fifth error fills the window at 100% — now trips. + pool.reportOutcome('https://a.com', { success: false }); + assert.equal(pool.getStatus().urls[0].breakerState, 'open'); +}); diff --git a/backend/src/services/batchPayoutService.js b/backend/src/services/batchPayoutService.js new file mode 100644 index 00000000..fbe6d418 --- /dev/null +++ b/backend/src/services/batchPayoutService.js @@ -0,0 +1,343 @@ +// @ts-check +// Batch payout transaction builder — packs up to k credit ops per Soroban tx, +// adapts k on resource-limit failures, checkpoints per chunk, and enforces +// idempotency so retried batches cannot double-pay. + +const MIN_OPS_PER_CHUNK = 1; + +export const RECIPIENT_STATUS = /** @type {const} */ ({ + PENDING: 'pending', + SUCCESS: 'success', + FAILED: 'failed', +}); + +export const BATCH_STATUS = /** @type {const} */ ({ + PENDING: 'pending', + RUNNING: 'running', + COMPLETED: 'completed', + PARTIAL: 'partial', + FAILED: 'failed', +}); + +export class BatchPayoutError extends Error { + /** @param {string} message @param {string} code */ + constructor(message, code) { + super(message); + this.name = 'BatchPayoutError'; + this.code = code; + } +} + +/** + * @typedef {{ address: string; amount: string }} PayoutOp + * @typedef {{ address: string; amount: string; status: string; txHash: string | null; errorMessage: string | null }} RecipientRecord + * @typedef {{ + * id: string; + * campaignId: string; + * status: string; + * recipients: RecipientRecord[]; + * totalRecipients: number; + * successCount: number; + * failCount: number; + * chunksCompleted: number; + * createdAt: string; + * updatedAt: string; + * completedAt: string | null; + * }} BatchRecord + */ + +/** + * Creates an in-memory batch store. + * Swap for a persistent implementation (SQLite/Postgres) in production. + */ +export function createInMemoryBatchStore() { + /** @type {Map} */ + const batches = new Map(); + + return { + /** @param {BatchRecord} record */ + saveBatch(record) { + batches.set(record.id, JSON.parse(JSON.stringify(record))); + }, + /** @param {string} batchId @returns {BatchRecord | undefined} */ + getBatch(batchId) { + const r = batches.get(batchId); + return r ? JSON.parse(JSON.stringify(r)) : undefined; + }, + /** @param {string} batchId @param {Partial} updates */ + updateBatch(batchId, updates) { + const existing = batches.get(batchId); + if (!existing) throw new BatchPayoutError(`Batch ${batchId} not found`, 'NOT_FOUND'); + batches.set(batchId, { + ...existing, + ...updates, + updatedAt: new Date().toISOString(), + }); + }, + /** @returns {BatchRecord[]} */ + listBatches() { + return Array.from(batches.values()) + .map((r) => JSON.parse(JSON.stringify(r))) + .sort((a, b) => b.createdAt.localeCompare(a.createdAt)); + }, + }; +} + +/** + * Creates the batch payout service. + * + * @param {{ + * simulateChunk: (ops: PayoutOp[]) => Promise<{ success: boolean; resourceLimitExceeded: boolean; errorMessage?: string }>, + * submitChunk: (ops: PayoutOp[]) => Promise<{ success: boolean; txHash?: string; recipientErrors?: Record; errorMessage?: string }>, + * store: ReturnType, + * maxOpsPerChunk?: number, + * failMode?: 'continue' | 'abort', + * log?: Pick, + * }} deps + */ +export function createBatchPayoutService({ + simulateChunk, + submitChunk, + store, + maxOpsPerChunk = 50, + failMode = 'continue', + log = console, +}) { + /** + * Register a new batch. Returns immediately; call executeBatch(id) to run. + * Idempotent: re-submitting a batchId that already exists returns the + * existing record without modifying it — prevents duplicate submissions. + * + * @param {{ batchId: string; recipients: Array<{address: string; amount: string}>; campaignId: string }} params + * @returns {BatchRecord} + */ + function registerBatch({ batchId, recipients, campaignId }) { + if (!batchId || typeof batchId !== 'string') { + throw new BatchPayoutError('batchId is required', 'INVALID_INPUT'); + } + if (!Array.isArray(recipients) || recipients.length === 0) { + throw new BatchPayoutError('recipients must be a non-empty array', 'INVALID_INPUT'); + } + for (const r of recipients) { + if (!r.address || typeof r.address !== 'string') { + throw new BatchPayoutError('each recipient must have a string address', 'INVALID_INPUT'); + } + if (!r.amount || typeof r.amount !== 'string') { + throw new BatchPayoutError('each recipient must have a string amount', 'INVALID_INPUT'); + } + } + + const existing = store.getBatch(batchId); + if (existing) return existing; // idempotency + + /** @type {BatchRecord} */ + const record = { + id: batchId, + campaignId: campaignId ?? '', + status: BATCH_STATUS.PENDING, + recipients: recipients.map((r) => ({ + address: r.address, + amount: r.amount, + status: RECIPIENT_STATUS.PENDING, + txHash: null, + errorMessage: null, + })), + totalRecipients: recipients.length, + successCount: 0, + failCount: 0, + chunksCompleted: 0, + createdAt: new Date().toISOString(), + updatedAt: new Date().toISOString(), + completedAt: null, + }; + + store.saveBatch(record); + return store.getBatch(batchId); + } + + /** + * Execute a registered batch. Safe to call after a partial failure — + * resumes from the first still-pending recipient (checkpoint recovery). + * + * @param {string} batchId + * @returns {Promise} + */ + async function executeBatch(batchId) { + const batch = store.getBatch(batchId); + if (!batch) throw new BatchPayoutError(`Batch ${batchId} not found`, 'NOT_FOUND'); + + if (batch.status === BATCH_STATUS.COMPLETED) { + log.info(`batch:skip_completed id=${batchId}`); + return batch; + } + if (batch.status === BATCH_STATUS.RUNNING) { + throw new BatchPayoutError(`Batch ${batchId} is already running`, 'ALREADY_RUNNING'); + } + + store.updateBatch(batchId, { status: BATCH_STATUS.RUNNING }); + + // Resume: only process recipients that are still pending. + // Recipients already marked success/failed from a previous partial run are skipped. + const queue = batch.recipients + .filter((r) => r.status === RECIPIENT_STATUS.PENDING) + .map((r) => ({ ...r })); // mutable copies + + let successCount = batch.successCount; + let failCount = batch.failCount; + let chunksCompleted = batch.chunksCompleted; + let k = Math.min(maxOpsPerChunk, queue.length || 1); + let head = 0; + let aborted = false; + + while (head < queue.length && !aborted) { + const chunk = queue.slice(head, head + k); + + log.info(`batch:chunk_start id=${batchId} head=${head} ops=${chunk.length} k=${k}`); + + const simResult = await simulateChunk(chunk.map((r) => ({ address: r.address, amount: r.amount }))); + + if (!simResult.success) { + if (simResult.resourceLimitExceeded && k > MIN_OPS_PER_CHUNK) { + // Halve chunk size and retry the same head position + k = Math.max(MIN_OPS_PER_CHUNK, Math.floor(k / 2)); + log.warn(`batch:adaptive_k id=${batchId} reduced_to=${k}`); + continue; + } + + // Simulation failed for a non-resource reason, or k is already minimum + const reason = simResult.errorMessage || 'simulation failed'; + for (const r of chunk) { + r.status = RECIPIENT_STATUS.FAILED; + r.errorMessage = reason; + } + failCount += chunk.length; + + if (failMode === 'abort') { + aborted = true; + break; + } + + head += chunk.length; + _checkpoint(batchId, queue, { successCount, failCount, chunksCompleted }); + continue; + } + + // Simulation passed — submit the chunk + const submitResult = await submitChunk(chunk.map((r) => ({ address: r.address, amount: r.amount }))); + + if (submitResult.success) { + for (const r of chunk) { + r.status = RECIPIENT_STATUS.SUCCESS; + r.txHash = submitResult.txHash ?? null; + } + // Apply any per-recipient errors reported by the contract (partial ops failure) + if (submitResult.recipientErrors) { + for (const [addr, errMsg] of Object.entries(submitResult.recipientErrors)) { + const r = chunk.find((c) => c.address === addr); + if (r) { + r.status = RECIPIENT_STATUS.FAILED; + r.errorMessage = errMsg; + } + } + } + const chunkSuccess = chunk.filter((r) => r.status === RECIPIENT_STATUS.SUCCESS).length; + const chunkFail = chunk.filter((r) => r.status === RECIPIENT_STATUS.FAILED).length; + successCount += chunkSuccess; + failCount += chunkFail; + } else { + const reason = submitResult.errorMessage || 'submission failed'; + for (const r of chunk) { + r.status = RECIPIENT_STATUS.FAILED; + r.errorMessage = reason; + } + failCount += chunk.length; + if (failMode === 'abort') { + head += chunk.length; + _checkpoint(batchId, queue, { successCount, failCount, chunksCompleted }); + aborted = true; + break; + } + } + + chunksCompleted += 1; + head += chunk.length; + _checkpoint(batchId, queue, { successCount, failCount, chunksCompleted }); + } + + // Recompute totals from recipient state so that resumed batches + // (where a previously-failed recipient was reset to pending and retried) + // report accurate counts rather than accumulated deltas. + const finalBatch = store.getBatch(batchId); + const finalSuccess = finalBatch.recipients.filter((r) => r.status === RECIPIENT_STATUS.SUCCESS).length; + const finalFail = finalBatch.recipients.filter((r) => r.status === RECIPIENT_STATUS.FAILED).length; + + const finalStatus = aborted + ? BATCH_STATUS.FAILED + : finalFail > 0 + ? BATCH_STATUS.PARTIAL + : BATCH_STATUS.COMPLETED; + + store.updateBatch(batchId, { + status: finalStatus, + successCount: finalSuccess, + failCount: finalFail, + chunksCompleted, + completedAt: new Date().toISOString(), + }); + + return store.getBatch(batchId); + } + + /** + * Flush the current in-memory queue state back to the persistent store. + * Called after every chunk so a crash can be resumed mid-batch. + * + * @param {string} batchId + * @param {RecipientRecord[]} queue + * @param {{ successCount: number; failCount: number; chunksCompleted: number }} counters + */ + function _checkpoint(batchId, queue, counters) { + const batch = store.getBatch(batchId); + if (!batch) return; + + const queueMap = new Map(queue.map((r) => [r.address, r])); + const merged = batch.recipients.map((r) => { + const updated = queueMap.get(r.address); + return updated + ? { ...r, status: updated.status, txHash: updated.txHash, errorMessage: updated.errorMessage } + : r; + }); + + store.updateBatch(batchId, { recipients: merged, ...counters }); + } + + /** @param {string} batchId @returns {BatchRecord | undefined} */ + function getBatch(batchId) { + return store.getBatch(batchId); + } + + /** @returns {BatchRecord[]} */ + function listBatches() { + return store.listBatches(); + } + + return { registerBatch, executeBatch, getBatch, listBatches }; +} + +// ── internal helpers (exported for unit tests) ──────────────────────────────── + +/** + * Split an array into chunks of at most `size` elements. + * @template T + * @param {T[]} arr + * @param {number} size + * @returns {T[][]} + */ +export function chunkArray(arr, size) { + if (size <= 0) size = 1; + const out = []; + for (let i = 0; i < arr.length; i += size) { + out.push(arr.slice(i, i + size)); + } + return out; +} diff --git a/backend/src/services/batchPayoutService.test.js b/backend/src/services/batchPayoutService.test.js new file mode 100644 index 00000000..dd5e42a4 --- /dev/null +++ b/backend/src/services/batchPayoutService.test.js @@ -0,0 +1,392 @@ +// @ts-check +import assert from 'node:assert/strict'; +import test from 'node:test'; +import { + createBatchPayoutService, + createInMemoryBatchStore, + chunkArray, + RECIPIENT_STATUS, + BATCH_STATUS, + BatchPayoutError, +} from './batchPayoutService.js'; + +// ── chunkArray helper ───────────────────────────────────────────────────────── + +test('chunkArray splits an array into equal-sized chunks', () => { + assert.deepEqual(chunkArray([1, 2, 3, 4, 5], 2), [[1, 2], [3, 4], [5]]); +}); + +test('chunkArray returns one chunk when size >= array length', () => { + assert.deepEqual(chunkArray([1, 2, 3], 10), [[1, 2, 3]]); +}); + +test('chunkArray returns one-element chunks when size is 1', () => { + assert.deepEqual(chunkArray(['a', 'b'], 1), [['a'], ['b']]); +}); + +test('chunkArray returns empty array for empty input', () => { + assert.deepEqual(chunkArray([], 5), []); +}); + +// ── in-memory store ─────────────────────────────────────────────────────────── + +test('createInMemoryBatchStore round-trips a record', () => { + const store = createInMemoryBatchStore(); + const rec = { + id: 'b1', + campaignId: 'c1', + status: BATCH_STATUS.PENDING, + recipients: [], + totalRecipients: 0, + successCount: 0, + failCount: 0, + chunksCompleted: 0, + createdAt: new Date().toISOString(), + updatedAt: new Date().toISOString(), + completedAt: null, + }; + store.saveBatch(rec); + assert.equal(store.getBatch('b1').id, 'b1'); +}); + +test('createInMemoryBatchStore returns undefined for unknown batchId', () => { + const store = createInMemoryBatchStore(); + assert.equal(store.getBatch('nope'), undefined); +}); + +test('updateBatch throws for unknown batchId', () => { + const store = createInMemoryBatchStore(); + assert.throws(() => store.updateBatch('ghost', { status: BATCH_STATUS.RUNNING }), BatchPayoutError); +}); + +// ── registerBatch ───────────────────────────────────────────────────────────── + +function makeService(overrides = {}) { + const silentLog = { info() {}, warn() {}, error() {} }; + return createBatchPayoutService({ + simulateChunk: async () => ({ success: true, resourceLimitExceeded: false }), + submitChunk: async () => ({ success: true, txHash: 'TX_HASH' }), + store: createInMemoryBatchStore(), + maxOpsPerChunk: 3, + failMode: 'continue', + log: silentLog, + ...overrides, + }); +} + +const RECIPIENTS = [ + { address: 'GAAA', amount: '100' }, + { address: 'GBBB', amount: '200' }, + { address: 'GCCC', amount: '300' }, +]; + +test('registerBatch creates a batch in PENDING state', () => { + const svc = makeService(); + const batch = svc.registerBatch({ batchId: 'b1', recipients: RECIPIENTS, campaignId: 'c1' }); + assert.equal(batch.id, 'b1'); + assert.equal(batch.status, BATCH_STATUS.PENDING); + assert.equal(batch.totalRecipients, 3); + assert.equal(batch.successCount, 0); + assert.equal(batch.failCount, 0); +}); + +test('registerBatch is idempotent — second call returns the same record', () => { + const svc = makeService(); + const first = svc.registerBatch({ batchId: 'b1', recipients: RECIPIENTS, campaignId: 'c1' }); + const second = svc.registerBatch({ + batchId: 'b1', + recipients: [{ address: 'GDDD', amount: '99' }], + campaignId: 'c2', + }); + assert.deepEqual(first, second); + assert.equal(second.totalRecipients, 3); // original recipients preserved +}); + +test('registerBatch rejects empty recipients array', () => { + const svc = makeService(); + assert.throws( + () => svc.registerBatch({ batchId: 'b1', recipients: [], campaignId: 'c1' }), + BatchPayoutError, + ); +}); + +test('registerBatch rejects a recipient missing amount', () => { + const svc = makeService(); + assert.throws( + () => + svc.registerBatch({ + batchId: 'b1', + recipients: [{ address: 'GAAA', amount: '' }], + campaignId: 'c1', + }), + BatchPayoutError, + ); +}); + +// ── executeBatch — happy path ───────────────────────────────────────────────── + +test('executeBatch marks all recipients SUCCESS when everything passes', async () => { + const svc = makeService(); + svc.registerBatch({ batchId: 'b1', recipients: RECIPIENTS, campaignId: 'c1' }); + const result = await svc.executeBatch('b1'); + + assert.equal(result.status, BATCH_STATUS.COMPLETED); + assert.equal(result.successCount, 3); + assert.equal(result.failCount, 0); + for (const r of result.recipients) { + assert.equal(r.status, RECIPIENT_STATUS.SUCCESS); + assert.equal(r.txHash, 'TX_HASH'); + } +}); + +test('executeBatch uses the correct number of chunks (ceil(N/k))', async () => { + let chunkCallCount = 0; + const svc = makeService({ + simulateChunk: async () => ({ success: true, resourceLimitExceeded: false }), + submitChunk: async () => { + chunkCallCount += 1; + return { success: true, txHash: 'TX' }; + }, + maxOpsPerChunk: 2, + }); + svc.registerBatch({ batchId: 'b1', recipients: RECIPIENTS, campaignId: 'c1' }); // 3 recipients, k=2 → 2 chunks + await svc.executeBatch('b1'); + assert.equal(chunkCallCount, 2); +}); + +test('executeBatch skips already-completed batch without re-running', async () => { + let callCount = 0; + const svc = makeService({ + submitChunk: async () => { + callCount += 1; + return { success: true, txHash: 'TX' }; + }, + }); + svc.registerBatch({ batchId: 'b1', recipients: RECIPIENTS, campaignId: 'c1' }); + await svc.executeBatch('b1'); + const firstCallCount = callCount; + await svc.executeBatch('b1'); // second call — should be a no-op + assert.equal(callCount, firstCallCount); +}); + +test('executeBatch rejects if batch does not exist', async () => { + const svc = makeService(); + await assert.rejects(() => svc.executeBatch('ghost'), BatchPayoutError); +}); + +test('executeBatch rejects concurrent execution of the same batch', async () => { + let resolveSubmit; + const submitDone = new Promise((r) => { resolveSubmit = r; }); + + const svc = makeService({ + submitChunk: async () => { + await submitDone; + return { success: true, txHash: 'TX' }; + }, + }); + svc.registerBatch({ batchId: 'b1', recipients: RECIPIENTS, campaignId: 'c1' }); + const first = svc.executeBatch('b1'); + await assert.rejects(() => svc.executeBatch('b1'), BatchPayoutError); + resolveSubmit(); + await first; +}); + +// ── executeBatch — adaptive k ───────────────────────────────────────────────── + +test('executeBatch halves k on resourceLimitExceeded and retries', async () => { + const simulateCalls = []; + let firstCall = true; + + const svc = makeService({ + simulateChunk: async (ops) => { + simulateCalls.push(ops.length); + if (firstCall && ops.length > 1) { + firstCall = false; + return { success: false, resourceLimitExceeded: true }; + } + return { success: true, resourceLimitExceeded: false }; + }, + maxOpsPerChunk: 4, + }); + + const recipients = [ + { address: 'GA', amount: '1' }, + { address: 'GB', amount: '2' }, + { address: 'GC', amount: '3' }, + { address: 'GD', amount: '4' }, + ]; + svc.registerBatch({ batchId: 'b1', recipients, campaignId: 'c1' }); + const result = await svc.executeBatch('b1'); + + // First simulate call was 4 ops (failed), second was 2 (halved) + assert.equal(simulateCalls[0], 4); + assert.equal(simulateCalls[1], 2); + assert.equal(result.successCount, 4); + assert.equal(result.status, BATCH_STATUS.COMPLETED); +}); + +test('executeBatch fails ops when resource limit hits k=1 (cannot halve further)', async () => { + const svc = makeService({ + simulateChunk: async () => ({ success: false, resourceLimitExceeded: true, errorMessage: 'too big' }), + maxOpsPerChunk: 1, + }); + const recipients = [{ address: 'GA', amount: '1' }, { address: 'GB', amount: '2' }]; + svc.registerBatch({ batchId: 'b1', recipients, campaignId: 'c1' }); + const result = await svc.executeBatch('b1'); + + assert.equal(result.failCount, 2); + assert.equal(result.status, BATCH_STATUS.PARTIAL); + for (const r of result.recipients) { + assert.equal(r.status, RECIPIENT_STATUS.FAILED); + } +}); + +// ── executeBatch — fail modes ───────────────────────────────────────────────── + +test('continue mode: marks failed chunk and processes the rest', async () => { + let callIndex = 0; + const svc = makeService({ + simulateChunk: async () => ({ success: true, resourceLimitExceeded: false }), + submitChunk: async () => { + callIndex += 1; + if (callIndex === 1) return { success: false, errorMessage: 'rpc error' }; + return { success: true, txHash: 'TX2' }; + }, + maxOpsPerChunk: 1, + failMode: 'continue', + }); + const recipients = [{ address: 'GA', amount: '1' }, { address: 'GB', amount: '2' }]; + svc.registerBatch({ batchId: 'b1', recipients, campaignId: 'c1' }); + const result = await svc.executeBatch('b1'); + + assert.equal(result.failCount, 1); + assert.equal(result.successCount, 1); + assert.equal(result.status, BATCH_STATUS.PARTIAL); +}); + +test('abort mode: stops after first submission failure', async () => { + let submitCount = 0; + const svc = makeService({ + simulateChunk: async () => ({ success: true, resourceLimitExceeded: false }), + submitChunk: async () => { + submitCount += 1; + return { success: false, errorMessage: 'rpc error' }; + }, + maxOpsPerChunk: 1, + failMode: 'abort', + }); + const recipients = [ + { address: 'GA', amount: '1' }, + { address: 'GB', amount: '2' }, + { address: 'GC', amount: '3' }, + ]; + svc.registerBatch({ batchId: 'b1', recipients, campaignId: 'c1' }); + const result = await svc.executeBatch('b1'); + + assert.equal(submitCount, 1); // stopped after first failure + assert.equal(result.status, BATCH_STATUS.FAILED); +}); + +test('abort mode: stops after first simulation failure', async () => { + let simCount = 0; + const svc = makeService({ + simulateChunk: async () => { + simCount += 1; + return { success: false, resourceLimitExceeded: false, errorMessage: 'sim error' }; + }, + maxOpsPerChunk: 1, + failMode: 'abort', + }); + const recipients = [{ address: 'GA', amount: '1' }, { address: 'GB', amount: '2' }]; + svc.registerBatch({ batchId: 'b1', recipients, campaignId: 'c1' }); + const result = await svc.executeBatch('b1'); + + assert.equal(simCount, 1); + assert.equal(result.status, BATCH_STATUS.FAILED); +}); + +// ── executeBatch — per-recipient errors ─────────────────────────────────────── + +test('per-recipient errors from submitChunk are recorded correctly', async () => { + const svc = makeService({ + submitChunk: async (ops) => ({ + success: true, + txHash: 'TX1', + recipientErrors: { [ops[0].address]: 'account frozen' }, + }), + maxOpsPerChunk: 3, + }); + svc.registerBatch({ batchId: 'b1', recipients: RECIPIENTS, campaignId: 'c1' }); + const result = await svc.executeBatch('b1'); + + const frozen = result.recipients.find((r) => r.address === 'GAAA'); + assert.equal(frozen.status, RECIPIENT_STATUS.FAILED); + assert.equal(frozen.errorMessage, 'account frozen'); + assert.equal(result.failCount, 1); + assert.equal(result.successCount, 2); + assert.equal(result.status, BATCH_STATUS.PARTIAL); +}); + +// ── executeBatch — checkpointing / resume ───────────────────────────────────── + +test('executeBatch resumes from first pending recipient after partial failure', async () => { + const store = createInMemoryBatchStore(); + const silentLog = { info() {}, warn() {}, error() {} }; + let submitCount = 0; + + function makeSvc(failFirst) { + return createBatchPayoutService({ + simulateChunk: async () => ({ success: true, resourceLimitExceeded: false }), + submitChunk: async () => { + submitCount += 1; + if (failFirst && submitCount === 1) return { success: false, errorMessage: 'transient' }; + return { success: true, txHash: 'TX_OK' }; + }, + store, + maxOpsPerChunk: 1, + failMode: 'continue', + log: silentLog, + }); + } + + // First run: first op fails + const svc1 = makeSvc(true); + svc1.registerBatch({ batchId: 'b1', recipients: RECIPIENTS, campaignId: 'c1' }); + const partial = await svc1.executeBatch('b1'); + assert.equal(partial.status, BATCH_STATUS.PARTIAL); + assert.equal(partial.failCount, 1); + assert.equal(partial.successCount, 2); + + // Reset submit counter; second run should not re-submit already-succeeded recipients + submitCount = 0; + const svc2 = makeSvc(false); + // Force the failed recipient back to pending so it can be retried + const stored = store.getBatch('b1'); + stored.recipients[0].status = RECIPIENT_STATUS.PENDING; + stored.recipients[0].errorMessage = null; + store.updateBatch('b1', { recipients: stored.recipients, status: BATCH_STATUS.PENDING }); + + const final = await svc2.executeBatch('b1'); + assert.equal(submitCount, 1); // only the previously-failed recipient was resubmitted + assert.equal(final.successCount, 3); + assert.equal(final.status, BATCH_STATUS.COMPLETED); +}); + +// ── getBatch / listBatches ──────────────────────────────────────────────────── + +test('getBatch returns undefined for unknown batchId', () => { + const svc = makeService(); + assert.equal(svc.getBatch('ghost'), undefined); +}); + +test('listBatches returns all registered batches in reverse-creation order', async () => { + const svc = makeService(); + svc.registerBatch({ batchId: 'b1', recipients: [{ address: 'GA', amount: '1' }], campaignId: 'c1' }); + // Small delay so createdAt strings differ + await new Promise((r) => setTimeout(r, 2)); + svc.registerBatch({ batchId: 'b2', recipients: [{ address: 'GB', amount: '2' }], campaignId: 'c1' }); + + const list = svc.listBatches(); + assert.equal(list.length, 2); + assert.equal(list[0].id, 'b2'); // most recent first +}); diff --git a/backend/src/services/featureFlagService.js b/backend/src/services/featureFlagService.js new file mode 100644 index 00000000..fdfc249c --- /dev/null +++ b/backend/src/services/featureFlagService.js @@ -0,0 +1,99 @@ +// @ts-check + +/** + * Evaluates percentage-based rollout using a deterministic hash of flagKey + userId. + * @param {string} flagKey + * @param {string} userId + * @param {number} percentage 0–100 + */ +function inPercentageRollout(flagKey, userId, percentage) { + if (percentage <= 0) return false; + if (percentage >= 100) return true; + // Simple deterministic hash: sum char codes then mod 100 + const seed = `${flagKey}:${userId}`; + let hash = 0; + for (let i = 0; i < seed.length; i++) { + hash = (hash * 31 + seed.charCodeAt(i)) >>> 0; + } + return hash % 100 < percentage; +} + +/** + * @param {{ + * featureFlagRepository: ReturnType + * }} deps + */ +export function createFeatureFlagService({ featureFlagRepository }) { + /** + * Evaluate whether a flag is enabled for the given context. + * Falls back to `false` (safe default) if the store throws. + * + * @param {string} flagKey + * @param {{ userId?: string, orgId?: string }} [context] + * @returns {boolean} + */ + function isEnabled(flagKey, context = {}) { + try { + const flag = featureFlagRepository.getByKey(flagKey); + if (!flag || !flag.enabled) return false; + + const { targeting } = flag; + + // Kill-switch: if killSwitch is explicitly set, it overrides everything + if (targeting.killSwitch === true) return false; + + const { userId, orgId } = context; + + // Org targeting: only enabled for specific orgs + if (Array.isArray(targeting.allowedOrgs) && targeting.allowedOrgs.length > 0) { + if (!orgId || !targeting.allowedOrgs.includes(orgId)) return false; + } + + // User targeting: only enabled for specific users + if (Array.isArray(targeting.allowedUsers) && targeting.allowedUsers.length > 0) { + if (!userId || !targeting.allowedUsers.includes(userId)) return false; + } + + // Percentage rollout: requires a userId for deterministic assignment + if (typeof targeting.percentage === 'number') { + if (!userId) return false; + return inPercentageRollout(flagKey, userId, targeting.percentage); + } + + return true; + } catch { + // Store unavailable → safe default is off + return false; + } + } + + /** + * Returns all flags for client hydration. + * Falls back to empty array if the store throws. + */ + function getAllFlags() { + try { + return featureFlagRepository.list(); + } catch { + return []; + } + } + + /** + * Create or update a flag (admin operation). + * @param {{ flagKey: string, enabled?: boolean, targeting?: object, description?: string|null }} params + */ + function setFlag({ flagKey, enabled = false, targeting = {}, description = null }) { + return featureFlagRepository.upsert({ flagKey, enabled, targeting, description }); + } + + /** + * Remove a flag entirely. + * @param {string} flagKey + */ + function deleteFlag(flagKey) { + return featureFlagRepository.remove(flagKey); + } + + return { isEnabled, getAllFlags, setFlag, deleteFlag }; +} diff --git a/backend/src/services/featureFlagService.test.js b/backend/src/services/featureFlagService.test.js new file mode 100644 index 00000000..0b024904 --- /dev/null +++ b/backend/src/services/featureFlagService.test.js @@ -0,0 +1,124 @@ +// @ts-check +import { describe, it, expect, beforeEach } from 'vitest'; +import Database from 'better-sqlite3'; +import { runMigrations } from '../db/migrate.js'; +import { createSqliteFeatureFlagRepository } from '../dal/sqliteFeatureFlagRepository.js'; +import { createFeatureFlagService } from './featureFlagService.js'; + +function makeService() { + const db = new Database(':memory:'); + runMigrations(db); + const featureFlagRepository = createSqliteFeatureFlagRepository({ db }); + const service = createFeatureFlagService({ featureFlagRepository }); + return { service, featureFlagRepository }; +} + +describe('featureFlagService', () => { + let service; + let repo; + + beforeEach(() => { + const result = makeService(); + service = result.service; + repo = result.featureFlagRepository; + }); + + it('returns false for an unknown flag', () => { + expect(service.isEnabled('nonexistent')).toBe(false); + }); + + it('returns false when flag is disabled', () => { + service.setFlag({ flagKey: 'my-flag', enabled: false }); + expect(service.isEnabled('my-flag')).toBe(false); + }); + + it('returns true when flag is enabled with no targeting', () => { + service.setFlag({ flagKey: 'my-flag', enabled: true }); + expect(service.isEnabled('my-flag')).toBe(true); + }); + + it('kill-switch overrides enabled flag', () => { + service.setFlag({ flagKey: 'my-flag', enabled: true, targeting: { killSwitch: true } }); + expect(service.isEnabled('my-flag')).toBe(false); + }); + + it('org targeting: allows matching org', () => { + service.setFlag({ flagKey: 'org-flag', enabled: true, targeting: { allowedOrgs: ['org-1'] } }); + expect(service.isEnabled('org-flag', { orgId: 'org-1' })).toBe(true); + }); + + it('org targeting: blocks non-matching org', () => { + service.setFlag({ flagKey: 'org-flag', enabled: true, targeting: { allowedOrgs: ['org-1'] } }); + expect(service.isEnabled('org-flag', { orgId: 'org-2' })).toBe(false); + }); + + it('org targeting: blocks missing org', () => { + service.setFlag({ flagKey: 'org-flag', enabled: true, targeting: { allowedOrgs: ['org-1'] } }); + expect(service.isEnabled('org-flag')).toBe(false); + }); + + it('user targeting: allows matching user', () => { + service.setFlag({ flagKey: 'user-flag', enabled: true, targeting: { allowedUsers: ['user-42'] } }); + expect(service.isEnabled('user-flag', { userId: 'user-42' })).toBe(true); + }); + + it('user targeting: blocks non-matching user', () => { + service.setFlag({ flagKey: 'user-flag', enabled: true, targeting: { allowedUsers: ['user-42'] } }); + expect(service.isEnabled('user-flag', { userId: 'user-99' })).toBe(false); + }); + + it('percentage rollout: 100% enables all users', () => { + service.setFlag({ flagKey: 'pct-flag', enabled: true, targeting: { percentage: 100 } }); + expect(service.isEnabled('pct-flag', { userId: 'any-user' })).toBe(true); + }); + + it('percentage rollout: 0% disables all users', () => { + service.setFlag({ flagKey: 'pct-flag', enabled: true, targeting: { percentage: 0 } }); + expect(service.isEnabled('pct-flag', { userId: 'any-user' })).toBe(false); + }); + + it('percentage rollout: requires userId', () => { + service.setFlag({ flagKey: 'pct-flag', enabled: true, targeting: { percentage: 100 } }); + expect(service.isEnabled('pct-flag')).toBe(false); + }); + + it('percentage rollout: deterministic — same user always gets same result', () => { + service.setFlag({ flagKey: 'stable-flag', enabled: true, targeting: { percentage: 50 } }); + const first = service.isEnabled('stable-flag', { userId: 'test-user' }); + expect(service.isEnabled('stable-flag', { userId: 'test-user' })).toBe(first); + }); + + it('getAllFlags returns empty array on empty store', () => { + expect(service.getAllFlags()).toEqual([]); + }); + + it('getAllFlags returns all created flags', () => { + service.setFlag({ flagKey: 'a', enabled: true }); + service.setFlag({ flagKey: 'b', enabled: false }); + const flags = service.getAllFlags(); + expect(flags).toHaveLength(2); + expect(flags.map((f) => f.flagKey).sort()).toEqual(['a', 'b']); + }); + + it('deleteFlag removes the flag', () => { + service.setFlag({ flagKey: 'temp', enabled: true }); + expect(service.deleteFlag('temp')).toBe(true); + expect(service.isEnabled('temp')).toBe(false); + }); + + it('deleteFlag returns false for non-existent flag', () => { + expect(service.deleteFlag('ghost')).toBe(false); + }); + + it('safe default: returns false when store throws', () => { + const brokenRepo = { + getByKey: () => { throw new Error('db down'); }, + list: () => { throw new Error('db down'); }, + upsert: () => { throw new Error('db down'); }, + remove: () => { throw new Error('db down'); }, + }; + const brokenService = createFeatureFlagService({ featureFlagRepository: brokenRepo }); + expect(brokenService.isEnabled('any-flag')).toBe(false); + expect(brokenService.getAllFlags()).toEqual([]); + }); +}); diff --git a/frontend/src/hooks/useFeatureFlag.js b/frontend/src/hooks/useFeatureFlag.js new file mode 100644 index 00000000..32d47773 --- /dev/null +++ b/frontend/src/hooks/useFeatureFlag.js @@ -0,0 +1,14 @@ +import { useFeatureFlagContext } from '../lib/FeatureFlagContext'; + +/** + * Returns whether a named feature flag is enabled for the current session. + * Reads from the flag map hydrated by FeatureFlagProvider on app start. + * Defaults to `false` when the flag is unknown or the store was unreachable. + * + * @param {string} flagKey + * @returns {boolean} + */ +export function useFeatureFlag(flagKey) { + const flags = useFeatureFlagContext(); + return flags[flagKey] === true; +} diff --git a/frontend/src/lib/FeatureFlagContext.jsx b/frontend/src/lib/FeatureFlagContext.jsx new file mode 100644 index 00000000..c654f5b1 --- /dev/null +++ b/frontend/src/lib/FeatureFlagContext.jsx @@ -0,0 +1,34 @@ +import { createContext, useContext, useEffect, useState } from 'react'; +import { apiUrl } from '../config'; + +const FeatureFlagContext = createContext({}); + +/** + * Fetches all feature flags from the server on mount and provides a stable + * key→boolean map to the React tree. Falls back to an empty map if the + * request fails so the app keeps working when the flag store is unavailable. + */ +export function FeatureFlagProvider({ children }) { + const [flags, setFlags] = useState({}); + + useEffect(() => { + let cancelled = false; + fetch(apiUrl('/feature-flags')) + .then((res) => (res.ok ? res.json() : Promise.reject(res))) + .then((data) => { + if (!cancelled) setFlags(data.flags ?? {}); + }) + .catch(() => { + // Safe default: no flags enabled if store is unreachable + }); + return () => { + cancelled = true; + }; + }, []); + + return {children}; +} + +export function useFeatureFlagContext() { + return useContext(FeatureFlagContext); +}