feat(indexer): add idempotency and runnable entrypoints (issues #54 &…#73
feat(indexer): add idempotency and runnable entrypoints (issues #54 &…#73intelliDean wants to merge 1 commit into
Conversation
…ble-Protocol#54 & Fundable-Protocol#56) - Integrate EventRepository idempotency checks into HandlerRegistry.dispatch(): * Check if event (contractId, ledger, txHash, eventIndex) was already processed * Skip dispatch and return [] for previously seen events * Record event as processed only when all matched handlers succeed * Vacuously record events with no matching handlers * Fail open on any handler error (allow retry) - Add STREAMS_CONTRACT_IDS and DISTRIBUTIONS_CONTRACT_IDS to env config with comma-separated parsing and validation tests - Implement shared runIndexer() orchestrator (common/src/poller/runner.ts): * Boots TypeORM DataSource with graceful fail-fast on config/DB/RPC errors * Decodes Soroban event topic XDR via scValToNative * Runs polling loop calling HandlerRegistry.dispatch() with EventRepository * Handles SIGINT/SIGTERM for graceful shutdown and DB cleanup - Add main.ts entrypoints for streams and distributions indexers: * Registers all domain handlers against their respective contract IDs * Calls runIndexer() with domain-specific entities - Wire start scripts in package.json files: * indexer/streams: bun start -> src/main.ts * indexer/distributions: bun start -> src/main.ts * root: indexer:streams, indexer:distributions convenience scripts - Expand indexer/README.md with runnable indexers, config table, and idempotency documentation Closes Fundable-Protocol#54 Closes Fundable-Protocol#56
|
@intelliDean Great news! 🎉 Based on an automated assessment of this PR, the linked Wave issue(s) no longer count against your application limits. You can now already apply to more issues while waiting for a review of this PR. Keep up the great work! 🚀 |
📝 WalkthroughWalkthroughAdds ChangesIdempotent Dispatch and Event Identity
runIndexer Runner and Domain Entrypoints
Sequence Diagram(s)sequenceDiagram
participant main as streams/distributions main.ts
participant runIndexer
participant DataSource as TypeORM DataSource
participant SorobanPoller
participant HandlerRegistry
participant EventRepository
main->>runIndexer: runIndexer(options)
runIndexer->>runIndexer: loadIndexerConfig()
runIndexer->>DataSource: initialize()
runIndexer->>SorobanPoller: getLatestLedger()
loop polling loop
runIndexer->>SorobanPoller: processLedgerRange(start, end)
SorobanPoller-->>runIndexer: raw events
runIndexer->>runIndexer: decode topics/value via scValToNative
runIndexer->>HandlerRegistry: dispatch(event, eventRepo)
HandlerRegistry->>EventRepository: isEventProcessed(identity)
alt not yet processed
HandlerRegistry->>HandlerRegistry: run matched handlers
HandlerRegistry->>EventRepository: recordEventProcessed(identity)
end
HandlerRegistry-->>runIndexer: HandlerResult[]
end
Note over runIndexer: SIGINT/SIGTERM received
runIndexer->>DataSource: destroy()
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes Possibly related PRs
Suggested reviewers
🚥 Pre-merge checks | ✅ 3 | ❌ 2❌ Failed checks (2 warnings)
✅ Passed checks (3 passed)
✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 8
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@indexer/common/src/config/env.ts`:
- Around line 47-70: The STREAMS_CONTRACT_IDS and DISTRIBUTIONS_CONTRACT_IDS
parsing in the env schema currently preserves duplicate entries, which can
register the same handler more than once. Update the shared transform logic in
env.ts to normalize the parsed arrays by removing duplicates while still
trimming and filtering empty values, and make sure both contract ID fields use
that behavior. Add a regression test for the env parsing path to verify repeated
IDs like C1,C1 are returned only once.
In `@indexer/common/src/handlers/registry.ts`:
- Around line 44-88: The idempotency flow in the registry handler is still
vulnerable to concurrent duplicates because `isEventProcessed()` is checked
before dispatch and `recordEventProcessed()` only happens after handlers
succeed. Update the event processing path in `registry.ts` to use an atomic
claim/reservation via `eventRepo` before calling `this.matches(event)` or any
handler, so only one runner can own the event at a time. If processing fails,
clear or transition that claim appropriately; if it succeeds, finalize the
processed marker in the same flow.
In `@indexer/common/src/poller/runner.ts`:
- Around line 172-175: Bubble failed handler results back up from processEvent
in runner.ts so SorobanPoller.processLedgerRange can stop advancing on failures.
After registry.dispatch(event, eventRepo) in processEvent, inspect the returned
HandlerResult[] and throw when any result has ok: false, so the existing
updateCursor() guard is triggered and failed events are retried instead of
skipped.
- Around line 57-59: The database connection failure log in runner.ts currently
prints config.databaseUrl verbatim, which can expose embedded credentials;
update the catch block in the connection logic to redact the URL before logging,
while still keeping useful connection context in the console.error message. Use
the existing catch path around the database connect attempt in runner.ts and
replace the direct databaseUrl interpolation with a sanitized/redacted value.
- Around line 112-113: The ledger batching in runner.ts is off by one: the
current chunk calculation in the polling loop makes an inclusive range cover 11
ledgers instead of 10. Update the endLedger computation in the polling logic
around currentLedger/latestLedger so the batch size matches the “up to 10
ledgers at a time” intent, using the same symbols already in use and adjusting
the upper bound to the correct inclusive end.
In `@indexer/distributions/src/main.ts`:
- Around line 13-24: `distributionsContractIds` is being used directly for both
`HandlerRegistry.register()` and `runIndexer`, which can register the same
handler multiple times when the config contains duplicate contract IDs.
Normalize the list once in `main` by deduplicating it before the registration
loop, then reuse that deduped list for both the registry wiring and the
`runIndexer` call so `distributionCreatedHandler`, `tokensClaimedHandler`,
`distributionPausedHandler`, and `distributionResumedHandler` are each
registered only once per contract.
In `@indexer/README.md`:
- Around line 69-77: Update the environment variable table in the README to
reflect that STREAMS_CONTRACT_IDS and DISTRIBUTIONS_CONTRACT_IDS are not
hard-required at startup; the current runtime behavior in the indexer runner
starts with an empty contractIds list and only warns, so rephrase their
descriptions to indicate they are optional or required only to actually index
events. Keep the wording aligned with the runner’s behavior so the README
matches the logic in the indexer setup.
In `@indexer/streams/package.json`:
- Around line 12-13: The dev script is pointing at the barrel file instead of
the actual startup entrypoint. Update the package.json script named dev to watch
src/main.ts, matching start, so local development runs the same indexer startup
path; use the existing dev and start script entries as the symbols to update.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 13754fb5-ac5b-48ed-8871-339963534d22
📒 Files selected for processing (27)
indexer/README.mdindexer/common/src/config/env.test.tsindexer/common/src/config/env.tsindexer/common/src/db/repository.tsindexer/common/src/handlers/registry.test.tsindexer/common/src/handlers/registry.tsindexer/common/src/handlers/types.tsindexer/common/src/index.tsindexer/common/src/poller/index.tsindexer/common/src/poller/runner.test.tsindexer/common/src/poller/runner.tsindexer/distributions/package.jsonindexer/distributions/src/handlers/distribution-created.handler.tsindexer/distributions/src/handlers/distribution-pause.handler.tsindexer/distributions/src/handlers/tokens-claimed.handler.tsindexer/distributions/src/handlers/types.tsindexer/distributions/src/main.tsindexer/streams/package.jsonindexer/streams/src/db/entity/CancelAction.tsindexer/streams/src/db/entity/Stream.tsindexer/streams/src/db/entity/WithdrawalAction.tsindexer/streams/src/db/migrations/00001_InitialStreamsSchema.tsindexer/streams/src/handlers/stream-cancel.handler.tsindexer/streams/src/handlers/stream-funded.handler.tsindexer/streams/src/handlers/stream-withdrawal.handler.tsindexer/streams/src/main.tspackage.json
| STREAMS_CONTRACT_IDS: z | ||
| .string() | ||
| .optional() | ||
| .default("") | ||
| .transform((val) => | ||
| val | ||
| ? val | ||
| .split(",") | ||
| .map((s) => s.trim()) | ||
| .filter(Boolean) | ||
| : [], | ||
| ), | ||
| DISTRIBUTIONS_CONTRACT_IDS: z | ||
| .string() | ||
| .optional() | ||
| .default("") | ||
| .transform((val) => | ||
| val | ||
| ? val | ||
| .split(",") | ||
| .map((s) => s.trim()) | ||
| .filter(Boolean) | ||
| : [], | ||
| ), |
There was a problem hiding this comment.
🗄️ Data Integrity & Integration | 🟠 Major | ⚡ Quick win
Deduplicate contract IDs during parsing.
This transform leaves duplicates intact. Both indexer entrypoints register one handler set per configured contract ID, so STREAMS_CONTRACT_IDS=C1,C1 will invoke the same handler twice for a single event before the event table can help. Please normalize these arrays here and add a regression test for duplicate values.
Suggested fix
STREAMS_CONTRACT_IDS: z
.string()
.optional()
.default("")
.transform((val) =>
val
- ? val
- .split(",")
- .map((s) => s.trim())
- .filter(Boolean)
+ ? [...new Set(val.split(",").map((s) => s.trim()).filter(Boolean))]
: [],
),
DISTRIBUTIONS_CONTRACT_IDS: z
.string()
.optional()
.default("")
.transform((val) =>
val
- ? val
- .split(",")
- .map((s) => s.trim())
- .filter(Boolean)
+ ? [...new Set(val.split(",").map((s) => s.trim()).filter(Boolean))]
: [],
),📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| STREAMS_CONTRACT_IDS: z | |
| .string() | |
| .optional() | |
| .default("") | |
| .transform((val) => | |
| val | |
| ? val | |
| .split(",") | |
| .map((s) => s.trim()) | |
| .filter(Boolean) | |
| : [], | |
| ), | |
| DISTRIBUTIONS_CONTRACT_IDS: z | |
| .string() | |
| .optional() | |
| .default("") | |
| .transform((val) => | |
| val | |
| ? val | |
| .split(",") | |
| .map((s) => s.trim()) | |
| .filter(Boolean) | |
| : [], | |
| ), | |
| STREAMS_CONTRACT_IDS: z | |
| .string() | |
| .optional() | |
| .default("") | |
| .transform((val) => | |
| val | |
| ? [...new Set(val.split(",").map((s) => s.trim()).filter(Boolean))] | |
| : [], | |
| ), | |
| DISTRIBUTIONS_CONTRACT_IDS: z | |
| .string() | |
| .optional() | |
| .default("") | |
| .transform((val) => | |
| val | |
| ? [...new Set(val.split(",").map((s) => s.trim()).filter(Boolean))] | |
| : [], | |
| ), |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@indexer/common/src/config/env.ts` around lines 47 - 70, The
STREAMS_CONTRACT_IDS and DISTRIBUTIONS_CONTRACT_IDS parsing in the env schema
currently preserves duplicate entries, which can register the same handler more
than once. Update the shared transform logic in env.ts to normalize the parsed
arrays by removing duplicates while still trimming and filtering empty values,
and make sure both contract ID fields use that behavior. Add a regression test
for the env parsing path to verify repeated IDs like C1,C1 are returned only
once.
| if (eventRepo) { | ||
| identity = getSorobanEventIdentity(event); | ||
| const isProcessed = await eventRepo.isEventProcessed( | ||
| identity.contractId, | ||
| identity.ledgerNumber, | ||
| identity.txHash, | ||
| identity.eventIndex, | ||
| ); | ||
| if (isProcessed) { | ||
| return []; | ||
| } | ||
| } | ||
|
|
||
| const handlers = this.matches(event); | ||
| return Promise.all( | ||
|
|
||
| if (handlers.length === 0) { | ||
| if (eventRepo && identity) { | ||
| await eventRepo.recordEventProcessed( | ||
| identity.contractId, | ||
| identity.ledgerNumber, | ||
| identity.txHash, | ||
| identity.eventIndex, | ||
| ); | ||
| } | ||
| return []; | ||
| } | ||
|
|
||
| const results = await Promise.all( | ||
| handlers.map((h) => | ||
| h(event).catch((err) => ({ | ||
| ok: false as const, | ||
| error: err instanceof Error ? err.message : String(err), | ||
| retriable: true, | ||
| })) | ||
| ) | ||
| })), | ||
| ), | ||
| ); | ||
|
|
||
| const allSucceeded = results.every((r) => r.ok); | ||
| if (allSucceeded && eventRepo && identity) { | ||
| await eventRepo.recordEventProcessed( | ||
| identity.contractId, | ||
| identity.ledgerNumber, | ||
| identity.txHash, | ||
| identity.eventIndex, | ||
| ); |
There was a problem hiding this comment.
🗄️ Data Integrity & Integration | 🟠 Major | 🏗️ Heavy lift
The new idempotency flow is still race-prone across concurrent runners.
isEventProcessed() happens before handler execution, and the only atomic dedupe happens in recordEventProcessed() after side effects. If two indexers pick up the same event concurrently, both can run the handlers and only one marker row will win, so the event is still applied twice. This needs an atomic claim/reservation before dispatch, with the claim cleared or transitioned on failure.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@indexer/common/src/handlers/registry.ts` around lines 44 - 88, The
idempotency flow in the registry handler is still vulnerable to concurrent
duplicates because `isEventProcessed()` is checked before dispatch and
`recordEventProcessed()` only happens after handlers succeed. Update the event
processing path in `registry.ts` to use an atomic claim/reservation via
`eventRepo` before calling `this.matches(event)` or any handler, so only one
runner can own the event at a time. If processing fails, clear or transition
that claim appropriately; if it succeeds, finalize the processed marker in the
same flow.
| } catch (err) { | ||
| console.error(`[${name}] Failed to connect to database at ${config.databaseUrl}:`, err); | ||
| process.exit(1); |
There was a problem hiding this comment.
🔒 Security & Privacy | 🟠 Major | ⚡ Quick win
Redact the DB URL in the connection error log.
Line 58 prints config.databaseUrl verbatim. Postgres URLs commonly embed credentials, so a startup failure will leak secrets into logs and CI output.
Suggested fix
} catch (err) {
- console.error(`[${name}] Failed to connect to database at ${config.databaseUrl}:`, err);
+ const safeDatabaseTarget = (() => {
+ try {
+ const url = new URL(config.databaseUrl);
+ return `${url.protocol}//${url.hostname}${url.port ? `:${url.port}` : ""}${url.pathname}`;
+ } catch {
+ return "<redacted>";
+ }
+ })();
+ console.error(`[${name}] Failed to connect to database at ${safeDatabaseTarget}:`, err);
process.exit(1);
return;
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| } catch (err) { | |
| console.error(`[${name}] Failed to connect to database at ${config.databaseUrl}:`, err); | |
| process.exit(1); | |
| } catch (err) { | |
| const safeDatabaseTarget = (() => { | |
| try { | |
| const url = new URL(config.databaseUrl); | |
| return `${url.protocol}//${url.hostname}${url.port ? `:${url.port}` : ""}${url.pathname}`; | |
| } catch { | |
| return "<redacted>"; | |
| } | |
| })(); | |
| console.error(`[${name}] Failed to connect to database at ${safeDatabaseTarget}:`, err); | |
| process.exit(1); | |
| return; |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@indexer/common/src/poller/runner.ts` around lines 57 - 59, The database
connection failure log in runner.ts currently prints config.databaseUrl
verbatim, which can expose embedded credentials; update the catch block in the
connection logic to redact the URL before logging, while still keeping useful
connection context in the console.error message. Use the existing catch path
around the database connect attempt in runner.ts and replace the direct
databaseUrl interpolation with a sanitized/redacted value.
| // Process up to 10 ledgers at a time | ||
| const endLedger = Math.min(currentLedger + 10, latestLedger); |
There was a problem hiding this comment.
🎯 Functional Correctness | 🟡 Minor | ⚡ Quick win
Fix the ledger chunk off-by-one.
Line 112 says "up to 10 ledgers at a time", but currentLedger + 10 makes the inclusive range contain 11 ledgers. Use + 9 if 10 is the intended batch size.
Suggested fix
- const endLedger = Math.min(currentLedger + 10, latestLedger);
+ const endLedger = Math.min(currentLedger + 9, latestLedger);📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| // Process up to 10 ledgers at a time | |
| const endLedger = Math.min(currentLedger + 10, latestLedger); | |
| // Process up to 10 ledgers at a time | |
| const endLedger = Math.min(currentLedger + 9, latestLedger); |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@indexer/common/src/poller/runner.ts` around lines 112 - 113, The ledger
batching in runner.ts is off by one: the current chunk calculation in the
polling loop makes an inclusive range cover 11 ledgers instead of 10. Update the
endLedger computation in the polling logic around currentLedger/latestLedger so
the batch size matches the “up to 10 ledgers at a time” intent, using the same
symbols already in use and adjusting the upper bound to the correct inclusive
end.
| const processEvent = async (event: SorobanEventInput) => { | ||
| // Dispatch to handlers with idempotency tracking | ||
| await registry.dispatch(event, eventRepo); | ||
| }; |
There was a problem hiding this comment.
🗄️ Data Integrity & Integration | 🟠 Major | 🏗️ Heavy lift
Bubble failed handler results back to the poller.
Line 174 awaits registry.dispatch() but never inspects the returned HandlerResult[]. HandlerRegistry.dispatch() converts handler failures into { ok: false } results, while SorobanPoller.processLedgerRange() only withholds updateCursor() when processEvent() throws. As written, a failed handler leaves the event unrecorded and still advances currentLedger, so transient failures get skipped instead of retried.
Suggested fix
const processEvent = async (event: SorobanEventInput) => {
// Dispatch to handlers with idempotency tracking
- await registry.dispatch(event, eventRepo);
+ const results = await registry.dispatch(event, eventRepo);
+ const failedResult = results.find((result) => !result.ok);
+ if (failedResult) {
+ throw new Error(failedResult.error);
+ }
};📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| const processEvent = async (event: SorobanEventInput) => { | |
| // Dispatch to handlers with idempotency tracking | |
| await registry.dispatch(event, eventRepo); | |
| }; | |
| const processEvent = async (event: SorobanEventInput) => { | |
| // Dispatch to handlers with idempotency tracking | |
| const results = await registry.dispatch(event, eventRepo); | |
| const failedResult = results.find((result) => !result.ok); | |
| if (failedResult) { | |
| throw new Error(failedResult.error); | |
| } | |
| }; |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@indexer/common/src/poller/runner.ts` around lines 172 - 175, Bubble failed
handler results back up from processEvent in runner.ts so
SorobanPoller.processLedgerRange can stop advancing on failures. After
registry.dispatch(event, eventRepo) in processEvent, inspect the returned
HandlerResult[] and throw when any result has ok: false, so the existing
updateCursor() guard is triggered and failed events are retried instead of
skipped.
| for (const contractId of config.distributionsContractIds) { | ||
| registry.register({ contractId, topic: "distribution_created" }, distributionCreatedHandler); | ||
| registry.register({ contractId, topic: "tokens_claimed" }, tokensClaimedHandler); | ||
| registry.register({ contractId, topic: "distribution_paused" }, distributionPausedHandler); | ||
| registry.register({ contractId, topic: "distribution_resumed" }, distributionResumedHandler); | ||
| } | ||
|
|
||
| await runIndexer({ | ||
| name: "distributions-indexer", | ||
| contractIds: config.distributionsContractIds, | ||
| registry, | ||
| entities: [], // No domain-specific entities yet for distributions indexer |
There was a problem hiding this comment.
🗄️ Data Integrity & Integration | 🟠 Major | ⚡ Quick win
Deduplicate distributionsContractIds before wiring the registry.
HandlerRegistry.register() appends entries, and matches() dispatches to every match. If DISTRIBUTIONS_CONTRACT_IDS contains the same contract more than once, this loop registers duplicate handlers and the same event gets processed multiple times before idempotency is recorded. Normalize the list once and reuse it for both registration and runIndexer.
Suggested fix
async function main() {
const config = loadIndexerConfig();
+ const contractIds = [...new Set(config.distributionsContractIds)];
const registry = new HandlerRegistry();
- for (const contractId of config.distributionsContractIds) {
+ for (const contractId of contractIds) {
registry.register({ contractId, topic: "distribution_created" }, distributionCreatedHandler);
registry.register({ contractId, topic: "tokens_claimed" }, tokensClaimedHandler);
registry.register({ contractId, topic: "distribution_paused" }, distributionPausedHandler);
registry.register({ contractId, topic: "distribution_resumed" }, distributionResumedHandler);
}
await runIndexer({
name: "distributions-indexer",
- contractIds: config.distributionsContractIds,
+ contractIds,
registry,
entities: [], // No domain-specific entities yet for distributions indexer
});
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| for (const contractId of config.distributionsContractIds) { | |
| registry.register({ contractId, topic: "distribution_created" }, distributionCreatedHandler); | |
| registry.register({ contractId, topic: "tokens_claimed" }, tokensClaimedHandler); | |
| registry.register({ contractId, topic: "distribution_paused" }, distributionPausedHandler); | |
| registry.register({ contractId, topic: "distribution_resumed" }, distributionResumedHandler); | |
| } | |
| await runIndexer({ | |
| name: "distributions-indexer", | |
| contractIds: config.distributionsContractIds, | |
| registry, | |
| entities: [], // No domain-specific entities yet for distributions indexer | |
| const config = loadIndexerConfig(); | |
| const contractIds = [...new Set(config.distributionsContractIds)]; | |
| const registry = new HandlerRegistry(); | |
| for (const contractId of contractIds) { | |
| registry.register({ contractId, topic: "distribution_created" }, distributionCreatedHandler); | |
| registry.register({ contractId, topic: "tokens_claimed" }, tokensClaimedHandler); | |
| registry.register({ contractId, topic: "distribution_paused" }, distributionPausedHandler); | |
| registry.register({ contractId, topic: "distribution_resumed" }, distributionResumedHandler); | |
| } | |
| await runIndexer({ | |
| name: "distributions-indexer", | |
| contractIds, | |
| registry, | |
| entities: [], // No domain-specific entities yet for distributions indexer |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@indexer/distributions/src/main.ts` around lines 13 - 24,
`distributionsContractIds` is being used directly for both
`HandlerRegistry.register()` and `runIndexer`, which can register the same
handler multiple times when the config contains duplicate contract IDs.
Normalize the list once in `main` by deduplicating it before the registration
loop, then reuse that deduped list for both the registry wiring and the
`runIndexer` call so `distributionCreatedHandler`, `tokensClaimedHandler`,
`distributionPausedHandler`, and `distributionResumedHandler` are each
registered only once per contract.
| | Variable | Required | Description | | ||
| |---|---|---| | ||
| | `DATABASE_URL` | ✅ | PostgreSQL connection string | | ||
| | `SOROBAN_RPC_URL` | ✅ | Soroban RPC endpoint | | ||
| | `STREAMS_CONTRACT_IDS` | ✅ (streams) | Comma-separated stream contract IDs | | ||
| | `DISTRIBUTIONS_CONTRACT_IDS` | ✅ (distributions) | Comma-separated distribution contract IDs | | ||
| | `START_LEDGER` | ☐ | Ledger sequence to start from (defaults to latest) | | ||
| | `POLL_INTERVAL_MS` | ☐ | Polling interval in ms (default: 5000) | | ||
| | `LOG_LEVEL` | ☐ | Log verbosity: `debug`, `info`, `warn`, `error` | |
There was a problem hiding this comment.
📐 Maintainability & Code Quality | 🟡 Minor | ⚡ Quick win
Mark the contract ID env vars as optional here.
Lines 73-74 label STREAMS_CONTRACT_IDS and DISTRIBUTIONS_CONTRACT_IDS as required, but the runner explicitly starts with an empty contractIds array and only warns. Reword these as "required to index events" or "optional, but the indexer becomes a no-op without them" to match runtime behavior.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@indexer/README.md` around lines 69 - 77, Update the environment variable
table in the README to reflect that STREAMS_CONTRACT_IDS and
DISTRIBUTIONS_CONTRACT_IDS are not hard-required at startup; the current runtime
behavior in the indexer runner starts with an empty contractIds list and only
warns, so rephrase their descriptions to indicate they are optional or required
only to actually index events. Keep the wording aligned with the runner’s
behavior so the README matches the logic in the indexer setup.
| "dev": "bun --watch src/index.ts", | ||
| "start": "bun src/main.ts", |
There was a problem hiding this comment.
📐 Maintainability & Code Quality | 🟡 Minor | ⚡ Quick win
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
set -euo pipefail
for file in indexer/streams/src/index.ts indexer/streams/src/main.ts; do
if [ -f "$file" ]; then
echo "---- $file ----"
sed -n '1,160p' "$file"
echo
fi
doneRepository: Fundable-Protocol/Backend
Length of output: 1875
Point dev at the real entrypoint too
src/index.ts is just a barrel, so bun --watch src/index.ts won’t run the indexer startup path. Update dev to watch src/main.ts so local development matches start.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@indexer/streams/package.json` around lines 12 - 13, The dev script is
pointing at the barrel file instead of the actual startup entrypoint. Update the
package.json script named dev to watch src/main.ts, matching start, so local
development runs the same indexer startup path; use the existing dev and start
script entries as the symbols to update.
… #56)
Integrate EventRepository idempotency checks into HandlerRegistry.dispatch():
Add STREAMS_CONTRACT_IDS and DISTRIBUTIONS_CONTRACT_IDS to env config with comma-separated parsing and validation tests
Implement shared runIndexer() orchestrator (common/src/poller/runner.ts):
Add main.ts entrypoints for streams and distributions indexers:
Wire start scripts in package.json files:
Expand indexer/README.md with runnable indexers, config table, and idempotency documentation
Closes #54
Closes #56
Summary
Describe what changed and why.
Area
src/)indexer/common/)indexer/streams/)indexer/distributions/)Scope
Verification
bun run type-checkbun run testbun run lintbun run indexer:type-checkif indexer files changedbun run indexer:testif indexer files changedbun run indexer:lintif indexer files changedIndexer Safety
Notes
Closes #
Summary by CodeRabbit
New Features
Bug Fixes
Documentation