Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 50 additions & 3 deletions indexer/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,26 @@ bun run indexer:test
bun run indexer:lint
```

### Runnable Indexers

Each domain indexer has a standalone entrypoint (`src/main.ts`) that can be
launched from the repository root:

```bash
# Start the payment streams indexer
bun run indexer:streams

# Start the token distributions indexer
bun run indexer:distributions
```

Or directly from each domain package:

```bash
cd indexer/streams && bun start
cd indexer/distributions && bun start
```

The nested `justfile` remains available for contributors who work from this
directory:

Expand All @@ -46,6 +66,32 @@ that reads these variables, parses numeric values (`INDEXER_PORT`,
`POLL_INTERVAL_MS`, `START_LEDGER`), and fails fast with a clear error when
required values are missing or malformed.

| Variable | Required | Description |
|---|---|---|
| `DATABASE_URL` | ✅ | PostgreSQL connection string |
| `SOROBAN_RPC_URL` | ✅ | Soroban RPC endpoint |
| `STREAMS_CONTRACT_IDS` | ✅ (streams) | Comma-separated stream contract IDs |
| `DISTRIBUTIONS_CONTRACT_IDS` | ✅ (distributions) | Comma-separated distribution contract IDs |
| `START_LEDGER` | ☐ | Ledger sequence to start from (defaults to latest) |
| `POLL_INTERVAL_MS` | ☐ | Polling interval in ms (default: 5000) |
| `LOG_LEVEL` | ☐ | Log verbosity: `debug`, `info`, `warn`, `error` |
Comment on lines +69 to +77

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

📐 Maintainability & Code Quality | 🟡 Minor | ⚡ Quick win

Mark the contract ID env vars as optional here.

Lines 73-74 label STREAMS_CONTRACT_IDS and DISTRIBUTIONS_CONTRACT_IDS as required, but the runner explicitly starts with an empty contractIds array and only warns. Reword these as "required to index events" or "optional, but the indexer becomes a no-op without them" to match runtime behavior.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@indexer/README.md` around lines 69 - 77, Update the environment variable
table in the README to reflect that STREAMS_CONTRACT_IDS and
DISTRIBUTIONS_CONTRACT_IDS are not hard-required at startup; the current runtime
behavior in the indexer runner starts with an empty contractIds list and only
warns, so rephrase their descriptions to indicate they are optional or required
only to actually index events. Keep the wording aligned with the runner’s
behavior so the README matches the logic in the indexer setup.


## Idempotency

The `HandlerRegistry.dispatch()` method integrates with `EventRepository` to
guarantee at-most-once processing per Soroban event:

1. Before dispatching, it checks whether the event's (`contractId`, `ledger`,
`txHash`, `eventIndex`) tuple has already been recorded in the database.
2. If the event was previously processed, dispatch is skipped (returns `[]`).
3. If all matched handlers succeed (or no handlers match), the event is recorded
as processed.
4. If any handler fails, the event is **not** recorded — allowing it to be
retried on the next polling cycle.

Event identity is extracted deterministically from the event's `pagingToken`
or `id` fields by `getSorobanEventIdentity()` in `common/src/db/repository.ts`.

## Database & Migrations

Persistence uses PostgreSQL with [TypeORM](https://typeorm.io). Entities live
Expand All @@ -60,6 +106,7 @@ via `experimentalDecorators` and `emitDecoratorMetadata`.
## Status

This workspace provides the indexer foundation — validated config, a Soroban RPC
client, a TypeORM persistence layer (entities and repository), a poller, and an
event-handler registry, plus the streams and distributions handlers. The GraphQL
API is planned but not yet implemented.
client, a TypeORM persistence layer (entities and repository), a poller, an
event-handler registry with idempotency guarantees, runnable entrypoints for the
streams and distributions indexers, and all matching handlers. The GraphQL API
is planned but not yet implemented.
12 changes: 10 additions & 2 deletions indexer/common/src/config/env.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,24 @@ const validEnv = {

describe("loadConfig", () => {
test("returns typed values for a valid environment", () => {
const config = loadConfig(validEnv);
const config = loadConfig({
...validEnv,
STREAMS_CONTRACT_IDS: "C123,C456",
DISTRIBUTIONS_CONTRACT_IDS: "C789",
});

expect(config).toEqual({
databaseUrl: "postgres://postgres:postgres@localhost:5432/fundable_indexer",
port: 4000,
pollIntervalMs: 5000,
startLedger: 1000,
logLevel: "debug",
streamsContractIds: ["C123", "C456"],
distributionsContractIds: ["C789"],
});
});

test("defaults the log level and treats blank START_LEDGER as unset", () => {
test("defaults the log level and treats blank START_LEDGER as unset, defaulting contract IDs to empty arrays", () => {
const config = loadConfig({
INDEXER_DATABASE_URL: validEnv.INDEXER_DATABASE_URL,
INDEXER_PORT: "4000",
Expand All @@ -34,6 +40,8 @@ describe("loadConfig", () => {
expect(config.logLevel).toBe("info");
expect(config.startLedger).toBeUndefined();
expect("startLedger" in config).toBe(false);
expect(config.streamsContractIds).toEqual([]);
expect(config.distributionsContractIds).toEqual([]);
});

test("fails with a clear error when a required value is missing", () => {
Expand Down
32 changes: 32 additions & 0 deletions indexer/common/src/config/env.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ export interface IndexerConfig {
readonly startLedger?: number;
/** Logging verbosity. */
readonly logLevel: "error" | "warn" | "info" | "debug";
/** Configured Stellar contract IDs for streams indexer. */
readonly streamsContractIds: string[];
/** Configured Stellar contract IDs for distributions indexer. */
readonly distributionsContractIds: string[];
}

/** A positive integer parsed from an environment string. */
Expand All @@ -40,6 +44,30 @@ const configSchema = z.object({
POLL_INTERVAL_MS: positiveIntFromString,
START_LEDGER: positiveIntFromString.optional(),
INDEXER_LOG_LEVEL: z.enum(["error", "warn", "info", "debug"]).optional().default("info"),
STREAMS_CONTRACT_IDS: z
.string()
.optional()
.default("")
.transform((val) =>
val
? val
.split(",")
.map((s) => s.trim())
.filter(Boolean)
: [],
),
DISTRIBUTIONS_CONTRACT_IDS: z
.string()
.optional()
.default("")
.transform((val) =>
val
? val
.split(",")
.map((s) => s.trim())
.filter(Boolean)
: [],
),
Comment on lines +47 to +70

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🗄️ Data Integrity & Integration | 🟠 Major | ⚡ Quick win

Deduplicate contract IDs during parsing.

This transform leaves duplicates intact. Both indexer entrypoints register one handler set per configured contract ID, so STREAMS_CONTRACT_IDS=C1,C1 will invoke the same handler twice for a single event before the event table can help. Please normalize these arrays here and add a regression test for duplicate values.

Suggested fix
   STREAMS_CONTRACT_IDS: z
     .string()
     .optional()
     .default("")
     .transform((val) =>
       val
-        ? val
-            .split(",")
-            .map((s) => s.trim())
-            .filter(Boolean)
+        ? [...new Set(val.split(",").map((s) => s.trim()).filter(Boolean))]
         : [],
     ),
   DISTRIBUTIONS_CONTRACT_IDS: z
     .string()
     .optional()
     .default("")
     .transform((val) =>
       val
-        ? val
-            .split(",")
-            .map((s) => s.trim())
-            .filter(Boolean)
+        ? [...new Set(val.split(",").map((s) => s.trim()).filter(Boolean))]
         : [],
     ),
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
STREAMS_CONTRACT_IDS: z
.string()
.optional()
.default("")
.transform((val) =>
val
? val
.split(",")
.map((s) => s.trim())
.filter(Boolean)
: [],
),
DISTRIBUTIONS_CONTRACT_IDS: z
.string()
.optional()
.default("")
.transform((val) =>
val
? val
.split(",")
.map((s) => s.trim())
.filter(Boolean)
: [],
),
STREAMS_CONTRACT_IDS: z
.string()
.optional()
.default("")
.transform((val) =>
val
? [...new Set(val.split(",").map((s) => s.trim()).filter(Boolean))]
: [],
),
DISTRIBUTIONS_CONTRACT_IDS: z
.string()
.optional()
.default("")
.transform((val) =>
val
? [...new Set(val.split(",").map((s) => s.trim()).filter(Boolean))]
: [],
),
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@indexer/common/src/config/env.ts` around lines 47 - 70, The
STREAMS_CONTRACT_IDS and DISTRIBUTIONS_CONTRACT_IDS parsing in the env schema
currently preserves duplicate entries, which can register the same handler more
than once. Update the shared transform logic in env.ts to normalize the parsed
arrays by removing duplicates while still trimming and filtering empty values,
and make sure both contract ID fields use that behavior. Add a regression test
for the env parsing path to verify repeated IDs like C1,C1 are returned only
once.

});

/**
Expand Down Expand Up @@ -69,6 +97,8 @@ export function loadConfig(env: NodeJS.ProcessEnv = process.env): IndexerConfig
"POLL_INTERVAL_MS",
"START_LEDGER",
"INDEXER_LOG_LEVEL",
"STREAMS_CONTRACT_IDS",
"DISTRIBUTIONS_CONTRACT_IDS",
]) {
const value = env[key];
normalized[key] = value === undefined || value.trim() === "" ? undefined : value;
Expand All @@ -89,6 +119,8 @@ export function loadConfig(env: NodeJS.ProcessEnv = process.env): IndexerConfig
port: parsed.INDEXER_PORT,
pollIntervalMs: parsed.POLL_INTERVAL_MS,
logLevel: parsed.INDEXER_LOG_LEVEL,
streamsContractIds: parsed.STREAMS_CONTRACT_IDS,
distributionsContractIds: parsed.DISTRIBUTIONS_CONTRACT_IDS,
...(parsed.START_LEDGER !== undefined ? { startLedger: parsed.START_LEDGER } : {}),
};

Expand Down
39 changes: 39 additions & 0 deletions indexer/common/src/db/repository.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,45 @@
import type { DataSource, Repository } from "typeorm";
import type { SorobanEventInput } from "../handlers/types.js";
import { IndexedEvent } from "./entity/IndexedEvent.js";

export interface EventIdentity {
contractId: string;
ledgerNumber: number;
txHash: string;
eventIndex: number;
}

/**
* Maps a SorobanEventInput to its deterministic event identity fields.
*/
export function getSorobanEventIdentity(event: SorobanEventInput): EventIdentity {
let txHash = event.pagingToken || event.id || "";
let eventIndex = 0;

if (event.id?.includes("-")) {
const parts = event.id.split("-");
const lastPart = parts[parts.length - 1];
if (lastPart && /^\d+$/.test(lastPart)) {
eventIndex = Number.parseInt(lastPart, 10);
txHash = parts.slice(0, -1).join("-");
}
} else if (event.pagingToken?.includes("-")) {
const parts = event.pagingToken.split("-");
const lastPart = parts[parts.length - 1];
if (lastPart && /^\d+$/.test(lastPart)) {
eventIndex = Number.parseInt(lastPart, 10);
txHash = parts.slice(0, -1).join("-");
}
}

return {
contractId: event.contractId,
ledgerNumber: event.ledger,
txHash,
eventIndex,
};
}

export class EventRepository {
private repo: Repository<IndexedEvent>;

Expand Down
80 changes: 70 additions & 10 deletions indexer/common/src/handlers/registry.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,7 @@ const baseEvent: SorobanEventInput = {
};

const ok: HandlerResult = { ok: true };
const makeHandler = (result: HandlerResult = ok): EventHandler =>
vi.fn().mockResolvedValue(result);
const makeHandler = (result: HandlerResult = ok): EventHandler => vi.fn().mockResolvedValue(result);

describe("HandlerRegistry", () => {
describe("register and matches", () => {
Expand Down Expand Up @@ -63,9 +62,7 @@ describe("HandlerRegistry", () => {
registry.register({ contractId: "CABC123", topic: "stream_created" }, handler);

expect(registry.matches(baseEvent)).toEqual([handler]);
expect(
registry.matches({ ...baseEvent, contractId: "COTHER" }),
).toHaveLength(0);
expect(registry.matches({ ...baseEvent, contractId: "COTHER" })).toHaveLength(0);
});

test("returns multiple handlers when several match", () => {
Expand Down Expand Up @@ -99,10 +96,7 @@ describe("HandlerRegistry", () => {

expect(h1).toHaveBeenCalledWith(baseEvent);
expect(h2).toHaveBeenCalledWith(baseEvent);
expect(results).toEqual([
{ ok: true },
{ ok: false, error: "boom", retriable: true },
]);
expect(results).toEqual([{ ok: true }, { ok: false, error: "boom", retriable: true }]);
});

test("returns empty array when no handlers match", async () => {
Expand All @@ -119,5 +113,71 @@ describe("HandlerRegistry", () => {
expect(returned).toBe(registry);
});
});
});

describe("idempotent dispatch", () => {
// biome-ignore lint/suspicious/noExplicitAny: mock objects
let mockEventRepo: any;

beforeEach(() => {
mockEventRepo = {
isEventProcessed: vi.fn(),
recordEventProcessed: vi.fn(),
};
});

test("should skip calling handlers and return empty if event is already processed", async () => {
mockEventRepo.isEventProcessed.mockResolvedValue(true);
const registry = new HandlerRegistry();
const handler = makeHandler({ ok: true });
registry.register({}, handler);

const results = await registry.dispatch(baseEvent, mockEventRepo);

expect(results).toEqual([]);
expect(handler).not.toHaveBeenCalled();
expect(mockEventRepo.isEventProcessed).toHaveBeenCalledWith("CABC123", 100, "event", 1);
});

test("should run handlers and record event as processed when all handlers succeed", async () => {
mockEventRepo.isEventProcessed.mockResolvedValue(false);
mockEventRepo.recordEventProcessed.mockResolvedValue(true);
const registry = new HandlerRegistry();
const h1 = makeHandler({ ok: true });
const h2 = makeHandler({ ok: true });
registry.register({}, h1);
registry.register({}, h2);

const results = await registry.dispatch(baseEvent, mockEventRepo);

expect(results).toEqual([{ ok: true }, { ok: true }]);
expect(h1).toHaveBeenCalledWith(baseEvent);
expect(h2).toHaveBeenCalledWith(baseEvent);
expect(mockEventRepo.recordEventProcessed).toHaveBeenCalledWith("CABC123", 100, "event", 1);
});

test("should not record event as processed if any handler fails", async () => {
mockEventRepo.isEventProcessed.mockResolvedValue(false);
const registry = new HandlerRegistry();
const h1 = makeHandler({ ok: true });
const h2 = makeHandler({ ok: false, error: "failed", retriable: true });
registry.register({}, h1);
registry.register({}, h2);

const results = await registry.dispatch(baseEvent, mockEventRepo);

expect(results).toEqual([{ ok: true }, { ok: false, error: "failed", retriable: true }]);
expect(mockEventRepo.recordEventProcessed).not.toHaveBeenCalled();
});

test("should record event as processed if no handlers match", async () => {
mockEventRepo.isEventProcessed.mockResolvedValue(false);
mockEventRepo.recordEventProcessed.mockResolvedValue(true);
const registry = new HandlerRegistry();

const results = await registry.dispatch(baseEvent, mockEventRepo);

expect(results).toEqual([]);
expect(mockEventRepo.recordEventProcessed).toHaveBeenCalledWith("CABC123", 100, "event", 1);
});
});
});
63 changes: 53 additions & 10 deletions indexer/common/src/handlers/registry.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,5 @@
import type {
EventHandler,
HandlerFilter,
HandlerResult,
SorobanEventInput,
} from "./types.js";
import { type EventRepository, getSorobanEventIdentity } from "../db/repository.js";
import type { EventHandler, HandlerFilter, HandlerResult, SorobanEventInput } from "./types.js";

interface RegisteredHandler {
filter: HandlerFilter;
Expand Down Expand Up @@ -35,16 +31,63 @@ export class HandlerRegistry {
.map(({ handler }) => handler);
}

async dispatch(event: SorobanEventInput): Promise<HandlerResult[]> {
async dispatch(event: SorobanEventInput, eventRepo?: EventRepository): Promise<HandlerResult[]> {
let identity:
| {
contractId: string;
ledgerNumber: number;
txHash: string;
eventIndex: number;
}
| undefined;

if (eventRepo) {
identity = getSorobanEventIdentity(event);
const isProcessed = await eventRepo.isEventProcessed(
identity.contractId,
identity.ledgerNumber,
identity.txHash,
identity.eventIndex,
);
if (isProcessed) {
return [];
}
}

const handlers = this.matches(event);
return Promise.all(

if (handlers.length === 0) {
if (eventRepo && identity) {
await eventRepo.recordEventProcessed(
identity.contractId,
identity.ledgerNumber,
identity.txHash,
identity.eventIndex,
);
}
return [];
}

const results = await Promise.all(
handlers.map((h) =>
h(event).catch((err) => ({
ok: false as const,
error: err instanceof Error ? err.message : String(err),
retriable: true,
}))
)
})),
),
);

const allSucceeded = results.every((r) => r.ok);
if (allSucceeded && eventRepo && identity) {
await eventRepo.recordEventProcessed(
identity.contractId,
identity.ledgerNumber,
identity.txHash,
identity.eventIndex,
);
Comment on lines +44 to +88

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🗄️ Data Integrity & Integration | 🟠 Major | 🏗️ Heavy lift

The new idempotency flow is still race-prone across concurrent runners.

isEventProcessed() happens before handler execution, and the only atomic dedupe happens in recordEventProcessed() after side effects. If two indexers pick up the same event concurrently, both can run the handlers and only one marker row will win, so the event is still applied twice. This needs an atomic claim/reservation before dispatch, with the claim cleared or transitioned on failure.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@indexer/common/src/handlers/registry.ts` around lines 44 - 88, The
idempotency flow in the registry handler is still vulnerable to concurrent
duplicates because `isEventProcessed()` is checked before dispatch and
`recordEventProcessed()` only happens after handlers succeed. Update the event
processing path in `registry.ts` to use an atomic claim/reservation via
`eventRepo` before calling `this.matches(event)` or any handler, so only one
runner can own the event at a time. If processing fails, clear or transition
that claim appropriately; if it succeeds, finalize the processed marker in the
same flow.

}

return results;
}
}
Loading