Skip to content

feat: batch cranks#1190

Draft
Dodecahedr0x wants to merge 3 commits into
masterfrom
fix/crank-perfs
Draft

feat: batch cranks#1190
Dodecahedr0x wants to merge 3 commits into
masterfrom
fix/crank-perfs

Conversation

@Dodecahedr0x
Copy link
Copy Markdown
Contributor

@Dodecahedr0x Dodecahedr0x commented May 12, 2026

Closes #1189

Summary

Batch crank executions and stop waiting for them to finish before processsing more cranks.

Breaking Changes

  • None

Summary by CodeRabbit

  • Documentation

    • Updated task scheduler docs and README: new min-interval and reset settings; expanded performance guidance for batching, concurrency, and retry/backoff strategies.
  • Dependencies

    • Bumped solana-pubkey from 3.0 to 4.1.
  • Improvements

    • Task scheduler now executes due tasks in concurrent crank batches, with improved batch send logic and clearer failure recording/rescheduling.

Review Change Stack

@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai Bot commented May 12, 2026

📝 Walkthrough

Walkthrough

This PR refactors TaskSchedulerService to batch expired tasks and send their transactions concurrently. RpcClient and tx_counter are now Arc-wrapped; new helpers (apply_successful_execution, on_crank_batch_completed) handle batch results; send_crank_batch constructs per-task noop+execute transactions and dispatches them via a JoinSet; errors are stringified before DB recording. Cargo.toml dependency formatting was adjusted and solana-pubkey was bumped to 4.1; task-scheduler docs/README were updated to document min-interval and batching considerations.

Assessment against linked issues

Objective Addressed Explanation
Batch execution of tasks to improve crank throughput [#1189]

Suggested reviewers

  • GabrielePicco
  • taco-paco
✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch fix/crank-perfs

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@github-actions
Copy link
Copy Markdown

github-actions Bot commented May 12, 2026

Manual Deploy Available

You can trigger a manual deploy of this PR branch to testnet:

Deploy to Testnet 🚀

Alternative: Comment /deploy on this PR to trigger deployment directly.

⚠️ Note: Manual deploy requires authorization. Only authorized users can trigger deployments.

Comment updated automatically when the PR is synchronized.

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@magicblock-task-scheduler/README.md`:
- Around line 38-39: The README claims crank sends are semaphore-bounded but the
scheduler actually fans out sends unrestricted; update either the docs or the
code: either change the README wording to remove "bounded" or implement a
semaphore cap in the scheduler where crank sends are dispatched (the code paths
around send_transaction and the RpcSendTransactionConfig usage in the task
scheduler/crank dispatch routine) so that the send loop respects a semaphore
permit limit from RpcSendTransactionConfig before calling send_transaction,
ensuring concurrent sends are limited.

In `@magicblock-task-scheduler/src/service.rs`:
- Around line 276-299: The current on_crank_batch_completed collapses all
per-task results into one Result causing successful tasks to be retried when any
single task fails; change its input to accept per-task results (e.g.
Vec<(DbTask, TaskSchedulerResult<Signature>)>) and iterate over each tuple: call
apply_successful_execution(task).await? for Ok(signature) entries and call
handle_failed_task_execution(task, &err).await? for Err(err) entries. Update the
on_crank_batch_completed signature and all callers to produce per-task results
(rather than a single TaskSchedulerResult<Vec<Signature>>), and keep using the
existing DbTask, TaskSchedulerResult, Signature, apply_successful_execution and
handle_failed_task_execution symbols to implement per-task handling.
- Around line 477-497: The loop currently spawns an unbounded send_transaction
future per task (inside join_set.spawn) which can flood RPC; limit concurrency
by introducing a bounded semaphore (tokio::sync::Semaphore) or use a buffered
executor (futures::stream::StreamExt::buffer_unordered) so only N concurrent
sends run. Acquire a permit before calling join_set.spawn (or before awaiting
each buffered future) and let the permit drop after the send finishes; also wrap
the rpc_client.send_transaction call with a per-call timeout
(tokio::time::timeout) and propagate or log timeout errors. Update the block
that builds ixs/tx and the async closure around rpc_client.send_transaction
(referenced as rpc_client, tx_counter, join_set.spawn, InstructionUtils::*,
Transaction::new) to use the semaphore/timeout pattern so bursts are bounded.
- Around line 353-360: The loop double-polls the delay queue (calling
self.task_queue.poll_expired(&mut cx) in the while condition and again inside
the match), which discards the first expired entry; change the loop to poll
exactly once per iteration by binding the result from poll_expired and using
that bound value (the expired variable) for processing: call
self.task_queue.poll_expired(&mut cx) once per iteration, call
expired.into_inner() to get task, then remove its id from task_queue_keys and
push it into batch; alternatively replace the while-let with a loop that matches
self.task_queue.poll_expired(&mut cx) and handles Poll::Ready(Some(expired)),
Poll::Ready(None) and Poll::Pending accordingly so no expired entry is dropped.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Path: .coderabbit.yaml

Review profile: ASSERTIVE

Plan: Pro

Run ID: 5be1b3a8-33d5-4c0a-82f4-7011633acd4c

📥 Commits

Reviewing files that changed from the base of the PR and between 4fac49a and 12b286a.

⛔ Files ignored due to path filters (2)
  • Cargo.lock is excluded by !**/*.lock
  • test-integration/Cargo.lock is excluded by !**/*.lock
📒 Files selected for processing (4)
  • Cargo.toml
  • docs/task-scheduler.md
  • magicblock-task-scheduler/README.md
  • magicblock-task-scheduler/src/service.rs

Comment on lines +38 to +39
- Same-tick delay-queue draining; bounded parallel `send_transaction` via semaphore.
- `Arc` for stored instructions; configurable `RpcSendTransactionConfig` for crank sends.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Performance note is out of sync with implementation.

Line 38 says crank sends are semaphore-bounded, but current scheduler code fans out sends without a semaphore cap. Please either update the doc wording or add the actual bound in code so operators get accurate behavior expectations.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@magicblock-task-scheduler/README.md` around lines 38 - 39, The README claims
crank sends are semaphore-bounded but the scheduler actually fans out sends
unrestricted; update either the docs or the code: either change the README
wording to remove "bounded" or implement a semaphore cap in the scheduler where
crank sends are dispatched (the code paths around send_transaction and the
RpcSendTransactionConfig usage in the task scheduler/crank dispatch routine) so
that the send loop respects a semaphore permit limit from
RpcSendTransactionConfig before calling send_transaction, ensuring concurrent
sends are limited.

Comment on lines +276 to 299
async fn on_crank_batch_completed(
&mut self,
batch: Vec<DbTask>,
result: TaskSchedulerResult<Vec<Signature>>,
) -> TaskSchedulerResult<()> {
match result {
Ok(sig) => {
debug!(
"Executed crank batch ({} tasks) with signatures {:?}",
batch.len(),
sig
);
for task in &batch {
self.apply_successful_execution(task).await?;
}
}
Err(ref e) => {
for task in &batch {
self.handle_failed_task_execution(task, e).await?;
}
}
}
Ok(())
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical | 🏗️ Heavy lift

Batch-level error collapsing causes incorrect retries after partial success.

Line 499 collapses per-task outcomes into one Result. If one task send fails, on_crank_batch_completed (Lines 292-295) handles the entire batch as failed, including tasks that already succeeded. That risks duplicate executions and inconsistent DB state.

Please return/process per-task results (e.g., Vec<(DbTask, TaskSchedulerResult<Signature>)>) and apply success/failure per entry instead of per batch.

Also applies to: 474-500

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@magicblock-task-scheduler/src/service.rs` around lines 276 - 299, The current
on_crank_batch_completed collapses all per-task results into one Result causing
successful tasks to be retried when any single task fails; change its input to
accept per-task results (e.g. Vec<(DbTask, TaskSchedulerResult<Signature>)>) and
iterate over each tuple: call apply_successful_execution(task).await? for
Ok(signature) entries and call handle_failed_task_execution(task, &err).await?
for Err(err) entries. Update the on_crank_batch_completed signature and all
callers to produce per-task results (rather than a single
TaskSchedulerResult<Vec<Signature>>), and keep using the existing DbTask,
TaskSchedulerResult, Signature, apply_successful_execution and
handle_failed_task_execution symbols to implement per-task handling.

Comment thread magicblock-task-scheduler/src/service.rs Outdated
Comment on lines +477 to 497
for task in tasks {
let rpc_client = rpc_client.clone();
let tx_counter = tx_counter.clone();
let task = task.clone();
join_set.spawn(async move {
let ixs = vec![
InstructionUtils::noop_instruction(
tx_counter.fetch_add(1, Ordering::Relaxed),
),
InstructionUtils::execute_task_instruction(
task.instructions.clone(),
),
];
let tx = Transaction::new(
&[validator_authority()],
Message::new(&ixs, Some(&validator_authority_id())),
blockhash,
);
Ok(rpc_client.send_transaction(&tx).await.map_err(Box::new)?)
});
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | 🏗️ Heavy lift

Unbounded RPC fan-out can overload the scheduler and RPC endpoint.

This spawns one send_transaction future per task with no concurrency limit. Large due-task bursts can flood outbound RPC and degrade scheduler responsiveness. Add a bounded semaphore (or chunked join) and a per-send timeout to contain blast radius.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@magicblock-task-scheduler/src/service.rs` around lines 477 - 497, The loop
currently spawns an unbounded send_transaction future per task (inside
join_set.spawn) which can flood RPC; limit concurrency by introducing a bounded
semaphore (tokio::sync::Semaphore) or use a buffered executor
(futures::stream::StreamExt::buffer_unordered) so only N concurrent sends run.
Acquire a permit before calling join_set.spawn (or before awaiting each buffered
future) and let the permit drop after the send finishes; also wrap the
rpc_client.send_transaction call with a per-call timeout (tokio::time::timeout)
and propagate or log timeout errors. Update the block that builds ixs/tx and the
async closure around rpc_client.send_transaction (referenced as rpc_client,
tx_counter, join_set.spawn, InstructionUtils::*, Transaction::new) to use the
semaphore/timeout pattern so bursts are bounded.

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (2)
magicblock-task-scheduler/src/service.rs (2)

276-299: ⚠️ Potential issue | 🔴 Critical | 🏗️ Heavy lift

Per-task vs batch error handling still collapses partial successes.

send_crank_batch returns a single TaskSchedulerResult<Vec<Signature>> (Line 494 short-circuits on the first error via collect::<Result<Vec<_>, _>>()), so the Err arm here invokes handle_failed_task_execution for every task in batch — including the ones whose send_transaction already succeeded on-chain. That schedules retries / failure moves for already-executed tasks, producing duplicate executions and inconsistent DB state.

Return per-task outcomes from send_crank_batch (e.g. Vec<(DbTask, TaskSchedulerResult<Signature>)>) and dispatch apply_successful_execution / handle_failed_task_execution per entry instead of per batch.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@magicblock-task-scheduler/src/service.rs` around lines 276 - 299, The current
on_crank_batch_completed matches on a single TaskSchedulerResult<Vec<Signature>>
from send_crank_batch and treats the whole batch as failed if send_crank_batch
returns Err, which collapses partial successes; change send_crank_batch to
return per-task results (e.g. Vec<(DbTask, TaskSchedulerResult<Signature>)> or
Vec<TaskResult>) and update on_crank_batch_completed to iterate the per-task
outcomes and call apply_successful_execution(task).await? for Ok(signature)
entries and handle_failed_task_execution(task, &err).await? for Err(err) entries
so only genuinely failed tasks are retried/marked failed while successful tasks
are applied once. Ensure types and call sites that use send_crank_batch and
TaskSchedulerResult are updated accordingly.

464-496: ⚠️ Potential issue | 🟠 Major | 🏗️ Heavy lift

Unbounded RPC fan-out and no per-send timeout still present.

The loop spawns one send_transaction future per task into JoinSet with no concurrency cap, and rpc_client.send_transaction(&tx).await (Line 490) has no timeout. A large burst of due tasks can flood the RPC endpoint, and any slow/stuck send blocks the batch's join_all (Line 493) indefinitely, in turn stalling on_crank_batch_completed for the whole batch. Combined with detached tokio::spawns in run, concurrency across in-flight batches is also unbounded.

Please add a bounded Semaphore (or futures::stream::buffer_unordered) and wrap each send_transaction in tokio::time::timeout so blast radius is contained.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@magicblock-task-scheduler/src/service.rs` around lines 464 - 496, The
send_crank_batch function currently spawns an unbounded number of
send_transaction futures into the JoinSet and awaits them with no per-send
timeout; change it to limit concurrency (e.g. create a Semaphore or use
futures::stream::iter(tasks).map(|t| ...).buffer_unordered(CONCURRENCY)) and
acquire a permit for each send so only N RPC calls run concurrently, and wrap
each rpc_client.send_transaction(&tx).await call in
tokio::time::timeout(DURATION, ...); convert timeout expiry into a
TaskSchedulerResult error (or map it to a recoverable error) so
join_all/buffer_unordered does not hang forever; update references inside
send_crank_batch (the JoinSet spawn/collect logic and the send_transaction call)
to use the bounded concurrency + timeout approach.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@magicblock-task-scheduler/src/service.rs`:
- Around line 359-368: The spawned batch-send tasks in run() currently use
tokio::spawn and drop the JoinHandle so they remain running after shutdown;
modify the spawn logic to respect self.token by either (A) collect and manage
JoinHandles with a JoinSet/TaskTracker inside the struct and await/join them on
shutdown in run(), or (B) pass a clone of self.token into the async move and
wrap send_crank_batch/send_transaction with a tokio::select! that aborts the
send when token.cancelled() fires; ensure you reference and update the call site
that invokes send_crank_batch(rpc_client, &block, tx_counter, &batch).await as
well as the crank_tx send so tasks are cancelled and cleaned up when
token.cancelled() triggers.

---

Duplicate comments:
In `@magicblock-task-scheduler/src/service.rs`:
- Around line 276-299: The current on_crank_batch_completed matches on a single
TaskSchedulerResult<Vec<Signature>> from send_crank_batch and treats the whole
batch as failed if send_crank_batch returns Err, which collapses partial
successes; change send_crank_batch to return per-task results (e.g. Vec<(DbTask,
TaskSchedulerResult<Signature>)> or Vec<TaskResult>) and update
on_crank_batch_completed to iterate the per-task outcomes and call
apply_successful_execution(task).await? for Ok(signature) entries and
handle_failed_task_execution(task, &err).await? for Err(err) entries so only
genuinely failed tasks are retried/marked failed while successful tasks are
applied once. Ensure types and call sites that use send_crank_batch and
TaskSchedulerResult are updated accordingly.
- Around line 464-496: The send_crank_batch function currently spawns an
unbounded number of send_transaction futures into the JoinSet and awaits them
with no per-send timeout; change it to limit concurrency (e.g. create a
Semaphore or use futures::stream::iter(tasks).map(|t|
...).buffer_unordered(CONCURRENCY)) and acquire a permit for each send so only N
RPC calls run concurrently, and wrap each rpc_client.send_transaction(&tx).await
call in tokio::time::timeout(DURATION, ...); convert timeout expiry into a
TaskSchedulerResult error (or map it to a recoverable error) so
join_all/buffer_unordered does not hang forever; update references inside
send_crank_batch (the JoinSet spawn/collect logic and the send_transaction call)
to use the bounded concurrency + timeout approach.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Path: .coderabbit.yaml

Review profile: ASSERTIVE

Plan: Pro

Run ID: ad2ff50a-2aaa-43ac-8fa3-760b7276ae91

📥 Commits

Reviewing files that changed from the base of the PR and between 12b286a and 68df949.

📒 Files selected for processing (1)
  • magicblock-task-scheduler/src/service.rs

Comment on lines +359 to +368
let rpc_client = self.rpc_client.clone();
let block = self.block.clone();
let tx_counter = self.tx_counter.clone();
let crank_tx = crank_tx.clone();
tokio::spawn(async move {
let result =
send_crank_batch(rpc_client, &block, tx_counter, &batch).await;
let _ = crank_tx.send((batch, result));
});
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick | 🔵 Trivial | ⚡ Quick win

Detached batch spawns are not tied to the cancellation token.

tokio::spawn here returns a JoinHandle that is dropped immediately, so in-flight batch sends are orphaned. On self.token.cancelled(), run returns and the receiver side of crank_rx is dropped, but these spawned tasks keep running on the runtime until send_transaction resolves. Without a per-send timeout (see related comment), they can hang well past shutdown, holding the Arc<RpcClient> and producing on-chain effects after the service is considered stopped.

Consider tracking the handles in a JoinSet/TaskTracker and either awaiting them on shutdown or wiring self.token into the spawned future (tokio::select! against token.cancelled()), so batches honor the cancellation signal.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@magicblock-task-scheduler/src/service.rs` around lines 359 - 368, The spawned
batch-send tasks in run() currently use tokio::spawn and drop the JoinHandle so
they remain running after shutdown; modify the spawn logic to respect self.token
by either (A) collect and manage JoinHandles with a JoinSet/TaskTracker inside
the struct and await/join them on shutdown in run(), or (B) pass a clone of
self.token into the async move and wrap send_crank_batch/send_transaction with a
tokio::select! that aborts the send when token.cancelled() fires; ensure you
reference and update the call site that invokes send_crank_batch(rpc_client,
&block, tx_counter, &batch).await as well as the crank_tx send so tasks are
cancelled and cleaned up when token.cancelled() triggers.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

feat: enhance crank perfs

1 participant