Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 76 additions & 0 deletions docs/fraud-signal.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
# Fraud Signal Detector

Background job that analyzes recent predictions, builds an address graph,
clusters suspicious addresses with **Union-Find**, and persists findings to
`fraud_flags` for admin review.

## Architecture

```
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚ predictions, users │──▢ β”‚ buildGraph (pure) │──▢ β”‚ clusterize (DSU) β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
β”‚
β–Ό
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚ fraud_flags β”‚
β”‚ (idempotent) β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
β”‚
β–Ό
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚ GET /api/admin/ β”‚
β”‚ fraud/flags β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
```

* **Graph builder** β€” `src/services/fraudService.ts :: buildGraph`. Pure
function. Creates undirected edges between Stellar addresses for any of:
* `SHARED_FUNDING_SOURCE` β€” same on-chain funder
* `SHARED_TX_HASH` β€” two addresses appearing on the same transaction
* `REPEATED_PATTERN` β€” identical (market, outcome, amount) bet inside the
same 5-minute bucket
* **Clustering** β€” `clusterize` uses a classic Union-Find with path
compression + union-by-rank. Components of size β‰₯ 2 become clusters.
* **Persistence** β€” `DrizzleFraudRepo.upsertFlags` writes one row per
`(cluster, user)` with `ON CONFLICT DO UPDATE`, so re-running the scan
is idempotent and refreshes evidence in-place.
* **Worker** β€” `src/workers/fraudDetector.ts`. `runOnce()` for ad-hoc or
cron runs; `start(intervalMs)` for an in-process timer.
* **Admin endpoints** β€” `src/routes/admin/fraud.ts`:
* `GET /api/admin/fraud/flags?status=open&limit=50` β€” paginated review
* `POST /api/admin/fraud/scan` β€” manual trigger (admin only)

All endpoints require an admin JWT (`role: "admin"`) and are rate-limited
per-token (60 req/min by default).

## Schema

Migration `drizzle/migrations/0011_fraud_flags.sql`:

* Adds nullable `predictions.funding_source TEXT` (+ partial index).
* Creates `fraud_flags` with `(cluster_key, user_id)` unique index,
status enum, evidence JSONB, score, and reviewer audit columns.

## Operational notes

* Correlation IDs flow from the request (or are generated per scan) into
every log line and every persisted `fraud_flags.correlation_id`.
* The worker never throws β€” failures are logged and the next interval
retries. This keeps the in-process scheduler stable.
* `MIN_CLUSTER_SIZE = 2` keeps noise low. Tune via the constant in
`fraudService.ts` if needed.
* Default lookback is **24 h**, capped at 10 000 predictions per scan to
protect memory on large datasets.

## Testing

```
npm test -- tests/fraudService.test.ts
npm test -- tests/fraudDetector.test.ts
npm test -- tests/adminFraud.test.ts
```

The suite covers graph builder edge types, union-find correctness, run
orchestration, worker scheduling & error handling, and the admin route
(auth, validation, happy path).
44 changes: 44 additions & 0 deletions drizzle/migrations/0011_fraud_flags.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
-- Migration: Fraud signal detector
--
-- 1. Adds an optional `funding_source` column to predictions so the graph
-- builder can connect addresses that share an on-chain funder.
-- Nullable + no default β€” fully backwards-compatible with existing rows.
-- 2. Creates the `fraud_flags` table that the background detector writes
-- cluster findings into, with the reason payload and review state.
--
-- All statements are idempotent so re-running the migration is safe.

ALTER TABLE predictions
ADD COLUMN IF NOT EXISTS funding_source text;

CREATE INDEX IF NOT EXISTS predictions_funding_source_idx
ON predictions (funding_source)
WHERE funding_source IS NOT NULL;

CREATE TABLE IF NOT EXISTS fraud_flags (
id uuid PRIMARY KEY DEFAULT gen_random_uuid(),
cluster_key text NOT NULL,
user_id uuid NOT NULL REFERENCES users(id) ON DELETE CASCADE,
stellar_address text NOT NULL,
reason text NOT NULL,
evidence jsonb NOT NULL DEFAULT '{}'::jsonb,
score integer NOT NULL DEFAULT 0,
status text NOT NULL DEFAULT 'open',
reviewed_by text,
reviewed_at timestamptz,
correlation_id text,
created_at timestamptz NOT NULL DEFAULT now(),
updated_at timestamptz NOT NULL DEFAULT now(),
CONSTRAINT fraud_flags_status_check
CHECK (status IN ('open', 'dismissed', 'confirmed'))
);

-- One open flag per (cluster, user) to keep re-runs idempotent.
CREATE UNIQUE INDEX IF NOT EXISTS fraud_flags_cluster_user_unique
ON fraud_flags (cluster_key, user_id);

CREATE INDEX IF NOT EXISTS fraud_flags_status_created_idx
ON fraud_flags (status, created_at DESC);

CREATE INDEX IF NOT EXISTS fraud_flags_address_idx
ON fraud_flags (stellar_address);
52 changes: 52 additions & 0 deletions src/db/schema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -141,13 +141,65 @@ export const predictions = pgTable("predictions", {
outcome: text("outcome").notNull(),
amount: text("amount").notNull(),
txHash: text("tx_hash").notNull().default(""),
/**
* Optional on-chain funding source (e.g. the account that originally
* funded this user's wallet). Used by the fraud-signal detector to
* connect addresses that share a funder. Nullable so legacy rows and
* predictions whose funder is unknown remain valid.
*/
fundingSource: text("funding_source"),
status: text("status").notNull().default("pending"),
result: text("result"),
createdAt: timestamp("created_at", { withTimezone: true })
.notNull()
.defaultNow(),
});

/**
* fraud_flags β€” persisted output of the fraud-signal background job.
*
* Each row represents a single (cluster, user) finding. A `cluster_key`
* groups all addresses the union-find algorithm collapsed together,
* `reason` is a short machine code, and `evidence` carries the structured
* payload (graph edges, shared funders, repeated patterns) for admin review.
*
* `(cluster_key, user_id)` is unique so re-running the detector is
* idempotent and never produces duplicates for the same finding.
*/
export const fraudFlags = pgTable(
"fraud_flags",
{
id: uuid("id").primaryKey().defaultRandom(),
clusterKey: text("cluster_key").notNull(),
userId: uuid("user_id")
.notNull()
.references(() => users.id, { onDelete: "cascade" }),
stellarAddress: text("stellar_address").notNull(),
reason: text("reason").notNull(),
evidence: jsonb("evidence").notNull().default({}),
score: integer("score").notNull().default(0),
status: text("status").notNull().default("open"),
reviewedBy: text("reviewed_by"),
reviewedAt: timestamp("reviewed_at", { withTimezone: true }),
correlationId: text("correlation_id"),
createdAt: timestamp("created_at", { withTimezone: true })
.notNull()
.defaultNow(),
updatedAt: timestamp("updated_at", { withTimezone: true })
.notNull()
.defaultNow(),
},
(t) => ({
fraudFlagsStatusCreatedIdx: index("fraud_flags_status_created_idx").on(
t.status,
t.createdAt,
),
fraudFlagsAddressIdx: index("fraud_flags_address_idx").on(t.stellarAddress),
}),
);

export type FraudFlag = typeof fraudFlags.$inferSelect;

export const claims = pgTable("claims", {
id: uuid("id").primaryKey().defaultRandom(),
userId: uuid("user_id")
Expand Down
2 changes: 2 additions & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import { createDocsRouter } from "./routes/docs";
import { notificationsRouter } from "./routes/notifications";
import { socialRouter } from "./routes/social";
import { adminAuditRouter } from "./routes/admin/audit";
import { adminFraudRouter } from "./routes/admin/fraud";
import { errorHandler } from "./middleware/errorHandler";
import { requestContextStorage } from "./lib/requestContext";
import { REQUEST_ID_HEADER } from "./lib/http";
Expand Down Expand Up @@ -92,6 +93,7 @@ export function createApp(): express.Express {
app.use("/api/users", socialRouter);
app.use("/api/users", usersRouter);
app.use("/api/admin/audit", adminAuditRouter);
app.use("/api/admin/fraud", adminFraudRouter);

app.get("/metrics", async (req, res) => {
const metricsAuthToken = process.env.METRICS_AUTH_TOKEN;
Expand Down
143 changes: 143 additions & 0 deletions src/routes/admin/fraud.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
/**
* Admin fraud review endpoint.
*
* GET /api/admin/fraud/flags?status=open&limit=50
* POST /api/admin/fraud/scan (manual trigger)
*
* Both endpoints:
* β€’ require an admin JWT (Bearer token, role: "admin")
* β€’ validate input at the boundary with Zod
* β€’ return the project's standard error envelope on failure
* β€’ echo the request id so the client can correlate logs
*/

import { Router } from "express";
import { rateLimit } from "express-rate-limit";
import { z } from "zod";
import { requireAdmin } from "../../middleware/requireAdmin";
import { REQUEST_ID_HEADER } from "../../lib/http";
import { getRequestId } from "../../lib/requestContext";
import {
DrizzleFraudRepo,
type FraudRepo,
listFraudFlags,
runFraudScan,
} from "../../services/fraudService";

const listQuerySchema = z.object({
status: z.enum(["open", "dismissed", "confirmed"]).optional(),
limit: z
.string()
.regex(/^\d+$/u, { message: "limit must be a positive integer" })
.transform((v) => parseInt(v, 10))
.refine((n) => n >= 1 && n <= 200, {
message: "limit must be between 1 and 200",
})
.optional(),
});

const scanBodySchema = z
.object({
lookbackMs: z.number().int().positive().max(7 * 24 * 60 * 60 * 1000).optional(),
maxPredictions: z.number().int().positive().max(100_000).optional(),
})
.strict();

export interface AdminFraudRouterOptions {
/** Inject a fake repo in tests. */
repo?: FraudRepo;
/** Requests per minute per admin token. Default: 60 */
rateLimitPerMinute?: number;
}

function requestIdOf(req: { id?: unknown }): string {
return (
getRequestId() ??
(typeof req.id === "string" ? req.id : "") ??
""
);
}

export function createAdminFraudRouter(
opts: AdminFraudRouterOptions = {},
): Router {
const router = Router();
const repo = opts.repo ?? new DrizzleFraudRepo();
const limit = opts.rateLimitPerMinute ?? 60;

router.use(
rateLimit({
windowMs: 60_000,
limit,
keyGenerator: (req) =>
(req.headers.authorization as string | undefined) ??
req.ip ??
"unknown",
standardHeaders: "draft-6",
legacyHeaders: false,
message: { error: { code: "rate_limit_exceeded" } },
}),
);

router.use(requireAdmin);

// ── GET /flags ────────────────────────────────────────────────────────────
router.get("/flags", async (req, res, next) => {
try {
const requestId = requestIdOf({ id: (req as { id?: unknown }).id });
const parsed = listQuerySchema.safeParse(req.query);
if (!parsed.success) {
res.setHeader(REQUEST_ID_HEADER, requestId);
res.status(400).json({
error: {
code: "validation_error",
message:
parsed.error.issues[0]?.message ?? "invalid query parameters",
details: parsed.error.issues,
requestId,
},
});
return;
}
const rows = await listFraudFlags(parsed.data, repo);
res.setHeader(REQUEST_ID_HEADER, requestId);
res.json({ data: rows });
} catch (e) {
next(e);
}
});

// ── POST /scan ────────────────────────────────────────────────────────────
router.post("/scan", async (req, res, next) => {
try {
const requestId = requestIdOf({ id: (req as { id?: unknown }).id });
const body = req.body ?? {};
const parsed = scanBodySchema.safeParse(body);
if (!parsed.success) {
res.setHeader(REQUEST_ID_HEADER, requestId);
res.status(400).json({
error: {
code: "validation_error",
message:
parsed.error.issues[0]?.message ?? "invalid request body",
details: parsed.error.issues,
requestId,
},
});
return;
}
const result = await runFraudScan(repo, {
...parsed.data,
correlationId: requestId,
});
res.setHeader(REQUEST_ID_HEADER, requestId);
res.json({ data: result });
} catch (e) {
next(e);
}
});

return router;
}

export const adminFraudRouter = createAdminFraudRouter();
Loading
Loading