Skip to content

feat(indexer): add idempotency and runnable entrypoints (issues #54 &…#73

Open
intelliDean wants to merge 1 commit into
Fundable-Protocol:devfrom
intelliDean:dev
Open

feat(indexer): add idempotency and runnable entrypoints (issues #54 &…#73
intelliDean wants to merge 1 commit into
Fundable-Protocol:devfrom
intelliDean:dev

Conversation

@intelliDean

@intelliDean intelliDean commented Jun 29, 2026

Copy link
Copy Markdown

#56)

  • Integrate EventRepository idempotency checks into HandlerRegistry.dispatch():

    • Check if event (contractId, ledger, txHash, eventIndex) was already processed
    • Skip dispatch and return [] for previously seen events
    • Record event as processed only when all matched handlers succeed
    • Vacuously record events with no matching handlers
    • Fail open on any handler error (allow retry)
  • Add STREAMS_CONTRACT_IDS and DISTRIBUTIONS_CONTRACT_IDS to env config with comma-separated parsing and validation tests

  • Implement shared runIndexer() orchestrator (common/src/poller/runner.ts):

    • Boots TypeORM DataSource with graceful fail-fast on config/DB/RPC errors
    • Decodes Soroban event topic XDR via scValToNative
    • Runs polling loop calling HandlerRegistry.dispatch() with EventRepository
    • Handles SIGINT/SIGTERM for graceful shutdown and DB cleanup
  • Add main.ts entrypoints for streams and distributions indexers:

    • Registers all domain handlers against their respective contract IDs
    • Calls runIndexer() with domain-specific entities
  • Wire start scripts in package.json files:

    • indexer/streams: bun start -> src/main.ts
    • indexer/distributions: bun start -> src/main.ts
    • root: indexer:streams, indexer:distributions convenience scripts
  • Expand indexer/README.md with runnable indexers, config table, and idempotency documentation

Closes #54
Closes #56

Summary

Describe what changed and why.

Area

  • Backend API (src/)
  • Indexer common infrastructure (indexer/common/)
  • Streams indexer (indexer/streams/)
  • Distributions indexer (indexer/distributions/)
  • Tooling, docs, CI, or Docker

Scope

  • This PR addresses one scoped issue or task
  • Unrelated formatting, generated files, and follow-up work were left out
  • Backend and indexer package boundaries were respected

Verification

  • bun run type-check
  • bun run test
  • bun run lint
  • bun run indexer:type-check if indexer files changed
  • bun run indexer:test if indexer files changed
  • bun run indexer:lint if indexer files changed

Indexer Safety

  • Event processing changes are idempotent or do not affect event processing
  • Cursor changes advance only after successful processing
  • Event names and payload shapes were confirmed from contracts, if relevant
  • Backfill and replay behavior was considered, if relevant

Notes

Closes #

Summary by CodeRabbit

  • New Features

    • Added runnable start commands for the indexer workspaces, making it easier to launch the streams and distributions services from the repo root or within each package.
    • Added support for separate contract ID lists for streams and distributions, with environment-based configuration.
  • Bug Fixes

    • Improved event handling reliability by preventing duplicate processing and ensuring events are marked complete only after successful handling.
  • Documentation

    • Expanded the indexer README with setup, configuration, and standalone run instructions.

…ble-Protocol#54 & Fundable-Protocol#56)

- Integrate EventRepository idempotency checks into HandlerRegistry.dispatch():
  * Check if event (contractId, ledger, txHash, eventIndex) was already processed
  * Skip dispatch and return [] for previously seen events
  * Record event as processed only when all matched handlers succeed
  * Vacuously record events with no matching handlers
  * Fail open on any handler error (allow retry)

- Add STREAMS_CONTRACT_IDS and DISTRIBUTIONS_CONTRACT_IDS to env config with
  comma-separated parsing and validation tests

- Implement shared runIndexer() orchestrator (common/src/poller/runner.ts):
  * Boots TypeORM DataSource with graceful fail-fast on config/DB/RPC errors
  * Decodes Soroban event topic XDR via scValToNative
  * Runs polling loop calling HandlerRegistry.dispatch() with EventRepository
  * Handles SIGINT/SIGTERM for graceful shutdown and DB cleanup

- Add main.ts entrypoints for streams and distributions indexers:
  * Registers all domain handlers against their respective contract IDs
  * Calls runIndexer() with domain-specific entities

- Wire start scripts in package.json files:
  * indexer/streams: bun start -> src/main.ts
  * indexer/distributions: bun start -> src/main.ts
  * root: indexer:streams, indexer:distributions convenience scripts

- Expand indexer/README.md with runnable indexers, config table, and
  idempotency documentation

Closes Fundable-Protocol#54
Closes Fundable-Protocol#56
@drips-wave

drips-wave Bot commented Jun 29, 2026

Copy link
Copy Markdown

@intelliDean Great news! 🎉 Based on an automated assessment of this PR, the linked Wave issue(s) no longer count against your application limits.

You can now already apply to more issues while waiting for a review of this PR. Keep up the great work! 🚀

Learn more about application limits

@coderabbitai

coderabbitai Bot commented Jun 29, 2026

Copy link
Copy Markdown
Contributor

Review Change Stack

📝 Walkthrough

Walkthrough

Adds streamsContractIds/distributionsContractIds config fields, a getSorobanEventIdentity helper and EventIdentity type, idempotent HandlerRegistry.dispatch backed by EventRepository, a generic runIndexer polling loop, standalone entrypoints for streams and distributions indexers, workspace start scripts, and updated README documentation.

Changes

Idempotent Dispatch and Event Identity

Layer / File(s) Summary
Contract ID config fields
indexer/common/src/config/env.ts, indexer/common/src/config/env.test.ts
IndexerConfig adds streamsContractIds and distributionsContractIds; loadConfig parses STREAMS_CONTRACT_IDS and DISTRIBUTIONS_CONTRACT_IDS as comma-split string arrays, defaulting to empty. Tests assert populated and empty-default cases.
EventIdentity derivation
indexer/common/src/db/repository.ts
Exports EventIdentity interface and getSorobanEventIdentity helper that extracts txHash and eventIndex from event.id or event.pagingToken using numeric trailing-segment parsing with fallbacks.
Idempotent HandlerRegistry.dispatch
indexer/common/src/handlers/registry.ts, indexer/common/src/handlers/types.ts, indexer/common/src/handlers/registry.test.ts
dispatch accepts an optional EventRepository; short-circuits with [] when already processed, records processed state when all handlers succeed, skips recording on any handler failure, and records when no handlers match. Four new idempotency tests cover each branch.

runIndexer Runner and Domain Entrypoints

Layer / File(s) Summary
runIndexer implementation and tests
indexer/common/src/poller/runner.ts, indexer/common/src/poller/runner.test.ts, indexer/common/src/poller/index.ts, indexer/common/src/index.ts
Exports RunnerOptions and runIndexer: initializes config, TypeORM DataSource, EventRepository, SorobanClient, and SorobanPoller; runs a chunked polling loop with event decoding, idempotent dispatch, retry delays, and SIGINT/SIGTERM graceful shutdown. Tests cover config failure, DB connection failure, and clean shutdown via SIGINT.
Streams indexer entrypoint
indexer/streams/src/main.ts, indexer/streams/package.json, indexer/streams/src/handlers/*, indexer/streams/src/db/...
main.ts registers stream_funded, stream_withdrawal, and stream_cancel handlers per contract ID and calls runIndexer with stream entities. Adds start script. Handler and entity files have formatting-only changes.
Distributions indexer entrypoint
indexer/distributions/src/main.ts, indexer/distributions/package.json, indexer/distributions/src/handlers/*
main.ts registers distribution_created, tokens_claimed, distribution_paused, and distribution_resumed handlers per contract ID and calls runIndexer. Adds start script. Handler files have formatting-only changes.
Root scripts and README
package.json, indexer/README.md
Adds indexer:streams and indexer:distributions root scripts. README adds Runnable Indexers section, env variable table, idempotency explanation, and updated status list.

Sequence Diagram(s)

sequenceDiagram
  participant main as streams/distributions main.ts
  participant runIndexer
  participant DataSource as TypeORM DataSource
  participant SorobanPoller
  participant HandlerRegistry
  participant EventRepository

  main->>runIndexer: runIndexer(options)
  runIndexer->>runIndexer: loadIndexerConfig()
  runIndexer->>DataSource: initialize()
  runIndexer->>SorobanPoller: getLatestLedger()
  loop polling loop
    runIndexer->>SorobanPoller: processLedgerRange(start, end)
    SorobanPoller-->>runIndexer: raw events
    runIndexer->>runIndexer: decode topics/value via scValToNative
    runIndexer->>HandlerRegistry: dispatch(event, eventRepo)
    HandlerRegistry->>EventRepository: isEventProcessed(identity)
    alt not yet processed
      HandlerRegistry->>HandlerRegistry: run matched handlers
      HandlerRegistry->>EventRepository: recordEventProcessed(identity)
    end
    HandlerRegistry-->>runIndexer: HandlerResult[]
  end
  Note over runIndexer: SIGINT/SIGTERM received
  runIndexer->>DataSource: destroy()
Loading

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~25 minutes

Possibly related PRs

  • Fundable-Protocol/Backend#42: Establishes the indexer foundation that this PR extends with event identity derivation and the runIndexer polling loop.
  • Fundable-Protocol/Backend#49: Introduced the HandlerRegistry and basic dispatch logic that this PR modifies to add idempotent processing.
  • Fundable-Protocol/Backend#51: Introduced the loadConfig/IndexerConfig env-loader that this PR extends with contract ID array fields.

Suggested reviewers

  • mubarak23

🐇 A ledger arrives, events in tow,
The rabbit checks — "processed before? No!"
Handlers fire, identity stored,
SIGINT arrives, connections restored.
Two indexers wake, streams and distributions hum,
At-most-once magic — the rabbit beats the drum! 🥁

🚥 Pre-merge checks | ✅ 3 | ❌ 2

❌ Failed checks (2 warnings)

Check name Status Explanation Resolution
Out of Scope Changes check ⚠️ Warning Several files contain pure formatting-only edits in handlers/entities/migrations that aren't needed for #54/#56. Remove the unrelated formatting churn or split it into a separate cleanup PR so this change stays focused on the two linked issues.
Docstring Coverage ⚠️ Warning Docstring coverage is 20.00% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (3 passed)
Check name Status Explanation
Title check ✅ Passed The title clearly reflects the two main changes: idempotent indexing and runnable indexer entrypoints.
Linked Issues check ✅ Passed The PR appears to satisfy #54 and #56 with idempotent dispatch, runner entrypoints, scripts, docs, and tests.
Description check ✅ Passed The PR description follows the template, covers the requested summary, scope, verification, safety, and notes, and is mostly complete.
✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands.

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 8

🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@indexer/common/src/config/env.ts`:
- Around line 47-70: The STREAMS_CONTRACT_IDS and DISTRIBUTIONS_CONTRACT_IDS
parsing in the env schema currently preserves duplicate entries, which can
register the same handler more than once. Update the shared transform logic in
env.ts to normalize the parsed arrays by removing duplicates while still
trimming and filtering empty values, and make sure both contract ID fields use
that behavior. Add a regression test for the env parsing path to verify repeated
IDs like C1,C1 are returned only once.

In `@indexer/common/src/handlers/registry.ts`:
- Around line 44-88: The idempotency flow in the registry handler is still
vulnerable to concurrent duplicates because `isEventProcessed()` is checked
before dispatch and `recordEventProcessed()` only happens after handlers
succeed. Update the event processing path in `registry.ts` to use an atomic
claim/reservation via `eventRepo` before calling `this.matches(event)` or any
handler, so only one runner can own the event at a time. If processing fails,
clear or transition that claim appropriately; if it succeeds, finalize the
processed marker in the same flow.

In `@indexer/common/src/poller/runner.ts`:
- Around line 172-175: Bubble failed handler results back up from processEvent
in runner.ts so SorobanPoller.processLedgerRange can stop advancing on failures.
After registry.dispatch(event, eventRepo) in processEvent, inspect the returned
HandlerResult[] and throw when any result has ok: false, so the existing
updateCursor() guard is triggered and failed events are retried instead of
skipped.
- Around line 57-59: The database connection failure log in runner.ts currently
prints config.databaseUrl verbatim, which can expose embedded credentials;
update the catch block in the connection logic to redact the URL before logging,
while still keeping useful connection context in the console.error message. Use
the existing catch path around the database connect attempt in runner.ts and
replace the direct databaseUrl interpolation with a sanitized/redacted value.
- Around line 112-113: The ledger batching in runner.ts is off by one: the
current chunk calculation in the polling loop makes an inclusive range cover 11
ledgers instead of 10. Update the endLedger computation in the polling logic
around currentLedger/latestLedger so the batch size matches the “up to 10
ledgers at a time” intent, using the same symbols already in use and adjusting
the upper bound to the correct inclusive end.

In `@indexer/distributions/src/main.ts`:
- Around line 13-24: `distributionsContractIds` is being used directly for both
`HandlerRegistry.register()` and `runIndexer`, which can register the same
handler multiple times when the config contains duplicate contract IDs.
Normalize the list once in `main` by deduplicating it before the registration
loop, then reuse that deduped list for both the registry wiring and the
`runIndexer` call so `distributionCreatedHandler`, `tokensClaimedHandler`,
`distributionPausedHandler`, and `distributionResumedHandler` are each
registered only once per contract.

In `@indexer/README.md`:
- Around line 69-77: Update the environment variable table in the README to
reflect that STREAMS_CONTRACT_IDS and DISTRIBUTIONS_CONTRACT_IDS are not
hard-required at startup; the current runtime behavior in the indexer runner
starts with an empty contractIds list and only warns, so rephrase their
descriptions to indicate they are optional or required only to actually index
events. Keep the wording aligned with the runner’s behavior so the README
matches the logic in the indexer setup.

In `@indexer/streams/package.json`:
- Around line 12-13: The dev script is pointing at the barrel file instead of
the actual startup entrypoint. Update the package.json script named dev to watch
src/main.ts, matching start, so local development runs the same indexer startup
path; use the existing dev and start script entries as the symbols to update.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 13754fb5-ac5b-48ed-8871-339963534d22

📥 Commits

Reviewing files that changed from the base of the PR and between 43f39f6 and f63f661.

📒 Files selected for processing (27)
  • indexer/README.md
  • indexer/common/src/config/env.test.ts
  • indexer/common/src/config/env.ts
  • indexer/common/src/db/repository.ts
  • indexer/common/src/handlers/registry.test.ts
  • indexer/common/src/handlers/registry.ts
  • indexer/common/src/handlers/types.ts
  • indexer/common/src/index.ts
  • indexer/common/src/poller/index.ts
  • indexer/common/src/poller/runner.test.ts
  • indexer/common/src/poller/runner.ts
  • indexer/distributions/package.json
  • indexer/distributions/src/handlers/distribution-created.handler.ts
  • indexer/distributions/src/handlers/distribution-pause.handler.ts
  • indexer/distributions/src/handlers/tokens-claimed.handler.ts
  • indexer/distributions/src/handlers/types.ts
  • indexer/distributions/src/main.ts
  • indexer/streams/package.json
  • indexer/streams/src/db/entity/CancelAction.ts
  • indexer/streams/src/db/entity/Stream.ts
  • indexer/streams/src/db/entity/WithdrawalAction.ts
  • indexer/streams/src/db/migrations/00001_InitialStreamsSchema.ts
  • indexer/streams/src/handlers/stream-cancel.handler.ts
  • indexer/streams/src/handlers/stream-funded.handler.ts
  • indexer/streams/src/handlers/stream-withdrawal.handler.ts
  • indexer/streams/src/main.ts
  • package.json

Comment on lines +47 to +70
STREAMS_CONTRACT_IDS: z
.string()
.optional()
.default("")
.transform((val) =>
val
? val
.split(",")
.map((s) => s.trim())
.filter(Boolean)
: [],
),
DISTRIBUTIONS_CONTRACT_IDS: z
.string()
.optional()
.default("")
.transform((val) =>
val
? val
.split(",")
.map((s) => s.trim())
.filter(Boolean)
: [],
),

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🗄️ Data Integrity & Integration | 🟠 Major | ⚡ Quick win

Deduplicate contract IDs during parsing.

This transform leaves duplicates intact. Both indexer entrypoints register one handler set per configured contract ID, so STREAMS_CONTRACT_IDS=C1,C1 will invoke the same handler twice for a single event before the event table can help. Please normalize these arrays here and add a regression test for duplicate values.

Suggested fix
   STREAMS_CONTRACT_IDS: z
     .string()
     .optional()
     .default("")
     .transform((val) =>
       val
-        ? val
-            .split(",")
-            .map((s) => s.trim())
-            .filter(Boolean)
+        ? [...new Set(val.split(",").map((s) => s.trim()).filter(Boolean))]
         : [],
     ),
   DISTRIBUTIONS_CONTRACT_IDS: z
     .string()
     .optional()
     .default("")
     .transform((val) =>
       val
-        ? val
-            .split(",")
-            .map((s) => s.trim())
-            .filter(Boolean)
+        ? [...new Set(val.split(",").map((s) => s.trim()).filter(Boolean))]
         : [],
     ),
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
STREAMS_CONTRACT_IDS: z
.string()
.optional()
.default("")
.transform((val) =>
val
? val
.split(",")
.map((s) => s.trim())
.filter(Boolean)
: [],
),
DISTRIBUTIONS_CONTRACT_IDS: z
.string()
.optional()
.default("")
.transform((val) =>
val
? val
.split(",")
.map((s) => s.trim())
.filter(Boolean)
: [],
),
STREAMS_CONTRACT_IDS: z
.string()
.optional()
.default("")
.transform((val) =>
val
? [...new Set(val.split(",").map((s) => s.trim()).filter(Boolean))]
: [],
),
DISTRIBUTIONS_CONTRACT_IDS: z
.string()
.optional()
.default("")
.transform((val) =>
val
? [...new Set(val.split(",").map((s) => s.trim()).filter(Boolean))]
: [],
),
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@indexer/common/src/config/env.ts` around lines 47 - 70, The
STREAMS_CONTRACT_IDS and DISTRIBUTIONS_CONTRACT_IDS parsing in the env schema
currently preserves duplicate entries, which can register the same handler more
than once. Update the shared transform logic in env.ts to normalize the parsed
arrays by removing duplicates while still trimming and filtering empty values,
and make sure both contract ID fields use that behavior. Add a regression test
for the env parsing path to verify repeated IDs like C1,C1 are returned only
once.

Comment on lines +44 to +88
if (eventRepo) {
identity = getSorobanEventIdentity(event);
const isProcessed = await eventRepo.isEventProcessed(
identity.contractId,
identity.ledgerNumber,
identity.txHash,
identity.eventIndex,
);
if (isProcessed) {
return [];
}
}

const handlers = this.matches(event);
return Promise.all(

if (handlers.length === 0) {
if (eventRepo && identity) {
await eventRepo.recordEventProcessed(
identity.contractId,
identity.ledgerNumber,
identity.txHash,
identity.eventIndex,
);
}
return [];
}

const results = await Promise.all(
handlers.map((h) =>
h(event).catch((err) => ({
ok: false as const,
error: err instanceof Error ? err.message : String(err),
retriable: true,
}))
)
})),
),
);

const allSucceeded = results.every((r) => r.ok);
if (allSucceeded && eventRepo && identity) {
await eventRepo.recordEventProcessed(
identity.contractId,
identity.ledgerNumber,
identity.txHash,
identity.eventIndex,
);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🗄️ Data Integrity & Integration | 🟠 Major | 🏗️ Heavy lift

The new idempotency flow is still race-prone across concurrent runners.

isEventProcessed() happens before handler execution, and the only atomic dedupe happens in recordEventProcessed() after side effects. If two indexers pick up the same event concurrently, both can run the handlers and only one marker row will win, so the event is still applied twice. This needs an atomic claim/reservation before dispatch, with the claim cleared or transitioned on failure.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@indexer/common/src/handlers/registry.ts` around lines 44 - 88, The
idempotency flow in the registry handler is still vulnerable to concurrent
duplicates because `isEventProcessed()` is checked before dispatch and
`recordEventProcessed()` only happens after handlers succeed. Update the event
processing path in `registry.ts` to use an atomic claim/reservation via
`eventRepo` before calling `this.matches(event)` or any handler, so only one
runner can own the event at a time. If processing fails, clear or transition
that claim appropriately; if it succeeds, finalize the processed marker in the
same flow.

Comment on lines +57 to +59
} catch (err) {
console.error(`[${name}] Failed to connect to database at ${config.databaseUrl}:`, err);
process.exit(1);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔒 Security & Privacy | 🟠 Major | ⚡ Quick win

Redact the DB URL in the connection error log.

Line 58 prints config.databaseUrl verbatim. Postgres URLs commonly embed credentials, so a startup failure will leak secrets into logs and CI output.

Suggested fix
   } catch (err) {
-    console.error(`[${name}] Failed to connect to database at ${config.databaseUrl}:`, err);
+    const safeDatabaseTarget = (() => {
+      try {
+        const url = new URL(config.databaseUrl);
+        return `${url.protocol}//${url.hostname}${url.port ? `:${url.port}` : ""}${url.pathname}`;
+      } catch {
+        return "<redacted>";
+      }
+    })();
+    console.error(`[${name}] Failed to connect to database at ${safeDatabaseTarget}:`, err);
     process.exit(1);
     return;
   }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
} catch (err) {
console.error(`[${name}] Failed to connect to database at ${config.databaseUrl}:`, err);
process.exit(1);
} catch (err) {
const safeDatabaseTarget = (() => {
try {
const url = new URL(config.databaseUrl);
return `${url.protocol}//${url.hostname}${url.port ? `:${url.port}` : ""}${url.pathname}`;
} catch {
return "<redacted>";
}
})();
console.error(`[${name}] Failed to connect to database at ${safeDatabaseTarget}:`, err);
process.exit(1);
return;
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@indexer/common/src/poller/runner.ts` around lines 57 - 59, The database
connection failure log in runner.ts currently prints config.databaseUrl
verbatim, which can expose embedded credentials; update the catch block in the
connection logic to redact the URL before logging, while still keeping useful
connection context in the console.error message. Use the existing catch path
around the database connect attempt in runner.ts and replace the direct
databaseUrl interpolation with a sanitized/redacted value.

Comment on lines +112 to +113
// Process up to 10 ledgers at a time
const endLedger = Math.min(currentLedger + 10, latestLedger);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🎯 Functional Correctness | 🟡 Minor | ⚡ Quick win

Fix the ledger chunk off-by-one.

Line 112 says "up to 10 ledgers at a time", but currentLedger + 10 makes the inclusive range contain 11 ledgers. Use + 9 if 10 is the intended batch size.

Suggested fix
-      const endLedger = Math.min(currentLedger + 10, latestLedger);
+      const endLedger = Math.min(currentLedger + 9, latestLedger);
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
// Process up to 10 ledgers at a time
const endLedger = Math.min(currentLedger + 10, latestLedger);
// Process up to 10 ledgers at a time
const endLedger = Math.min(currentLedger + 9, latestLedger);
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@indexer/common/src/poller/runner.ts` around lines 112 - 113, The ledger
batching in runner.ts is off by one: the current chunk calculation in the
polling loop makes an inclusive range cover 11 ledgers instead of 10. Update the
endLedger computation in the polling logic around currentLedger/latestLedger so
the batch size matches the “up to 10 ledgers at a time” intent, using the same
symbols already in use and adjusting the upper bound to the correct inclusive
end.

Comment on lines +172 to +175
const processEvent = async (event: SorobanEventInput) => {
// Dispatch to handlers with idempotency tracking
await registry.dispatch(event, eventRepo);
};

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🗄️ Data Integrity & Integration | 🟠 Major | 🏗️ Heavy lift

Bubble failed handler results back to the poller.

Line 174 awaits registry.dispatch() but never inspects the returned HandlerResult[]. HandlerRegistry.dispatch() converts handler failures into { ok: false } results, while SorobanPoller.processLedgerRange() only withholds updateCursor() when processEvent() throws. As written, a failed handler leaves the event unrecorded and still advances currentLedger, so transient failures get skipped instead of retried.

Suggested fix
       const processEvent = async (event: SorobanEventInput) => {
         // Dispatch to handlers with idempotency tracking
-        await registry.dispatch(event, eventRepo);
+        const results = await registry.dispatch(event, eventRepo);
+        const failedResult = results.find((result) => !result.ok);
+        if (failedResult) {
+          throw new Error(failedResult.error);
+        }
       };
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
const processEvent = async (event: SorobanEventInput) => {
// Dispatch to handlers with idempotency tracking
await registry.dispatch(event, eventRepo);
};
const processEvent = async (event: SorobanEventInput) => {
// Dispatch to handlers with idempotency tracking
const results = await registry.dispatch(event, eventRepo);
const failedResult = results.find((result) => !result.ok);
if (failedResult) {
throw new Error(failedResult.error);
}
};
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@indexer/common/src/poller/runner.ts` around lines 172 - 175, Bubble failed
handler results back up from processEvent in runner.ts so
SorobanPoller.processLedgerRange can stop advancing on failures. After
registry.dispatch(event, eventRepo) in processEvent, inspect the returned
HandlerResult[] and throw when any result has ok: false, so the existing
updateCursor() guard is triggered and failed events are retried instead of
skipped.

Comment on lines +13 to +24
for (const contractId of config.distributionsContractIds) {
registry.register({ contractId, topic: "distribution_created" }, distributionCreatedHandler);
registry.register({ contractId, topic: "tokens_claimed" }, tokensClaimedHandler);
registry.register({ contractId, topic: "distribution_paused" }, distributionPausedHandler);
registry.register({ contractId, topic: "distribution_resumed" }, distributionResumedHandler);
}

await runIndexer({
name: "distributions-indexer",
contractIds: config.distributionsContractIds,
registry,
entities: [], // No domain-specific entities yet for distributions indexer

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🗄️ Data Integrity & Integration | 🟠 Major | ⚡ Quick win

Deduplicate distributionsContractIds before wiring the registry.

HandlerRegistry.register() appends entries, and matches() dispatches to every match. If DISTRIBUTIONS_CONTRACT_IDS contains the same contract more than once, this loop registers duplicate handlers and the same event gets processed multiple times before idempotency is recorded. Normalize the list once and reuse it for both registration and runIndexer.

Suggested fix
 async function main() {
   const config = loadIndexerConfig();
+  const contractIds = [...new Set(config.distributionsContractIds)];
   const registry = new HandlerRegistry();

-  for (const contractId of config.distributionsContractIds) {
+  for (const contractId of contractIds) {
     registry.register({ contractId, topic: "distribution_created" }, distributionCreatedHandler);
     registry.register({ contractId, topic: "tokens_claimed" }, tokensClaimedHandler);
     registry.register({ contractId, topic: "distribution_paused" }, distributionPausedHandler);
     registry.register({ contractId, topic: "distribution_resumed" }, distributionResumedHandler);
   }

   await runIndexer({
     name: "distributions-indexer",
-    contractIds: config.distributionsContractIds,
+    contractIds,
     registry,
     entities: [], // No domain-specific entities yet for distributions indexer
   });
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
for (const contractId of config.distributionsContractIds) {
registry.register({ contractId, topic: "distribution_created" }, distributionCreatedHandler);
registry.register({ contractId, topic: "tokens_claimed" }, tokensClaimedHandler);
registry.register({ contractId, topic: "distribution_paused" }, distributionPausedHandler);
registry.register({ contractId, topic: "distribution_resumed" }, distributionResumedHandler);
}
await runIndexer({
name: "distributions-indexer",
contractIds: config.distributionsContractIds,
registry,
entities: [], // No domain-specific entities yet for distributions indexer
const config = loadIndexerConfig();
const contractIds = [...new Set(config.distributionsContractIds)];
const registry = new HandlerRegistry();
for (const contractId of contractIds) {
registry.register({ contractId, topic: "distribution_created" }, distributionCreatedHandler);
registry.register({ contractId, topic: "tokens_claimed" }, tokensClaimedHandler);
registry.register({ contractId, topic: "distribution_paused" }, distributionPausedHandler);
registry.register({ contractId, topic: "distribution_resumed" }, distributionResumedHandler);
}
await runIndexer({
name: "distributions-indexer",
contractIds,
registry,
entities: [], // No domain-specific entities yet for distributions indexer
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@indexer/distributions/src/main.ts` around lines 13 - 24,
`distributionsContractIds` is being used directly for both
`HandlerRegistry.register()` and `runIndexer`, which can register the same
handler multiple times when the config contains duplicate contract IDs.
Normalize the list once in `main` by deduplicating it before the registration
loop, then reuse that deduped list for both the registry wiring and the
`runIndexer` call so `distributionCreatedHandler`, `tokensClaimedHandler`,
`distributionPausedHandler`, and `distributionResumedHandler` are each
registered only once per contract.

Comment thread indexer/README.md
Comment on lines +69 to +77
| Variable | Required | Description |
|---|---|---|
| `DATABASE_URL` | ✅ | PostgreSQL connection string |
| `SOROBAN_RPC_URL` | ✅ | Soroban RPC endpoint |
| `STREAMS_CONTRACT_IDS` | ✅ (streams) | Comma-separated stream contract IDs |
| `DISTRIBUTIONS_CONTRACT_IDS` | ✅ (distributions) | Comma-separated distribution contract IDs |
| `START_LEDGER` | ☐ | Ledger sequence to start from (defaults to latest) |
| `POLL_INTERVAL_MS` | ☐ | Polling interval in ms (default: 5000) |
| `LOG_LEVEL` | ☐ | Log verbosity: `debug`, `info`, `warn`, `error` |

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

📐 Maintainability & Code Quality | 🟡 Minor | ⚡ Quick win

Mark the contract ID env vars as optional here.

Lines 73-74 label STREAMS_CONTRACT_IDS and DISTRIBUTIONS_CONTRACT_IDS as required, but the runner explicitly starts with an empty contractIds array and only warns. Reword these as "required to index events" or "optional, but the indexer becomes a no-op without them" to match runtime behavior.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@indexer/README.md` around lines 69 - 77, Update the environment variable
table in the README to reflect that STREAMS_CONTRACT_IDS and
DISTRIBUTIONS_CONTRACT_IDS are not hard-required at startup; the current runtime
behavior in the indexer runner starts with an empty contractIds list and only
warns, so rephrase their descriptions to indicate they are optional or required
only to actually index events. Keep the wording aligned with the runner’s
behavior so the README matches the logic in the indexer setup.

Comment on lines 12 to +13
"dev": "bun --watch src/index.ts",
"start": "bun src/main.ts",

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

📐 Maintainability & Code Quality | 🟡 Minor | ⚡ Quick win

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
set -euo pipefail

for file in indexer/streams/src/index.ts indexer/streams/src/main.ts; do
  if [ -f "$file" ]; then
    echo "---- $file ----"
    sed -n '1,160p' "$file"
    echo
  fi
done

Repository: Fundable-Protocol/Backend

Length of output: 1875


Point dev at the real entrypoint too

src/index.ts is just a barrel, so bun --watch src/index.ts won’t run the indexer startup path. Update dev to watch src/main.ts so local development matches start.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@indexer/streams/package.json` around lines 12 - 13, The dev script is
pointing at the barrel file instead of the actual startup entrypoint. Update the
package.json script named dev to watch src/main.ts, matching start, so local
development runs the same indexer startup path; use the existing dev and start
script entries as the symbols to update.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Add runnable indexer entrypoints for streams and distributions Integrate indexed event idempotency into poller dispatch

1 participant