Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
14 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 15 additions & 15 deletions backend/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -21,42 +21,42 @@
"dependencies": {
"@aws-sdk/client-s3": "^3.1057.0",
"@opentelemetry/api": "^1.9.0",
"@opentelemetry/exporter-trace-otlp-http": "^0.55.0",
"@opentelemetry/instrumentation-express": "^0.45.0",
"@opentelemetry/instrumentation-http": "^0.55.0",
"@opentelemetry/resources": "^1.28.0",
"@opentelemetry/sdk-node": "^0.55.0",
"@opentelemetry/exporter-trace-otlp-http": "^0.219.0",
"@opentelemetry/instrumentation-express": "^0.67.0",
"@opentelemetry/instrumentation-http": "^0.219.0",
"@opentelemetry/resources": "^2.8.0",
"@opentelemetry/sdk-node": "^0.219.0",
"@opentelemetry/semantic-conventions": "^1.28.0",
"@stellar/stellar-sdk": "^14.0.0",
"better-sqlite3": "^11.0.0",
"@stellar/stellar-sdk": "^16.0.1",
"better-sqlite3": "^12.11.1",
"compression": "^1.8.0",
"cors": "^2.8.5",
"dotenv": "^16.4.5",
"express": "^4.21.0",
"dotenv": "^17.4.2",
"express": "^5.2.1",
"helmet": "^8.0.0",
"ioredis": "^5.4.0",
"multer": "^2.1.1",
"pg": "^8.13.0",
"pino": "^9.0.0",
"pino": "^10.3.1",
"supertest": "^7.0.0",
"swagger-ui-express": "^5.0.1",
"validator": "^13.12.0",
"web-push": "^3.6.7",
"ws": "^8.18.0",
"zod": "^3.23.0"
"zod": "^4.4.3"
},
"devDependencies": {
"@readme/openapi-parser": "^2.6.0",
"@redocly/cli": "^1.34.5",
"@readme/openapi-parser": "^6.1.3",
"@redocly/cli": "^2.35.1",
"@types/cors": "^2.8.0",
"@types/express": "^4.17.0",
"@types/node": "^20.0.0",
"@types/supertest": "^6.0.0",
"ajv": "^8.17.0",
"ajv-formats": "^3.0.1",
"js-yaml": "^4.1.0",
"js-yaml": "^5.1.0",
"nodemon": "^3.1.0",
"redoc-cli": "^0.13.21",
"typescript": "^5.0.0"
"typescript": "^6.0.3"
}
}
2 changes: 2 additions & 0 deletions backend/src/dal/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import { createSqliteAllowlistRepository } from './sqliteAllowlistRepository.js'
import { SqliteOrganizationRepository } from './sqliteOrganizationRepository.js';
import { createSqliteOrgMemberRepository } from './sqliteOrgMemberRepository.js';
import { createSqliteUsageRepository } from './sqliteUsageRepository.js';
import { createSqliteFeatureFlagRepository } from './sqliteFeatureFlagRepository.js';

import { runPgMigrations } from './pg/migrate.js';
import { createPgCampaignRepository } from './pg/pgCampaignRepository.js';
Expand Down Expand Up @@ -92,6 +93,7 @@ export async function createDal({
organizations: new SqliteOrganizationRepository(db),
orgMembers: createSqliteOrgMemberRepository({ db }),
usage: createSqliteUsageRepository({ db }),
featureFlags: createSqliteFeatureFlagRepository({ db }),
db,
pgPool,
};
Expand Down
2 changes: 2 additions & 0 deletions backend/src/dal/sqliteCampaignRepository.js
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,8 @@ export function createSqliteCampaignRepository({
if (status && status !== 'all') {
where.push('campaigns.status = ?');
params.push(status);
} else if (!status) {
where.push("campaigns.status = 'published'");
}

if (category) {
Expand Down
55 changes: 55 additions & 0 deletions backend/src/dal/sqliteFeatureFlagRepository.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
// @ts-check

/**
* @param {{ id: number, flag_key: string, enabled: number, targeting: string, description: string|null, created_at: string, updated_at: string }} row
*/
function rowToFlag(row) {
return {
id: row.id,
flagKey: row.flag_key,
enabled: row.enabled === 1,
targeting: JSON.parse(row.targeting || '{}'),
description: row.description ?? null,
createdAt: row.created_at,
updatedAt: row.updated_at,
};
}

/**
* @param {{ db: import('better-sqlite3').Database }} deps
*/
export function createSqliteFeatureFlagRepository({ db }) {
function upsert({ flagKey, enabled = false, targeting = {}, description = null }) {
const now = new Date().toISOString();
const targetingJson = JSON.stringify(targeting);
const enabledInt = enabled ? 1 : 0;

db.prepare(
`INSERT INTO feature_flags (flag_key, enabled, targeting, description, created_at, updated_at)
VALUES (?, ?, ?, ?, ?, ?)
ON CONFLICT(flag_key) DO UPDATE SET
enabled = excluded.enabled,
targeting = excluded.targeting,
description = excluded.description,
updated_at = excluded.updated_at`,
).run(flagKey, enabledInt, targetingJson, description, now, now);

return rowToFlag(db.prepare('SELECT * FROM feature_flags WHERE flag_key = ?').get(flagKey));
}

function getByKey(flagKey) {
const row = db.prepare('SELECT * FROM feature_flags WHERE flag_key = ?').get(flagKey);
return row ? rowToFlag(row) : null;
}

function list() {
return db.prepare('SELECT * FROM feature_flags ORDER BY flag_key').all().map(rowToFlag);
}

function remove(flagKey) {
const result = db.prepare('DELETE FROM feature_flags WHERE flag_key = ?').run(flagKey);
return result.changes > 0;
}

return { upsert, getByKey, list, remove };
}
18 changes: 18 additions & 0 deletions backend/src/db/migrations/016_feature_flags.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
// @ts-check
export const version = 16;
export const description = 'feature flags store';

/** @param {import('better-sqlite3').Database} db */
export function up(db) {
db.exec(`
CREATE TABLE IF NOT EXISTS feature_flags (
id INTEGER PRIMARY KEY AUTOINCREMENT,
flag_key TEXT NOT NULL UNIQUE,
enabled INTEGER NOT NULL DEFAULT 0,
targeting TEXT NOT NULL DEFAULT '{}',
description TEXT,
created_at TEXT NOT NULL DEFAULT (datetime('now')),
updated_at TEXT NOT NULL DEFAULT (datetime('now'))
);
`);
}
20 changes: 10 additions & 10 deletions backend/src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,12 @@ import { createAuditLogService } from './services/auditLogService.js';
import { createWebPushService } from './services/webPushService.js';
import { createOrganizationRoutes } from './routes/organizations.js';
import { createUsageMeteringService } from './services/usageMeteringService.js';
import { createFeatureFlagRoutes } from './routes/featureFlags.js';
import { createFeatureFlagService } from './services/featureFlagService.js';
import { createUsageMeteringMiddleware } from './middleware/usageMetering.js';
import { requestTimeout } from './middleware/timeout.js';
import { PoolSaturatedError } from './rpcPool.js';
import { initializeWebSocket, getWebSocketServer } from './websocket/index.js';
import { requireScope } from './middleware/rbac.js';

const DEFAULT_PORT = 3001;
Expand Down Expand Up @@ -1658,6 +1661,8 @@ export async function createApp(options = {}) {
app.put(`${prefix}/campaigns/:id/archive`, rateLimiter, requireApiKey, archiveCampaign);
app.delete(`${prefix}/campaigns/:id`, rateLimiter, requireApiKey, deleteCampaign);
app.put(`${prefix}/campaigns/:id`, rateLimiter, ...guard, updateCampaign);
app.put(`${prefix}/campaigns/:id/publish`, rateLimiter, ...guard, publishCampaign);
app.put(`${prefix}/campaigns/:id/archive`, rateLimiter, ...guard, archiveCampaign);
app.delete(`${prefix}/campaigns/:id`, rateLimiter, ...guard, deleteCampaign);

app.post(`${prefix}/admin/api-keys`, rateLimiter, requireMasterKey, createApiKeyHandler);
Expand Down Expand Up @@ -1903,6 +1908,11 @@ export async function createApp(options = {}) {
const organizationRouter = createOrganizationRoutes(dal);
app.use(`${prefix}/organizations`, rateLimiter, requireApiKey, organizationRouter);
app.use(prefix, rateLimiter, ...guard, pushRouter);

// Feature flag system routes (Issue #625)
const featureFlagService = createFeatureFlagService({ featureFlagRepository: dal.featureFlags });
const featureFlagRouter = createFeatureFlagRoutes({ featureFlagService });
app.use(`${prefix}/feature-flags`, rateLimiter, featureFlagRouter);
}

registerApiRoutes(API_V1_PREFIX);
Expand Down Expand Up @@ -1967,12 +1977,6 @@ export async function startServer(options = {}) {
}

// ── Graceful shutdown (issue #650) ─────────────────────────────────────────
// On SIGTERM / SIGINT:
// 1. Stop accepting new connections (server.close).
// 2. Allow in-flight HTTP requests to finish for up to SHUTDOWN_GRACE_MS.
// 3. Send "Connection: close / will-reconnect" hint to open SSE/WS streams.
// 4. Flush OTel spans.
// 5. Exit 0 once everything is drained (or force-exit after the grace window).
const SHUTDOWN_GRACE_MS = normalizePositiveInteger(process.env.SHUTDOWN_GRACE_MS, 15_000);

let shuttingDown = false;
Expand All @@ -1982,21 +1986,17 @@ export async function startServer(options = {}) {
shuttingDown = true;
log.info({ signal, graceMs: SHUTDOWN_GRACE_MS }, 'graceful shutdown started');

// Force exit after the grace window so a stuck handler never blocks a deploy.
const forceTimer = setTimeout(() => {
log.error('graceful shutdown timed out — forcing exit');
process.exit(1);
}, SHUTDOWN_GRACE_MS);
if (typeof forceTimer.unref === 'function') forceTimer.unref();

// Stop accepting new connections; drain in-flight HTTP requests.
await new Promise((resolve) => server.close(resolve));

// Flush usage counters to DB before exiting.
stopUsageFlush();
await usageMeteringService.flushToDb().catch((err) => log.warn({ err }, 'usage flush warning'));

// Flush OTel exporter.
await shutdownTracing().catch((err) => log.warn({ err }, 'OTel shutdown warning'));

log.info('graceful shutdown complete');
Expand Down
90 changes: 90 additions & 0 deletions backend/src/routes/batchPayout.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
// @ts-check
import { Router } from 'express';
import { randomUUID } from 'node:crypto';
import { BatchPayoutError } from '../services/batchPayoutService.js';

/**
* Admin API for batch payout operations.
*
* Routes:
* POST /api/v1/batch-payouts – register + launch a batch
* GET /api/v1/batch-payouts – list all batches
* GET /api/v1/batch-payouts/:batchId – get batch status + per-recipient detail
*
* @param {{
* batchPayoutService: ReturnType<import('../services/batchPayoutService.js').createBatchPayoutService>,
* requireApiKey: import('express').RequestHandler | import('express').RequestHandler[],
* log: Pick<Console, 'info' | 'warn' | 'error'>,
* }} deps
* @returns {import('express').Router}
*/
export function createBatchPayoutRouter({ batchPayoutService, requireApiKey, log }) {
const router = Router();
const auth = Array.isArray(requireApiKey) ? requireApiKey : [requireApiKey];

// ── POST /api/v1/batch-payouts ─────────────────────────────────────────────
// Register a new batch and kick off async execution.
// Idempotent: if batchId is supplied and already exists, returns the existing
// record without re-launching. Omit batchId to auto-generate one.

router.post('/batch-payouts', ...auth, (req, res) => {
const { campaignId, recipients, batchId, failMode } = req.body ?? {};

if (!Array.isArray(recipients) || recipients.length === 0) {
return res.status(400).json({ error: 'recipients must be a non-empty array', code: 'VALIDATION_ERROR' });
}
if (recipients.length > 10_000) {
return res.status(400).json({ error: 'Maximum 10 000 recipients per batch', code: 'BATCH_TOO_LARGE' });
}

const id = typeof batchId === 'string' && batchId ? batchId : randomUUID();

let batch;
try {
batch = batchPayoutService.registerBatch({ batchId: id, recipients, campaignId: campaignId ?? '' });
} catch (err) {
if (err instanceof BatchPayoutError) {
return res.status(400).json({ error: err.message, code: err.code });
}
log.error(err, 'batch_payout:register_error');
return res.status(500).json({ error: 'Internal server error', code: 'INTERNAL_ERROR' });
}

// Fire-and-forget — progress is visible via GET /batch-payouts/:batchId
batchPayoutService.executeBatch(id).catch((err) => {
if (err instanceof BatchPayoutError && err.code === 'ALREADY_RUNNING') return;
log.error({ err, batchId: id }, 'batch_payout:execution_error');
});

return res.status(202).json({
batchId: id,
status: batch.status,
totalRecipients: batch.totalRecipients,
});
});

// ── GET /api/v1/batch-payouts ──────────────────────────────────────────────

router.get('/batch-payouts', ...auth, (_req, res) => {
const batches = batchPayoutService.listBatches().map(summaryView);
return res.json({ batches, count: batches.length });
});

// ── GET /api/v1/batch-payouts/:batchId ────────────────────────────────────

router.get('/batch-payouts/:batchId', ...auth, (req, res) => {
const batch = batchPayoutService.getBatch(req.params.batchId);
if (!batch) {
return res.status(404).json({ error: 'Batch not found', code: 'NOT_FOUND' });
}
return res.json({ batch });
});

return router;
}

/** Strip the recipient list for the summary view to keep list responses small. */
function summaryView(batch) {
const { recipients: _r, ...rest } = batch;
return rest;
}
50 changes: 50 additions & 0 deletions backend/src/routes/featureFlags.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
// @ts-check
import express from 'express';

/**
* @param {{ featureFlagService: ReturnType<import('../services/featureFlagService.js').createFeatureFlagService> }} deps
*/
export function createFeatureFlagRoutes({ featureFlagService }) {
const router = express.Router();

// Hydrate client with all flags (evaluated values not stored rules)
router.get('/', (_req, res) => {
const flags = featureFlagService.getAllFlags();
// Return a simple key→enabled map safe to expose to the client
const map = Object.fromEntries(flags.map((f) => [f.flagKey, f.enabled]));
res.json({ flags: map });
});

// Evaluate a single flag for a given context
router.get('/:key', (req, res) => {
const { key } = req.params;
const userId = typeof req.query.userId === 'string' ? req.query.userId : undefined;
const orgId = typeof req.query.orgId === 'string' ? req.query.orgId : undefined;
const enabled = featureFlagService.isEnabled(key, { userId, orgId });
res.json({ flagKey: key, enabled });
});

// Admin: create or update a flag
router.post('/', (req, res) => {
const { flagKey, enabled, targeting, description } = req.body ?? {};
if (!flagKey || typeof flagKey !== 'string') {
return res.status(400).json({ error: 'flagKey is required' });
}
const flag = featureFlagService.setFlag({
flagKey,
enabled: Boolean(enabled),
targeting: targeting ?? {},
description: description ?? null,
});
res.status(201).json(flag);
});

// Admin: delete a flag (kill-switch removal)
router.delete('/:key', (req, res) => {
const deleted = featureFlagService.deleteFlag(req.params.key);
if (!deleted) return res.status(404).json({ error: 'Flag not found' });
res.status(204).end();
});

return router;
}
Loading
Loading