From 6611affcc796a6e5bccc706104297555c347474c Mon Sep 17 00:00:00 2001 From: veloura-dev Date: Sun, 28 Jun 2026 21:31:53 +0000 Subject: [PATCH] #136 Add fraud-signal detector that flags coordinated betting from the same Stellar address graph FIXED --- docs/fraud-signal.md | 76 ++++ drizzle/migrations/0011_fraud_flags.sql | 44 ++ src/db/schema.ts | 52 +++ src/index.ts | 2 + src/routes/admin/fraud.ts | 143 +++++++ src/services/fraudService.ts | 545 ++++++++++++++++++++++++ src/workers/fraudDetector.ts | 105 +++++ tests/adminFraud.test.ts | 264 ++++++++++++ tests/fraudDetector.test.ts | 82 ++++ tests/fraudService.test.ts | 368 ++++++++++++++++ 10 files changed, 1681 insertions(+) create mode 100644 docs/fraud-signal.md create mode 100644 drizzle/migrations/0011_fraud_flags.sql create mode 100644 src/routes/admin/fraud.ts create mode 100644 src/services/fraudService.ts create mode 100644 src/workers/fraudDetector.ts create mode 100644 tests/adminFraud.test.ts create mode 100644 tests/fraudDetector.test.ts create mode 100644 tests/fraudService.test.ts diff --git a/docs/fraud-signal.md b/docs/fraud-signal.md new file mode 100644 index 0000000..16e4e24 --- /dev/null +++ b/docs/fraud-signal.md @@ -0,0 +1,76 @@ +# Fraud Signal Detector + +Background job that analyzes recent predictions, builds an address graph, +clusters suspicious addresses with **Union-Find**, and persists findings to +`fraud_flags` for admin review. + +## Architecture + +``` +┌─────────────────────┐ ┌──────────────────────┐ ┌──────────────────┐ +│ predictions, users │──▶ │ buildGraph (pure) │──▶ │ clusterize (DSU) │ +└─────────────────────┘ └──────────────────────┘ └────────┬─────────┘ + │ + ▼ + ┌──────────────────┐ + │ fraud_flags │ + │ (idempotent) │ + └────────┬─────────┘ + │ + ▼ + ┌──────────────────┐ + │ GET /api/admin/ │ + │ fraud/flags │ + └──────────────────┘ +``` + +* **Graph builder** — `src/services/fraudService.ts :: buildGraph`. Pure + function. Creates undirected edges between Stellar addresses for any of: + * `SHARED_FUNDING_SOURCE` — same on-chain funder + * `SHARED_TX_HASH` — two addresses appearing on the same transaction + * `REPEATED_PATTERN` — identical (market, outcome, amount) bet inside the + same 5-minute bucket +* **Clustering** — `clusterize` uses a classic Union-Find with path + compression + union-by-rank. Components of size ≥ 2 become clusters. +* **Persistence** — `DrizzleFraudRepo.upsertFlags` writes one row per + `(cluster, user)` with `ON CONFLICT DO UPDATE`, so re-running the scan + is idempotent and refreshes evidence in-place. +* **Worker** — `src/workers/fraudDetector.ts`. `runOnce()` for ad-hoc or + cron runs; `start(intervalMs)` for an in-process timer. +* **Admin endpoints** — `src/routes/admin/fraud.ts`: + * `GET /api/admin/fraud/flags?status=open&limit=50` — paginated review + * `POST /api/admin/fraud/scan` — manual trigger (admin only) + +All endpoints require an admin JWT (`role: "admin"`) and are rate-limited +per-token (60 req/min by default). + +## Schema + +Migration `drizzle/migrations/0011_fraud_flags.sql`: + +* Adds nullable `predictions.funding_source TEXT` (+ partial index). +* Creates `fraud_flags` with `(cluster_key, user_id)` unique index, + status enum, evidence JSONB, score, and reviewer audit columns. + +## Operational notes + +* Correlation IDs flow from the request (or are generated per scan) into + every log line and every persisted `fraud_flags.correlation_id`. +* The worker never throws — failures are logged and the next interval + retries. This keeps the in-process scheduler stable. +* `MIN_CLUSTER_SIZE = 2` keeps noise low. Tune via the constant in + `fraudService.ts` if needed. +* Default lookback is **24 h**, capped at 10 000 predictions per scan to + protect memory on large datasets. + +## Testing + +``` +npm test -- tests/fraudService.test.ts +npm test -- tests/fraudDetector.test.ts +npm test -- tests/adminFraud.test.ts +``` + +The suite covers graph builder edge types, union-find correctness, run +orchestration, worker scheduling & error handling, and the admin route +(auth, validation, happy path). diff --git a/drizzle/migrations/0011_fraud_flags.sql b/drizzle/migrations/0011_fraud_flags.sql new file mode 100644 index 0000000..3cb1e96 --- /dev/null +++ b/drizzle/migrations/0011_fraud_flags.sql @@ -0,0 +1,44 @@ +-- Migration: Fraud signal detector +-- +-- 1. Adds an optional `funding_source` column to predictions so the graph +-- builder can connect addresses that share an on-chain funder. +-- Nullable + no default — fully backwards-compatible with existing rows. +-- 2. Creates the `fraud_flags` table that the background detector writes +-- cluster findings into, with the reason payload and review state. +-- +-- All statements are idempotent so re-running the migration is safe. + +ALTER TABLE predictions + ADD COLUMN IF NOT EXISTS funding_source text; + +CREATE INDEX IF NOT EXISTS predictions_funding_source_idx + ON predictions (funding_source) + WHERE funding_source IS NOT NULL; + +CREATE TABLE IF NOT EXISTS fraud_flags ( + id uuid PRIMARY KEY DEFAULT gen_random_uuid(), + cluster_key text NOT NULL, + user_id uuid NOT NULL REFERENCES users(id) ON DELETE CASCADE, + stellar_address text NOT NULL, + reason text NOT NULL, + evidence jsonb NOT NULL DEFAULT '{}'::jsonb, + score integer NOT NULL DEFAULT 0, + status text NOT NULL DEFAULT 'open', + reviewed_by text, + reviewed_at timestamptz, + correlation_id text, + created_at timestamptz NOT NULL DEFAULT now(), + updated_at timestamptz NOT NULL DEFAULT now(), + CONSTRAINT fraud_flags_status_check + CHECK (status IN ('open', 'dismissed', 'confirmed')) +); + +-- One open flag per (cluster, user) to keep re-runs idempotent. +CREATE UNIQUE INDEX IF NOT EXISTS fraud_flags_cluster_user_unique + ON fraud_flags (cluster_key, user_id); + +CREATE INDEX IF NOT EXISTS fraud_flags_status_created_idx + ON fraud_flags (status, created_at DESC); + +CREATE INDEX IF NOT EXISTS fraud_flags_address_idx + ON fraud_flags (stellar_address); diff --git a/src/db/schema.ts b/src/db/schema.ts index c1314b7..29d36ff 100644 --- a/src/db/schema.ts +++ b/src/db/schema.ts @@ -141,6 +141,13 @@ export const predictions = pgTable("predictions", { outcome: text("outcome").notNull(), amount: text("amount").notNull(), txHash: text("tx_hash").notNull().default(""), + /** + * Optional on-chain funding source (e.g. the account that originally + * funded this user's wallet). Used by the fraud-signal detector to + * connect addresses that share a funder. Nullable so legacy rows and + * predictions whose funder is unknown remain valid. + */ + fundingSource: text("funding_source"), status: text("status").notNull().default("pending"), result: text("result"), createdAt: timestamp("created_at", { withTimezone: true }) @@ -148,6 +155,51 @@ export const predictions = pgTable("predictions", { .defaultNow(), }); +/** + * fraud_flags — persisted output of the fraud-signal background job. + * + * Each row represents a single (cluster, user) finding. A `cluster_key` + * groups all addresses the union-find algorithm collapsed together, + * `reason` is a short machine code, and `evidence` carries the structured + * payload (graph edges, shared funders, repeated patterns) for admin review. + * + * `(cluster_key, user_id)` is unique so re-running the detector is + * idempotent and never produces duplicates for the same finding. + */ +export const fraudFlags = pgTable( + "fraud_flags", + { + id: uuid("id").primaryKey().defaultRandom(), + clusterKey: text("cluster_key").notNull(), + userId: uuid("user_id") + .notNull() + .references(() => users.id, { onDelete: "cascade" }), + stellarAddress: text("stellar_address").notNull(), + reason: text("reason").notNull(), + evidence: jsonb("evidence").notNull().default({}), + score: integer("score").notNull().default(0), + status: text("status").notNull().default("open"), + reviewedBy: text("reviewed_by"), + reviewedAt: timestamp("reviewed_at", { withTimezone: true }), + correlationId: text("correlation_id"), + createdAt: timestamp("created_at", { withTimezone: true }) + .notNull() + .defaultNow(), + updatedAt: timestamp("updated_at", { withTimezone: true }) + .notNull() + .defaultNow(), + }, + (t) => ({ + fraudFlagsStatusCreatedIdx: index("fraud_flags_status_created_idx").on( + t.status, + t.createdAt, + ), + fraudFlagsAddressIdx: index("fraud_flags_address_idx").on(t.stellarAddress), + }), +); + +export type FraudFlag = typeof fraudFlags.$inferSelect; + export const claims = pgTable("claims", { id: uuid("id").primaryKey().defaultRandom(), userId: uuid("user_id") diff --git a/src/index.ts b/src/index.ts index 7a1da14..128b66f 100644 --- a/src/index.ts +++ b/src/index.ts @@ -16,6 +16,7 @@ import { createDocsRouter } from "./routes/docs"; import { notificationsRouter } from "./routes/notifications"; import { socialRouter } from "./routes/social"; import { adminAuditRouter } from "./routes/admin/audit"; +import { adminFraudRouter } from "./routes/admin/fraud"; import { errorHandler } from "./middleware/errorHandler"; import { requestContextStorage } from "./lib/requestContext"; import { REQUEST_ID_HEADER } from "./lib/http"; @@ -92,6 +93,7 @@ export function createApp(): express.Express { app.use("/api/users", socialRouter); app.use("/api/users", usersRouter); app.use("/api/admin/audit", adminAuditRouter); + app.use("/api/admin/fraud", adminFraudRouter); app.get("/metrics", async (req, res) => { const metricsAuthToken = process.env.METRICS_AUTH_TOKEN; diff --git a/src/routes/admin/fraud.ts b/src/routes/admin/fraud.ts new file mode 100644 index 0000000..ecc2224 --- /dev/null +++ b/src/routes/admin/fraud.ts @@ -0,0 +1,143 @@ +/** + * Admin fraud review endpoint. + * + * GET /api/admin/fraud/flags?status=open&limit=50 + * POST /api/admin/fraud/scan (manual trigger) + * + * Both endpoints: + * • require an admin JWT (Bearer token, role: "admin") + * • validate input at the boundary with Zod + * • return the project's standard error envelope on failure + * • echo the request id so the client can correlate logs + */ + +import { Router } from "express"; +import { rateLimit } from "express-rate-limit"; +import { z } from "zod"; +import { requireAdmin } from "../../middleware/requireAdmin"; +import { REQUEST_ID_HEADER } from "../../lib/http"; +import { getRequestId } from "../../lib/requestContext"; +import { + DrizzleFraudRepo, + type FraudRepo, + listFraudFlags, + runFraudScan, +} from "../../services/fraudService"; + +const listQuerySchema = z.object({ + status: z.enum(["open", "dismissed", "confirmed"]).optional(), + limit: z + .string() + .regex(/^\d+$/u, { message: "limit must be a positive integer" }) + .transform((v) => parseInt(v, 10)) + .refine((n) => n >= 1 && n <= 200, { + message: "limit must be between 1 and 200", + }) + .optional(), +}); + +const scanBodySchema = z + .object({ + lookbackMs: z.number().int().positive().max(7 * 24 * 60 * 60 * 1000).optional(), + maxPredictions: z.number().int().positive().max(100_000).optional(), + }) + .strict(); + +export interface AdminFraudRouterOptions { + /** Inject a fake repo in tests. */ + repo?: FraudRepo; + /** Requests per minute per admin token. Default: 60 */ + rateLimitPerMinute?: number; +} + +function requestIdOf(req: { id?: unknown }): string { + return ( + getRequestId() ?? + (typeof req.id === "string" ? req.id : "") ?? + "" + ); +} + +export function createAdminFraudRouter( + opts: AdminFraudRouterOptions = {}, +): Router { + const router = Router(); + const repo = opts.repo ?? new DrizzleFraudRepo(); + const limit = opts.rateLimitPerMinute ?? 60; + + router.use( + rateLimit({ + windowMs: 60_000, + limit, + keyGenerator: (req) => + (req.headers.authorization as string | undefined) ?? + req.ip ?? + "unknown", + standardHeaders: "draft-6", + legacyHeaders: false, + message: { error: { code: "rate_limit_exceeded" } }, + }), + ); + + router.use(requireAdmin); + + // ── GET /flags ──────────────────────────────────────────────────────────── + router.get("/flags", async (req, res, next) => { + try { + const requestId = requestIdOf({ id: (req as { id?: unknown }).id }); + const parsed = listQuerySchema.safeParse(req.query); + if (!parsed.success) { + res.setHeader(REQUEST_ID_HEADER, requestId); + res.status(400).json({ + error: { + code: "validation_error", + message: + parsed.error.issues[0]?.message ?? "invalid query parameters", + details: parsed.error.issues, + requestId, + }, + }); + return; + } + const rows = await listFraudFlags(parsed.data, repo); + res.setHeader(REQUEST_ID_HEADER, requestId); + res.json({ data: rows }); + } catch (e) { + next(e); + } + }); + + // ── POST /scan ──────────────────────────────────────────────────────────── + router.post("/scan", async (req, res, next) => { + try { + const requestId = requestIdOf({ id: (req as { id?: unknown }).id }); + const body = req.body ?? {}; + const parsed = scanBodySchema.safeParse(body); + if (!parsed.success) { + res.setHeader(REQUEST_ID_HEADER, requestId); + res.status(400).json({ + error: { + code: "validation_error", + message: + parsed.error.issues[0]?.message ?? "invalid request body", + details: parsed.error.issues, + requestId, + }, + }); + return; + } + const result = await runFraudScan(repo, { + ...parsed.data, + correlationId: requestId, + }); + res.setHeader(REQUEST_ID_HEADER, requestId); + res.json({ data: result }); + } catch (e) { + next(e); + } + }); + + return router; +} + +export const adminFraudRouter = createAdminFraudRouter(); diff --git a/src/services/fraudService.ts b/src/services/fraudService.ts new file mode 100644 index 0000000..e1a2e7c --- /dev/null +++ b/src/services/fraudService.ts @@ -0,0 +1,545 @@ +/** + * fraudService.ts — Address-graph fraud-signal detector for GrantFox. + * + * Responsibilities + * ──────────────── + * 1. Pull recent predictions (+ joined users) from the database. + * 2. Build an undirected graph between Stellar addresses whose edges + * represent suspicious overlap: + * • SHARED_FUNDING_SOURCE — two addresses funded by the same wallet + * • SHARED_TX_HASH — distinct addresses appearing on the same + * on-chain transaction (highly unusual) + * • REPEATED_PATTERN — two addresses repeatedly placing the + * same (market, outcome, amount) bet inside + * a short time window — likely sybil + * 3. Run a weighted Union-Find / DSU to collapse connected components. + * 4. Persist any component of size ≥ MIN_CLUSTER_SIZE as `fraud_flags` + * rows (one per address in the cluster), idempotently. Each row + * carries the human-readable `reason` and the structured `evidence`. + * 5. Expose `listFlags` for the admin review endpoint. + * + * Boundaries + * ────────── + * • Pure functions (`buildGraph`, `UnionFind`, `clusterize`) have **no** + * I/O and are fully unit-tested. + * • All DB access funnels through the `FraudRepo` interface so the + * worker / route / tests can inject in-memory fakes. + * • Inputs from the admin route are validated by Zod at the HTTP + * boundary (see src/routes/admin/fraud.ts). + * + * Logging + * ─────── + * Every public entry point emits structured logs with the active + * `correlationId` (from AsyncLocalStorage) so the run can be traced + * across the worker → service → repo boundary. + */ + +import { and, desc, eq, gte, sql } from "drizzle-orm"; +import { db as defaultDb } from "../db"; +import { fraudFlags, predictions, users } from "../db/schema"; +import { logger } from "../config/logger"; +import { getRequestId } from "../lib/requestContext"; + +// ────────────────────────────────────────────────────────────────────────────── +// Constants & types +// ────────────────────────────────────────────────────────────────────────────── + +/** Minimum component size that warrants flagging. Singletons are ignored. */ +export const MIN_CLUSTER_SIZE = 2; + +/** Default lookback when the worker is called without an explicit window. */ +export const DEFAULT_LOOKBACK_MS = 24 * 60 * 60 * 1000; // 24h + +/** Default max patterns to consider per scan — guards memory on huge runs. */ +export const DEFAULT_MAX_PREDICTIONS = 10_000; + +/** Repeated-pattern time bucket: bets within this many ms collide. */ +export const PATTERN_BUCKET_MS = 5 * 60 * 1000; // 5 minutes + +export type EdgeReason = + | "SHARED_FUNDING_SOURCE" + | "SHARED_TX_HASH" + | "REPEATED_PATTERN"; + +export interface PredictionRow { + predictionId: string; + userId: string; + stellarAddress: string; + marketId: string; + outcome: string; + amount: string; + txHash: string; + fundingSource: string | null; + createdAt: Date; +} + +export interface GraphEdge { + a: string; // stellar address (lexicographically smaller) + b: string; // stellar address (lexicographically larger) + reason: EdgeReason; + /** Extra context, e.g. the shared funder or the matching pattern key. */ + detail: string; +} + +export interface AddressGraph { + /** Distinct stellar addresses seen in the input. */ + nodes: Set; + /** Deduplicated edges (same (a, b, reason, detail) collapsed). */ + edges: GraphEdge[]; +} + +export interface Cluster { + /** Stable deterministic id derived from the sorted member addresses. */ + key: string; + /** Member addresses, sorted ascending. */ + members: string[]; + /** Edges that participated in forming this cluster. */ + edges: GraphEdge[]; + /** Aggregate severity score — sum of edge weights. */ + score: number; +} + +export interface FlagWriteInput { + clusterKey: string; + stellarAddress: string; + userId: string; + reason: string; + score: number; + evidence: Record; + correlationId?: string | null; +} + +export interface FraudFlagDTO { + id: string; + clusterKey: string; + userId: string; + stellarAddress: string; + reason: string; + evidence: unknown; + score: number; + status: string; + reviewedBy: string | null; + reviewedAt: Date | null; + correlationId: string | null; + createdAt: Date; + updatedAt: Date; +} + +export interface ListFlagsFilters { + status?: "open" | "dismissed" | "confirmed"; + limit?: number; +} + +export interface FraudRepo { + loadRecentPredictions(opts: { + since: Date; + limit: number; + }): Promise; + upsertFlags(rows: FlagWriteInput[]): Promise; + listFlags(filters: ListFlagsFilters): Promise; +} + +// Edge weights — feed into both the cluster `score` and the per-row score. +const EDGE_WEIGHT: Record = { + SHARED_FUNDING_SOURCE: 5, + SHARED_TX_HASH: 8, + REPEATED_PATTERN: 3, +}; + +// ────────────────────────────────────────────────────────────────────────────── +// Pure: Union-Find / Disjoint-Set Union +// ────────────────────────────────────────────────────────────────────────────── + +/** + * Classic union-find with path compression + union-by-rank. O(α(n)) per op. + * `find()` returns the representative; `union()` returns true iff the call + * actually merged two previously-disjoint sets (useful for edge counters). + */ +export class UnionFind { + private parent = new Map(); + private rank = new Map(); + + add(x: T): void { + if (!this.parent.has(x)) { + this.parent.set(x, x); + this.rank.set(x, 0); + } + } + + find(x: T): T { + this.add(x); + let root = x; + // climb to the root + while (this.parent.get(root) !== root) { + root = this.parent.get(root) as T; + } + // path compression + let cur = x; + while (this.parent.get(cur) !== root) { + const next = this.parent.get(cur) as T; + this.parent.set(cur, root); + cur = next; + } + return root; + } + + union(a: T, b: T): boolean { + const ra = this.find(a); + const rb = this.find(b); + if (ra === rb) return false; + const rankA = this.rank.get(ra) ?? 0; + const rankB = this.rank.get(rb) ?? 0; + if (rankA < rankB) { + this.parent.set(ra, rb); + } else if (rankA > rankB) { + this.parent.set(rb, ra); + } else { + this.parent.set(rb, ra); + this.rank.set(ra, rankA + 1); + } + return true; + } + + /** Returns components as Map. */ + components(): Map { + const out = new Map(); + for (const node of this.parent.keys()) { + const root = this.find(node); + const bucket = out.get(root); + if (bucket) bucket.push(node); + else out.set(root, [node]); + } + return out; + } +} + +// ────────────────────────────────────────────────────────────────────────────── +// Pure: graph builder +// ────────────────────────────────────────────────────────────────────────────── + +function orderedPair(x: string, y: string): [string, string] { + return x < y ? [x, y] : [y, x]; +} + +/** Stable cluster key — sorted member addresses joined by `|`. */ +export function makeClusterKey(members: string[]): string { + return [...members].sort().join("|"); +} + +/** + * Build an undirected suspicion graph from a flat list of predictions. + * + * The function is **pure** — no DB, no logging, no clock — which makes it + * trivial to unit-test deterministically. + */ +export function buildGraph(rows: PredictionRow[]): AddressGraph { + const nodes = new Set(); + const edgeMap = new Map(); // key → edge (dedup) + + // Bucket helpers + const byFunder = new Map>(); + const byTxHash = new Map>(); + const byPattern = new Map>(); + + for (const r of rows) { + if (!r.stellarAddress) continue; + nodes.add(r.stellarAddress); + + if (r.fundingSource) { + const set = byFunder.get(r.fundingSource) ?? new Set(); + set.add(r.stellarAddress); + byFunder.set(r.fundingSource, set); + } + + if (r.txHash && r.txHash.length > 0) { + const set = byTxHash.get(r.txHash) ?? new Set(); + set.add(r.stellarAddress); + byTxHash.set(r.txHash, set); + } + + // Repeated pattern key: same market, outcome, amount, time-bucket + const bucket = Math.floor(r.createdAt.getTime() / PATTERN_BUCKET_MS); + const patternKey = `${r.marketId}|${r.outcome}|${r.amount}|${bucket}`; + const pSet = byPattern.get(patternKey) ?? new Set(); + pSet.add(r.stellarAddress); + byPattern.set(patternKey, pSet); + } + + const addEdge = ( + a: string, + b: string, + reason: EdgeReason, + detail: string, + ): void => { + if (a === b) return; + const [x, y] = orderedPair(a, b); + const key = `${reason}::${detail}::${x}::${y}`; + if (edgeMap.has(key)) return; + edgeMap.set(key, { a: x, b: y, reason, detail }); + }; + + // Funder edges + for (const [funder, addrs] of byFunder) { + if (addrs.size < 2) continue; + const list = [...addrs]; + for (let i = 0; i < list.length; i++) { + for (let j = i + 1; j < list.length; j++) { + addEdge(list[i], list[j], "SHARED_FUNDING_SOURCE", funder); + } + } + } + + // Shared tx_hash edges + for (const [tx, addrs] of byTxHash) { + if (addrs.size < 2) continue; + const list = [...addrs]; + for (let i = 0; i < list.length; i++) { + for (let j = i + 1; j < list.length; j++) { + addEdge(list[i], list[j], "SHARED_TX_HASH", tx); + } + } + } + + // Repeated pattern edges + for (const [pat, addrs] of byPattern) { + if (addrs.size < 2) continue; + const list = [...addrs]; + for (let i = 0; i < list.length; i++) { + for (let j = i + 1; j < list.length; j++) { + addEdge(list[i], list[j], "REPEATED_PATTERN", pat); + } + } + } + + return { nodes, edges: [...edgeMap.values()] }; +} + +/** + * Collapse the graph into clusters using union-find. + * Singleton components (size < MIN_CLUSTER_SIZE) are excluded. + */ +export function clusterize(graph: AddressGraph): Cluster[] { + const uf = new UnionFind(); + for (const n of graph.nodes) uf.add(n); + for (const e of graph.edges) uf.union(e.a, e.b); + + // Group edges per root so each cluster carries its own evidence + const edgesPerRoot = new Map(); + for (const e of graph.edges) { + const root = uf.find(e.a); + const bucket = edgesPerRoot.get(root) ?? []; + bucket.push(e); + edgesPerRoot.set(root, bucket); + } + + const out: Cluster[] = []; + for (const [root, members] of uf.components()) { + if (members.length < MIN_CLUSTER_SIZE) continue; + const sorted = [...members].sort(); + const edges = edgesPerRoot.get(root) ?? []; + const score = edges.reduce((s, e) => s + EDGE_WEIGHT[e.reason], 0); + out.push({ + key: makeClusterKey(sorted), + members: sorted, + edges, + score, + }); + } + return out; +} + +// ────────────────────────────────────────────────────────────────────────────── +// Orchestration — scan + persist +// ────────────────────────────────────────────────────────────────────────────── + +export interface RunScanOptions { + lookbackMs?: number; + maxPredictions?: number; + /** Override "now" — used by tests. */ + now?: () => Date; + correlationId?: string | null; +} + +export interface RunScanResult { + scanned: number; + edges: number; + clusters: number; + flagsWritten: number; + correlationId: string | null; +} + +/** + * End-to-end: load → build graph → cluster → persist. + * Idempotent thanks to the `(cluster_key, user_id)` unique index. + */ +export async function runFraudScan( + repo: FraudRepo, + opts: RunScanOptions = {}, +): Promise { + const now = (opts.now ?? (() => new Date()))(); + const lookbackMs = opts.lookbackMs ?? DEFAULT_LOOKBACK_MS; + const limit = opts.maxPredictions ?? DEFAULT_MAX_PREDICTIONS; + const correlationId = opts.correlationId ?? getRequestId() ?? null; + + if (!Number.isFinite(lookbackMs) || lookbackMs <= 0) { + throw new Error("lookbackMs must be a positive finite number"); + } + if (!Number.isInteger(limit) || limit <= 0) { + throw new Error("maxPredictions must be a positive integer"); + } + + const since = new Date(now.getTime() - lookbackMs); + + logger.info( + { correlationId, since: since.toISOString(), limit }, + "fraud_scan: start", + ); + + const rows = await repo.loadRecentPredictions({ since, limit }); + const graph = buildGraph(rows); + const clusters = clusterize(graph); + + // Build address → userId map (last-write wins; addresses are 1:1 with users + // in this codebase, so this is safe). + const addrToUser = new Map(); + for (const r of rows) { + if (r.stellarAddress) addrToUser.set(r.stellarAddress, r.userId); + } + + const flagRows: FlagWriteInput[] = []; + for (const c of clusters) { + const reasonsByEdge = c.edges.reduce>((acc, e) => { + acc[e.reason] = (acc[e.reason] ?? 0) + 1; + return acc; + }, {}); + const reasonCode = + Object.entries(reasonsByEdge).sort((a, b) => b[1] - a[1])[0]?.[0] ?? + "ADDRESS_CLUSTER"; + + const evidence = { + clusterSize: c.members.length, + members: c.members, + edges: c.edges, + edgeReasonCounts: reasonsByEdge, + }; + + for (const addr of c.members) { + const userId = addrToUser.get(addr); + if (!userId) { + // Address present in graph but missing user mapping — skip safely. + logger.warn( + { correlationId, addr, clusterKey: c.key }, + "fraud_scan: skipping address with no user mapping", + ); + continue; + } + flagRows.push({ + clusterKey: c.key, + stellarAddress: addr, + userId, + reason: reasonCode, + score: c.score, + evidence, + correlationId, + }); + } + } + + const written = flagRows.length > 0 ? await repo.upsertFlags(flagRows) : 0; + + const result: RunScanResult = { + scanned: rows.length, + edges: graph.edges.length, + clusters: clusters.length, + flagsWritten: written, + correlationId, + }; + logger.info({ ...result }, "fraud_scan: complete"); + return result; +} + +// ────────────────────────────────────────────────────────────────────────────── +// Drizzle-backed repository (production wiring) +// ────────────────────────────────────────────────────────────────────────────── + +export class DrizzleFraudRepo implements FraudRepo { + // Use `any` to remain compatible with the codebase's drizzle helper typing + // (other services here do the same — see DrizzleMarketResolutionRepo). + constructor(private readonly db: any = defaultDb) {} + + async loadRecentPredictions(opts: { + since: Date; + limit: number; + }): Promise { + const rows = await this.db + .select({ + predictionId: predictions.id, + userId: predictions.userId, + stellarAddress: users.stellarAddress, + marketId: predictions.marketId, + outcome: predictions.outcome, + amount: predictions.amount, + txHash: predictions.txHash, + fundingSource: predictions.fundingSource, + createdAt: predictions.createdAt, + }) + .from(predictions) + .innerJoin(users, eq(users.id, predictions.userId)) + .where(gte(predictions.createdAt, opts.since)) + .orderBy(desc(predictions.createdAt)) + .limit(opts.limit); + return rows as PredictionRow[]; + } + + async upsertFlags(rows: FlagWriteInput[]): Promise { + if (rows.length === 0) return 0; + // ON CONFLICT (cluster_key, user_id) DO UPDATE — keep latest evidence + // and score, never decrement reviewer state. + const values = rows.map((r) => ({ + clusterKey: r.clusterKey, + userId: r.userId, + stellarAddress: r.stellarAddress, + reason: r.reason, + score: r.score, + evidence: r.evidence, + correlationId: r.correlationId ?? null, + })); + const result = await this.db + .insert(fraudFlags) + .values(values) + .onConflictDoUpdate({ + target: [fraudFlags.clusterKey, fraudFlags.userId], + set: { + reason: sql`excluded.reason`, + score: sql`excluded.score`, + evidence: sql`excluded.evidence`, + correlationId: sql`excluded.correlation_id`, + updatedAt: sql`now()`, + }, + }) + .returning({ id: fraudFlags.id }); + return Array.isArray(result) ? result.length : rows.length; + } + + async listFlags(filters: ListFlagsFilters): Promise { + const limit = Math.max(1, Math.min(filters.limit ?? 50, 200)); + const conds = []; + if (filters.status) conds.push(eq(fraudFlags.status, filters.status)); + const where = conds.length > 0 ? and(...conds) : undefined; + const rows = await this.db + .select() + .from(fraudFlags) + .where(where) + .orderBy(desc(fraudFlags.createdAt)) + .limit(limit); + return rows as FraudFlagDTO[]; + } +} + +/** Convenience: list flags for the admin endpoint. */ +export async function listFraudFlags( + filters: ListFlagsFilters, + repo: FraudRepo = new DrizzleFraudRepo(), +): Promise { + return repo.listFlags(filters); +} diff --git a/src/workers/fraudDetector.ts b/src/workers/fraudDetector.ts new file mode 100644 index 0000000..072113a --- /dev/null +++ b/src/workers/fraudDetector.ts @@ -0,0 +1,105 @@ +/** + * fraudDetector.ts — background worker that periodically scans recent + * predictions for sybil / collusion clusters and persists `fraud_flags`. + * + * Designed to be invoked from: + * • a cron-style scheduler (every N minutes) + * • the existing in-process scheduler (`src/services/scheduler.ts`) + * • or one-off CLI runs (`node dist/workers/fraudDetector.js`) + * + * The worker itself is intentionally tiny — all logic lives in + * `fraudService.ts` so it can be unit-tested without spinning up a job + * runtime. A correlation id is generated per run so every log line and + * persisted flag can be traced. + */ + +import { randomUUID } from "crypto"; +import { logger } from "../config/logger"; +import { + DrizzleFraudRepo, + type FraudRepo, + type RunScanOptions, + type RunScanResult, + runFraudScan, +} from "../services/fraudService"; + +export class FraudDetectorWorker { + private readonly repo: FraudRepo; + private timer: NodeJS.Timeout | null = null; + + constructor(repo: FraudRepo = new DrizzleFraudRepo()) { + this.repo = repo; + } + + /** Run a single scan. Errors are caught and logged — the worker never throws. */ + async runOnce(opts: RunScanOptions = {}): Promise { + const correlationId = opts.correlationId ?? randomUUID(); + const merged: RunScanOptions = { ...opts }; + merged.correlationId = correlationId; + try { + const result = await runFraudScan(this.repo, merged); + logger.info({ ...result }, "fraud_detector: run complete"); + return result; + } catch (err) { + logger.error( + { correlationId, err }, + "fraud_detector: run failed", + ); + return null; + } + } + + /** + * Start a recurring scan. Returns a stop handle. + * `intervalMs` defaults to 15 minutes; non-positive disables scheduling. + */ + start(intervalMs = 15 * 60 * 1000, opts: RunScanOptions = {}): () => void { + if (this.timer) { + logger.warn("fraud_detector: already running, ignoring start()"); + return () => this.stop(); + } + if (!Number.isFinite(intervalMs) || intervalMs <= 0) { + logger.warn( + { intervalMs }, + "fraud_detector: invalid interval, not starting", + ); + return () => undefined; + } + + // Kick off immediately, then on interval. + void this.runOnce(opts); + this.timer = setInterval(() => { + void this.runOnce(opts); + }, intervalMs); + if (typeof this.timer.unref === "function") this.timer.unref(); + logger.info({ intervalMs }, "fraud_detector: started"); + return () => this.stop(); + } + + stop(): void { + if (this.timer) { + clearInterval(this.timer); + this.timer = null; + logger.info("fraud_detector: stopped"); + } + } +} + +/** Singleton for production wiring. */ +export const fraudDetectorWorker = new FraudDetectorWorker(); + +// Allow `node dist/workers/fraudDetector.js` for ad-hoc runs. +if (require.main === module) { + fraudDetectorWorker + .runOnce() + .then((res) => { + // eslint-disable-next-line no-console + console.log("fraud_scan", res); + process.exit(0); + }) + .catch((err) => { + // eslint-disable-next-line no-console + console.error(err); + process.exit(1); + }); +} diff --git a/tests/adminFraud.test.ts b/tests/adminFraud.test.ts new file mode 100644 index 0000000..5fc2f82 --- /dev/null +++ b/tests/adminFraud.test.ts @@ -0,0 +1,264 @@ +import express from "express"; +import jwt from "jsonwebtoken"; +import request from "supertest"; +import { createAdminFraudRouter } from "../src/routes/admin/fraud"; +import { errorHandler } from "../src/middleware/errorHandler"; +import type { + FraudFlagDTO, + FraudRepo, + FlagWriteInput, + PredictionRow, +} from "../src/services/fraudService"; + +const SECRET = process.env.JWT_SECRET!; +const ISSUER = process.env.JWT_ISSUER ?? "predictify"; +const AUDIENCE = process.env.JWT_AUDIENCE ?? "predictify-app"; +const ADMIN_ADDR = + "GADMIN7777777777777777777777777777777777777777777777777777"; + +function signAdmin(): string { + return jwt.sign({ sub: ADMIN_ADDR, role: "admin" }, SECRET, { + issuer: ISSUER, + audience: AUDIENCE, + expiresIn: "1h", + }); +} + +class FakeRepo implements FraudRepo { + rows: PredictionRow[] = []; + written: FlagWriteInput[] = []; + flags: FraudFlagDTO[] = []; + async loadRecentPredictions(): Promise { + return this.rows; + } + async upsertFlags(rows: FlagWriteInput[]): Promise { + this.written.push(...rows); + return rows.length; + } + async listFlags(filters: { + status?: "open" | "dismissed" | "confirmed"; + limit?: number; + }): Promise { + const arr = filters.status + ? this.flags.filter((f) => f.status === filters.status) + : this.flags; + return arr.slice(0, filters.limit ?? 50); + } +} + +function makeApp(repo: FraudRepo): express.Express { + const app = express(); + app.use(express.json()); + app.use((req, _res, next) => { + (req as express.Request & { id?: string }).id = + (req.headers["x-request-id"] as string | undefined) ?? "req-id-test"; + next(); + }); + app.use( + "/api/admin/fraud", + createAdminFraudRouter({ repo, rateLimitPerMinute: 1000 }), + ); + app.use(errorHandler); + return app; +} + +describe("admin fraud routes", () => { + describe("auth", () => { + it("GET /flags returns 403 without an admin token", async () => { + const res = await request(makeApp(new FakeRepo())).get( + "/api/admin/fraud/flags", + ); + expect(res.status).toBe(403); + expect(res.body).toEqual({ error: { code: "forbidden" } }); + }); + + it("POST /scan returns 403 without an admin token", async () => { + const res = await request(makeApp(new FakeRepo())) + .post("/api/admin/fraud/scan") + .send({}); + expect(res.status).toBe(403); + }); + + it("GET /flags returns 403 with a non-admin token", async () => { + const userToken = jwt.sign( + { sub: "GUSER", role: "user" }, + SECRET, + { issuer: ISSUER, audience: AUDIENCE, expiresIn: "1h" }, + ); + const res = await request(makeApp(new FakeRepo())) + .get("/api/admin/fraud/flags") + .set("Authorization", `Bearer ${userToken}`); + expect(res.status).toBe(403); + }); + }); + + describe("GET /flags", () => { + it("returns the listed flags as an admin", async () => { + const repo = new FakeRepo(); + repo.flags = [ + { + id: "id-1", + clusterKey: "GA|GB", + userId: "u-a", + stellarAddress: "GA", + reason: "SHARED_FUNDING_SOURCE", + evidence: { foo: "bar" }, + score: 5, + status: "open", + reviewedBy: null, + reviewedAt: null, + correlationId: "cid", + createdAt: new Date("2026-06-01T00:00:00Z"), + updatedAt: new Date("2026-06-01T00:00:00Z"), + }, + ]; + const res = await request(makeApp(repo)) + .get("/api/admin/fraud/flags") + .set("Authorization", `Bearer ${signAdmin()}`); + expect(res.status).toBe(200); + expect(res.body.data).toHaveLength(1); + expect(res.body.data[0]).toMatchObject({ + clusterKey: "GA|GB", + reason: "SHARED_FUNDING_SOURCE", + status: "open", + }); + expect(res.headers["x-request-id"]).toBe("req-id-test"); + }); + + it("rejects invalid status enum", async () => { + const res = await request(makeApp(new FakeRepo())) + .get("/api/admin/fraud/flags?status=nope") + .set("Authorization", `Bearer ${signAdmin()}`); + expect(res.status).toBe(400); + expect(res.body.error.code).toBe("validation_error"); + }); + + it("rejects out-of-range limit", async () => { + const res = await request(makeApp(new FakeRepo())) + .get("/api/admin/fraud/flags?limit=9999") + .set("Authorization", `Bearer ${signAdmin()}`); + expect(res.status).toBe(400); + expect(res.body.error.code).toBe("validation_error"); + }); + + it("rejects non-numeric limit", async () => { + const res = await request(makeApp(new FakeRepo())) + .get("/api/admin/fraud/flags?limit=abc") + .set("Authorization", `Bearer ${signAdmin()}`); + expect(res.status).toBe(400); + }); + + it("forwards the status filter to the repo", async () => { + const repo = new FakeRepo(); + repo.flags = [ + { + id: "1", + clusterKey: "k", + userId: "u", + stellarAddress: "G", + reason: "r", + evidence: {}, + score: 0, + status: "confirmed", + reviewedBy: null, + reviewedAt: null, + correlationId: null, + createdAt: new Date(), + updatedAt: new Date(), + }, + { + id: "2", + clusterKey: "k", + userId: "u", + stellarAddress: "G", + reason: "r", + evidence: {}, + score: 0, + status: "open", + reviewedBy: null, + reviewedAt: null, + correlationId: null, + createdAt: new Date(), + updatedAt: new Date(), + }, + ]; + const res = await request(makeApp(repo)) + .get("/api/admin/fraud/flags?status=confirmed") + .set("Authorization", `Bearer ${signAdmin()}`); + expect(res.status).toBe(200); + expect(res.body.data).toHaveLength(1); + expect(res.body.data[0].status).toBe("confirmed"); + }); + }); + + describe("POST /scan", () => { + it("runs a scan with no body and returns the summary", async () => { + const repo = new FakeRepo(); + const res = await request(makeApp(repo)) + .post("/api/admin/fraud/scan") + .set("Authorization", `Bearer ${signAdmin()}`) + .send({}); + expect(res.status).toBe(200); + expect(res.body.data).toMatchObject({ + scanned: 0, + edges: 0, + clusters: 0, + flagsWritten: 0, + }); + expect(res.body.data.correlationId).toBe("req-id-test"); + }); + + it("rejects unknown body keys (strict schema)", async () => { + const res = await request(makeApp(new FakeRepo())) + .post("/api/admin/fraud/scan") + .set("Authorization", `Bearer ${signAdmin()}`) + .send({ wat: 1 }); + expect(res.status).toBe(400); + expect(res.body.error.code).toBe("validation_error"); + }); + + it("rejects negative lookbackMs", async () => { + const res = await request(makeApp(new FakeRepo())) + .post("/api/admin/fraud/scan") + .set("Authorization", `Bearer ${signAdmin()}`) + .send({ lookbackMs: -10 }); + expect(res.status).toBe(400); + }); + + it("persists flags when the repo returns suspicious rows", async () => { + const repo = new FakeRepo(); + repo.rows = [ + { + predictionId: "p1", + userId: "u-a", + stellarAddress: "GA", + marketId: "m", + outcome: "yes", + amount: "100", + txHash: "", + fundingSource: "GF", + createdAt: new Date(), + }, + { + predictionId: "p2", + userId: "u-b", + stellarAddress: "GB", + marketId: "m", + outcome: "yes", + amount: "100", + txHash: "", + fundingSource: "GF", + createdAt: new Date(), + }, + ]; + const res = await request(makeApp(repo)) + .post("/api/admin/fraud/scan") + .set("Authorization", `Bearer ${signAdmin()}`) + .send({ lookbackMs: 60_000 }); + expect(res.status).toBe(200); + expect(res.body.data.clusters).toBe(1); + expect(res.body.data.flagsWritten).toBe(2); + expect(repo.written).toHaveLength(2); + }); + }); +}); diff --git a/tests/fraudDetector.test.ts b/tests/fraudDetector.test.ts new file mode 100644 index 0000000..63fed0c --- /dev/null +++ b/tests/fraudDetector.test.ts @@ -0,0 +1,82 @@ +import { FraudDetectorWorker } from "../src/workers/fraudDetector"; +import type { + FlagWriteInput, + FraudFlagDTO, + FraudRepo, + PredictionRow, +} from "../src/services/fraudService"; + +class FakeRepo implements FraudRepo { + rows: PredictionRow[] = []; + written: FlagWriteInput[] = []; + shouldThrow = false; + async loadRecentPredictions(): Promise { + if (this.shouldThrow) throw new Error("boom"); + return this.rows; + } + async upsertFlags(rows: FlagWriteInput[]): Promise { + this.written.push(...rows); + return rows.length; + } + async listFlags(): Promise { + return []; + } +} + +describe("FraudDetectorWorker", () => { + it("runOnce returns the scan result for a happy path", async () => { + const repo = new FakeRepo(); + const worker = new FraudDetectorWorker(repo); + const res = await worker.runOnce({ correlationId: "abc" }); + expect(res).not.toBeNull(); + expect(res!.correlationId).toBe("abc"); + expect(res!.scanned).toBe(0); + }); + + it("runOnce swallows errors and returns null instead of throwing", async () => { + const repo = new FakeRepo(); + repo.shouldThrow = true; + const worker = new FraudDetectorWorker(repo); + const res = await worker.runOnce(); + expect(res).toBeNull(); + }); + + it("start() refuses non-positive intervals", () => { + const worker = new FraudDetectorWorker(new FakeRepo()); + const stop = worker.start(0); + expect(typeof stop).toBe("function"); + stop(); + }); + + it("start() invokes the scan and stop() halts the timer", async () => { + jest.useFakeTimers(); + const repo = new FakeRepo(); + const worker = new FraudDetectorWorker(repo); + const spy = jest.spyOn(worker, "runOnce"); + const stop = worker.start(1000); + + // start() schedules an immediate run via void this.runOnce(); flush microtasks + await Promise.resolve(); + expect(spy).toHaveBeenCalledTimes(1); + + jest.advanceTimersByTime(1000); + await Promise.resolve(); + expect(spy).toHaveBeenCalledTimes(2); + + stop(); + jest.advanceTimersByTime(5000); + await Promise.resolve(); + expect(spy).toHaveBeenCalledTimes(2); + jest.useRealTimers(); + }); + + it("start() called twice does not stack timers", () => { + jest.useFakeTimers(); + const worker = new FraudDetectorWorker(new FakeRepo()); + const stop1 = worker.start(1000); + const stop2 = worker.start(1000); // should warn + no-op + expect(typeof stop2).toBe("function"); + stop1(); + jest.useRealTimers(); + }); +}); diff --git a/tests/fraudService.test.ts b/tests/fraudService.test.ts new file mode 100644 index 0000000..fb8effe --- /dev/null +++ b/tests/fraudService.test.ts @@ -0,0 +1,368 @@ +import { + UnionFind, + buildGraph, + clusterize, + makeClusterKey, + runFraudScan, + type FraudRepo, + type PredictionRow, + type FlagWriteInput, + PATTERN_BUCKET_MS, +} from "../src/services/fraudService"; + +// ──────────────────────────────────────────────────────────────────────────── +// Helpers +// ──────────────────────────────────────────────────────────────────────────── + +let _rowCounter = 0; +function mkRow(overrides: Partial = {}): PredictionRow { + _rowCounter += 1; + // Defaults are picked so that, in the absence of overrides, no two rows + // share a market/outcome/amount/time bucket — keeps each test focused on + // the specific edge type it is asserting. + return { + predictionId: overrides.predictionId ?? `p-${_rowCounter}`, + userId: overrides.userId ?? `u-${overrides.stellarAddress ?? `x${_rowCounter}`}`, + stellarAddress: overrides.stellarAddress ?? `GAAA${_rowCounter}`, + marketId: overrides.marketId ?? `m-${_rowCounter}`, + outcome: overrides.outcome ?? "yes", + amount: overrides.amount ?? `${100 + _rowCounter}`, + txHash: overrides.txHash ?? "", + fundingSource: overrides.fundingSource ?? null, + createdAt: + overrides.createdAt ?? + new Date(2026, 5, 1, 12, 0, _rowCounter * 60, 0), + }; +} + +class FakeRepo implements FraudRepo { + rows: PredictionRow[] = []; + written: FlagWriteInput[] = []; + listed: ReturnType extends Promise + ? R + : never = []; + + async loadRecentPredictions(): Promise { + return this.rows; + } + async upsertFlags(rows: FlagWriteInput[]): Promise { + this.written.push(...rows); + return rows.length; + } + async listFlags(): Promise { + return this.listed as any[]; + } +} + +// ──────────────────────────────────────────────────────────────────────────── +// UnionFind +// ──────────────────────────────────────────────────────────────────────────── + +describe("UnionFind", () => { + it("treats unseen nodes as their own root", () => { + const uf = new UnionFind(); + expect(uf.find("a")).toBe("a"); + }); + + it("unions two singletons and reports a single component", () => { + const uf = new UnionFind(); + expect(uf.union("a", "b")).toBe(true); + expect(uf.find("a")).toBe(uf.find("b")); + expect(uf.union("a", "b")).toBe(false); // already merged + }); + + it("collapses a transitive chain into one component", () => { + const uf = new UnionFind(); + uf.union("a", "b"); + uf.union("b", "c"); + uf.union("c", "d"); + const comps = uf.components(); + expect(comps.size).toBe(1); + expect([...comps.values()][0].sort()).toEqual(["a", "b", "c", "d"]); + }); + + it("keeps disjoint sets disjoint", () => { + const uf = new UnionFind(); + uf.union("a", "b"); + uf.union("c", "d"); + expect(uf.components().size).toBe(2); + }); + + it("path-compresses on find without changing semantics", () => { + const uf = new UnionFind(); + for (const n of ["a", "b", "c", "d", "e"]) uf.add(n); + uf.union("a", "b"); + uf.union("b", "c"); + uf.union("c", "d"); + uf.union("d", "e"); + // Call find many times — must remain stable. + const root = uf.find("a"); + for (const n of ["a", "b", "c", "d", "e"]) { + expect(uf.find(n)).toBe(root); + } + }); +}); + +// ──────────────────────────────────────────────────────────────────────────── +// makeClusterKey +// ──────────────────────────────────────────────────────────────────────────── + +describe("makeClusterKey", () => { + it("is order-independent", () => { + expect(makeClusterKey(["GBBB", "GAAA"])).toBe( + makeClusterKey(["GAAA", "GBBB"]), + ); + }); +}); + +// ──────────────────────────────────────────────────────────────────────────── +// buildGraph +// ──────────────────────────────────────────────────────────────────────────── + +describe("buildGraph", () => { + it("returns an empty graph for empty input", () => { + const g = buildGraph([]); + expect(g.nodes.size).toBe(0); + expect(g.edges).toEqual([]); + }); + + it("ignores rows with no stellar address", () => { + const g = buildGraph([mkRow({ stellarAddress: "" })]); + expect(g.nodes.size).toBe(0); + }); + + it("creates a SHARED_FUNDING_SOURCE edge between two addresses funded by the same wallet", () => { + const g = buildGraph([ + mkRow({ stellarAddress: "GA", fundingSource: "GF" }), + mkRow({ stellarAddress: "GB", fundingSource: "GF" }), + ]); + expect(g.edges).toHaveLength(1); + expect(g.edges[0].reason).toBe("SHARED_FUNDING_SOURCE"); + expect(g.edges[0].detail).toBe("GF"); + expect([g.edges[0].a, g.edges[0].b].sort()).toEqual(["GA", "GB"]); + }); + + it("does not create funding edges when funder is null", () => { + const g = buildGraph([ + mkRow({ stellarAddress: "GA", fundingSource: null }), + mkRow({ stellarAddress: "GB", fundingSource: null }), + ]); + expect(g.edges).toHaveLength(0); + }); + + it("creates SHARED_TX_HASH edges", () => { + const g = buildGraph([ + mkRow({ stellarAddress: "GA", txHash: "tx-1" }), + mkRow({ stellarAddress: "GB", txHash: "tx-1" }), + ]); + expect(g.edges.some((e) => e.reason === "SHARED_TX_HASH")).toBe(true); + }); + + it("does NOT create tx-hash edges for empty tx strings", () => { + const g = buildGraph([ + mkRow({ stellarAddress: "GA", txHash: "" }), + mkRow({ stellarAddress: "GB", txHash: "" }), + ]); + expect(g.edges.some((e) => e.reason === "SHARED_TX_HASH")).toBe(false); + }); + + it("buckets near-simultaneous identical bets as REPEATED_PATTERN", () => { + const base = new Date("2026-06-01T12:00:00Z").getTime(); + const g = buildGraph([ + mkRow({ + stellarAddress: "GA", + marketId: "m", + outcome: "yes", + amount: "10", + createdAt: new Date(base), + }), + mkRow({ + stellarAddress: "GB", + marketId: "m", + outcome: "yes", + amount: "10", + createdAt: new Date(base + 1000), // same bucket + }), + ]); + expect(g.edges.some((e) => e.reason === "REPEATED_PATTERN")).toBe(true); + }); + + it("does NOT bucket identical bets that fall into different time buckets", () => { + const base = new Date("2026-06-01T12:00:00Z").getTime(); + const g = buildGraph([ + mkRow({ + stellarAddress: "GA", + marketId: "m", + outcome: "yes", + amount: "10", + createdAt: new Date(base), + }), + mkRow({ + stellarAddress: "GB", + marketId: "m", + outcome: "yes", + amount: "10", + createdAt: new Date(base + PATTERN_BUCKET_MS * 3), + }), + ]); + expect(g.edges.some((e) => e.reason === "REPEATED_PATTERN")).toBe(false); + }); + + it("never creates self-loops", () => { + const g = buildGraph([ + mkRow({ stellarAddress: "GA", fundingSource: "GF" }), + mkRow({ stellarAddress: "GA", fundingSource: "GF" }), + ]); + expect(g.edges).toHaveLength(0); + }); + + it("dedupes identical edges from multiple prediction pairs", () => { + const g = buildGraph([ + mkRow({ stellarAddress: "GA", fundingSource: "GF" }), + mkRow({ stellarAddress: "GA", fundingSource: "GF" }), + mkRow({ stellarAddress: "GB", fundingSource: "GF" }), + mkRow({ stellarAddress: "GB", fundingSource: "GF" }), + ]); + const funderEdges = g.edges.filter( + (e) => e.reason === "SHARED_FUNDING_SOURCE", + ); + expect(funderEdges).toHaveLength(1); + }); +}); + +// ──────────────────────────────────────────────────────────────────────────── +// clusterize +// ──────────────────────────────────────────────────────────────────────────── + +describe("clusterize", () => { + it("ignores singleton components", () => { + const g = buildGraph([ + mkRow({ stellarAddress: "GA", fundingSource: "GF" }), + ]); + expect(clusterize(g)).toEqual([]); + }); + + it("groups addresses linked transitively through different reasons", () => { + // GA ↔ GB via funder, GB ↔ GC via tx hash → cluster {GA, GB, GC} + const g = buildGraph([ + mkRow({ stellarAddress: "GA", fundingSource: "GF" }), + mkRow({ stellarAddress: "GB", fundingSource: "GF" }), + mkRow({ stellarAddress: "GB", txHash: "tx-1" }), + mkRow({ stellarAddress: "GC", txHash: "tx-1" }), + ]); + const clusters = clusterize(g); + expect(clusters).toHaveLength(1); + expect(clusters[0].members).toEqual(["GA", "GB", "GC"]); + expect(clusters[0].score).toBeGreaterThan(0); + expect(clusters[0].key).toBe("GA|GB|GC"); + }); + + it("returns multiple clusters when address groups are disjoint", () => { + const g = buildGraph([ + mkRow({ stellarAddress: "GA", fundingSource: "GF1" }), + mkRow({ stellarAddress: "GB", fundingSource: "GF1" }), + mkRow({ stellarAddress: "GC", fundingSource: "GF2" }), + mkRow({ stellarAddress: "GD", fundingSource: "GF2" }), + ]); + const clusters = clusterize(g); + expect(clusters).toHaveLength(2); + }); +}); + +// ──────────────────────────────────────────────────────────────────────────── +// runFraudScan (orchestration) +// ──────────────────────────────────────────────────────────────────────────── + +describe("runFraudScan", () => { + it("validates lookbackMs", async () => { + const repo = new FakeRepo(); + await expect( + runFraudScan(repo, { lookbackMs: -1 }), + ).rejects.toThrow(/lookbackMs/); + }); + + it("validates maxPredictions", async () => { + const repo = new FakeRepo(); + await expect( + runFraudScan(repo, { maxPredictions: 0 }), + ).rejects.toThrow(/maxPredictions/); + }); + + it("returns zero counts when there is no data", async () => { + const repo = new FakeRepo(); + const res = await runFraudScan(repo, { correlationId: "cid-1" }); + expect(res).toMatchObject({ + scanned: 0, + edges: 0, + clusters: 0, + flagsWritten: 0, + correlationId: "cid-1", + }); + expect(repo.written).toEqual([]); + }); + + it("persists one flag per address in each cluster, idempotently in shape", async () => { + const repo = new FakeRepo(); + repo.rows = [ + mkRow({ + stellarAddress: "GA", + userId: "u-a", + fundingSource: "GF", + }), + mkRow({ + stellarAddress: "GB", + userId: "u-b", + fundingSource: "GF", + }), + ]; + + const res = await runFraudScan(repo, { correlationId: "cid-2" }); + + expect(res.scanned).toBe(2); + expect(res.clusters).toBe(1); + expect(res.flagsWritten).toBe(2); + + expect(repo.written).toHaveLength(2); + const addrs = repo.written.map((w) => w.stellarAddress).sort(); + expect(addrs).toEqual(["GA", "GB"]); + + for (const w of repo.written) { + expect(w.clusterKey).toBe("GA|GB"); + expect(w.reason).toBe("SHARED_FUNDING_SOURCE"); + expect(w.correlationId).toBe("cid-2"); + expect(w.score).toBeGreaterThan(0); + const ev = w.evidence as Record; + expect(ev.clusterSize).toBe(2); + expect(ev.members).toEqual(["GA", "GB"]); + expect(Array.isArray(ev.edges)).toBe(true); + } + }); + + it("skips an address gracefully when its userId cannot be resolved", async () => { + const repo = new FakeRepo(); + // Inject a row with empty userId — should not crash, just skip + repo.rows = [ + mkRow({ stellarAddress: "GA", userId: "u-a", fundingSource: "GF" }), + mkRow({ stellarAddress: "GB", userId: "", fundingSource: "GF" }), + ]; + const res = await runFraudScan(repo); + // Cluster forms (size 2) but only GA can be persisted + expect(res.clusters).toBe(1); + expect(res.flagsWritten).toBe(1); + expect(repo.written[0].stellarAddress).toBe("GA"); + }); + + it("uses the supplied clock to compute the since cutoff", async () => { + const repo = new FakeRepo(); + const loadSpy = jest.spyOn(repo, "loadRecentPredictions"); + const fixedNow = new Date("2026-06-15T00:00:00Z"); + await runFraudScan(repo, { + now: () => fixedNow, + lookbackMs: 60_000, + }); + expect(loadSpy).toHaveBeenCalledWith({ + since: new Date(fixedNow.getTime() - 60_000), + limit: expect.any(Number), + }); + }); +});