Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 77 additions & 0 deletions docs/cache.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
# Cache Strategy

## Overview

Predictify uses Redis for caching market data to improve read performance. This document describes the cache keys, TTLs, and invalidation strategy.

## Cache Keys

| Key Pattern | Description | Used By |
|-------------|-------------|---------|
| `markets:all` | List of all markets | `GET /api/markets` |
| `markets:{id}` | Single market detail | `GET /api/markets/:id` |

## TTLs

- **`markets:all`**: 60 seconds - Refreshed on any market update to ensure list consistency
- **`markets:{id}`**: 120 seconds - Longer TTL for individual market reads

## Invalidation Strategy

Cache entries are invalidated on the following events:

### Market Update (`PATCH /api/markets/:id`)

When a market is updated via the admin API, both cache keys are invalidated:

1. `markets:{id}` - The specific market's cache is removed
2. `markets:all` - The aggregated list cache is removed

This ensures that subsequent reads return fresh data from the database.

### Implementation

```typescript
// src/cache/marketsCache.ts
export async function invalidateMarketCache(marketId: string) {
const keysToDelete = [marketCacheKeys.byId(marketId), marketCacheKeys.all];
await Promise.all(keysToDelete.map((k) => redisConnection.del(k)));
}
```

## Error Handling

Cache operations are designed to never fail the business operation:

- If Redis is unavailable, the request continues without caching
- Cache errors are logged with correlation IDs for debugging
- The API remains functional even if cache invalidation fails

## Security Considerations

- Cache keys do not contain sensitive data
- Only market IDs and aggregated lists are cached
- No user-specific data is cached
- Cache invalidation requires admin authentication

## Performance Notes

- Individual `DEL` operations are O(N) where N is the number of keys
- Invalidations are performed in parallel using `Promise.all`
- Cache misses fall back to database queries seamlessly

## Monitoring

Cache operations are logged with correlation IDs:

```json
{
"requestId": "uuid",
"marketId": "market-123",
"keys": ["markets:market-123", "markets:all"]
}
```

Monitor for:
- `Market cache invalidated` - Successful invalidation
- `Failed to invalidate market cache` - Redis connectivity issues
42 changes: 42 additions & 0 deletions drizzle/migrations/0013_address_aggregates_mv.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
-- Predictions-per-address aggregated materialized view
-- Precomputes per-user prediction statistics for fast leaderboard queries.
-- Refreshed hourly via src/workers/refreshAggregates.ts using CONCURRENTLY
-- to avoid locking reads during refresh.

CREATE MATERIALIZED VIEW IF NOT EXISTS address_aggregates_mv AS
SELECT
u.id AS user_id,
u.stellar_address,
COUNT(p.id)::bigint AS total_predictions,
SUM(CASE WHEN p.outcome = m.resolution_outcome THEN 1 ELSE 0 END)::bigint AS correct_predictions,
ROUND(
CASE WHEN COUNT(p.id) > 0 THEN
100.0 * SUM(CASE WHEN p.outcome = m.resolution_outcome THEN 1 ELSE 0 END) / COUNT(p.id)
ELSE 0
END,
2
) AS accuracy_percentage,
ROW_NUMBER() OVER (
ORDER BY
CASE WHEN COUNT(p.id) > 0 THEN
100.0 * SUM(CASE WHEN p.outcome = m.resolution_outcome THEN 1 ELSE 0 END) / COUNT(p.id)
ELSE 0
END DESC,
COUNT(p.id) DESC
) AS rank
FROM users u
LEFT JOIN predictions p ON u.id = p.user_id
LEFT JOIN markets m ON p.market_id = m.id AND m.status IN ('resolved', 'disputed')
GROUP BY u.id, u.stellar_address;

-- Unique index on user_id is required for CONCURRENTLY refresh
CREATE UNIQUE INDEX IF NOT EXISTS idx_address_aggregates_user_id
ON address_aggregates_mv (user_id);

-- Index for lookups by stellar address
CREATE INDEX IF NOT EXISTS idx_address_aggregates_stellar_address
ON address_aggregates_mv (stellar_address);

-- Index for ordered leaderboard queries
CREATE INDEX IF NOT EXISTS idx_address_aggregates_rank
ON address_aggregates_mv (rank);
2 changes: 1 addition & 1 deletion src/config/env-schema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ export const envSchema = z.object({
// ── Application ───────────────────────────────────────────
NODE_ENV: z.enum(["development", "test", "production"]).default("development"),
PORT: z.coerce.number().int().positive().default(3001),
LOG_LEVEL: z.enum(["fatal", "error", "warn", "info", "debug", "trace"]).default("info"),
LOG_LEVEL: z.enum(["fatal", "error", "warn", "info", "debug", "trace", "silent"]).default("info"),

// ── Database ──────────────────────────────────────────────
DATABASE_URL: z.string().url(),
Expand Down
1 change: 1 addition & 0 deletions src/db/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,4 @@ const pool = new Pool({
});

export const db = drizzle(pool, { schema });
export type Db = typeof db;
5 changes: 5 additions & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import { REQUEST_ID_HEADER } from "./lib/http";
import { register } from "./metrics/registry";
import { connectWithRetry, closeDb } from "./db/client";
import { stopScheduler } from "./services/scheduler";
import { startRefreshAggregatesWorker } from "./workers/refreshAggregates";

const docsEnabled = env.NODE_ENV !== "production" || process.env.ENABLE_DOCS === "true";

Expand Down Expand Up @@ -113,9 +114,11 @@ export function createApp(): express.Express {

if (require.main === module) {
const app = createApp();
let refreshWorker: NodeJS.Timeout | undefined;

connectWithRetry()
.then(() => {
refreshWorker = startRefreshAggregatesWorker();
app.listen(env.PORT, () => {
logger.info({ port: env.PORT, env: env.NODE_ENV }, "predictify-backend listening");
logger.info(`Swagger UI available at http://localhost:${env.PORT}/docs`);
Expand All @@ -133,6 +136,7 @@ if (require.main === module) {
process.exit(1);
}, 5000).unref();

if (refreshWorker) clearInterval(refreshWorker);
stopScheduler();
await closeDb();
clearTimeout(forceExit);
Expand All @@ -141,6 +145,7 @@ if (require.main === module) {

process.on("SIGINT", () => {
logger.info("SIGINT received, shutting down gracefully");
if (refreshWorker) clearInterval(refreshWorker);
stopScheduler();
process.exit(0);
});
Expand Down
2 changes: 1 addition & 1 deletion src/middleware/auth.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ export function requireAdmin(req: AuthenticatedRequest, res: Response, next: Nex

req.user = { id: stellarAddress, stellarAddress };
next();
} catch (err) {
} catch {
res.status(401).json({ error: { code: "unauthorized" } });
}
}
Expand Down
1 change: 1 addition & 0 deletions src/middleware/rateLimit.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import { logger } from "../config/logger";
* that can be consumed by downstream middleware or route handlers.
*/
declare global {
// eslint-disable-next-line @typescript-eslint/no-namespace
namespace Express {
interface Request {
/** Rate-limit context set by the rateLimit middleware on every request */
Expand Down
1 change: 1 addition & 0 deletions src/middleware/requireAdmin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import { env } from "../config/env";
// Augment Express Request so downstream handlers can read the admin identity
// without casting.
declare global {
// eslint-disable-next-line @typescript-eslint/no-namespace
namespace Express {
interface Request {
adminAddress?: string;
Expand Down
2 changes: 2 additions & 0 deletions src/routes/markets.ts
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ marketsRouter.patch("/:id", requireAdmin, async (req: AuthenticatedRequest, res,
const { question, metadata, expectedVersion } = parsed.data;
const adminAddress = req.user!.stellarAddress;

// eslint-disable-next-line @typescript-eslint/no-explicit-any
const patch: { question?: string; metadata?: any } = {};
if (question !== undefined) patch.question = question;
if (metadata !== undefined) patch.metadata = metadata;
Expand All @@ -112,6 +113,7 @@ marketsRouter.patch("/:id", requireAdmin, async (req: AuthenticatedRequest, res,
if (e instanceof VersionConflictError) {
return res.status(409).json({ error: { code: "version_conflict" } });
}
// eslint-disable-next-line @typescript-eslint/no-explicit-any
if ((e as any).status === 404) {
return res.status(404).json({ error: { code: "not_found" } });
}
Expand Down
90 changes: 50 additions & 40 deletions src/routes/users.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ usersRouter.get("/:address/predictions", async (req: Request, res: Response, nex

try {
stellarAddressSchema.parse(address);
} catch (e) {
} catch {
return res.status(400).json({ error: { code: "invalid_address" } });
}

Expand Down Expand Up @@ -69,49 +69,59 @@ usersRouter.get("/:address/predictions", async (req: Request, res: Response, nex
}
});

usersRouter.get("/:stellarAddress/profile", async (req: Request, res: Response, next: NextFunction) => {
const reqId = getRequestId() ?? (typeof (req as { id?: unknown }).id === "string" ? (req as { id?: string }).id : undefined);

const parseResult = stellarAddressSchema.safeParse(req.params.stellarAddress);
if (!parseResult.success) {
logger.warn(
{ reqId, stellarAddress: req.params.stellarAddress, issues: parseResult.error.issues },
"user_profile_validation_failed",
);
return res.status(400).json({
error: {
code: "validation_error",
message: parseResult.error.issues[0]?.message ?? "invalid stellar address",
requestId: reqId,
},
});
}

const stellarAddress = parseResult.data;

try {
logger.debug({ reqId, stellarAddress }, "user_profile_lookup");

const profile = await getUserProfile(stellarAddress);

if (!profile) {
logger.debug({ reqId, stellarAddress }, "user_profile_not_found");
return res.status(404).json({
/**
* GET /api/users/:stellarAddress/profile
*
* Public endpoint — no authentication required.
*
* Returns the profile for the user identified by `stellarAddress`.
*/
usersRouter.get(
"/:stellarAddress/profile",
async (req: Request, res: Response, next: NextFunction) => {
const reqId = getRequestId();

const parseResult = stellarAddressSchema.safeParse(req.params.stellarAddress);
if (!parseResult.success) {
logger.warn(
{ reqId, stellarAddress: req.params.stellarAddress, issues: parseResult.error.issues },
"user_profile_validation_failed",
);
return res.status(400).json({
error: {
code: "not_found",
message: "no user found with that stellar address",
code: "validation_error",
message: parseResult.error.issues[0]?.message ?? "invalid stellar address",
requestId: reqId,
},
});
}

logger.debug(
{ reqId, stellarAddress, predictionCount: profile.predictions.length },
"user_profile_found",
);
const stellarAddress = parseResult.data;

return res.json({ data: profile });
} catch (err) {
return next(err);
}
});
try {
logger.debug({ reqId, stellarAddress }, "user_profile_lookup");

const profile = await getUserProfile(stellarAddress);

if (!profile) {
logger.debug({ reqId, stellarAddress }, "user_profile_not_found");
return res.status(404).json({
error: {
code: "not_found",
message: "no user found with that stellar address",
requestId: reqId,
},
});
}

logger.debug(
{ reqId, stellarAddress, predictionCount: profile.predictions.length },
"user_profile_found",
);

return res.json({ data: profile });
} catch (err) {
return next(err);
}
},
);
68 changes: 68 additions & 0 deletions src/services/addressAggregatesService.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
import { db } from "../db";
import { sql } from "drizzle-orm";

export interface AddressAggregate extends Record<string, unknown> {
user_id: string;
stellar_address: string;
total_predictions: number;
correct_predictions: number;
accuracy_percentage: number;
rank: number;
}

/**
* Refresh the address_aggregates_mv materialized view concurrently.
* Uses CONCURRENTLY to avoid locking reads during refresh.
*/
export async function refreshAddressAggregates(): Promise<void> {
await db.execute(sql`REFRESH MATERIALIZED VIEW CONCURRENTLY address_aggregates_mv`);
}

/**
* Get paginated address aggregates ordered by rank.
*/
export async function getAddressAggregates(
limit: number = 50,
offset: number = 0
): Promise<AddressAggregate[]> {
const result = await db.execute<AddressAggregate>(
sql`
SELECT user_id, stellar_address, total_predictions, correct_predictions,
accuracy_percentage, rank
FROM address_aggregates_mv
ORDER BY rank ASC
LIMIT ${limit}
OFFSET ${offset}
`
);
return result.rows;
}

/**
* Look up a single address aggregate by stellar address.
*/
export async function getAddressAggregate(
stellarAddress: string
): Promise<AddressAggregate | null> {
const result = await db.execute<AddressAggregate>(
sql`
SELECT user_id, stellar_address, total_predictions, correct_predictions,
accuracy_percentage, rank
FROM address_aggregates_mv
WHERE stellar_address = ${stellarAddress}
LIMIT 1
`
);
return result.rows[0] || null;
}

/**
* Refresh the view then return paginated results.
*/
export async function getAddressAggregatesWithRefresh(
limit: number = 50,
offset: number = 0
): Promise<AddressAggregate[]> {
await refreshAddressAggregates();
return getAddressAggregates(limit, offset);
}
Loading
Loading