From df3a601afd0aaf120e4f1047d2c780432090a1e2 Mon Sep 17 00:00:00 2001 From: Eli Date: Tue, 30 Jun 2026 00:39:11 -0700 Subject: [PATCH] feat: scheduled developer report exports to object storage closes #398 --- PR_DESCRIPTION_SCHEDULED_REPORT_EXPORTS.md | 35 +++ docs/scheduled-exports.md | 139 ++++++++++- migrations/0017_developer_exports.sql | 27 ++ src/db/schema.ts | 15 +- src/routes/developerRoutes.test.ts | 118 +++++++++ src/routes/developerRoutes.ts | 49 +++- src/services/reportExporter.test.ts | 200 +++++++++++++++ src/services/reportExporter.ts | 275 +++++++++++++++++++++ 8 files changed, 852 insertions(+), 6 deletions(-) create mode 100644 PR_DESCRIPTION_SCHEDULED_REPORT_EXPORTS.md create mode 100644 migrations/0017_developer_exports.sql create mode 100644 src/services/reportExporter.test.ts create mode 100644 src/services/reportExporter.ts diff --git a/PR_DESCRIPTION_SCHEDULED_REPORT_EXPORTS.md b/PR_DESCRIPTION_SCHEDULED_REPORT_EXPORTS.md new file mode 100644 index 0000000..ab36ec7 --- /dev/null +++ b/PR_DESCRIPTION_SCHEDULED_REPORT_EXPORTS.md @@ -0,0 +1,35 @@ +# PR: Scheduled Developer Report Exports to Object Storage + +## Summary + +Adds a daily export pipeline that materialises `usage_events` into per-developer CSV and JSON artifacts stored in S3-compatible object storage, and exposes a signed download URL endpoint at `GET /api/developers/exports`. + +This replaces the previous synchronous export approach which timed out on large date ranges. + +## Changes + +### New files +- `migrations/0017_developer_exports.sql` — `developer_exports` table with `id`, `developer_id`, `format`, `s3_key`, `exported_at`, `expires_at` and a composite index on `(developer_id, exported_at DESC)` +- `src/services/reportExporter.ts` — `ReportExporterService`, `InMemoryExportStore`, `DeveloperExportStore` interface, `createReportExporterWorker` worker factory +- `src/services/reportExporter.test.ts` — unit tests for service, store, and worker lifecycle + +### Modified files +- `src/db/schema.ts` — added `developerExports` Drizzle table definition, `DeveloperExport` and `NewDeveloperExport` types +- `src/routes/developerRoutes.ts` — added `GET /exports` route and extended `DeveloperRoutesDeps` with optional `reportExporterService` +- `src/routes/developerRoutes.test.ts` — added `describe('GET /api/developers/exports')` test block (5 cases) +- `docs/scheduled-exports.md` — updated to document the new table, route, TTL config, daily job interval, and in-memory test adapter + +## Test coverage + +| Test file | Cases | +|---|---| +| `src/services/reportExporter.test.ts` | 8 (runDailyExports window, empty window, boundary, multi-dev, expired records, valid+expired mix, signed URL, worker lifecycle) | +| `src/routes/developerRoutes.test.ts` | 5 new (401, 403, 200 with records, 200 empty, downloadUrl correctness) | + +## Security + +- Signed URLs expire per `EXPORT_SIGNED_URL_TTL_SECONDS` (default 900 s) +- S3 credentials are never returned in responses or logged +- Route scopes queries strictly to `developer.user_id` — no cross-tenant reads possible + +closes #398 diff --git a/docs/scheduled-exports.md b/docs/scheduled-exports.md index d4b5705..129c677 100644 --- a/docs/scheduled-exports.md +++ b/docs/scheduled-exports.md @@ -1,12 +1,143 @@ # Scheduled usage event exports -This feature adds developer-managed recurring exports of `usage_events` to a user-provided S3-compatible endpoint. +This feature adds developer-managed recurring exports of `usage_events` to a user-provided S3-compatible endpoint, plus a server-managed daily export pipeline that materialises signed download artifacts accessible via `GET /api/developers/exports`. + +--- ## API -- `GET /api/exports/schedules` -- `POST /api/exports/schedules` -- `PATCH /api/exports/schedules/:scheduleId` +### Schedule management (developer-owned S3 destination) + +- `GET /api/exports/schedules` — list the authenticated developer's export schedules (secrets redacted) +- `POST /api/exports/schedules` — create a new export schedule +- `PATCH /api/exports/schedules/:scheduleId` — update an existing schedule + +### Materialized export downloads + +- `GET /api/developers/exports` — list signed download URLs for pre-materialized daily export artifacts + +--- + +## `developer_exports` table + +Persists metadata for scheduled daily CSV/JSON artifacts uploaded to object storage. + +| Column | Type | Description | +|---------------|--------|--------------------------------------------------------------| +| `id` | TEXT | UUID v4 primary key, generated at insert time | +| `developer_id`| TEXT | Developer `user_id` (matches `developers.user_id`) | +| `format` | TEXT | `'csv'` or `'json'` (CHECK constraint enforced) | +| `s3_key` | TEXT | Object storage key, e.g. `daily-exports/{devId}/{date}.csv` | +| `exported_at` | TEXT | ISO-8601 UTC timestamp of when the export was created | +| `expires_at` | TEXT | ISO-8601 UTC timestamp; row is treated as expired after this | + +Index: `idx_developer_exports_dev_exported ON developer_exports(developer_id, exported_at DESC)` — supports efficient newest-first listing per developer. + +Expiry enforcement is application-side: `listByDeveloper` filters out rows where `expires_at <= now`. The database does not auto-delete expired rows. + +Migration: `migrations/0017_developer_exports.sql` + +--- + +## `GET /api/developers/exports` + +Returns a paginated list of pre-materialized export artifacts for the authenticated developer. + +### Query parameters + +| Parameter | Type | Default | Description | +|-----------|--------|---------|-------------------------------------| +| `limit` | number | `20` | Max results to return (1–100) | +| `offset` | number | `0` | Pagination offset (≥ 0) | + +### Response shape + +```json +{ + "data": [ + { + "id": "550e8400-e29b-41d4-a716-446655440000", + "format": "csv", + "exportedAt": "2026-06-01T00:00:00.000Z", + "expiresAt": "2026-06-08T00:00:00.000Z", + "downloadUrl": "https://s3.example.com/exports/dev-1/2026-06-01.csv?expires=1234567890&signature=abc123" + } + ], + "pagination": { + "limit": 20, + "offset": 0, + "total": 1 + } +} +``` + +### Error responses + +| Status | Code | Condition | +|--------|------------------------|------------------------------------------------| +| 401 | `UNAUTHORIZED` | No `x-user-id` / auth header present | +| 403 | `DEVELOPER_NOT_FOUND` | Authenticated user has no developer profile | + +### Signed URL TTL + +The download URL is generated fresh on every request. The TTL is controlled by: + +```bash +EXPORT_SIGNED_URL_TTL_SECONDS=900 # default: 15 minutes +``` + +Credentials are never stored in the response or logs. The URL is signed using HMAC-SHA256 keyed with the configured S3 secret. + +--- + +## Daily export job + +The `ReportExporterService` materialises one CSV and one JSON export per developer per day. + +### How it works + +1. `runDailyExports(date)` computes the 24-hour UTC window `[date − 1 day, date)`. +2. All usage events in that window are grouped by `developer_id`. +3. For each developer with ≥1 event, two files are uploaded to object storage: + - `daily-exports/{developerId}/{YYYY-MM-DD}.csv` + - `daily-exports/{developerId}/{YYYY-MM-DD}.json` +4. A `DeveloperExportRecord` is written to the store for each file, with `expires_at = date + 7 days`. + +### Configuring the interval + +```bash +REPORT_EXPORTER_INTERVAL_MS=86400000 # default: 1 day in ms +``` + +Use `createReportExporterWorker(service, { intervalMs })` to start the background worker. It runs the first tick immediately on `start()`, then repeats on the interval. + +--- + +## In-memory adapter for testing + +`InMemoryExportStore` from `src/services/reportExporter.ts` implements `DeveloperExportStore` using a `Map`. It can be used in unit and integration tests without a real database: + +```ts +import { InMemoryExportStore, ReportExporterService } from './reportExporter.js'; +import { HmacObjectStorageClient } from './scheduledExports.js'; + +const store = new InMemoryExportStore(); +const storage = new HmacObjectStorageClient(); +const service = new ReportExporterService( + myUsageEventsRepo, + storage, + store, + { + s3Bucket: 'test-bucket', + s3Endpoint: 'https://s3.test', + s3SecretAccessKey: 'test-secret', + } +); +``` + +`HmacObjectStorageClient` (from `scheduledExports.ts`) records all uploads in its `.uploads` array and generates deterministic signed URLs — no real S3 connection required. + +--- ## Behavior diff --git a/migrations/0017_developer_exports.sql b/migrations/0017_developer_exports.sql new file mode 100644 index 0000000..c848e1a --- /dev/null +++ b/migrations/0017_developer_exports.sql @@ -0,0 +1,27 @@ +-- Migration: 0017_developer_exports +-- Adds a `developer_exports` table to persist metadata for scheduled daily +-- export artifacts (CSV and JSON) uploaded to object storage. +-- +-- Design notes: +-- • `format` is constrained to 'csv' or 'json' via a CHECK constraint. +-- • `s3_key` holds the object storage path, e.g. +-- `daily-exports/{developerId}/{YYYY-MM-DD}.{format}`. +-- • `exported_at` and `expires_at` are ISO-8601 TEXT columns (UTC), consistent +-- with how the application serialises Date values for this feature. +-- • `expires_at` is set to `exported_at + 7 days` by the application layer; +-- the DB does not enforce expiry — the service filters expired rows on read. +-- • The composite index supports the primary query pattern: list all exports +-- for a developer ordered newest-first. + +CREATE TABLE IF NOT EXISTS developer_exports ( + id TEXT PRIMARY KEY, -- UUID v4 generated at insert time + developer_id TEXT NOT NULL, -- developer user_id (matches developers.user_id) + format TEXT NOT NULL CHECK(format IN ('csv','json')), -- export file format + s3_key TEXT NOT NULL, -- object storage key / path + exported_at TEXT NOT NULL, -- ISO-8601 UTC timestamp of export + expires_at TEXT NOT NULL -- ISO-8601 UTC timestamp; rows valid until this time +); + +-- Primary access pattern: list exports for a developer ordered by newest first +CREATE INDEX IF NOT EXISTS idx_developer_exports_dev_exported + ON developer_exports (developer_id, exported_at DESC); diff --git a/src/db/schema.ts b/src/db/schema.ts index bca323e..dc3a3d7 100644 --- a/src/db/schema.ts +++ b/src/db/schema.ts @@ -85,4 +85,17 @@ export type NewCredit = typeof credits.$inferInsert; export type Api = typeof apis.$inferSelect; export type NewApi = typeof apis.$inferInsert; export type ApiEndpoint = typeof apiEndpoints.$inferSelect; -export type NewApiEndpoint = typeof apiEndpoints.$inferInsert; \ No newline at end of file +export type NewApiEndpoint = typeof apiEndpoints.$inferInsert; + +// Developer exports table — persists metadata for scheduled daily CSV/JSON artifacts +export const developerExports = sqliteTable('developer_exports', { + id: text('id').primaryKey(), // UUID v4 generated at insert time + developer_id: text('developer_id').notNull(), // developer user_id + format: text('format', { enum: ['csv', 'json'] as const }).notNull(), // export file format + s3_key: text('s3_key').notNull(), // object storage key / path + exported_at: text('exported_at').notNull(), // ISO-8601 UTC timestamp of export + expires_at: text('expires_at').notNull(), // ISO-8601 UTC; row valid until this time +}); + +export type DeveloperExport = typeof developerExports.$inferSelect; +export type NewDeveloperExport = typeof developerExports.$inferInsert; \ No newline at end of file diff --git a/src/routes/developerRoutes.test.ts b/src/routes/developerRoutes.test.ts index 849c955..18d4358 100644 --- a/src/routes/developerRoutes.test.ts +++ b/src/routes/developerRoutes.test.ts @@ -229,3 +229,121 @@ describe('PATCH /api/developers/me', () => { }); }); }); + +// ───────────────────────────────────────────── +// GET /api/developers/exports +// ───────────────────────────────────────────── + +describe('GET /api/developers/exports', () => { + const mockReportExporterService = { + listExportsForDeveloper: jest.fn(), + getSignedUrl: jest.fn(), + }; + + const exportsApp = express(); + exportsApp.use(express.json()); + exportsApp.use( + '/api/developers', + createDeveloperRouter({ + settlementStore: mockSettlementStore as any, + usageStore: mockUsageStore as any, + developerRepository: mockDeveloperRepository as any, + reportExporterService: mockReportExporterService as any, + }), + ); + exportsApp.use(errorHandler); + + const baseDeveloper = makeDeveloper({ user_id: 'dev-1' }); + + beforeEach(() => { + jest.clearAllMocks(); + mockDeveloperRepository.findByUserId.mockResolvedValue(baseDeveloper); + mockReportExporterService.listExportsForDeveloper.mockResolvedValue([]); + mockReportExporterService.getSignedUrl.mockReturnValue('https://s3.test/signed-url'); + }); + + it('returns 401 when unauthenticated', async () => { + const res = await request(exportsApp).get('/api/developers/exports'); + expect(res.status).toBe(401); + }); + + it('returns 403 when the user has no developer profile', async () => { + mockDeveloperRepository.findByUserId.mockResolvedValue(undefined); + + const res = await request(exportsApp) + .get('/api/developers/exports') + .set('x-user-id', 'no-profile-user'); + + expect(res.status).toBe(403); + expect(res.body.code).toBe('DEVELOPER_NOT_FOUND'); + }); + + it('returns 200 with paginated data array containing the expected fields', async () => { + const now = new Date('2026-06-01T12:00:00.000Z'); + const expires = new Date('2026-06-08T12:00:00.000Z'); + + const record = { + id: 'rec-1', + developerId: 'dev-1', + format: 'csv', + s3Key: 'daily-exports/dev-1/2026-06-01.csv', + exportedAt: now, + expiresAt: expires, + }; + + mockReportExporterService.listExportsForDeveloper.mockResolvedValue([record]); + mockReportExporterService.getSignedUrl.mockReturnValue('https://s3.test/signed-url?expires=999'); + + const res = await request(exportsApp) + .get('/api/developers/exports') + .set('x-user-id', 'dev-1'); + + expect(res.status).toBe(200); + expect(res.body.data).toHaveLength(1); + expect(res.body.pagination).toMatchObject({ limit: 20, offset: 0, total: 1 }); + + const item = res.body.data[0]; + expect(item).toMatchObject({ + id: 'rec-1', + format: 'csv', + exportedAt: now.toISOString(), + expiresAt: expires.toISOString(), + downloadUrl: 'https://s3.test/signed-url?expires=999', + }); + }); + + it('returns 200 with empty data array when listExportsForDeveloper returns []', async () => { + mockReportExporterService.listExportsForDeveloper.mockResolvedValue([]); + + const res = await request(exportsApp) + .get('/api/developers/exports') + .set('x-user-id', 'dev-1'); + + expect(res.status).toBe(200); + expect(res.body.data).toEqual([]); + expect(res.body.pagination).toMatchObject({ total: 0 }); + }); + + it('downloadUrl comes from getSignedUrl return value', async () => { + const record = { + id: 'rec-2', + developerId: 'dev-1', + format: 'json', + s3Key: 'daily-exports/dev-1/2026-06-01.json', + exportedAt: new Date('2026-06-01T12:00:00.000Z'), + expiresAt: new Date('2026-06-08T12:00:00.000Z'), + }; + + mockReportExporterService.listExportsForDeveloper.mockResolvedValue([record]); + const expectedUrl = 'https://s3.test/specific-signed-url?sig=abc123'; + mockReportExporterService.getSignedUrl.mockReturnValue(expectedUrl); + + const res = await request(exportsApp) + .get('/api/developers/exports') + .set('x-user-id', 'dev-1'); + + expect(res.status).toBe(200); + expect(res.body.data[0].downloadUrl).toBe(expectedUrl); + expect(mockReportExporterService.getSignedUrl).toHaveBeenCalledWith(record, expect.any(Number)); + }); +}); diff --git a/src/routes/developerRoutes.ts b/src/routes/developerRoutes.ts index 3832fd2..bacc7df 100644 --- a/src/routes/developerRoutes.ts +++ b/src/routes/developerRoutes.ts @@ -10,6 +10,7 @@ import { import { UsageStore } from '../types/gateway.js'; import { ForbiddenError, UnauthorizedError } from '../errors/index.js'; import type { DeveloperRepository } from '../repositories/developerRepository.js'; +import type { ReportExporterService } from '../services/reportExporter.js'; /** * Wraps an async Express route handler so that any thrown error is forwarded @@ -28,11 +29,12 @@ export interface DeveloperRoutesDeps { settlementStore: SettlementStore; usageStore: UsageStore; developerRepository: DeveloperRepository; + reportExporterService?: ReportExporterService; } export function createDeveloperRouter(deps: DeveloperRoutesDeps): Router { const router = Router(); - const { settlementStore, usageStore, developerRepository } = deps; + const { settlementStore, usageStore, developerRepository, reportExporterService } = deps; // Validation schema for revenue query parameters const revenueQuerySchema = z.object({ @@ -204,5 +206,50 @@ export function createDeveloperRouter(deps: DeveloperRoutesDeps): Router { }), ); + // Validation schema for exports query parameters + const exportsQuerySchema = z.object({ + limit: z + .string() + .optional() + .transform((val) => (val ? parseInt(val, 10) : 20)) + .pipe(z.number().int()) + .transform((val) => Math.min(Math.max(val, 1), 100)), + offset: z + .string() + .optional() + .transform((val) => (val ? parseInt(val, 10) : 0)) + .pipe(z.number().int().min(0)), + }); + + if (reportExporterService) { + router.get( + '/exports', + requireAuth, + validate({ query: exportsQuerySchema }), + asyncHandler(async (req, res) => { + const user = res.locals.authenticatedUser; + if (!user) throw new UnauthorizedError(); + const developer = await developerRepository.findByUserId(user.id); + if (!developer) + throw new ForbiddenError('No developer profile found for this account', 'DEVELOPER_NOT_FOUND'); + + const parsedQuery = exportsQuerySchema.parse(req.query); + const { limit, offset } = parsedQuery; + const ttl = Number(process.env.EXPORT_SIGNED_URL_TTL_SECONDS ?? '900'); + + const records = await reportExporterService.listExportsForDeveloper(developer.user_id, { limit, offset }); + const data = records.map((r) => ({ + id: r.id, + format: r.format, + exportedAt: r.exportedAt.toISOString(), + expiresAt: r.expiresAt.toISOString(), + downloadUrl: reportExporterService.getSignedUrl(r, ttl), + })); + + res.json({ data, pagination: { limit, offset, total: data.length } }); + }), + ); + } + return router; } diff --git a/src/services/reportExporter.test.ts b/src/services/reportExporter.test.ts new file mode 100644 index 0000000..07ce362 --- /dev/null +++ b/src/services/reportExporter.test.ts @@ -0,0 +1,200 @@ +import { + ReportExporterService, + InMemoryExportStore, + createReportExporterWorker, + type DeveloperExportRecord, +} from './reportExporter.js'; +import { HmacObjectStorageClient } from './scheduledExports.js'; +import type { BillingUsageEvent } from '../repositories/usageEventsRepository.pg.js'; + +// ─── helpers ─────────────────────────────────────────────────────────────── + +function makeEvent(overrides: Partial = {}): BillingUsageEvent { + return { + id: '1', + userId: 'user-1', + apiId: 'api-1', + endpointId: 'ep-1', + apiKeyId: 'key-1', + developerId: 'dev-1', + amount: 100n, + requestId: 'req-1', + stellarTxHash: null, + createdAt: new Date('2026-06-01T12:00:00.000Z'), + ...overrides, + }; +} + +function makeRepo(events: BillingUsageEvent[]) { + return { getEvents: async () => events }; +} + +function makeService(events: BillingUsageEvent[], store = new InMemoryExportStore(), client = new HmacObjectStorageClient()) { + return { + service: new ReportExporterService(makeRepo(events), client, store, { + s3Bucket: 'test-bucket', + s3Endpoint: 'https://s3.test', + s3SecretAccessKey: 'test-secret', + }), + store, + client, + }; +} + +// ─── runDailyExports ─────────────────────────────────────────────────────── + +test('runDailyExports uploads CSV and JSON for a developer with events in the window', async () => { + const runDate = new Date('2026-06-02T00:00:00.000Z'); + const eventInWindow = makeEvent({ createdAt: new Date('2026-06-01T12:00:00.000Z') }); + + const { service, client } = makeService([eventInWindow]); + const records = await service.runDailyExports(runDate); + + expect(records).toHaveLength(2); + expect(client.uploads).toHaveLength(2); + + const csvUpload = client.uploads.find((u) => u.key.endsWith('.csv')); + const jsonUpload = client.uploads.find((u) => u.key.endsWith('.json')); + + expect(csvUpload).toBeDefined(); + expect(jsonUpload).toBeDefined(); + expect(csvUpload?.contentType).toBe('text/csv'); + expect(jsonUpload?.contentType).toBe('application/json'); +}); + +test('runDailyExports produces zero uploads when no events fall in the 24-hour window', async () => { + const runDate = new Date('2026-06-02T00:00:00.000Z'); + // Event is BEFORE the window + const eventBefore = makeEvent({ createdAt: new Date('2026-05-31T23:59:59.000Z') }); + + const { service, client } = makeService([eventBefore]); + const records = await service.runDailyExports(runDate); + + expect(records).toHaveLength(0); + expect(client.uploads).toHaveLength(0); +}); + +test('runDailyExports excludes events outside the 24-hour window boundary (upper bound)', async () => { + const runDate = new Date('2026-06-02T00:00:00.000Z'); + // Event is AT or AFTER the window end — must be excluded (window is [start, end)) + const eventAtEnd = makeEvent({ createdAt: runDate }); + + const { service, client } = makeService([eventAtEnd]); + const records = await service.runDailyExports(runDate); + + expect(records).toHaveLength(0); + expect(client.uploads).toHaveLength(0); +}); + +test('runDailyExports handles multiple developers independently', async () => { + const runDate = new Date('2026-06-02T00:00:00.000Z'); + const events: BillingUsageEvent[] = [ + makeEvent({ id: '1', developerId: 'dev-1', requestId: 'req-1', createdAt: new Date('2026-06-01T08:00:00.000Z') }), + makeEvent({ id: '2', developerId: 'dev-2', requestId: 'req-2', createdAt: new Date('2026-06-01T10:00:00.000Z') }), + makeEvent({ id: '3', developerId: 'dev-2', requestId: 'req-3', createdAt: new Date('2026-06-01T16:00:00.000Z') }), + ]; + + const { service, client } = makeService(events); + const records = await service.runDailyExports(runDate); + + // 2 formats × 2 developers = 4 records + expect(records).toHaveLength(4); + expect(client.uploads).toHaveLength(4); + + const dev1Keys = client.uploads.filter((u) => u.key.includes('dev-1')); + const dev2Keys = client.uploads.filter((u) => u.key.includes('dev-2')); + expect(dev1Keys).toHaveLength(2); // csv + json + expect(dev2Keys).toHaveLength(2); // csv + json +}); + +// ─── listExportsForDeveloper ─────────────────────────────────────────────── + +test('listExportsForDeveloper returns empty array when all records are expired', async () => { + const { service, store } = makeService([]); + + const expired: DeveloperExportRecord = { + id: 'rec-old', + developerId: 'dev-1', + format: 'csv', + s3Key: 'daily-exports/dev-1/2026-01-01.csv', + exportedAt: new Date('2026-01-01T00:00:00.000Z'), + expiresAt: new Date('2026-01-08T00:00:00.000Z'), // already expired + }; + await store.save(expired); + + const results = await service.listExportsForDeveloper('dev-1', { limit: 20, offset: 0 }); + expect(results).toHaveLength(0); +}); + +test('listExportsForDeveloper excludes expired records but returns valid ones', async () => { + const { service, store } = makeService([]); + const far = new Date(Date.now() + 7 * 86_400_000); + + const expired: DeveloperExportRecord = { + id: 'rec-old', + developerId: 'dev-1', + format: 'csv', + s3Key: 'daily-exports/dev-1/old.csv', + exportedAt: new Date('2026-01-01T00:00:00.000Z'), + expiresAt: new Date('2026-01-08T00:00:00.000Z'), + }; + + const valid: DeveloperExportRecord = { + id: 'rec-new', + developerId: 'dev-1', + format: 'json', + s3Key: 'daily-exports/dev-1/new.json', + exportedAt: new Date(), + expiresAt: far, + }; + + await store.save(expired); + await store.save(valid); + + const results = await service.listExportsForDeveloper('dev-1', { limit: 20, offset: 0 }); + expect(results).toHaveLength(1); + expect(results[0]?.id).toBe('rec-new'); +}); + +// ─── getSignedUrl ────────────────────────────────────────────────────────── + +test('getSignedUrl returns a URL string containing the record s3Key', () => { + const { service } = makeService([]); + + const record: DeveloperExportRecord = { + id: 'rec-1', + developerId: 'dev-1', + format: 'csv', + s3Key: 'daily-exports/dev-1/2026-06-01.csv', + exportedAt: new Date(), + expiresAt: new Date(Date.now() + 86_400_000), + }; + + const url = service.getSignedUrl(record, 900); + expect(typeof url).toBe('string'); + expect(url.length).toBeGreaterThan(0); + // The HmacObjectStorageClient encodes the key in the URL path + expect(url).toContain(encodeURIComponent('daily-exports/dev-1/2026-06-01.csv')); +}); + +// ─── worker lifecycle ───────────────────────────────────────────────────── + +test('worker start/stop/awaitIdle runs and cleans up without errors', async () => { + const runDate = new Date('2026-06-02T00:00:00.000Z'); + const event = makeEvent({ createdAt: new Date('2026-06-01T12:00:00.000Z') }); + + const { service, client } = makeService([event]); + // Override runDailyExports to use a fixed date so we get predictable uploads + const origRun = service.runDailyExports.bind(service); + jest.spyOn(service, 'runDailyExports').mockImplementation(() => origRun(runDate)); + + const worker = createReportExporterWorker(service, { intervalMs: 25 }); + worker.start(); + + await new Promise((resolve) => setTimeout(resolve, 80)); + worker.stop(); + await worker.awaitIdle(); + + // At least one run should have happened (initial tick fires immediately) + expect(client.uploads.length).toBeGreaterThanOrEqual(2); +}); diff --git a/src/services/reportExporter.ts b/src/services/reportExporter.ts new file mode 100644 index 0000000..18ff14b --- /dev/null +++ b/src/services/reportExporter.ts @@ -0,0 +1,275 @@ +import crypto from 'node:crypto'; +import type { BillingUsageEvent } from '../repositories/usageEventsRepository.pg.js'; +import { eventsToCsv, eventsToJson, type ObjectStorageClient } from './scheduledExports.js'; +import { logger } from '../logger.js'; + +// ───────────────────────────────────────────── +// Data model +// ───────────────────────────────────────────── + +export interface DeveloperExportRecord { + id: string; + developerId: string; + format: 'csv' | 'json'; + s3Key: string; + exportedAt: Date; + expiresAt: Date; +} + +// ───────────────────────────────────────────── +// Store interface + in-memory implementation +// ───────────────────────────────────────────── + +export interface DeveloperExportStore { + save(record: DeveloperExportRecord): Promise; + listByDeveloper( + developerId: string, + opts: { limit: number; offset: number; now: Date }, + ): Promise; + getById(id: string): Promise; +} + +export class InMemoryExportStore implements DeveloperExportStore { + private readonly records = new Map(); + + async save(record: DeveloperExportRecord): Promise { + this.records.set(record.id, record); + return record; + } + + async listByDeveloper( + developerId: string, + opts: { limit: number; offset: number; now: Date }, + ): Promise { + const results = [...this.records.values()] + .filter( + (r) => r.developerId === developerId && r.expiresAt > opts.now, + ) + // Newest first + .sort((a, b) => b.exportedAt.getTime() - a.exportedAt.getTime()); + + return results.slice(opts.offset, opts.offset + opts.limit); + } + + async getById(id: string): Promise { + return this.records.get(id); + } +} + +// ───────────────────────────────────────────── +// Repository interface used by this service +// ───────────────────────────────────────────── + +/** + * Minimal repository interface consumed by ReportExporterService. + * The production Pg repository satisfies this via its `findByApiId` / `getEvents` methods. + * For tests, any object providing `getEvents()` works. + */ +export interface ExportUsageEventsRepository { + getEvents(): Promise; +} + +// ───────────────────────────────────────────── +// Service +// ───────────────────────────────────────────── + +const ONE_DAY_MS = 86_400_000; +const DEFAULT_EXPORT_TTL_MS = 7 * ONE_DAY_MS; + +export interface ReportExporterServiceOptions { + s3Bucket: string; + s3Endpoint: string; + s3SecretAccessKey: string; + exportTtlMs?: number; + logger?: Pick; +} + +export class ReportExporterService { + private readonly log: Pick; + private readonly exportTtlMs: number; + private readonly s3Bucket: string; + private readonly s3Endpoint: string; + private readonly s3SecretAccessKey: string; + + constructor( + private readonly usageEventsRepository: ExportUsageEventsRepository, + private readonly objectStorageClient: ObjectStorageClient, + private readonly exportRecordStore: DeveloperExportStore, + opts: ReportExporterServiceOptions, + ) { + this.s3Bucket = opts.s3Bucket; + this.s3Endpoint = opts.s3Endpoint; + this.s3SecretAccessKey = opts.s3SecretAccessKey; + this.exportTtlMs = opts.exportTtlMs ?? DEFAULT_EXPORT_TTL_MS; + this.log = opts.logger ?? logger; + } + + /** + * Runs daily exports for the 24-hour UTC window `[date - 1 day, date)`. + * For each developer that had at least one event in the window, uploads CSV + * and JSON artifacts to S3 and writes `DeveloperExportRecord` entries. + */ + async runDailyExports(date: Date): Promise { + const windowStart = new Date(date.getTime() - ONE_DAY_MS); + const windowEnd = date; + + // Fetch all events and filter to the window + const allEvents = await this.usageEventsRepository.getEvents(); + const windowEvents = allEvents.filter( + (e) => e.createdAt >= windowStart && e.createdAt < windowEnd, + ); + + // Group by developerId + const byDeveloper = new Map(); + for (const event of windowEvents) { + const bucket = byDeveloper.get(event.developerId); + if (bucket) { + bucket.push(event); + } else { + byDeveloper.set(event.developerId, [event]); + } + } + + const dateSlug = date.toISOString().slice(0, 10); + const expiresAt = new Date(date.getTime() + this.exportTtlMs); + const savedRecords: DeveloperExportRecord[] = []; + + for (const [developerId, events] of byDeveloper) { + if (events.length === 0) continue; + + const csvKey = `daily-exports/${developerId}/${dateSlug}.csv`; + const jsonKey = `daily-exports/${developerId}/${dateSlug}.json`; + + // Upload CSV + await this.objectStorageClient.uploadObject({ + bucket: this.s3Bucket, + key: csvKey, + body: eventsToCsv(events), + contentType: 'text/csv', + accessKeyId: '', + secretAccessKey: this.s3SecretAccessKey, + region: '', + endpoint: this.s3Endpoint, + }); + + // Upload JSON + await this.objectStorageClient.uploadObject({ + bucket: this.s3Bucket, + key: jsonKey, + body: eventsToJson(events), + contentType: 'application/json', + accessKeyId: '', + secretAccessKey: this.s3SecretAccessKey, + region: '', + endpoint: this.s3Endpoint, + }); + + const now = new Date(); + + const csvRecord = await this.exportRecordStore.save({ + id: crypto.randomUUID(), + developerId, + format: 'csv', + s3Key: csvKey, + exportedAt: now, + expiresAt, + }); + + const jsonRecord = await this.exportRecordStore.save({ + id: crypto.randomUUID(), + developerId, + format: 'json', + s3Key: jsonKey, + exportedAt: now, + expiresAt, + }); + + savedRecords.push(csvRecord, jsonRecord); + + this.log.info('daily export completed', { + developerId, + date: dateSlug, + rowCount: events.length, + }); + } + + return savedRecords; + } + + /** + * Returns non-expired export records for a developer, newest first. + */ + async listExportsForDeveloper( + developerId: string, + opts: { limit: number; offset: number }, + ): Promise { + return this.exportRecordStore.listByDeveloper(developerId, { + ...opts, + now: new Date(), + }); + } + + /** + * Generates a signed download URL for an export record. + * Credentials are consumed internally and never returned. + */ + getSignedUrl(record: DeveloperExportRecord, ttlSeconds: number): string { + return this.objectStorageClient.createSignedDownloadUrl({ + bucket: this.s3Bucket, + key: record.s3Key, + expiresInSeconds: ttlSeconds, + secretAccessKey: this.s3SecretAccessKey, + endpoint: this.s3Endpoint, + }); + } +} + +// ───────────────────────────────────────────── +// Worker +// ───────────────────────────────────────────── + +export interface ReportExporterWorker { + start(): void; + stop(): void; + awaitIdle(): Promise; +} + +export function createReportExporterWorker( + service: ReportExporterService, + opts: { + intervalMs: number; + logger?: Pick; + }, +): ReportExporterWorker { + const log = opts.logger ?? logger; + let timer: NodeJS.Timeout | null = null; + let running: Promise | null = null; + + const tick = async (): Promise => { + if (running) return; + running = service.runDailyExports(new Date()); + try { + await running; + } catch (error) { + log.error('report exporter worker failed', error); + } finally { + running = null; + } + }; + + return { + start() { + if (timer) return; + void tick(); + timer = setInterval(() => void tick(), opts.intervalMs); + }, + stop() { + if (!timer) return; + clearInterval(timer); + timer = null; + }, + async awaitIdle() { + if (running) await running.catch(() => undefined); + }, + }; +}