feat: batch cranks#1190
Conversation
📝 WalkthroughWalkthroughThis PR refactors TaskSchedulerService to batch expired tasks and send their transactions concurrently. RpcClient and tx_counter are now Arc-wrapped; new helpers (apply_successful_execution, on_crank_batch_completed) handle batch results; send_crank_batch constructs per-task noop+execute transactions and dispatches them via a JoinSet; errors are stringified before DB recording. Cargo.toml dependency formatting was adjusted and solana-pubkey was bumped to 4.1; task-scheduler docs/README were updated to document min-interval and batching considerations. Assessment against linked issues
Suggested reviewers
✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
Manual Deploy AvailableYou can trigger a manual deploy of this PR branch to testnet: Alternative: Comment
Comment updated automatically when the PR is synchronized. |
There was a problem hiding this comment.
Actionable comments posted: 4
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@magicblock-task-scheduler/README.md`:
- Around line 38-39: The README claims crank sends are semaphore-bounded but the
scheduler actually fans out sends unrestricted; update either the docs or the
code: either change the README wording to remove "bounded" or implement a
semaphore cap in the scheduler where crank sends are dispatched (the code paths
around send_transaction and the RpcSendTransactionConfig usage in the task
scheduler/crank dispatch routine) so that the send loop respects a semaphore
permit limit from RpcSendTransactionConfig before calling send_transaction,
ensuring concurrent sends are limited.
In `@magicblock-task-scheduler/src/service.rs`:
- Around line 276-299: The current on_crank_batch_completed collapses all
per-task results into one Result causing successful tasks to be retried when any
single task fails; change its input to accept per-task results (e.g.
Vec<(DbTask, TaskSchedulerResult<Signature>)>) and iterate over each tuple: call
apply_successful_execution(task).await? for Ok(signature) entries and call
handle_failed_task_execution(task, &err).await? for Err(err) entries. Update the
on_crank_batch_completed signature and all callers to produce per-task results
(rather than a single TaskSchedulerResult<Vec<Signature>>), and keep using the
existing DbTask, TaskSchedulerResult, Signature, apply_successful_execution and
handle_failed_task_execution symbols to implement per-task handling.
- Around line 477-497: The loop currently spawns an unbounded send_transaction
future per task (inside join_set.spawn) which can flood RPC; limit concurrency
by introducing a bounded semaphore (tokio::sync::Semaphore) or use a buffered
executor (futures::stream::StreamExt::buffer_unordered) so only N concurrent
sends run. Acquire a permit before calling join_set.spawn (or before awaiting
each buffered future) and let the permit drop after the send finishes; also wrap
the rpc_client.send_transaction call with a per-call timeout
(tokio::time::timeout) and propagate or log timeout errors. Update the block
that builds ixs/tx and the async closure around rpc_client.send_transaction
(referenced as rpc_client, tx_counter, join_set.spawn, InstructionUtils::*,
Transaction::new) to use the semaphore/timeout pattern so bursts are bounded.
- Around line 353-360: The loop double-polls the delay queue (calling
self.task_queue.poll_expired(&mut cx) in the while condition and again inside
the match), which discards the first expired entry; change the loop to poll
exactly once per iteration by binding the result from poll_expired and using
that bound value (the expired variable) for processing: call
self.task_queue.poll_expired(&mut cx) once per iteration, call
expired.into_inner() to get task, then remove its id from task_queue_keys and
push it into batch; alternatively replace the while-let with a loop that matches
self.task_queue.poll_expired(&mut cx) and handles Poll::Ready(Some(expired)),
Poll::Ready(None) and Poll::Pending accordingly so no expired entry is dropped.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: ASSERTIVE
Plan: Pro
Run ID: 5be1b3a8-33d5-4c0a-82f4-7011633acd4c
⛔ Files ignored due to path filters (2)
Cargo.lockis excluded by!**/*.locktest-integration/Cargo.lockis excluded by!**/*.lock
📒 Files selected for processing (4)
Cargo.tomldocs/task-scheduler.mdmagicblock-task-scheduler/README.mdmagicblock-task-scheduler/src/service.rs
| - Same-tick delay-queue draining; bounded parallel `send_transaction` via semaphore. | ||
| - `Arc` for stored instructions; configurable `RpcSendTransactionConfig` for crank sends. |
There was a problem hiding this comment.
Performance note is out of sync with implementation.
Line 38 says crank sends are semaphore-bounded, but current scheduler code fans out sends without a semaphore cap. Please either update the doc wording or add the actual bound in code so operators get accurate behavior expectations.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@magicblock-task-scheduler/README.md` around lines 38 - 39, The README claims
crank sends are semaphore-bounded but the scheduler actually fans out sends
unrestricted; update either the docs or the code: either change the README
wording to remove "bounded" or implement a semaphore cap in the scheduler where
crank sends are dispatched (the code paths around send_transaction and the
RpcSendTransactionConfig usage in the task scheduler/crank dispatch routine) so
that the send loop respects a semaphore permit limit from
RpcSendTransactionConfig before calling send_transaction, ensuring concurrent
sends are limited.
| async fn on_crank_batch_completed( | ||
| &mut self, | ||
| batch: Vec<DbTask>, | ||
| result: TaskSchedulerResult<Vec<Signature>>, | ||
| ) -> TaskSchedulerResult<()> { | ||
| match result { | ||
| Ok(sig) => { | ||
| debug!( | ||
| "Executed crank batch ({} tasks) with signatures {:?}", | ||
| batch.len(), | ||
| sig | ||
| ); | ||
| for task in &batch { | ||
| self.apply_successful_execution(task).await?; | ||
| } | ||
| } | ||
| Err(ref e) => { | ||
| for task in &batch { | ||
| self.handle_failed_task_execution(task, e).await?; | ||
| } | ||
| } | ||
| } | ||
| Ok(()) | ||
| } |
There was a problem hiding this comment.
Batch-level error collapsing causes incorrect retries after partial success.
Line 499 collapses per-task outcomes into one Result. If one task send fails, on_crank_batch_completed (Lines 292-295) handles the entire batch as failed, including tasks that already succeeded. That risks duplicate executions and inconsistent DB state.
Please return/process per-task results (e.g., Vec<(DbTask, TaskSchedulerResult<Signature>)>) and apply success/failure per entry instead of per batch.
Also applies to: 474-500
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@magicblock-task-scheduler/src/service.rs` around lines 276 - 299, The current
on_crank_batch_completed collapses all per-task results into one Result causing
successful tasks to be retried when any single task fails; change its input to
accept per-task results (e.g. Vec<(DbTask, TaskSchedulerResult<Signature>)>) and
iterate over each tuple: call apply_successful_execution(task).await? for
Ok(signature) entries and call handle_failed_task_execution(task, &err).await?
for Err(err) entries. Update the on_crank_batch_completed signature and all
callers to produce per-task results (rather than a single
TaskSchedulerResult<Vec<Signature>>), and keep using the existing DbTask,
TaskSchedulerResult, Signature, apply_successful_execution and
handle_failed_task_execution symbols to implement per-task handling.
| for task in tasks { | ||
| let rpc_client = rpc_client.clone(); | ||
| let tx_counter = tx_counter.clone(); | ||
| let task = task.clone(); | ||
| join_set.spawn(async move { | ||
| let ixs = vec![ | ||
| InstructionUtils::noop_instruction( | ||
| tx_counter.fetch_add(1, Ordering::Relaxed), | ||
| ), | ||
| InstructionUtils::execute_task_instruction( | ||
| task.instructions.clone(), | ||
| ), | ||
| ]; | ||
| let tx = Transaction::new( | ||
| &[validator_authority()], | ||
| Message::new(&ixs, Some(&validator_authority_id())), | ||
| blockhash, | ||
| ); | ||
| Ok(rpc_client.send_transaction(&tx).await.map_err(Box::new)?) | ||
| }); | ||
| } |
There was a problem hiding this comment.
Unbounded RPC fan-out can overload the scheduler and RPC endpoint.
This spawns one send_transaction future per task with no concurrency limit. Large due-task bursts can flood outbound RPC and degrade scheduler responsiveness. Add a bounded semaphore (or chunked join) and a per-send timeout to contain blast radius.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@magicblock-task-scheduler/src/service.rs` around lines 477 - 497, The loop
currently spawns an unbounded send_transaction future per task (inside
join_set.spawn) which can flood RPC; limit concurrency by introducing a bounded
semaphore (tokio::sync::Semaphore) or use a buffered executor
(futures::stream::StreamExt::buffer_unordered) so only N concurrent sends run.
Acquire a permit before calling join_set.spawn (or before awaiting each buffered
future) and let the permit drop after the send finishes; also wrap the
rpc_client.send_transaction call with a per-call timeout (tokio::time::timeout)
and propagate or log timeout errors. Update the block that builds ixs/tx and the
async closure around rpc_client.send_transaction (referenced as rpc_client,
tx_counter, join_set.spawn, InstructionUtils::*, Transaction::new) to use the
semaphore/timeout pattern so bursts are bounded.
There was a problem hiding this comment.
Actionable comments posted: 1
♻️ Duplicate comments (2)
magicblock-task-scheduler/src/service.rs (2)
276-299:⚠️ Potential issue | 🔴 Critical | 🏗️ Heavy liftPer-task vs batch error handling still collapses partial successes.
send_crank_batchreturns a singleTaskSchedulerResult<Vec<Signature>>(Line 494 short-circuits on the first error viacollect::<Result<Vec<_>, _>>()), so theErrarm here invokeshandle_failed_task_executionfor every task inbatch— including the ones whosesend_transactionalready succeeded on-chain. That schedules retries / failure moves for already-executed tasks, producing duplicate executions and inconsistent DB state.Return per-task outcomes from
send_crank_batch(e.g.Vec<(DbTask, TaskSchedulerResult<Signature>)>) and dispatchapply_successful_execution/handle_failed_task_executionper entry instead of per batch.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@magicblock-task-scheduler/src/service.rs` around lines 276 - 299, The current on_crank_batch_completed matches on a single TaskSchedulerResult<Vec<Signature>> from send_crank_batch and treats the whole batch as failed if send_crank_batch returns Err, which collapses partial successes; change send_crank_batch to return per-task results (e.g. Vec<(DbTask, TaskSchedulerResult<Signature>)> or Vec<TaskResult>) and update on_crank_batch_completed to iterate the per-task outcomes and call apply_successful_execution(task).await? for Ok(signature) entries and handle_failed_task_execution(task, &err).await? for Err(err) entries so only genuinely failed tasks are retried/marked failed while successful tasks are applied once. Ensure types and call sites that use send_crank_batch and TaskSchedulerResult are updated accordingly.
464-496:⚠️ Potential issue | 🟠 Major | 🏗️ Heavy liftUnbounded RPC fan-out and no per-send timeout still present.
The loop spawns one
send_transactionfuture per task intoJoinSetwith no concurrency cap, andrpc_client.send_transaction(&tx).await(Line 490) has no timeout. A large burst of due tasks can flood the RPC endpoint, and any slow/stuck send blocks the batch'sjoin_all(Line 493) indefinitely, in turn stallingon_crank_batch_completedfor the whole batch. Combined with detachedtokio::spawns inrun, concurrency across in-flight batches is also unbounded.Please add a bounded
Semaphore(orfutures::stream::buffer_unordered) and wrap eachsend_transactionintokio::time::timeoutso blast radius is contained.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@magicblock-task-scheduler/src/service.rs` around lines 464 - 496, The send_crank_batch function currently spawns an unbounded number of send_transaction futures into the JoinSet and awaits them with no per-send timeout; change it to limit concurrency (e.g. create a Semaphore or use futures::stream::iter(tasks).map(|t| ...).buffer_unordered(CONCURRENCY)) and acquire a permit for each send so only N RPC calls run concurrently, and wrap each rpc_client.send_transaction(&tx).await call in tokio::time::timeout(DURATION, ...); convert timeout expiry into a TaskSchedulerResult error (or map it to a recoverable error) so join_all/buffer_unordered does not hang forever; update references inside send_crank_batch (the JoinSet spawn/collect logic and the send_transaction call) to use the bounded concurrency + timeout approach.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@magicblock-task-scheduler/src/service.rs`:
- Around line 359-368: The spawned batch-send tasks in run() currently use
tokio::spawn and drop the JoinHandle so they remain running after shutdown;
modify the spawn logic to respect self.token by either (A) collect and manage
JoinHandles with a JoinSet/TaskTracker inside the struct and await/join them on
shutdown in run(), or (B) pass a clone of self.token into the async move and
wrap send_crank_batch/send_transaction with a tokio::select! that aborts the
send when token.cancelled() fires; ensure you reference and update the call site
that invokes send_crank_batch(rpc_client, &block, tx_counter, &batch).await as
well as the crank_tx send so tasks are cancelled and cleaned up when
token.cancelled() triggers.
---
Duplicate comments:
In `@magicblock-task-scheduler/src/service.rs`:
- Around line 276-299: The current on_crank_batch_completed matches on a single
TaskSchedulerResult<Vec<Signature>> from send_crank_batch and treats the whole
batch as failed if send_crank_batch returns Err, which collapses partial
successes; change send_crank_batch to return per-task results (e.g. Vec<(DbTask,
TaskSchedulerResult<Signature>)> or Vec<TaskResult>) and update
on_crank_batch_completed to iterate the per-task outcomes and call
apply_successful_execution(task).await? for Ok(signature) entries and
handle_failed_task_execution(task, &err).await? for Err(err) entries so only
genuinely failed tasks are retried/marked failed while successful tasks are
applied once. Ensure types and call sites that use send_crank_batch and
TaskSchedulerResult are updated accordingly.
- Around line 464-496: The send_crank_batch function currently spawns an
unbounded number of send_transaction futures into the JoinSet and awaits them
with no per-send timeout; change it to limit concurrency (e.g. create a
Semaphore or use futures::stream::iter(tasks).map(|t|
...).buffer_unordered(CONCURRENCY)) and acquire a permit for each send so only N
RPC calls run concurrently, and wrap each rpc_client.send_transaction(&tx).await
call in tokio::time::timeout(DURATION, ...); convert timeout expiry into a
TaskSchedulerResult error (or map it to a recoverable error) so
join_all/buffer_unordered does not hang forever; update references inside
send_crank_batch (the JoinSet spawn/collect logic and the send_transaction call)
to use the bounded concurrency + timeout approach.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: ASSERTIVE
Plan: Pro
Run ID: ad2ff50a-2aaa-43ac-8fa3-760b7276ae91
📒 Files selected for processing (1)
magicblock-task-scheduler/src/service.rs
| let rpc_client = self.rpc_client.clone(); | ||
| let block = self.block.clone(); | ||
| let tx_counter = self.tx_counter.clone(); | ||
| let crank_tx = crank_tx.clone(); | ||
| tokio::spawn(async move { | ||
| let result = | ||
| send_crank_batch(rpc_client, &block, tx_counter, &batch).await; | ||
| let _ = crank_tx.send((batch, result)); | ||
| }); | ||
| } |
There was a problem hiding this comment.
🧹 Nitpick | 🔵 Trivial | ⚡ Quick win
Detached batch spawns are not tied to the cancellation token.
tokio::spawn here returns a JoinHandle that is dropped immediately, so in-flight batch sends are orphaned. On self.token.cancelled(), run returns and the receiver side of crank_rx is dropped, but these spawned tasks keep running on the runtime until send_transaction resolves. Without a per-send timeout (see related comment), they can hang well past shutdown, holding the Arc<RpcClient> and producing on-chain effects after the service is considered stopped.
Consider tracking the handles in a JoinSet/TaskTracker and either awaiting them on shutdown or wiring self.token into the spawned future (tokio::select! against token.cancelled()), so batches honor the cancellation signal.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@magicblock-task-scheduler/src/service.rs` around lines 359 - 368, The spawned
batch-send tasks in run() currently use tokio::spawn and drop the JoinHandle so
they remain running after shutdown; modify the spawn logic to respect self.token
by either (A) collect and manage JoinHandles with a JoinSet/TaskTracker inside
the struct and await/join them on shutdown in run(), or (B) pass a clone of
self.token into the async move and wrap send_crank_batch/send_transaction with a
tokio::select! that aborts the send when token.cancelled() fires; ensure you
reference and update the call site that invokes send_crank_batch(rpc_client,
&block, tx_counter, &batch).await as well as the crank_tx send so tasks are
cancelled and cleaned up when token.cancelled() triggers.
Closes #1189
Summary
Batch crank executions and stop waiting for them to finish before processsing more cranks.
Breaking Changes
Summary by CodeRabbit
Documentation
min-intervalandresetsettings; expanded performance guidance for batching, concurrency, and retry/backoff strategies.Dependencies
Improvements