diff --git a/indexer/README.md b/indexer/README.md index b6ac419..4aaeebd 100644 --- a/indexer/README.md +++ b/indexer/README.md @@ -27,6 +27,26 @@ bun run indexer:test bun run indexer:lint ``` +### Runnable Indexers + +Each domain indexer has a standalone entrypoint (`src/main.ts`) that can be +launched from the repository root: + +```bash +# Start the payment streams indexer +bun run indexer:streams + +# Start the token distributions indexer +bun run indexer:distributions +``` + +Or directly from each domain package: + +```bash +cd indexer/streams && bun start +cd indexer/distributions && bun start +``` + The nested `justfile` remains available for contributors who work from this directory: @@ -46,6 +66,32 @@ that reads these variables, parses numeric values (`INDEXER_PORT`, `POLL_INTERVAL_MS`, `START_LEDGER`), and fails fast with a clear error when required values are missing or malformed. +| Variable | Required | Description | +|---|---|---| +| `DATABASE_URL` | ✅ | PostgreSQL connection string | +| `SOROBAN_RPC_URL` | ✅ | Soroban RPC endpoint | +| `STREAMS_CONTRACT_IDS` | ✅ (streams) | Comma-separated stream contract IDs | +| `DISTRIBUTIONS_CONTRACT_IDS` | ✅ (distributions) | Comma-separated distribution contract IDs | +| `START_LEDGER` | ☐ | Ledger sequence to start from (defaults to latest) | +| `POLL_INTERVAL_MS` | ☐ | Polling interval in ms (default: 5000) | +| `LOG_LEVEL` | ☐ | Log verbosity: `debug`, `info`, `warn`, `error` | + +## Idempotency + +The `HandlerRegistry.dispatch()` method integrates with `EventRepository` to +guarantee at-most-once processing per Soroban event: + +1. Before dispatching, it checks whether the event's (`contractId`, `ledger`, + `txHash`, `eventIndex`) tuple has already been recorded in the database. +2. If the event was previously processed, dispatch is skipped (returns `[]`). +3. If all matched handlers succeed (or no handlers match), the event is recorded + as processed. +4. If any handler fails, the event is **not** recorded — allowing it to be + retried on the next polling cycle. + +Event identity is extracted deterministically from the event's `pagingToken` +or `id` fields by `getSorobanEventIdentity()` in `common/src/db/repository.ts`. + ## Database & Migrations Persistence uses PostgreSQL with [TypeORM](https://typeorm.io). Entities live @@ -60,6 +106,7 @@ via `experimentalDecorators` and `emitDecoratorMetadata`. ## Status This workspace provides the indexer foundation — validated config, a Soroban RPC -client, a TypeORM persistence layer (entities and repository), a poller, and an -event-handler registry, plus the streams and distributions handlers. The GraphQL -API is planned but not yet implemented. +client, a TypeORM persistence layer (entities and repository), a poller, an +event-handler registry with idempotency guarantees, runnable entrypoints for the +streams and distributions indexers, and all matching handlers. The GraphQL API +is planned but not yet implemented. diff --git a/indexer/common/src/config/env.test.ts b/indexer/common/src/config/env.test.ts index 6ed0c56..bf6f9ab 100644 --- a/indexer/common/src/config/env.test.ts +++ b/indexer/common/src/config/env.test.ts @@ -12,7 +12,11 @@ const validEnv = { describe("loadConfig", () => { test("returns typed values for a valid environment", () => { - const config = loadConfig(validEnv); + const config = loadConfig({ + ...validEnv, + STREAMS_CONTRACT_IDS: "C123,C456", + DISTRIBUTIONS_CONTRACT_IDS: "C789", + }); expect(config).toEqual({ databaseUrl: "postgres://postgres:postgres@localhost:5432/fundable_indexer", @@ -20,10 +24,12 @@ describe("loadConfig", () => { pollIntervalMs: 5000, startLedger: 1000, logLevel: "debug", + streamsContractIds: ["C123", "C456"], + distributionsContractIds: ["C789"], }); }); - test("defaults the log level and treats blank START_LEDGER as unset", () => { + test("defaults the log level and treats blank START_LEDGER as unset, defaulting contract IDs to empty arrays", () => { const config = loadConfig({ INDEXER_DATABASE_URL: validEnv.INDEXER_DATABASE_URL, INDEXER_PORT: "4000", @@ -34,6 +40,8 @@ describe("loadConfig", () => { expect(config.logLevel).toBe("info"); expect(config.startLedger).toBeUndefined(); expect("startLedger" in config).toBe(false); + expect(config.streamsContractIds).toEqual([]); + expect(config.distributionsContractIds).toEqual([]); }); test("fails with a clear error when a required value is missing", () => { diff --git a/indexer/common/src/config/env.ts b/indexer/common/src/config/env.ts index d53210a..32f358e 100644 --- a/indexer/common/src/config/env.ts +++ b/indexer/common/src/config/env.ts @@ -19,6 +19,10 @@ export interface IndexerConfig { readonly startLedger?: number; /** Logging verbosity. */ readonly logLevel: "error" | "warn" | "info" | "debug"; + /** Configured Stellar contract IDs for streams indexer. */ + readonly streamsContractIds: string[]; + /** Configured Stellar contract IDs for distributions indexer. */ + readonly distributionsContractIds: string[]; } /** A positive integer parsed from an environment string. */ @@ -40,6 +44,30 @@ const configSchema = z.object({ POLL_INTERVAL_MS: positiveIntFromString, START_LEDGER: positiveIntFromString.optional(), INDEXER_LOG_LEVEL: z.enum(["error", "warn", "info", "debug"]).optional().default("info"), + STREAMS_CONTRACT_IDS: z + .string() + .optional() + .default("") + .transform((val) => + val + ? val + .split(",") + .map((s) => s.trim()) + .filter(Boolean) + : [], + ), + DISTRIBUTIONS_CONTRACT_IDS: z + .string() + .optional() + .default("") + .transform((val) => + val + ? val + .split(",") + .map((s) => s.trim()) + .filter(Boolean) + : [], + ), }); /** @@ -69,6 +97,8 @@ export function loadConfig(env: NodeJS.ProcessEnv = process.env): IndexerConfig "POLL_INTERVAL_MS", "START_LEDGER", "INDEXER_LOG_LEVEL", + "STREAMS_CONTRACT_IDS", + "DISTRIBUTIONS_CONTRACT_IDS", ]) { const value = env[key]; normalized[key] = value === undefined || value.trim() === "" ? undefined : value; @@ -89,6 +119,8 @@ export function loadConfig(env: NodeJS.ProcessEnv = process.env): IndexerConfig port: parsed.INDEXER_PORT, pollIntervalMs: parsed.POLL_INTERVAL_MS, logLevel: parsed.INDEXER_LOG_LEVEL, + streamsContractIds: parsed.STREAMS_CONTRACT_IDS, + distributionsContractIds: parsed.DISTRIBUTIONS_CONTRACT_IDS, ...(parsed.START_LEDGER !== undefined ? { startLedger: parsed.START_LEDGER } : {}), }; diff --git a/indexer/common/src/db/repository.ts b/indexer/common/src/db/repository.ts index b97e5f7..56c92d8 100644 --- a/indexer/common/src/db/repository.ts +++ b/indexer/common/src/db/repository.ts @@ -1,6 +1,45 @@ import type { DataSource, Repository } from "typeorm"; +import type { SorobanEventInput } from "../handlers/types.js"; import { IndexedEvent } from "./entity/IndexedEvent.js"; +export interface EventIdentity { + contractId: string; + ledgerNumber: number; + txHash: string; + eventIndex: number; +} + +/** + * Maps a SorobanEventInput to its deterministic event identity fields. + */ +export function getSorobanEventIdentity(event: SorobanEventInput): EventIdentity { + let txHash = event.pagingToken || event.id || ""; + let eventIndex = 0; + + if (event.id?.includes("-")) { + const parts = event.id.split("-"); + const lastPart = parts[parts.length - 1]; + if (lastPart && /^\d+$/.test(lastPart)) { + eventIndex = Number.parseInt(lastPart, 10); + txHash = parts.slice(0, -1).join("-"); + } + } else if (event.pagingToken?.includes("-")) { + const parts = event.pagingToken.split("-"); + const lastPart = parts[parts.length - 1]; + if (lastPart && /^\d+$/.test(lastPart)) { + eventIndex = Number.parseInt(lastPart, 10); + txHash = parts.slice(0, -1).join("-"); + } + } + + return { + contractId: event.contractId, + ledgerNumber: event.ledger, + txHash, + eventIndex, + }; +} + export class EventRepository { private repo: Repository; diff --git a/indexer/common/src/handlers/registry.test.ts b/indexer/common/src/handlers/registry.test.ts index 41cd21c..460ddec 100644 --- a/indexer/common/src/handlers/registry.test.ts +++ b/indexer/common/src/handlers/registry.test.ts @@ -14,8 +14,7 @@ const baseEvent: SorobanEventInput = { }; const ok: HandlerResult = { ok: true }; -const makeHandler = (result: HandlerResult = ok): EventHandler => - vi.fn().mockResolvedValue(result); +const makeHandler = (result: HandlerResult = ok): EventHandler => vi.fn().mockResolvedValue(result); describe("HandlerRegistry", () => { describe("register and matches", () => { @@ -63,9 +62,7 @@ describe("HandlerRegistry", () => { registry.register({ contractId: "CABC123", topic: "stream_created" }, handler); expect(registry.matches(baseEvent)).toEqual([handler]); - expect( - registry.matches({ ...baseEvent, contractId: "COTHER" }), - ).toHaveLength(0); + expect(registry.matches({ ...baseEvent, contractId: "COTHER" })).toHaveLength(0); }); test("returns multiple handlers when several match", () => { @@ -99,10 +96,7 @@ describe("HandlerRegistry", () => { expect(h1).toHaveBeenCalledWith(baseEvent); expect(h2).toHaveBeenCalledWith(baseEvent); - expect(results).toEqual([ - { ok: true }, - { ok: false, error: "boom", retriable: true }, - ]); + expect(results).toEqual([{ ok: true }, { ok: false, error: "boom", retriable: true }]); }); test("returns empty array when no handlers match", async () => { @@ -119,5 +113,71 @@ describe("HandlerRegistry", () => { expect(returned).toBe(registry); }); }); -}); + describe("idempotent dispatch", () => { + // biome-ignore lint/suspicious/noExplicitAny: mock objects + let mockEventRepo: any; + + beforeEach(() => { + mockEventRepo = { + isEventProcessed: vi.fn(), + recordEventProcessed: vi.fn(), + }; + }); + + test("should skip calling handlers and return empty if event is already processed", async () => { + mockEventRepo.isEventProcessed.mockResolvedValue(true); + const registry = new HandlerRegistry(); + const handler = makeHandler({ ok: true }); + registry.register({}, handler); + + const results = await registry.dispatch(baseEvent, mockEventRepo); + + expect(results).toEqual([]); + expect(handler).not.toHaveBeenCalled(); + expect(mockEventRepo.isEventProcessed).toHaveBeenCalledWith("CABC123", 100, "event", 1); + }); + + test("should run handlers and record event as processed when all handlers succeed", async () => { + mockEventRepo.isEventProcessed.mockResolvedValue(false); + mockEventRepo.recordEventProcessed.mockResolvedValue(true); + const registry = new HandlerRegistry(); + const h1 = makeHandler({ ok: true }); + const h2 = makeHandler({ ok: true }); + registry.register({}, h1); + registry.register({}, h2); + + const results = await registry.dispatch(baseEvent, mockEventRepo); + + expect(results).toEqual([{ ok: true }, { ok: true }]); + expect(h1).toHaveBeenCalledWith(baseEvent); + expect(h2).toHaveBeenCalledWith(baseEvent); + expect(mockEventRepo.recordEventProcessed).toHaveBeenCalledWith("CABC123", 100, "event", 1); + }); + + test("should not record event as processed if any handler fails", async () => { + mockEventRepo.isEventProcessed.mockResolvedValue(false); + const registry = new HandlerRegistry(); + const h1 = makeHandler({ ok: true }); + const h2 = makeHandler({ ok: false, error: "failed", retriable: true }); + registry.register({}, h1); + registry.register({}, h2); + + const results = await registry.dispatch(baseEvent, mockEventRepo); + + expect(results).toEqual([{ ok: true }, { ok: false, error: "failed", retriable: true }]); + expect(mockEventRepo.recordEventProcessed).not.toHaveBeenCalled(); + }); + + test("should record event as processed if no handlers match", async () => { + mockEventRepo.isEventProcessed.mockResolvedValue(false); + mockEventRepo.recordEventProcessed.mockResolvedValue(true); + const registry = new HandlerRegistry(); + + const results = await registry.dispatch(baseEvent, mockEventRepo); + + expect(results).toEqual([]); + expect(mockEventRepo.recordEventProcessed).toHaveBeenCalledWith("CABC123", 100, "event", 1); + }); + }); +}); diff --git a/indexer/common/src/handlers/registry.ts b/indexer/common/src/handlers/registry.ts index 07a965e..437c057 100644 --- a/indexer/common/src/handlers/registry.ts +++ b/indexer/common/src/handlers/registry.ts @@ -1,9 +1,5 @@ -import type { - EventHandler, - HandlerFilter, - HandlerResult, - SorobanEventInput, -} from "./types.js"; +import { type EventRepository, getSorobanEventIdentity } from "../db/repository.js"; +import type { EventHandler, HandlerFilter, HandlerResult, SorobanEventInput } from "./types.js"; interface RegisteredHandler { filter: HandlerFilter; @@ -35,16 +31,63 @@ export class HandlerRegistry { .map(({ handler }) => handler); } - async dispatch(event: SorobanEventInput): Promise { + async dispatch(event: SorobanEventInput, eventRepo?: EventRepository): Promise { + let identity: + | { + contractId: string; + ledgerNumber: number; + txHash: string; + eventIndex: number; + } + | undefined; + + if (eventRepo) { + identity = getSorobanEventIdentity(event); + const isProcessed = await eventRepo.isEventProcessed( + identity.contractId, + identity.ledgerNumber, + identity.txHash, + identity.eventIndex, + ); + if (isProcessed) { + return []; + } + } + const handlers = this.matches(event); - return Promise.all( + + if (handlers.length === 0) { + if (eventRepo && identity) { + await eventRepo.recordEventProcessed( + identity.contractId, + identity.ledgerNumber, + identity.txHash, + identity.eventIndex, + ); + } + return []; + } + + const results = await Promise.all( handlers.map((h) => h(event).catch((err) => ({ ok: false as const, error: err instanceof Error ? err.message : String(err), retriable: true, - })) - ) + })), + ), ); + + const allSucceeded = results.every((r) => r.ok); + if (allSucceeded && eventRepo && identity) { + await eventRepo.recordEventProcessed( + identity.contractId, + identity.ledgerNumber, + identity.txHash, + identity.eventIndex, + ); + } + + return results; } } diff --git a/indexer/common/src/handlers/types.ts b/indexer/common/src/handlers/types.ts index 444c418..8bb5dbe 100644 --- a/indexer/common/src/handlers/types.ts +++ b/indexer/common/src/handlers/types.ts @@ -8,9 +8,7 @@ export interface SorobanEventInput { pagingToken: string; } -export type HandlerResult = - | { ok: true } - | { ok: false; error: string; retriable: boolean }; +export type HandlerResult = { ok: true } | { ok: false; error: string; retriable: boolean }; export type EventHandler = (event: SorobanEventInput) => Promise; diff --git a/indexer/common/src/index.ts b/indexer/common/src/index.ts index da3ade8..85dca52 100644 --- a/indexer/common/src/index.ts +++ b/indexer/common/src/index.ts @@ -12,6 +12,15 @@ export { } from "./config/index.js"; export { createSorobanClient, sorobanClient } from "./rpc/client.js"; export { IndexedEvent } from "./db/entity/IndexedEvent.js"; -export { EventRepository } from "./db/repository.js"; -export { SorobanPoller, type PollerOptions, type PollResult } from "./poller/index.js"; +export { + EventRepository, + getSorobanEventIdentity, + type EventIdentity, +} from "./db/repository.js"; +export { + SorobanPoller, + type PollerOptions, + type PollResult, +} from "./poller/index.js"; +export { runIndexer, type RunnerOptions } from "./poller/runner.js"; export * from "./handlers/index.js"; diff --git a/indexer/common/src/poller/index.ts b/indexer/common/src/poller/index.ts index ae30e14..e8c621a 100644 --- a/indexer/common/src/poller/index.ts +++ b/indexer/common/src/poller/index.ts @@ -86,7 +86,10 @@ export class SorobanPoller { return { success: true, lastProcessedLedger: endLedger }; } catch (error) { // Return the error to surface it. Cursor is intentionally not advanced. - return { success: false, error: error instanceof Error ? error : new Error(String(error)) }; + return { + success: false, + error: error instanceof Error ? error : new Error(String(error)), + }; } } } diff --git a/indexer/common/src/poller/runner.test.ts b/indexer/common/src/poller/runner.test.ts new file mode 100644 index 0000000..ef9bf22 --- /dev/null +++ b/indexer/common/src/poller/runner.test.ts @@ -0,0 +1,153 @@ +import { DataSource } from "typeorm"; +import { beforeEach, describe, expect, test, vi } from "vitest"; +import { loadIndexerConfig } from "../config/index.js"; +import { HandlerRegistry } from "../handlers/registry.js"; +import { createSorobanClient } from "../rpc/client.js"; +import { runIndexer } from "./runner.js"; + +vi.mock("typeorm", async (importOriginal) => { + // biome-ignore lint/suspicious/noExplicitAny: mock + const actual = (await importOriginal()) as any; + const mockDataSource = { + initialize: vi.fn(), + destroy: vi.fn(), + getRepository: vi.fn().mockReturnValue({ + count: vi.fn(), + createQueryBuilder: vi.fn(() => ({ + insert: vi.fn().mockReturnThis(), + into: vi.fn().mockReturnThis(), + values: vi.fn().mockReturnThis(), + orIgnore: vi.fn().mockReturnThis(), + execute: vi.fn(), + })), + }), + }; + return { + ...actual, + DataSource: vi.fn(() => mockDataSource), + }; +}); + +vi.mock("../config/index.js", () => ({ + loadIndexerConfig: vi.fn(), +})); + +vi.mock("../rpc/client.js", () => { + const mockClient = { + getLatestLedger: vi.fn(), + getEvents: vi.fn(), + }; + return { + createSorobanClient: vi.fn(() => mockClient), + }; +}); + +describe("runIndexer", () => { + // biome-ignore lint/suspicious/noExplicitAny: mock + let mockExit: any; + // biome-ignore lint/suspicious/noExplicitAny: mock + let mockRegistry: any; + + beforeEach(() => { + vi.clearAllMocks(); + // biome-ignore lint/suspicious/noExplicitAny: mock + mockExit = vi.spyOn(process, "exit").mockImplementation((() => {}) as any); + mockRegistry = new HandlerRegistry(); + }); + + test("should fail fast if config validation fails", async () => { + vi.mocked(loadIndexerConfig).mockImplementation(() => { + throw new Error("Invalid configuration"); + }); + + await runIndexer({ + name: "test-indexer", + contractIds: [], + registry: mockRegistry, + }); + + expect(mockExit).toHaveBeenCalledWith(1); + }); + + test("should fail fast if database connection fails", async () => { + vi.mocked(loadIndexerConfig).mockReturnValue({ + databaseUrl: "postgres://fail", + port: 4000, + pollIntervalMs: 10, + logLevel: "info", + streamsContractIds: [], + distributionsContractIds: [], + }); + + // biome-ignore lint/suspicious/noExplicitAny: mock + const mockDb = new DataSource({} as any); + vi.mocked(mockDb.initialize).mockRejectedValue(new Error("DB Connection Error")); + + await runIndexer({ + name: "test-indexer", + contractIds: [], + registry: mockRegistry, + }); + + expect(mockExit).toHaveBeenCalledWith(1); + }); + + test("should start polling and run the loop, stopping gracefully on shutdown signal", async () => { + vi.mocked(loadIndexerConfig).mockReturnValue({ + databaseUrl: "postgres://ok", + port: 4000, + pollIntervalMs: 5, + logLevel: "info", + streamsContractIds: ["C123"], + distributionsContractIds: [], + startLedger: 100, + }); + + // biome-ignore lint/suspicious/noExplicitAny: mock + const mockDb = new DataSource({} as any); + // biome-ignore lint/suspicious/noExplicitAny: mock + vi.mocked(mockDb.initialize).mockResolvedValue({} as any); + // biome-ignore lint/suspicious/noExplicitAny: mock + vi.mocked(mockDb.destroy).mockResolvedValue({} as any); + + const mockClient = createSorobanClient(); + vi.mocked(mockClient.getLatestLedger).mockResolvedValue({ + sequence: 101, + // biome-ignore lint/suspicious/noExplicitAny: mock + } as any); + vi.mocked(mockClient.getEvents).mockResolvedValue({ + events: [ + { + contractId: "C123", + ledger: 100, + ledgerClosedAt: "2026-01-01", + topic: [], + value: null, + id: "event-1", + pagingToken: "paging-1", + }, + ], + // biome-ignore lint/suspicious/noExplicitAny: mock + } as any); + + // Run the indexer in the background, then trigger SIGINT to terminate the loop + const runnerPromise = runIndexer({ + name: "test-indexer", + contractIds: ["C123"], + registry: mockRegistry, + }); + + // Wait a brief moment to allow the loop to run at least once + await new Promise((resolve) => setTimeout(resolve, 20)); + + // Emit SIGINT to stop the running loop + // biome-ignore lint/suspicious/noExplicitAny: mock + process.emit("SIGINT" as any); + + await runnerPromise; + + expect(mockDb.initialize).toHaveBeenCalled(); + expect(mockDb.destroy).toHaveBeenCalled(); + expect(mockExit).not.toHaveBeenCalled(); + }); +}); diff --git a/indexer/common/src/poller/runner.ts b/indexer/common/src/poller/runner.ts new file mode 100644 index 0000000..72d1bde --- /dev/null +++ b/indexer/common/src/poller/runner.ts @@ -0,0 +1,210 @@ +import { type Contract, scValToNative, type xdr } from "@stellar/stellar-sdk"; +import { DataSource } from "typeorm"; +import { type IndexerConfig, loadIndexerConfig } from "../config/index.js"; +import { IndexedEvent } from "../db/entity/IndexedEvent.js"; +import { EventRepository } from "../db/repository.js"; +import type { HandlerRegistry } from "../handlers/registry.js"; +import type { SorobanEventInput } from "../handlers/types.js"; +import { createSorobanClient } from "../rpc/client.js"; +import { SorobanPoller } from "./index.js"; + +export interface RunnerOptions { + name: string; + contractIds: string[]; + registry: HandlerRegistry; + // biome-ignore lint/complexity/noBannedTypes: TypeORM entity constructors are plain Functions + entities?: Function[]; +} + +export async function runIndexer(options: RunnerOptions): Promise { + const { name, contractIds, registry, entities = [] } = options; + console.info(`[${name}] Initializing indexer...`); + + // 1. Load validated config + let config: IndexerConfig; + try { + config = loadIndexerConfig(); + } catch (err) { + if (err instanceof Error) { + console.error(err.message); + } else { + console.error("Unknown configuration validation error", err); + } + process.exit(1); + return; + } + + if (contractIds.length === 0) { + console.warn( + `[${name}] No contract IDs configured. Indexer will start but won't poll any contract events.`, + ); + } else { + console.info(`[${name}] Configured contract IDs: ${contractIds.join(", ")}`); + } + + // 2. Initialize database + const dataSource = new DataSource({ + type: "postgres", + url: config.databaseUrl, + entities: [IndexedEvent, ...entities], + synchronize: false, + logging: config.logLevel === "debug", + }); + + try { + await dataSource.initialize(); + console.info(`[${name}] Database connection established.`); + } catch (err) { + console.error(`[${name}] Failed to connect to database at ${config.databaseUrl}:`, err); + process.exit(1); + return; + } + + const eventRepo = new EventRepository(dataSource); + const sorobanClient = createSorobanClient(); + const poller = new SorobanPoller({ + maxRetries: 3, + retryDelayMs: 1000, + }); + + // Determine starting ledger + let currentLedger = config.startLedger; + if (currentLedger === undefined) { + try { + const latestLedger = await sorobanClient.getLatestLedger(); + currentLedger = latestLedger.sequence; + console.info( + `[${name}] No START_LEDGER configured. Starting from latest ledger: ${currentLedger}`, + ); + } catch (err) { + console.error(`[${name}] Failed to fetch latest ledger from RPC:`, err); + await dataSource.destroy(); + process.exit(1); + return; + } + } else { + console.info(`[${name}] Starting from configured ledger: ${currentLedger}`); + } + + let isRunning = true; + + const shutdown = async (signal: string) => { + console.info(`[${name}] Received ${signal}. Shutting down gracefully...`); + isRunning = false; + }; + + process.on("SIGINT", () => shutdown("SIGINT")); + process.on("SIGTERM", () => shutdown("SIGTERM")); + + console.info(`[${name}] Indexer loop started. Interval: ${config.pollIntervalMs}ms`); + + while (isRunning) { + try { + const latestLedgerInfo = await sorobanClient.getLatestLedger(); + const latestLedger = latestLedgerInfo.sequence; + + if (currentLedger > latestLedger) { + // We are caught up, wait + await new Promise((resolve) => setTimeout(resolve, config.pollIntervalMs)); + continue; + } + + // Process up to 10 ledgers at a time + const endLedger = Math.min(currentLedger + 10, latestLedger); + + const fetchEvents = async (start: number, end: number) => { + if (contractIds.length === 0) { + return []; + } + + const filters = contractIds.map((id) => ({ + type: "contract" as const, + contractIds: [id], + })); + + const rpcResponse = await sorobanClient.getEvents({ + startLedger: start, + filters, + }); + + const allEvents = rpcResponse.events || []; + const rangeEvents = allEvents.filter( + (event) => event.ledger >= start && event.ledger <= end, + ); + + return rangeEvents.map((event) => { + const topic = event.topic.map((t: xdr.ScVal) => { + try { + const native = scValToNative(t); + return typeof native === "string" ? native : String(native); + } catch { + return String(t); + } + }); + + let data: unknown; + if (event.value) { + try { + data = scValToNative(event.value); + } catch { + data = event.value; + } + } + + const contractId = event.contractId + ? typeof event.contractId === "string" + ? event.contractId + : (event.contractId as Contract).contractId() + : ""; + + return { + contractId, + ledger: event.ledger, + ledgerClosedAt: event.ledgerClosedAt, + topic, + data, + id: event.id, + pagingToken: event.pagingToken, + } as SorobanEventInput; + }); + }; + + const processEvent = async (event: SorobanEventInput) => { + // Dispatch to handlers with idempotency tracking + await registry.dispatch(event, eventRepo); + }; + + const updateCursor = async (ledger: number) => { + console.info(`[${name}] Processed up to ledger ${ledger}`); + currentLedger = ledger + 1; + }; + + const result = await poller.processLedgerRange( + currentLedger, + endLedger, + fetchEvents, + processEvent, + updateCursor, + ); + + if (!result.success) { + console.error(`[${name}] Error during polling:`, result.error); + await new Promise((resolve) => setTimeout(resolve, config.pollIntervalMs)); + } else { + await new Promise((resolve) => setTimeout(resolve, config.pollIntervalMs)); + } + } catch (err) { + console.error(`[${name}] Unexpected error in polling loop:`, err); + await new Promise((resolve) => setTimeout(resolve, config.pollIntervalMs)); + } + } + + // Graceful cleanup + try { + await dataSource.destroy(); + console.info(`[${name}] Database connection closed.`); + } catch (err) { + console.error(`[${name}] Error closing database connection:`, err); + } + console.info(`[${name}] Shutdown complete.`); +} diff --git a/indexer/distributions/package.json b/indexer/distributions/package.json index 40a5dcc..72d5eb8 100644 --- a/indexer/distributions/package.json +++ b/indexer/distributions/package.json @@ -10,6 +10,7 @@ "build": "tsc -p tsconfig.json", "codegen": "bun --print \"'distributions: no codegen configured yet'\"", "dev": "bun --watch src/index.ts", + "start": "bun src/main.ts", "lint": "biome check .", "test": "vitest run src", "type-check": "tsc -p tsconfig.json --noEmit" diff --git a/indexer/distributions/src/handlers/distribution-created.handler.ts b/indexer/distributions/src/handlers/distribution-created.handler.ts index ea8cee0..558f16a 100644 --- a/indexer/distributions/src/handlers/distribution-created.handler.ts +++ b/indexer/distributions/src/handlers/distribution-created.handler.ts @@ -1,8 +1,4 @@ -import type { - EventHandler, - HandlerResult, - SorobanEventInput, -} from "@fundable-indexer/common"; +import type { EventHandler, HandlerResult, SorobanEventInput } from "@fundable-indexer/common"; import { parseDistributionCreated } from "./types.js"; export const distributionCreatedHandler: EventHandler = async ( diff --git a/indexer/distributions/src/handlers/distribution-pause.handler.ts b/indexer/distributions/src/handlers/distribution-pause.handler.ts index 2f73ff3..3e332c0 100644 --- a/indexer/distributions/src/handlers/distribution-pause.handler.ts +++ b/indexer/distributions/src/handlers/distribution-pause.handler.ts @@ -1,8 +1,4 @@ -import type { - EventHandler, - HandlerResult, - SorobanEventInput, -} from "@fundable-indexer/common"; +import type { EventHandler, HandlerResult, SorobanEventInput } from "@fundable-indexer/common"; import { parseDistributionPaused, parseDistributionResumed } from "./types.js"; export const distributionPausedHandler: EventHandler = async ( diff --git a/indexer/distributions/src/handlers/tokens-claimed.handler.ts b/indexer/distributions/src/handlers/tokens-claimed.handler.ts index 10b5302..6d1d33a 100644 --- a/indexer/distributions/src/handlers/tokens-claimed.handler.ts +++ b/indexer/distributions/src/handlers/tokens-claimed.handler.ts @@ -1,8 +1,4 @@ -import type { - EventHandler, - HandlerResult, - SorobanEventInput, -} from "@fundable-indexer/common"; +import type { EventHandler, HandlerResult, SorobanEventInput } from "@fundable-indexer/common"; import { parseTokensClaimed } from "./types.js"; export const tokensClaimedHandler: EventHandler = async ( diff --git a/indexer/distributions/src/handlers/types.ts b/indexer/distributions/src/handlers/types.ts index 7aa6f4b..3549cf5 100644 --- a/indexer/distributions/src/handlers/types.ts +++ b/indexer/distributions/src/handlers/types.ts @@ -39,9 +39,7 @@ function num(v: unknown): number { return Number.isFinite(n) ? n : 0; } -export function parseDistributionCreated( - data: unknown, -): DistributionCreatedPayload { +export function parseDistributionCreated(data: unknown): DistributionCreatedPayload { const d = record(data); return { distributionId: str(d.distributionId ?? d.distribution_id), @@ -63,9 +61,7 @@ export function parseTokensClaimed(data: unknown): TokensClaimedPayload { }; } -export function parseDistributionPaused( - data: unknown, -): DistributionPausedPayload { +export function parseDistributionPaused(data: unknown): DistributionPausedPayload { const d = record(data); return { distributionId: str(d.distributionId ?? d.distribution_id), @@ -74,9 +70,7 @@ export function parseDistributionPaused( }; } -export function parseDistributionResumed( - data: unknown, -): DistributionResumedPayload { +export function parseDistributionResumed(data: unknown): DistributionResumedPayload { const d = record(data); return { distributionId: str(d.distributionId ?? d.distribution_id), diff --git a/indexer/distributions/src/main.ts b/indexer/distributions/src/main.ts new file mode 100644 index 0000000..b3405c1 --- /dev/null +++ b/indexer/distributions/src/main.ts @@ -0,0 +1,31 @@ +import { HandlerRegistry, loadIndexerConfig, runIndexer } from "@fundable-indexer/common"; +import { + distributionCreatedHandler, + distributionPausedHandler, + distributionResumedHandler, + tokensClaimedHandler, +} from "./handlers/index.js"; + +async function main() { + const config = loadIndexerConfig(); + const registry = new HandlerRegistry(); + + for (const contractId of config.distributionsContractIds) { + registry.register({ contractId, topic: "distribution_created" }, distributionCreatedHandler); + registry.register({ contractId, topic: "tokens_claimed" }, tokensClaimedHandler); + registry.register({ contractId, topic: "distribution_paused" }, distributionPausedHandler); + registry.register({ contractId, topic: "distribution_resumed" }, distributionResumedHandler); + } + + await runIndexer({ + name: "distributions-indexer", + contractIds: config.distributionsContractIds, + registry, + entities: [], // No domain-specific entities yet for distributions indexer + }); +} + +main().catch((err) => { + console.error("Distributions indexer exited with fatal error:", err); + process.exit(1); +}); diff --git a/indexer/streams/package.json b/indexer/streams/package.json index 5d03564..825b60f 100644 --- a/indexer/streams/package.json +++ b/indexer/streams/package.json @@ -10,6 +10,7 @@ "build": "tsc -p tsconfig.json", "codegen": "bun --print \"'streams: no codegen configured yet'\"", "dev": "bun --watch src/index.ts", + "start": "bun src/main.ts", "lint": "biome check .", "test": "vitest run src", "type-check": "tsc -p tsconfig.json --noEmit" diff --git a/indexer/streams/src/db/entity/CancelAction.ts b/indexer/streams/src/db/entity/CancelAction.ts index d2d4310..1c72861 100644 --- a/indexer/streams/src/db/entity/CancelAction.ts +++ b/indexer/streams/src/db/entity/CancelAction.ts @@ -15,17 +15,26 @@ export class CancelAction { id!: string; @Index() - @Column({ type: "varchar", comment: "The ID of the stream this cancel action belongs to" }) + @Column({ + type: "varchar", + comment: "The ID of the stream this cancel action belongs to", + }) streamId!: string; @ManyToOne(() => Stream, { onDelete: "CASCADE" }) @JoinColumn({ name: "streamId" }) stream!: Stream; - @Column({ type: "varchar", comment: "The address that triggered the cancellation" }) + @Column({ + type: "varchar", + comment: "The address that triggered the cancellation", + }) canceler!: string; - @Column({ type: "varchar", comment: "Transaction hash where cancellation occurred" }) + @Column({ + type: "varchar", + comment: "Transaction hash where cancellation occurred", + }) txHash!: string; @Column({ type: "bigint", comment: "Timestamp of the cancellation" }) diff --git a/indexer/streams/src/db/entity/Stream.ts b/indexer/streams/src/db/entity/Stream.ts index 2f12bb0..b3ae6a8 100644 --- a/indexer/streams/src/db/entity/Stream.ts +++ b/indexer/streams/src/db/entity/Stream.ts @@ -17,19 +17,36 @@ export class Stream { @Column({ type: "varchar", comment: "The token asset address" }) token!: string; - @Column({ type: "bigint", comment: "The total amount of tokens in the stream" }) + @Column({ + type: "bigint", + comment: "The total amount of tokens in the stream", + }) totalAmount!: string; - @Column({ type: "bigint", comment: "The start time of the stream as unix timestamp" }) + @Column({ + type: "bigint", + comment: "The start time of the stream as unix timestamp", + }) startTime!: string; - @Column({ type: "bigint", comment: "The end time of the stream as unix timestamp" }) + @Column({ + type: "bigint", + comment: "The end time of the stream as unix timestamp", + }) endTime!: string; - @Column({ type: "bigint", default: "0", comment: "Total amount withdrawn so far" }) + @Column({ + type: "bigint", + default: "0", + comment: "Total amount withdrawn so far", + }) amountWithdrawn!: string; - @Column({ type: "boolean", default: false, comment: "Whether the stream was canceled" }) + @Column({ + type: "boolean", + default: false, + comment: "Whether the stream was canceled", + }) canceled!: boolean; @CreateDateColumn() diff --git a/indexer/streams/src/db/entity/WithdrawalAction.ts b/indexer/streams/src/db/entity/WithdrawalAction.ts index d1c4c23..9453905 100644 --- a/indexer/streams/src/db/entity/WithdrawalAction.ts +++ b/indexer/streams/src/db/entity/WithdrawalAction.ts @@ -15,7 +15,10 @@ export class WithdrawalAction { id!: string; @Index() - @Column({ type: "varchar", comment: "The ID of the stream this withdrawal belongs to" }) + @Column({ + type: "varchar", + comment: "The ID of the stream this withdrawal belongs to", + }) streamId!: string; @ManyToOne(() => Stream, { onDelete: "CASCADE" }) @@ -28,7 +31,10 @@ export class WithdrawalAction { @Column({ type: "bigint", comment: "The amount withdrawn" }) amount!: string; - @Column({ type: "varchar", comment: "Transaction hash where withdrawal occurred" }) + @Column({ + type: "varchar", + comment: "Transaction hash where withdrawal occurred", + }) txHash!: string; @Column({ type: "bigint", comment: "Timestamp of the withdrawal" }) diff --git a/indexer/streams/src/db/migrations/00001_InitialStreamsSchema.ts b/indexer/streams/src/db/migrations/00001_InitialStreamsSchema.ts index f84a80f..708bd22 100644 --- a/indexer/streams/src/db/migrations/00001_InitialStreamsSchema.ts +++ b/indexer/streams/src/db/migrations/00001_InitialStreamsSchema.ts @@ -1,10 +1,15 @@ -import { type MigrationInterface, type QueryRunner, Table, TableIndex } from "typeorm"; +import { + type MigrationInterface, + type QueryRunner, + Table, + TableIndex, +} from "typeorm"; export class InitialStreamsSchema00001 implements MigrationInterface { - public async up(queryRunner: QueryRunner): Promise { - // Note: In a real environment, this can be auto-generated by TypeORM - // These tables match the Stream, WithdrawalAction, and CancelAction entities. - await queryRunner.query(` + public async up(queryRunner: QueryRunner): Promise { + // Note: In a real environment, this can be auto-generated by TypeORM + // These tables match the Stream, WithdrawalAction, and CancelAction entities. + await queryRunner.query(` CREATE TABLE "stream" ( "id" varchar PRIMARY KEY NOT NULL, "sender" varchar NOT NULL, @@ -42,11 +47,11 @@ export class InitialStreamsSchema00001 implements MigrationInterface { ); CREATE INDEX "IDX_cancel_streamId" ON "stream_cancel_action" ("streamId"); `); - } + } - public async down(queryRunner: QueryRunner): Promise { - await queryRunner.query(`DROP TABLE "stream_cancel_action"`); - await queryRunner.query(`DROP TABLE "stream_withdrawal_action"`); - await queryRunner.query(`DROP TABLE "stream"`); - } + public async down(queryRunner: QueryRunner): Promise { + await queryRunner.query(`DROP TABLE "stream_cancel_action"`); + await queryRunner.query(`DROP TABLE "stream_withdrawal_action"`); + await queryRunner.query(`DROP TABLE "stream"`); + } } diff --git a/indexer/streams/src/handlers/stream-cancel.handler.ts b/indexer/streams/src/handlers/stream-cancel.handler.ts index 2b3a916..688a7c4 100644 --- a/indexer/streams/src/handlers/stream-cancel.handler.ts +++ b/indexer/streams/src/handlers/stream-cancel.handler.ts @@ -1,8 +1,4 @@ -import type { - EventHandler, - HandlerResult, - SorobanEventInput, -} from "@fundable-indexer/common"; +import type { EventHandler, HandlerResult, SorobanEventInput } from "@fundable-indexer/common"; import { parseStreamCancel } from "./types.js"; export const streamCancelHandler: EventHandler = async ( @@ -12,15 +8,27 @@ export const streamCancelHandler: EventHandler = async ( const payload = parseStreamCancel(event.data); if (!payload.streamId) { - return { ok: false, error: "Missing streamId in cancel event", retriable: false }; + return { + ok: false, + error: "Missing streamId in cancel event", + retriable: false, + }; } if (!payload.cancelledBy) { - return { ok: false, error: "Missing cancelledBy in cancel event", retriable: false }; + return { + ok: false, + error: "Missing cancelledBy in cancel event", + retriable: false, + }; } if (!payload.transactionHash) { - return { ok: false, error: "Missing transactionHash in cancel event", retriable: false }; + return { + ok: false, + error: "Missing transactionHash in cancel event", + retriable: false, + }; } // TODO(#32): update stream status to CANCELLED via repository once DB schema is merged diff --git a/indexer/streams/src/handlers/stream-funded.handler.ts b/indexer/streams/src/handlers/stream-funded.handler.ts index c0e951c..45467eb 100644 --- a/indexer/streams/src/handlers/stream-funded.handler.ts +++ b/indexer/streams/src/handlers/stream-funded.handler.ts @@ -1,8 +1,4 @@ -import type { - EventHandler, - HandlerResult, - SorobanEventInput, -} from "@fundable-indexer/common"; +import type { EventHandler, HandlerResult, SorobanEventInput } from "@fundable-indexer/common"; import { parseStreamFunded } from "./types.js"; export const streamFundedHandler: EventHandler = async ( @@ -12,23 +8,43 @@ export const streamFundedHandler: EventHandler = async ( const payload = parseStreamFunded(event.data); if (!payload.streamId) { - return { ok: false, error: "Missing streamId in funded event", retriable: false }; + return { + ok: false, + error: "Missing streamId in funded event", + retriable: false, + }; } if (!payload.sender) { - return { ok: false, error: "Missing sender in funded event", retriable: false }; + return { + ok: false, + error: "Missing sender in funded event", + retriable: false, + }; } if (!payload.token) { - return { ok: false, error: "Missing token in funded event", retriable: false }; + return { + ok: false, + error: "Missing token in funded event", + retriable: false, + }; } if (!payload.amount) { - return { ok: false, error: "Missing amount in funded event", retriable: false }; + return { + ok: false, + error: "Missing amount in funded event", + retriable: false, + }; } if (!payload.transactionHash) { - return { ok: false, error: "Missing transactionHash in funded event", retriable: false }; + return { + ok: false, + error: "Missing transactionHash in funded event", + retriable: false, + }; } // TODO(#32): persist deposit via stream repository once DB schema is merged diff --git a/indexer/streams/src/handlers/stream-withdrawal.handler.ts b/indexer/streams/src/handlers/stream-withdrawal.handler.ts index 1e03219..f41f006 100644 --- a/indexer/streams/src/handlers/stream-withdrawal.handler.ts +++ b/indexer/streams/src/handlers/stream-withdrawal.handler.ts @@ -1,8 +1,4 @@ -import type { - EventHandler, - HandlerResult, - SorobanEventInput, -} from "@fundable-indexer/common"; +import type { EventHandler, HandlerResult, SorobanEventInput } from "@fundable-indexer/common"; import { parseStreamWithdrawal } from "./types.js"; export const streamWithdrawalHandler: EventHandler = async ( @@ -12,19 +8,35 @@ export const streamWithdrawalHandler: EventHandler = async ( const payload = parseStreamWithdrawal(event.data); if (!payload.streamId) { - return { ok: false, error: "Missing streamId in withdrawal event", retriable: false }; + return { + ok: false, + error: "Missing streamId in withdrawal event", + retriable: false, + }; } if (!payload.recipient) { - return { ok: false, error: "Missing recipient in withdrawal event", retriable: false }; + return { + ok: false, + error: "Missing recipient in withdrawal event", + retriable: false, + }; } if (!payload.amount) { - return { ok: false, error: "Missing amount in withdrawal event", retriable: false }; + return { + ok: false, + error: "Missing amount in withdrawal event", + retriable: false, + }; } if (!payload.transactionHash) { - return { ok: false, error: "Missing transactionHash in withdrawal event", retriable: false }; + return { + ok: false, + error: "Missing transactionHash in withdrawal event", + retriable: false, + }; } // TODO(#32): record withdrawal action via repository once DB schema is merged diff --git a/indexer/streams/src/main.ts b/indexer/streams/src/main.ts new file mode 100644 index 0000000..2a3a2c3 --- /dev/null +++ b/indexer/streams/src/main.ts @@ -0,0 +1,32 @@ +import { HandlerRegistry, loadIndexerConfig, runIndexer } from "@fundable-indexer/common"; +import { CancelAction } from "./db/entity/CancelAction.js"; +import { Stream } from "./db/entity/Stream.js"; +import { WithdrawalAction } from "./db/entity/WithdrawalAction.js"; +import { + streamCancelHandler, + streamFundedHandler, + streamWithdrawalHandler, +} from "./handlers/index.js"; + +async function main() { + const config = loadIndexerConfig(); + const registry = new HandlerRegistry(); + + for (const contractId of config.streamsContractIds) { + registry.register({ contractId, topic: "stream_funded" }, streamFundedHandler); + registry.register({ contractId, topic: "stream_withdrawal" }, streamWithdrawalHandler); + registry.register({ contractId, topic: "stream_cancel" }, streamCancelHandler); + } + + await runIndexer({ + name: "streams-indexer", + contractIds: config.streamsContractIds, + registry, + entities: [Stream, WithdrawalAction, CancelAction], + }); +} + +main().catch((err) => { + console.error("Streams indexer exited with fatal error:", err); + process.exit(1); +}); diff --git a/package.json b/package.json index 6fa0fd9..78be1f1 100644 --- a/package.json +++ b/package.json @@ -40,7 +40,9 @@ "indexer:format": "cd indexer && biome format --write .", "indexer:lint": "cd indexer && biome check .", "indexer:test": "turbo run test --filter=@fundable-indexer/common --filter=@fundable-indexer/streams --filter=@fundable-indexer/distributions", - "indexer:type-check": "turbo run type-check --filter=@fundable-indexer/common --filter=@fundable-indexer/streams --filter=@fundable-indexer/distributions" + "indexer:type-check": "turbo run type-check --filter=@fundable-indexer/common --filter=@fundable-indexer/streams --filter=@fundable-indexer/distributions", + "indexer:streams": "bun --filter=@fundable-indexer/streams start", + "indexer:distributions": "bun --filter=@fundable-indexer/distributions start" }, "c8": { "include": [