Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 110 additions & 0 deletions backend/prisma/schema.prisma
Original file line number Diff line number Diff line change
Expand Up @@ -1973,3 +1973,113 @@ model FeeChangeLog {
@@index([createdAt])
@@map("fee_change_logs")
}

// ─── Issue #447: Smart Contract Event Indexer ─────────────────────────────────

enum IndexedEventChain {
stellar
evm
}

model IndexedEvent {
id String @id @default(uuid())
dedupKey String @unique @map("dedup_key") // chain:txHash[:logIndex]
chain IndexedEventChain
contractAddress String @map("contract_address")
eventType String @map("event_type")
blockNumber BigInt @map("block_number")
txHash String @map("tx_hash")
timestamp DateTime
payload Json
confirmations Int @default(0)
retentionUntil DateTime @map("retention_until")
createdAt DateTime @default(now()) @map("created_at")

@@index([chain, contractAddress, eventType])
@@index([chain, contractAddress, timestamp])
@@index([eventType])
@@index([timestamp])
@@index([retentionUntil])
@@map("indexed_events")
}

model WsEventSubscription {
id String @id @default(uuid())
sessionId String @map("session_id")
contractAddress String? @map("contract_address")
eventType String? @map("event_type")
chain String?
createdAt DateTime @default(now()) @map("created_at")
expiresAt DateTime @map("expires_at")

@@index([sessionId])
@@index([expiresAt])
@@map("ws_event_subscriptions")
}

// ─── Issue #446: AI Payment Routing Metrics ────────────────────────────────────

model RoutingDecision {
id String @id @default(uuid())
tenantId String? @map("tenant_id")
requestId String @unique @map("request_id")
selectedChain String @map("selected_chain")
fallbackChains String[] @map("fallback_chains")
scoreStellar Float? @map("score_stellar")
scoreEvm Float? @map("score_evm")
featureSnapshot Json @map("feature_snapshot") // gas prices, latencies, success rates at decision time
rationale String?
latencyMs Int? @map("latency_ms")
isManualOverride Boolean @default(false) @map("is_manual_override")
overrideBy String? @map("override_by")
abVariant String? @map("ab_variant") // "static" | "ai"
outcome String? // "success" | "failed" | null (pending)
createdAt DateTime @default(now()) @map("created_at")

@@index([tenantId, createdAt])
@@index([selectedChain, createdAt])
@@index([abVariant])
@@map("routing_decisions")
}

model ChainPerformanceMetric {
id String @id @default(uuid())
chain String
sampleAt DateTime @default(now()) @map("sample_at")
avgGasPrice Float? @map("avg_gas_price")
avgConfirmTimeMs Float? @map("avg_confirm_time_ms")
successRate Float? @map("success_rate") // 0–1
p50LatencyMs Float? @map("p50_latency_ms")
p99LatencyMs Float? @map("p99_latency_ms")
sampleSize Int @default(0) @map("sample_size")

@@index([chain, sampleAt])
@@index([sampleAt])
@@map("chain_performance_metrics")
}

// ─── Issue #668: PII Audit ────────────────────────────────────────────────────

enum PiiClassificationLevel {
strict
standard
permissive
}

model PiiAuditLog {
id String @id @default(uuid())
endpoint String
method String @default("GET")
fieldPath String @map("field_path") // JSON path of detected PII
piiType String @map("pii_type") // email | phone | ssn | crypto_address | api_key
action String @default("redacted") // redacted | masked | allowed
level PiiClassificationLevel @default(standard)
tenantId String? @map("tenant_id")
requestId String? @map("request_id")
createdAt DateTime @default(now()) @map("created_at")

@@index([endpoint, createdAt])
@@index([piiType])
@@index([tenantId, createdAt])
@@map("pii_audit_logs")
}
15 changes: 15 additions & 0 deletions backend/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,10 @@ import { liquidityProtectionRouter } from './routes/liquidity-protection.js';
import { bulkPaymentsRouter } from './routes/bulk-payments.js';
import { feesRouter } from './routes/fees.js';
import { apiUsageTracker, checkQuota } from './middleware/api-usage-tracker.js';
import indexerRouter from './routes/indexer.js';
import aiRoutingRouter from './routes/ai-routing.js';
import piiRouter from './routes/pii.js';
import { piiRedactionMiddleware } from './middleware/pii-redaction.js';

// Validate environment variables at startup
validateEnv();
Expand Down Expand Up @@ -217,6 +221,8 @@ app.use('/webhooks', webhookHandlersRouter);
app.use(express.json());
app.use(express.text({ type: ['text/csv', 'text/plain'] }));
app.use('/api', openApiValidator({ validateResponses: process.env.OPENAPI_VALIDATE_RESPONSES === 'true' }));
// Redact PII from all outgoing JSON API responses — Issue #668
app.use('/api', piiRedactionMiddleware);

app.use(
compressionMiddleware({
Expand Down Expand Up @@ -414,6 +420,15 @@ app.use('/api/v1/payments/bulk', bulkPaymentsRouter);
// Dynamic fee calculation engine with tiered pricing — Issue #468
app.use('/api/v1/fees', feesRouter);

// Smart contract event indexer with real-time WebSocket streams — Issue #447
app.use('/api/v1/indexer', indexerRouter);

// AI-powered payment routing engine — Issue #446
app.use('/api/v1/routing/ai', aiRoutingRouter);

// PII classification and redaction audit — Issue #668
app.use('/api/v1/pii', piiRouter);

// Sandbox environment for testing (with relaxed rate limits)
const sandboxRouter = createSandboxRouter(getSandboxManager(), getMockPaymentProcessor(), getTestDataSeeder());
app.use('/api/v1/sandbox', sandboxRateLimiter, sandboxRouter);
Expand Down
75 changes: 75 additions & 0 deletions backend/src/middleware/pii-redaction.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/**
* PII Redaction Middleware (#668)
*
* Express middleware that intercepts outgoing JSON responses and log records,
* runs them through the PiiClassifier, redacts detected PII, and writes an
* audit entry to the PiiAuditLog table.
*/
import type { Request, Response, NextFunction } from 'express';
import { piiClassifier, type DetectedPii } from '../services/pii/pii-classifier.js';
import { prisma } from '../lib/prisma.js';

// ─── Response redaction middleware ────────────────────────────────────────────

export function piiRedactionMiddleware(req: Request, res: Response, next: NextFunction): void {
const originalJson = res.json.bind(res) as typeof res.json;

res.json = function (body: unknown): Response {
try {
const { detections, redacted } = piiClassifier.classify(body);
if (detections.length > 0) {
void persistAudit(detections, req);
return originalJson(redacted);
}
} catch {
// Never block the response on classifier failure
}
return originalJson(body);
};

next();
}

async function persistAudit(detections: DetectedPii[], req: Request): Promise<void> {
const tenantId = (req as Request & { tenantId?: string }).tenantId;
const requestId = (req as Request & { id?: string }).id;

await prisma.piiAuditLog.createMany({
data: detections.map((d) => ({
endpoint: req.path,
method: req.method,
fieldPath: d.path,
piiType: d.type,
action: 'redacted',
level: 'standard',
tenantId: tenantId ?? null,
requestId: requestId ?? null,
})),
skipDuplicates: true,
}).catch(() => { /* non-fatal */ });
}

// ─── Log redaction helper (for Pino / Winston formatters) ────────────────────

/**
* Pass this as a `redact` transform in your logger config.
* Compatible with pino's `redact` option when used as a custom serializer.
*/
export function redactLogRecord(record: Record<string, unknown>): Record<string, unknown> {
try {
const { redacted } = piiClassifier.classify(record);
return redacted;
} catch {
return record;
}
}

/**
* Pino-compatible serializer that strips PII from any object field.
*/
export const piiLogSerializer = {
// Applied to any field named "body", "payload", "data", "req", "res"
body: redactLogRecord,
payload: redactLogRecord,
data: redactLogRecord,
};
121 changes: 121 additions & 0 deletions backend/src/queues/routing-evaluation.queue.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
/**
* Routing evaluation queue (#446)
*
* Periodically samples chain performance (gas prices, latency, success rate)
* and writes them to:
* 1. Redis sorted sets – for sub-millisecond read by the AI router
* 2. ChainPerformanceMetric Prisma model – for historical analysis
*/
import { Queue, Worker, type Job } from 'bullmq';
import { prisma } from '../../lib/prisma.js';
import type { ChainFeatures } from '../routing/ai-router.js';

const QUEUE_NAME = 'routing-evaluation';
const REDIS_KEY_PREFIX = 'chain:perf:';

// ─── Mock collectors – replace with real RPC/API calls in production ──────────

async function collectStellarMetrics(): Promise<ChainFeatures> {
// TODO: query Stellar Horizon / Soroban RPC for real fee stats
return {
chain: 'stellar',
avgGasPrice: 0.00001,
avgConfirmTimeMs: 5_000,
successRate: 0.98,
p99LatencyMs: 6_000,
};
}

async function collectEvmMetrics(): Promise<ChainFeatures> {
// TODO: query EVM node eth_gasPrice + filter recent txs for success rate
return {
chain: 'evm',
avgGasPrice: 30,
avgConfirmTimeMs: 15_000,
successRate: 0.94,
p99LatencyMs: 20_000,
};
}

// ─── Queue & Worker ───────────────────────────────────────────────────────────

export interface RoutingEvalJobData {
sampleId: string;
}

let _queue: Queue<RoutingEvalJobData> | null = null;
let _worker: Worker<RoutingEvalJobData> | null = null;

type RedisClient = {
zadd(key: string, score: number, member: string): Promise<unknown>;
zremrangebyscore(key: string, min: number | string, max: number | string): Promise<unknown>;
};

export function startRoutingEvalQueue(
connection: { host: string; port: number },
redisClient?: RedisClient,
): void {
_queue = new Queue<RoutingEvalJobData>(QUEUE_NAME, { connection });

_worker = new Worker<RoutingEvalJobData>(
QUEUE_NAME,
async (_job: Job<RoutingEvalJobData>) => {
const collectors = [collectStellarMetrics, collectEvmMetrics];
const results = await Promise.allSettled(collectors.map((fn) => fn()));

for (const result of results) {
if (result.status !== 'fulfilled') continue;
const metrics = result.value;
const now = new Date();

// Persist to DB
await prisma.chainPerformanceMetric.create({
data: {
chain: metrics.chain,
sampleAt: now,
avgGasPrice: metrics.avgGasPrice,
avgConfirmTimeMs: metrics.avgConfirmTimeMs,
successRate: metrics.successRate,
p99LatencyMs: metrics.p99LatencyMs,
sampleSize: 1,
},
}).catch(() => { /* non-fatal */ });

// Write to Redis sorted set (score = timestamp for TTL pruning)
if (redisClient) {
const key = `${REDIS_KEY_PREFIX}${metrics.chain}`;
const member = JSON.stringify(metrics);
const nowMs = now.getTime();
await redisClient.zadd(key, nowMs, member).catch(() => {});
// Prune samples older than 1 hour
await redisClient.zremrangebyscore(key, '-inf', nowMs - 3_600_000).catch(() => {});
}
}
},
{ connection, concurrency: 1 },
);

_worker.on('failed', (job, err) => {
console.error(`[routing-eval] job ${job?.id} failed:`, err);
});
}

/** Schedule recurring eval jobs (call once on server startup) */
export async function scheduleRoutingEvalJobs(intervalMs = 60_000): Promise<void> {
if (!_queue) throw new Error('startRoutingEvalQueue must be called first');
await _queue.upsertJobScheduler(
'routing-eval-periodic',
{ every: intervalMs },
{ name: 'routing-eval', data: { sampleId: 'periodic' } },
);
}

export function stopRoutingEvalQueue(): Promise<void> {
return Promise.all([
_worker?.close(),
_queue?.close(),
]).then(() => {
_queue = null;
_worker = null;
});
}
Loading