diff --git a/docs/cache.md b/docs/cache.md new file mode 100644 index 0000000..a8cf601 --- /dev/null +++ b/docs/cache.md @@ -0,0 +1,77 @@ +# Cache Strategy + +## Overview + +Predictify uses Redis for caching market data to improve read performance. This document describes the cache keys, TTLs, and invalidation strategy. + +## Cache Keys + +| Key Pattern | Description | Used By | +|-------------|-------------|---------| +| `markets:all` | List of all markets | `GET /api/markets` | +| `markets:{id}` | Single market detail | `GET /api/markets/:id` | + +## TTLs + +- **`markets:all`**: 60 seconds - Refreshed on any market update to ensure list consistency +- **`markets:{id}`**: 120 seconds - Longer TTL for individual market reads + +## Invalidation Strategy + +Cache entries are invalidated on the following events: + +### Market Update (`PATCH /api/markets/:id`) + +When a market is updated via the admin API, both cache keys are invalidated: + +1. `markets:{id}` - The specific market's cache is removed +2. `markets:all` - The aggregated list cache is removed + +This ensures that subsequent reads return fresh data from the database. + +### Implementation + +```typescript +// src/cache/marketsCache.ts +export async function invalidateMarketCache(marketId: string) { + const keysToDelete = [marketCacheKeys.byId(marketId), marketCacheKeys.all]; + await Promise.all(keysToDelete.map((k) => redisConnection.del(k))); +} +``` + +## Error Handling + +Cache operations are designed to never fail the business operation: + +- If Redis is unavailable, the request continues without caching +- Cache errors are logged with correlation IDs for debugging +- The API remains functional even if cache invalidation fails + +## Security Considerations + +- Cache keys do not contain sensitive data +- Only market IDs and aggregated lists are cached +- No user-specific data is cached +- Cache invalidation requires admin authentication + +## Performance Notes + +- Individual `DEL` operations are O(N) where N is the number of keys +- Invalidations are performed in parallel using `Promise.all` +- Cache misses fall back to database queries seamlessly + +## Monitoring + +Cache operations are logged with correlation IDs: + +```json +{ + "requestId": "uuid", + "marketId": "market-123", + "keys": ["markets:market-123", "markets:all"] +} +``` + +Monitor for: +- `Market cache invalidated` - Successful invalidation +- `Failed to invalidate market cache` - Redis connectivity issues diff --git a/drizzle/migrations/0013_address_aggregates_mv.sql b/drizzle/migrations/0013_address_aggregates_mv.sql new file mode 100644 index 0000000..59a16f0 --- /dev/null +++ b/drizzle/migrations/0013_address_aggregates_mv.sql @@ -0,0 +1,42 @@ +-- Predictions-per-address aggregated materialized view +-- Precomputes per-user prediction statistics for fast leaderboard queries. +-- Refreshed hourly via src/workers/refreshAggregates.ts using CONCURRENTLY +-- to avoid locking reads during refresh. + +CREATE MATERIALIZED VIEW IF NOT EXISTS address_aggregates_mv AS +SELECT + u.id AS user_id, + u.stellar_address, + COUNT(p.id)::bigint AS total_predictions, + SUM(CASE WHEN p.outcome = m.resolution_outcome THEN 1 ELSE 0 END)::bigint AS correct_predictions, + ROUND( + CASE WHEN COUNT(p.id) > 0 THEN + 100.0 * SUM(CASE WHEN p.outcome = m.resolution_outcome THEN 1 ELSE 0 END) / COUNT(p.id) + ELSE 0 + END, + 2 + ) AS accuracy_percentage, + ROW_NUMBER() OVER ( + ORDER BY + CASE WHEN COUNT(p.id) > 0 THEN + 100.0 * SUM(CASE WHEN p.outcome = m.resolution_outcome THEN 1 ELSE 0 END) / COUNT(p.id) + ELSE 0 + END DESC, + COUNT(p.id) DESC + ) AS rank +FROM users u +LEFT JOIN predictions p ON u.id = p.user_id +LEFT JOIN markets m ON p.market_id = m.id AND m.status IN ('resolved', 'disputed') +GROUP BY u.id, u.stellar_address; + +-- Unique index on user_id is required for CONCURRENTLY refresh +CREATE UNIQUE INDEX IF NOT EXISTS idx_address_aggregates_user_id + ON address_aggregates_mv (user_id); + +-- Index for lookups by stellar address +CREATE INDEX IF NOT EXISTS idx_address_aggregates_stellar_address + ON address_aggregates_mv (stellar_address); + +-- Index for ordered leaderboard queries +CREATE INDEX IF NOT EXISTS idx_address_aggregates_rank + ON address_aggregates_mv (rank); diff --git a/src/config/env-schema.ts b/src/config/env-schema.ts index de448af..1fb7127 100644 --- a/src/config/env-schema.ts +++ b/src/config/env-schema.ts @@ -4,7 +4,7 @@ export const envSchema = z.object({ // ── Application ─────────────────────────────────────────── NODE_ENV: z.enum(["development", "test", "production"]).default("development"), PORT: z.coerce.number().int().positive().default(3001), - LOG_LEVEL: z.enum(["fatal", "error", "warn", "info", "debug", "trace"]).default("info"), + LOG_LEVEL: z.enum(["fatal", "error", "warn", "info", "debug", "trace", "silent"]).default("info"), // ── Database ────────────────────────────────────────────── DATABASE_URL: z.string().url(), diff --git a/src/db/index.ts b/src/db/index.ts index 6f8802d..67e7ec7 100644 --- a/src/db/index.ts +++ b/src/db/index.ts @@ -8,3 +8,4 @@ const pool = new Pool({ }); export const db = drizzle(pool, { schema }); +export type Db = typeof db; diff --git a/src/index.ts b/src/index.ts index 7a1da14..60b21dd 100644 --- a/src/index.ts +++ b/src/index.ts @@ -22,6 +22,7 @@ import { REQUEST_ID_HEADER } from "./lib/http"; import { register } from "./metrics/registry"; import { connectWithRetry, closeDb } from "./db/client"; import { stopScheduler } from "./services/scheduler"; +import { startRefreshAggregatesWorker } from "./workers/refreshAggregates"; const docsEnabled = env.NODE_ENV !== "production" || process.env.ENABLE_DOCS === "true"; @@ -113,9 +114,11 @@ export function createApp(): express.Express { if (require.main === module) { const app = createApp(); + let refreshWorker: NodeJS.Timeout | undefined; connectWithRetry() .then(() => { + refreshWorker = startRefreshAggregatesWorker(); app.listen(env.PORT, () => { logger.info({ port: env.PORT, env: env.NODE_ENV }, "predictify-backend listening"); logger.info(`Swagger UI available at http://localhost:${env.PORT}/docs`); @@ -133,6 +136,7 @@ if (require.main === module) { process.exit(1); }, 5000).unref(); + if (refreshWorker) clearInterval(refreshWorker); stopScheduler(); await closeDb(); clearTimeout(forceExit); @@ -141,6 +145,7 @@ if (require.main === module) { process.on("SIGINT", () => { logger.info("SIGINT received, shutting down gracefully"); + if (refreshWorker) clearInterval(refreshWorker); stopScheduler(); process.exit(0); }); diff --git a/src/middleware/auth.ts b/src/middleware/auth.ts index a81b650..9d64731 100644 --- a/src/middleware/auth.ts +++ b/src/middleware/auth.ts @@ -36,7 +36,7 @@ export function requireAdmin(req: AuthenticatedRequest, res: Response, next: Nex req.user = { id: stellarAddress, stellarAddress }; next(); - } catch (err) { + } catch { res.status(401).json({ error: { code: "unauthorized" } }); } } diff --git a/src/middleware/rateLimit.ts b/src/middleware/rateLimit.ts index 6b47082..44e56b0 100644 --- a/src/middleware/rateLimit.ts +++ b/src/middleware/rateLimit.ts @@ -26,6 +26,7 @@ import { logger } from "../config/logger"; * that can be consumed by downstream middleware or route handlers. */ declare global { + // eslint-disable-next-line @typescript-eslint/no-namespace namespace Express { interface Request { /** Rate-limit context set by the rateLimit middleware on every request */ diff --git a/src/middleware/requireAdmin.ts b/src/middleware/requireAdmin.ts index f5b08fa..b7db8b5 100644 --- a/src/middleware/requireAdmin.ts +++ b/src/middleware/requireAdmin.ts @@ -17,6 +17,7 @@ import { env } from "../config/env"; // Augment Express Request so downstream handlers can read the admin identity // without casting. declare global { + // eslint-disable-next-line @typescript-eslint/no-namespace namespace Express { interface Request { adminAddress?: string; diff --git a/src/routes/markets.ts b/src/routes/markets.ts index 7f818fe..a7f0750 100644 --- a/src/routes/markets.ts +++ b/src/routes/markets.ts @@ -102,6 +102,7 @@ marketsRouter.patch("/:id", requireAdmin, async (req: AuthenticatedRequest, res, const { question, metadata, expectedVersion } = parsed.data; const adminAddress = req.user!.stellarAddress; + // eslint-disable-next-line @typescript-eslint/no-explicit-any const patch: { question?: string; metadata?: any } = {}; if (question !== undefined) patch.question = question; if (metadata !== undefined) patch.metadata = metadata; @@ -112,6 +113,7 @@ marketsRouter.patch("/:id", requireAdmin, async (req: AuthenticatedRequest, res, if (e instanceof VersionConflictError) { return res.status(409).json({ error: { code: "version_conflict" } }); } + // eslint-disable-next-line @typescript-eslint/no-explicit-any if ((e as any).status === 404) { return res.status(404).json({ error: { code: "not_found" } }); } diff --git a/src/routes/users.ts b/src/routes/users.ts index 2d9e3e9..6fdb490 100644 --- a/src/routes/users.ts +++ b/src/routes/users.ts @@ -37,7 +37,7 @@ usersRouter.get("/:address/predictions", async (req: Request, res: Response, nex try { stellarAddressSchema.parse(address); - } catch (e) { + } catch { return res.status(400).json({ error: { code: "invalid_address" } }); } @@ -69,49 +69,59 @@ usersRouter.get("/:address/predictions", async (req: Request, res: Response, nex } }); -usersRouter.get("/:stellarAddress/profile", async (req: Request, res: Response, next: NextFunction) => { - const reqId = getRequestId() ?? (typeof (req as { id?: unknown }).id === "string" ? (req as { id?: string }).id : undefined); - - const parseResult = stellarAddressSchema.safeParse(req.params.stellarAddress); - if (!parseResult.success) { - logger.warn( - { reqId, stellarAddress: req.params.stellarAddress, issues: parseResult.error.issues }, - "user_profile_validation_failed", - ); - return res.status(400).json({ - error: { - code: "validation_error", - message: parseResult.error.issues[0]?.message ?? "invalid stellar address", - requestId: reqId, - }, - }); - } - - const stellarAddress = parseResult.data; - - try { - logger.debug({ reqId, stellarAddress }, "user_profile_lookup"); - - const profile = await getUserProfile(stellarAddress); - - if (!profile) { - logger.debug({ reqId, stellarAddress }, "user_profile_not_found"); - return res.status(404).json({ +/** + * GET /api/users/:stellarAddress/profile + * + * Public endpoint — no authentication required. + * + * Returns the profile for the user identified by `stellarAddress`. + */ +usersRouter.get( + "/:stellarAddress/profile", + async (req: Request, res: Response, next: NextFunction) => { + const reqId = getRequestId(); + + const parseResult = stellarAddressSchema.safeParse(req.params.stellarAddress); + if (!parseResult.success) { + logger.warn( + { reqId, stellarAddress: req.params.stellarAddress, issues: parseResult.error.issues }, + "user_profile_validation_failed", + ); + return res.status(400).json({ error: { - code: "not_found", - message: "no user found with that stellar address", + code: "validation_error", + message: parseResult.error.issues[0]?.message ?? "invalid stellar address", requestId: reqId, }, }); } - logger.debug( - { reqId, stellarAddress, predictionCount: profile.predictions.length }, - "user_profile_found", - ); + const stellarAddress = parseResult.data; - return res.json({ data: profile }); - } catch (err) { - return next(err); - } -}); + try { + logger.debug({ reqId, stellarAddress }, "user_profile_lookup"); + + const profile = await getUserProfile(stellarAddress); + + if (!profile) { + logger.debug({ reqId, stellarAddress }, "user_profile_not_found"); + return res.status(404).json({ + error: { + code: "not_found", + message: "no user found with that stellar address", + requestId: reqId, + }, + }); + } + + logger.debug( + { reqId, stellarAddress, predictionCount: profile.predictions.length }, + "user_profile_found", + ); + + return res.json({ data: profile }); + } catch (err) { + return next(err); + } + }, +); diff --git a/src/services/addressAggregatesService.ts b/src/services/addressAggregatesService.ts new file mode 100644 index 0000000..b146fb5 --- /dev/null +++ b/src/services/addressAggregatesService.ts @@ -0,0 +1,68 @@ +import { db } from "../db"; +import { sql } from "drizzle-orm"; + +export interface AddressAggregate extends Record { + user_id: string; + stellar_address: string; + total_predictions: number; + correct_predictions: number; + accuracy_percentage: number; + rank: number; +} + +/** + * Refresh the address_aggregates_mv materialized view concurrently. + * Uses CONCURRENTLY to avoid locking reads during refresh. + */ +export async function refreshAddressAggregates(): Promise { + await db.execute(sql`REFRESH MATERIALIZED VIEW CONCURRENTLY address_aggregates_mv`); +} + +/** + * Get paginated address aggregates ordered by rank. + */ +export async function getAddressAggregates( + limit: number = 50, + offset: number = 0 +): Promise { + const result = await db.execute( + sql` + SELECT user_id, stellar_address, total_predictions, correct_predictions, + accuracy_percentage, rank + FROM address_aggregates_mv + ORDER BY rank ASC + LIMIT ${limit} + OFFSET ${offset} + ` + ); + return result.rows; +} + +/** + * Look up a single address aggregate by stellar address. + */ +export async function getAddressAggregate( + stellarAddress: string +): Promise { + const result = await db.execute( + sql` + SELECT user_id, stellar_address, total_predictions, correct_predictions, + accuracy_percentage, rank + FROM address_aggregates_mv + WHERE stellar_address = ${stellarAddress} + LIMIT 1 + ` + ); + return result.rows[0] || null; +} + +/** + * Refresh the view then return paginated results. + */ +export async function getAddressAggregatesWithRefresh( + limit: number = 50, + offset: number = 0 +): Promise { + await refreshAddressAggregates(); + return getAddressAggregates(limit, offset); +} diff --git a/src/services/leaderboardService.ts b/src/services/leaderboardService.ts index 968aa35..a6e204f 100644 --- a/src/services/leaderboardService.ts +++ b/src/services/leaderboardService.ts @@ -1,73 +1,47 @@ -import { db } from "../db"; -import { sql } from "drizzle-orm"; +import { + refreshAddressAggregates, + getAddressAggregates, + getAddressAggregate, + getAddressAggregatesWithRefresh, + type AddressAggregate, +} from "./addressAggregatesService"; -export interface LeaderboardEntry extends Record { - user_id: string; - stellar_address: string; - total_predictions: number; - correct_predictions: number; - accuracy_percentage: number; - rank: number; -} +export type LeaderboardEntry = AddressAggregate; /** - * Refresh the leaderboard materialized view - * This should be called periodically (e.g., via cron or after market resolutions) + * Refresh the address aggregates materialized view. + * Delegates to the shared service which runs CONCURRENTLY. */ export async function refreshLeaderboard(): Promise { - await db.execute(sql`REFRESH MATERIALIZED VIEW CONCURRENTLY leaderboard_mv`); + await refreshAddressAggregates(); } /** - * Get the leaderboard with optional limit and offset - * @param limit - Maximum number of entries to return (default: 50) - * @param offset - Number of entries to skip (default: 0) + * Get the leaderboard with optional limit and offset. + * Reads from address_aggregates_mv. */ export async function getLeaderboard( limit: number = 50, offset: number = 0 ): Promise { - const result = await db.execute( - sql` - SELECT user_id, stellar_address, total_predictions, correct_predictions, - accuracy_percentage, rank - FROM leaderboard_mv - ORDER BY rank ASC - LIMIT ${limit} - OFFSET ${offset} - ` - ); - return result.rows; + return getAddressAggregates(limit, offset); } /** - * Get a specific user's leaderboard entry by stellar address - * @param stellarAddress - The user's Stellar address + * Get a specific user's leaderboard entry by stellar address. */ export async function getUserLeaderboardEntry( stellarAddress: string ): Promise { - const result = await db.execute( - sql` - SELECT user_id, stellar_address, total_predictions, correct_predictions, - accuracy_percentage, rank - FROM leaderboard_mv - WHERE stellar_address = ${stellarAddress} - LIMIT 1 - ` - ); - return result.rows[0] || null; + return getAddressAggregate(stellarAddress); } /** - * Get leaderboard with automatic refresh - * This refreshes the materialized view before returning data - * Use this when you need the most up-to-date data + * Get leaderboard with automatic refresh before returning data. */ export async function getLeaderboardWithRefresh( limit: number = 50, offset: number = 0 ): Promise { - await refreshLeaderboard(); - return getLeaderboard(limit, offset); + return getAddressAggregatesWithRefresh(limit, offset); } diff --git a/src/services/marketService.ts b/src/services/marketService.ts index c42d486..5691360 100644 --- a/src/services/marketService.ts +++ b/src/services/marketService.ts @@ -1,3 +1,4 @@ +import { invalidateMarketCache } from "../cache/marketsCache"; import { db, getDb } from "../db/client"; import { markets, marketAuditLog } from "../db/schema"; import { asc, eq } from "drizzle-orm"; @@ -8,6 +9,7 @@ export interface Market { question: string; status: string; resolutionTime: Date; + // eslint-disable-next-line @typescript-eslint/no-explicit-any metadata: any; indexedLedger: number; archived: boolean; @@ -23,6 +25,7 @@ export class VersionConflictError extends Error { } } +// eslint-disable-next-line @typescript-eslint/no-explicit-any export async function listMarkets(options: { limit?: number; offset?: number } = {}): Promise { const limit = options.limit ?? 50; const offset = options.offset ?? 0; @@ -52,6 +55,7 @@ export async function listMarkets(options: { limit?: number; offset?: number } = return Array.isArray(result) ? result : []; } +// eslint-disable-next-line @typescript-eslint/no-explicit-any export async function getMarketById(id: string): Promise { try { const rows = await getDb() @@ -78,14 +82,17 @@ export async function getMarketById(id: string): Promise { export async function updateMarket( id: string, + // eslint-disable-next-line @typescript-eslint/no-explicit-any patch: { question?: string; metadata?: any }, expectedVersion: number, adminAddress: string + // eslint-disable-next-line @typescript-eslint/no-explicit-any ): Promise { const result = await db.transaction(async (tx) => { const existing = await tx.select().from(markets).where(eq(markets.id, id)).limit(1); if (existing.length === 0) { const err = new Error("Market not found"); + // eslint-disable-next-line @typescript-eslint/no-explicit-any (err as any).status = 404; throw err; } @@ -121,11 +128,12 @@ export async function updateMarket( }, }); + // Invalidate related cache entries + await invalidateMarketCache(id); return updated[0]; }); // Structured log event – emitted from service layer after successful commit. - // Includes correlation ID via requestContext (see logging/events.ts). emitMarketEvent(LogEvent.MARKET_UPDATED, { marketId: id, actor: adminAddress, @@ -135,4 +143,3 @@ export async function updateMarket( return result; } - diff --git a/src/services/userService.ts b/src/services/userService.ts index 721f81f..f366c4d 100644 --- a/src/services/userService.ts +++ b/src/services/userService.ts @@ -1,5 +1,5 @@ import { db } from "../db/client"; -import { users, predictions, markets, claims } from "../db/schema"; +import { users, predictions, markets } from "../db/schema"; import { and, eq, desc, lt, count } from "drizzle-orm"; import { Result, ok, err } from "../errors/RouteError"; @@ -49,8 +49,16 @@ export interface CurrentUserProfile { }; } +/** + * Returns the authenticated user's profile (stellarAddress, createdAt) along + * with aggregate counts of their predictions. Two queries run + * in parallel via Promise.all: + * + * 1. users — by PK (UUID), cheap point-lookup + * 2. predictions — COUNT(*) filtered by user_id (FK index) + */ export async function getCurrentUserProfile(userId: string): Promise> { - const [userRow, predCountRow, claimCountRow] = await Promise.all([ + const [userRow, predCountRow] = await Promise.all([ db .select({ stellarAddress: users.stellarAddress, @@ -63,10 +71,6 @@ export async function getCurrentUserProfile(userId: string): Promise { + const start = Date.now(); + try { + await refreshAddressAggregates(); + logger.info( + { durationMs: Date.now() - start }, + "address_aggregates_mv refreshed" + ); + } catch (err) { + logger.error({ err, durationMs: Date.now() - start }, "address_aggregates_mv refresh failed"); + } + }, intervalMs); + + // Allow the timer to be garbage-collected even if the process would otherwise keep waiting + id.unref(); + + logger.info({ intervalMs }, "address_aggregates refresh worker started"); + return id; +} diff --git a/tests/addressAggregatesService.test.ts b/tests/addressAggregatesService.test.ts new file mode 100644 index 0000000..4944fbd --- /dev/null +++ b/tests/addressAggregatesService.test.ts @@ -0,0 +1,102 @@ +import { sql } from "drizzle-orm"; + +jest.mock("../src/db", () => ({ + db: { execute: jest.fn() }, +})); + +import { db } from "../src/db"; +import { + refreshAddressAggregates, + getAddressAggregates, + getAddressAggregate, + getAddressAggregatesWithRefresh, +} from "../src/services/addressAggregatesService"; + +const mockExecute = db.execute as jest.MockedFunction; + +beforeEach(() => { + jest.clearAllMocks(); +}); + +describe("addressAggregatesService", () => { + const sampleRows = [ + { + user_id: "u1", + stellar_address: "GAAA", + total_predictions: 10, + correct_predictions: 7, + accuracy_percentage: 70.0, + rank: 1, + }, + { + user_id: "u2", + stellar_address: "GBBB", + total_predictions: 5, + correct_predictions: 2, + accuracy_percentage: 40.0, + rank: 2, + }, + ]; + + describe("refreshAddressAggregates", () => { + it("issues REFRESH MATERIALIZED VIEW CONCURRENTLY", async () => { + mockExecute.mockResolvedValueOnce({ rows: [], fields: [], command: "REFRESH", rowCount: 0, oid: 0 }); + + await refreshAddressAggregates(); + + expect(mockExecute).toHaveBeenCalledTimes(1); + const [query] = mockExecute.mock.calls[0] as [ReturnType]; + expect(query).toBeDefined(); + }); + }); + + describe("getAddressAggregates", () => { + it("returns paginated rows ordered by rank", async () => { + mockExecute.mockResolvedValueOnce({ rows: sampleRows, fields: [], command: "SELECT", rowCount: 2, oid: 0 }); + + const result = await getAddressAggregates(10, 0); + + expect(result).toEqual(sampleRows); + expect(mockExecute).toHaveBeenCalledTimes(1); + }); + + it("uses default limit=50 and offset=0", async () => { + mockExecute.mockResolvedValueOnce({ rows: [], fields: [], command: "SELECT", rowCount: 0, oid: 0 }); + + await getAddressAggregates(); + + expect(mockExecute).toHaveBeenCalledTimes(1); + }); + }); + + describe("getAddressAggregate", () => { + it("returns a single entry for a known address", async () => { + mockExecute.mockResolvedValueOnce({ rows: [sampleRows[0]], fields: [], command: "SELECT", rowCount: 1, oid: 0 }); + + const result = await getAddressAggregate("GAAA"); + + expect(result).toEqual(sampleRows[0]); + }); + + it("returns null when address is not found", async () => { + mockExecute.mockResolvedValueOnce({ rows: [], fields: [], command: "SELECT", rowCount: 0, oid: 0 }); + + const result = await getAddressAggregate("GZZZ"); + + expect(result).toBeNull(); + }); + }); + + describe("getAddressAggregatesWithRefresh", () => { + it("refreshes then returns paginated rows", async () => { + mockExecute + .mockResolvedValueOnce({ rows: [], fields: [], command: "REFRESH", rowCount: 0, oid: 0 }) // refresh + .mockResolvedValueOnce({ rows: sampleRows, fields: [], command: "SELECT", rowCount: 2, oid: 0 }); // query + + const result = await getAddressAggregatesWithRefresh(20, 10); + + expect(mockExecute).toHaveBeenCalledTimes(2); + expect(result).toEqual(sampleRows); + }); + }); +}); diff --git a/tests/leaderboard.test.ts b/tests/leaderboard.test.ts new file mode 100644 index 0000000..547b170 --- /dev/null +++ b/tests/leaderboard.test.ts @@ -0,0 +1,113 @@ +process.env.NODE_ENV = "test"; +process.env.PORT = "3001"; +process.env.LOG_LEVEL = "fatal"; +process.env.DATABASE_URL = "postgres://localhost/test"; +process.env.JWT_SECRET = "leaderboard-test-secret-at-least-32-bytes"; +process.env.JWT_ISSUER = "predictify"; +process.env.JWT_AUDIENCE = "predictify-app"; +process.env.JWT_TTL_SECONDS = "3600"; +process.env.STELLAR_NETWORK = "testnet"; +process.env.SOROBAN_RPC_URL = "https://soroban-testnet.stellar.org"; +process.env.HORIZON_URL = "https://horizon-testnet.stellar.org"; +process.env.PREDICTIFY_CONTRACT_ID = "CABCDEF"; + +jest.mock("../src/db/client", () => ({ db: {} })); +jest.mock("../src/services/addressAggregatesService"); + +import request from "supertest"; +import { createApp } from "../src/index"; +import * as addressAggregatesService from "../src/services/addressAggregatesService"; + +const mockGetAddressAggregates = addressAggregatesService.getAddressAggregates as jest.MockedFunction< + typeof addressAggregatesService.getAddressAggregates +>; +const mockGetAddressAggregatesWithRefresh = addressAggregatesService.getAddressAggregatesWithRefresh as jest.MockedFunction< + typeof addressAggregatesService.getAddressAggregatesWithRefresh +>; +const mockGetAddressAggregate = addressAggregatesService.getAddressAggregate as jest.MockedFunction< + typeof addressAggregatesService.getAddressAggregate +>; + +const app = createApp(); + +beforeEach(() => { + jest.clearAllMocks(); +}); + +const sampleEntry = { + user_id: "u1", + stellar_address: "GAAA", + total_predictions: 10, + correct_predictions: 7, + accuracy_percentage: 70.0, + rank: 1, +}; + +describe("GET /api/leaderboard", () => { + it("returns paginated leaderboard entries", async () => { + mockGetAddressAggregates.mockResolvedValueOnce([sampleEntry]); + + const res = await request(app).get("/api/leaderboard"); + + expect(res.status).toBe(200); + expect(res.body.data).toEqual([sampleEntry]); + expect(res.body.meta).toMatchObject({ + limit: 50, + offset: 0, + count: 1, + refresh: false, + }); + }); + + it("accepts limit and offset query params", async () => { + mockGetAddressAggregates.mockResolvedValueOnce([]); + + const res = await request(app) + .get("/api/leaderboard") + .query({ limit: 10, offset: 20 }); + + expect(res.status).toBe(200); + expect(res.body.meta).toMatchObject({ limit: 10, offset: 20, refresh: false }); + expect(mockGetAddressAggregates).toHaveBeenCalledWith(10, 20); + }); + + it("rejects limit > 100", async () => { + const res = await request(app) + .get("/api/leaderboard") + .query({ limit: 200 }); + + expect(res.status).toBe(400); + }); + + it("uses refresh endpoint when refresh=true", async () => { + mockGetAddressAggregatesWithRefresh.mockResolvedValueOnce([sampleEntry]); + + const res = await request(app) + .get("/api/leaderboard") + .query({ refresh: "true" }); + + expect(res.status).toBe(200); + expect(res.body.meta.refresh).toBe(true); + expect(mockGetAddressAggregatesWithRefresh).toHaveBeenCalled(); + }); +}); + +describe("GET /api/leaderboard/user/:stellarAddress", () => { + it("returns a user's leaderboard entry", async () => { + mockGetAddressAggregate.mockResolvedValueOnce(sampleEntry); + + const res = await request(app).get("/api/leaderboard/user/GAAA"); + + expect(res.status).toBe(200); + expect(res.body.data).toEqual(sampleEntry); + }); + + it("returns 404 for unknown address", async () => { + mockGetAddressAggregate.mockResolvedValueOnce(null); + + const res = await request(app).get("/api/leaderboard/user/GZZZ"); + + expect(res.status).toBe(404); + expect(res.body).toHaveProperty("error"); + }); +}); diff --git a/tests/marketsCache.test.ts b/tests/marketsCache.test.ts new file mode 100644 index 0000000..a1d141f --- /dev/null +++ b/tests/marketsCache.test.ts @@ -0,0 +1,86 @@ +import { marketCacheKeys, invalidateMarketCache } from "../src/cache/marketsCache"; +import { redisConnection } from "../src/queue"; + +jest.mock("../src/queue", () => ({ + redisConnection: { + del: jest.fn(), + }, +})); + +jest.mock("../src/lib/requestContext", () => ({ + getRequestId: jest.fn(() => "test-request-id"), +})); + +describe("marketsCache", () => { + beforeEach(() => { + jest.clearAllMocks(); + }); + + describe("marketCacheKeys", () => { + it("generates correct 'all' key", () => { + expect(marketCacheKeys.all).toBe("markets:all"); + }); + + it("generates correct 'byId' key", () => { + expect(marketCacheKeys.byId("market-123")).toBe("markets:market-123"); + }); + }); + + describe("invalidateMarketCache", () => { + it("deletes both specific market key and list key", async () => { + (redisConnection.del as jest.Mock).mockResolvedValue(1); + + await invalidateMarketCache("market-123"); + + expect(redisConnection.del).toHaveBeenCalledTimes(2); + expect(redisConnection.del).toHaveBeenCalledWith("markets:market-123"); + expect(redisConnection.del).toHaveBeenCalledWith("markets:all"); + }); + + it("logs success message after invalidation", async () => { + (redisConnection.del as jest.Mock).mockResolvedValue(1); + const loggerModule = require("../src/config/logger"); + const loggerSpy = jest.spyOn(loggerModule.logger, "info"); + + await invalidateMarketCache("market-123"); + + expect(loggerSpy).toHaveBeenCalledWith( + { + requestId: "test-request-id", + marketId: "market-123", + keys: ["markets:market-123", "markets:all"], + }, + "Market cache invalidated" + ); + + loggerSpy.mockRestore(); + }); + + it("handles Redis errors gracefully without throwing", async () => { + (redisConnection.del as jest.Mock).mockRejectedValue(new Error("Redis connection failed")); + const loggerModule = require("../src/config/logger"); + const loggerSpy = jest.spyOn(loggerModule.logger, "error"); + + await expect(invalidateMarketCache("market-123")).resolves.not.toThrow(); + + expect(loggerSpy).toHaveBeenCalledWith( + expect.objectContaining({ + marketId: "market-123", + }), + "Failed to invalidate market cache" + ); + + loggerSpy.mockRestore(); + }); + + it("continues even if one key deletion fails", async () => { + (redisConnection.del as jest.Mock) + .mockRejectedValueOnce(new Error("First key failed")) + .mockResolvedValueOnce(1); + + await expect(invalidateMarketCache("market-123")).resolves.not.toThrow(); + + expect(redisConnection.del).toHaveBeenCalledTimes(2); + }); + }); +}); diff --git a/tests/refreshAggregates.test.ts b/tests/refreshAggregates.test.ts new file mode 100644 index 0000000..3708dc7 --- /dev/null +++ b/tests/refreshAggregates.test.ts @@ -0,0 +1,66 @@ +jest.mock("../src/db/client", () => ({ db: {} })); +jest.mock("../src/services/addressAggregatesService"); + +import * as addressAggregatesService from "../src/services/addressAggregatesService"; +import { startRefreshAggregatesWorker } from "../src/workers/refreshAggregates"; + +const mockRefresh = addressAggregatesService.refreshAddressAggregates as jest.MockedFunction< + typeof addressAggregatesService.refreshAddressAggregates +>; + +beforeEach(() => { + jest.clearAllMocks(); + jest.useFakeTimers(); +}); + +afterEach(() => { + jest.useRealTimers(); +}); + +describe("startRefreshAggregatesWorker", () => { + it("calls refreshAddressAggregates on each interval tick", async () => { + mockRefresh.mockResolvedValue(); + + const handle = startRefreshAggregatesWorker(60_000); + + // First tick + jest.advanceTimersByTime(60_000); + await Promise.resolve(); // flush microtasks + expect(mockRefresh).toHaveBeenCalledTimes(1); + + // Second tick + jest.advanceTimersByTime(60_000); + await Promise.resolve(); + expect(mockRefresh).toHaveBeenCalledTimes(2); + + clearInterval(handle); + }); + + it("logs error but does not throw when refresh fails", async () => { + mockRefresh.mockRejectedValueOnce(new Error("db down")); + + const handle = startRefreshAggregatesWorker(60_000); + + jest.advanceTimersByTime(60_000); + await Promise.resolve(); + + // Worker should still be alive and callable again + mockRefresh.mockResolvedValue(); + jest.advanceTimersByTime(60_000); + await Promise.resolve(); + expect(mockRefresh).toHaveBeenCalledTimes(2); + + clearInterval(handle); + }); + + it("returns a clearable interval handle", () => { + mockRefresh.mockResolvedValue(); + + const handle = startRefreshAggregatesWorker(60_000); + clearInterval(handle); + + jest.advanceTimersByTime(120_000); + // Should not have been called after clear + expect(mockRefresh).not.toHaveBeenCalled(); + }); +}); diff --git a/tests/requestId.test.ts b/tests/requestId.test.ts index 67dcbf5..c3be0f9 100644 --- a/tests/requestId.test.ts +++ b/tests/requestId.test.ts @@ -211,11 +211,11 @@ describe("fetchWithRequestId", () => { // Stub global fetch to inspect headers without making a real network call. const originalFetch = global.fetch; - global.fetch = jest.fn(async (_input: RequestInfo | URL, init?: RequestInit) => { + global.fetch = jest.fn(async (_input: any, init?: any) => { const headers = new Headers(init?.headers); capturedHeader = headers.get(REQUEST_ID_HEADER); return new Response(JSON.stringify({ ok: true }), { status: 200 }); - }) as typeof fetch; + }) as any; await requestContextStorage.run({ requestId: id }, async () => { await fetchWithRequestId("https://example.com/rpc"); @@ -229,11 +229,11 @@ describe("fetchWithRequestId", () => { let capturedHeader: string | null = "sentinel"; const originalFetch = global.fetch; - global.fetch = jest.fn(async (_input: RequestInfo | URL, init?: RequestInit) => { + global.fetch = jest.fn(async (_input: any, init?: any) => { const headers = new Headers(init?.headers); capturedHeader = headers.get(REQUEST_ID_HEADER); return new Response(null, { status: 200 }); - }) as typeof fetch; + }) as any; await fetchWithRequestId("https://example.com/rpc"); @@ -246,10 +246,10 @@ describe("fetchWithRequestId", () => { let capturedHeaders: Headers | null = null; const originalFetch = global.fetch; - global.fetch = jest.fn(async (_input: RequestInfo | URL, init?: RequestInit) => { + global.fetch = jest.fn(async (_input: any, init?: any) => { capturedHeaders = new Headers(init?.headers); return new Response(null, { status: 200 }); - }) as typeof fetch; + }) as any; await requestContextStorage.run({ requestId: id }, async () => { await fetchWithRequestId("https://example.com/rpc", { diff --git a/tests/users.test.ts b/tests/users.test.ts index 62fafb3..689f756 100644 --- a/tests/users.test.ts +++ b/tests/users.test.ts @@ -1,6 +1,6 @@ import request from "supertest"; import { createApp } from "../src/index"; -import { db } from "../src/db/connection"; +import { db } from "../src/db"; import { users, markets, predictions } from "../src/db/schema"; import { eq } from "drizzle-orm";