feat(spider-execution-manager): Add the runtime that drives the main task-dispatch loop.#329
feat(spider-execution-manager): Add the runtime that drives the main task-dispatch loop.#329LinZhihao-723 wants to merge 22 commits into
Conversation
|
Warning Review limit reached
More reviews will be available in 46 minutes and 31 seconds. Learn how PR review limits work. Your organization has run out of usage credits. Purchase more in the billing tab. ⌛ How to resolve this issue?After more reviews become available, a review can be triggered using the We recommend that you space out your commits to avoid hitting the rate limit. 🚦 How do rate limits work?CodeRabbit enforces hourly rate limits for each developer per organization. Our paid plans include higher PR review limits than trial, open-source, and free plans. In all cases, reviews become available again over time. During sustained high-volume PR review activity, CodeRabbit may temporarily slow when the next review becomes available. Please see our Fair Usage Limits Policy for further information. ℹ️ Review info⚙️ Run configurationConfiguration used: Repository UI Review profile: CHILL Plan: Pro Run ID: ⛔ Files ignored due to path filters (1)
📒 Files selected for processing (17)
WalkthroughThis PR introduces the execution manager, a new node-level service that supervises task-executor subprocesses. It adds session tracking, client contracts for distributed services, a heartbeat actor, process pooling with stdin/stdout framing, a task dispatch runtime loop, the executor binary itself, and comprehensive integration tests validating the full execution path. ChangesCore Session Tracking & Client Contracts
Liveness Heartbeat Actor
Process Pool and Task Executor Protocol
Runtime Main Loop and Orchestration
Task Executor Binary and Serialization
Integration Tests and Test Infrastructure
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related PRs
Suggested reviewers
🚥 Pre-merge checks | ✅ 5✅ Passed checks (5 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 7
🧹 Nitpick comments (1)
tests/huntsman/task-executor/tests/test_process_pool.rs (1)
115-121: ⚡ Quick winBound
ProcessPool::executewaits to prevent hanging test runs.If executor/pool IPC wedges, these awaits can hang the test indefinitely. Add an outer test timeout for each
executecall (or a small helper that applies it).Also applies to: 133-135, 154-157, 170-172, 183-189, 202-204
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@tests/huntsman/task-executor/tests/test_process_pool.rs` around lines 115 - 121, The test currently awaits pool.execute(...) directly (e.g., calls to ProcessPool::execute with make_request and NORMAL_TIMEOUT), which can hang if IPC wedges; wrap each execute call in a short outer timeout (using tokio::time::timeout or a test helper like run_with_timeout) and fail the test if the outer timeout elapses. Update every occurrence where pool.execute(...) is awaited (including the calls that use make_request("fibonacci", ...), NORMAL_TIMEOUT, and similar variants) to use that timeout wrapper (or introduce a small helper function that takes the future returned by pool.execute and the outer timeout duration) so tests abort instead of hanging when IPC stalls.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@components/spider-execution-manager/src/process_pool.rs`:
- Around line 246-247: The FramedWrite/FramedRead usage creates
LengthDelimitedCodec with the 8 MiB default which will reject larger payloads;
update the codec construction for requests and responses in process_pool.rs to
use LengthDelimitedCodec::builder().max_frame_length(N).new_write(...) and
.new_read(...) (or call set_max_frame_length) with an explicit N (pick a
project-appropriate limit) and ensure the subprocess side uses the identical
limit; also propagate a clear “frame too large” error from the codec handling so
callers of requests/responses see a descriptive error when the limit is
exceeded.
In `@components/spider-execution-manager/src/runtime.rs`:
- Around line 176-185: The run method currently drops any
tokio::spawn(report_outcome(...)) tasks, risking lost reports; update the
runtime to track outcome tasks with a tokio::task::JoinSet (add a field like
outcome_joinset: JoinSet<ResultType> to the runtime struct), replace direct
tokio::spawn(report_outcome(...)) calls in main_loop/other places with
outcome_joinset.spawn(report_outcome(...)), and during shutdown (in run after
cancelling cancellation_token and awaiting liveness_join) drain the JoinSet by
awaiting join_next in a loop (or collect/await all remaining joins) to ensure
all report_outcome tasks complete before returning; keep existing error logging
(tracing::warn) for any failed joins.
- Around line 265-266: The comment claiming "Retries and logging are handled
inside `report_outcome`" is incorrect because `report_outcome` only logs errors;
update the comment at the fire-and-forget outcome-report site to accurately
state that outcome reporting is dispatched asynchronously and that
`report_outcome` currently only logs failures (it does not perform retries), and
note that retry/backoff would need to be implemented inside `report_outcome` (or
by the caller) if desired; reference `report_outcome` and the fire-and-forget
dispatch so reviewers can locate and, if choosing to add retries later,
implement retry logic with configured attempts and backoff inside
`report_outcome`.
In `@components/spider-task-executor/src/bin/spider_task_executor.rs`:
- Around line 8-14: Update the top-level docs to accurately reflect the binary
contract: state that packages are resolved to
`${SPIDER_TDL_PACKAGE_DIR}/${package}/lib${package}.so` (note the `lib` prefix
used by run_task) and revise the execution model text to say the binary
processes many Execute requests sequentially on a single-threaded tokio runtime
(no concurrency) rather than implying a single task runs for the lifetime of the
process; update references in the header comments to match the behavior
implemented in main and run_task.
In `@tests/huntsman/integration-test-tasks/src/lib.rs`:
- Line 8: Update the intra-doc link in the module doc for task_decl::instrument
to reference the actual exported constant name INSTRUMENT_SLEEP_US (replace the
broken `INSTRUMENT_SLEEP` link with `INSTRUMENT_SLEEP_US`) so the rustdoc link
resolves correctly; ensure any surrounding text still makes sense (e.g., "sleeps
for a known [`INSTRUMENT_SLEEP_US`] duration").
In `@tests/huntsman/task-executor/tests/overhead_instrument.rs`:
- Around line 21-24: The code opens/writes the report file (using File::create /
File and a PathBuf named report_path) without ensuring the parent directory
exists, which causes failures when the directory is missing; before
creating/writing the file in the test (the block that calls
File::create(report_path) / writes the report), call std::fs::create_dir_all on
report_path.parent() (handling the None case safely) to create the directory
tree, then proceed to create and write the file.
In `@tests/huntsman/task-executor/tests/test_executor.rs`:
- Around line 22-27: The test currently awaits ExecutorHandle IPC calls without
any deadlines which can hang; wrap all awaited calls on the ExecutorHandle
(e.g., handle.recv().await, handle.shutdown_clean().await,
handle.try_recv().await, handle.wait_for_exit().await) with a hard timeout
(tokio::time::timeout with a reasonable Duration) and assert or unwrap the
timeout result so the test fails fast on hangs; update calls around
ExecutorHandle::spawn, send/recv/try_recv, shutdown_clean, and wait_for_exit to
use timeout and propagate a clear test failure when a timeout occurs.
---
Nitpick comments:
In `@tests/huntsman/task-executor/tests/test_process_pool.rs`:
- Around line 115-121: The test currently awaits pool.execute(...) directly
(e.g., calls to ProcessPool::execute with make_request and NORMAL_TIMEOUT),
which can hang if IPC wedges; wrap each execute call in a short outer timeout
(using tokio::time::timeout or a test helper like run_with_timeout) and fail the
test if the outer timeout elapses. Update every occurrence where
pool.execute(...) is awaited (including the calls that use
make_request("fibonacci", ...), NORMAL_TIMEOUT, and similar variants) to use
that timeout wrapper (or introduce a small helper function that takes the future
returned by pool.execute and the outer timeout duration) so tests abort instead
of hanging when IPC stalls.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Repository UI
Review profile: CHILL
Plan: Pro
Run ID: 5659315f-8271-4f2f-9ad3-dfceb77d9564
⛔ Files ignored due to path filters (1)
Cargo.lockis excluded by!**/*.lock
📒 Files selected for processing (28)
Cargo.tomlcomponents/spider-core/Cargo.tomlcomponents/spider-core/src/lib.rscomponents/spider-core/src/session.rscomponents/spider-execution-manager/Cargo.tomlcomponents/spider-execution-manager/src/client.rscomponents/spider-execution-manager/src/client/liveness.rscomponents/spider-execution-manager/src/client/scheduler.rscomponents/spider-execution-manager/src/client/storage.rscomponents/spider-execution-manager/src/lib.rscomponents/spider-execution-manager/src/liveness.rscomponents/spider-execution-manager/src/process_pool.rscomponents/spider-execution-manager/src/runtime.rscomponents/spider-task-executor/Cargo.tomlcomponents/spider-task-executor/src/bin/spider_task_executor.rscomponents/spider-task-executor/src/error.rscomponents/spider-task-executor/src/lib.rscomponents/spider-task-executor/src/manager.rscomponents/spider-task-executor/src/protocol.rstaskfiles/test.yamltests/huntsman/integration-test-tasks/Cargo.tomltests/huntsman/integration-test-tasks/src/lib.rstests/huntsman/task-executor/Cargo.tomltests/huntsman/task-executor/src/lib.rstests/huntsman/task-executor/tests/overhead_instrument.rstests/huntsman/task-executor/tests/test_executor.rstests/huntsman/task-executor/tests/test_process_pool.rstests/huntsman/tdl-integration/tests/complex.rs
| requests: FramedWrite::new(stdin, LengthDelimitedCodec::new()), | ||
| responses: FramedRead::new(stdout, LengthDelimitedCodec::new()), |
There was a problem hiding this comment.
🧩 Analysis chain
🌐 Web query:
What is the default maximum frame length for tokio_util::codec::LengthDelimitedCodec::new(), and how do you configure a larger max_frame_length for both FramedRead and FramedWrite?
💡 Result:
Default max frame length: 8 MiB (8 * 1024 * 1024 bytes) for tokio_util::codec::LengthDelimitedCodec::new/Default configuration. [1][2] How to configure a larger max_frame_length for both FramedRead and FramedWrite: 1) For FramedRead (AsyncRead -> Stream of frames): Use the LengthDelimitedCodec::builder and set max_frame_length before calling new_read. Example: let framed_read = tokio_util::codec::LengthDelimitedCodec::builder.max_frame_length(16 * 1024 * 1024) // larger limit.new_read(reader); The builder’s max_frame_length setting is “the largest size this codec will accept” and larger frames are rejected; it affects decoding. [1][3] 2) For FramedWrite (AsyncWrite -> Sink of frames): Use the same builder and set max_frame_length before calling new_write. The configuration option applies to both encoding and decoding. [1][4] Example: let framed_write = tokio_util::codec::LengthDelimitedCodec::builder.max_frame_length(16 * 1024 * 1024) // larger limit.new_write(writer); The builder applies max_frame_length to both encoding (payload length checked) and decoding (frames accepted). [1][4] Note: There is also an imperative setter set_max_frame_length(&mut self, val: usize) on LengthDelimitedCodec; the change takes effect the next time a frame is decoded. [5][4]
Citations:
- 1: https://people.eecs.berkeley.edu/~pschafhalter/pub/erdos/doc/tokio_util/codec/length_delimited/struct.Builder.html
- 2: https://docs.rs/tokio-util/latest/tokio_util/codec/length_delimited/
- 3: https://docs.rs/tokio-util/latest/tokio_util/codec/length_delimited/index.html
- 4: https://docs.rs/tokio-util/latest/src/tokio_util/codec/length_delimited.rs.html
- 5: https://docs.rs/tokio-util/latest/tokio_util/codec/length_delimited/struct.LengthDelimitedCodec.html
Set explicit IPC frame-size limits for LengthDelimitedCodec (avoid the 8 MiB default).
components/spider-execution-manager/src/process_pool.rs constructs LengthDelimitedCodec::new() for both FramedWrite/FramedRead, whose default max_frame_length is 8 MiB; payloads larger than this will be rejected by the codec. Configure max_frame_length explicitly and mirror the same limit on the subprocess side (e.g., LengthDelimitedCodec::builder().max_frame_length(N).new_read(...) / .new_write(...), or set_max_frame_length) and surface a clear “frame too large” error when exceeded.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@components/spider-execution-manager/src/process_pool.rs` around lines 246 -
247, The FramedWrite/FramedRead usage creates LengthDelimitedCodec with the 8
MiB default which will reject larger payloads; update the codec construction for
requests and responses in process_pool.rs to use
LengthDelimitedCodec::builder().max_frame_length(N).new_write(...) and
.new_read(...) (or call set_max_frame_length) with an explicit N (pick a
project-appropriate limit) and ensure the subprocess side uses the identical
limit; also propagate a clear “frame too large” error from the codec handling so
callers of requests/responses see a descriptive error when the limit is
exceeded.
| pub async fn run(self) -> Result<(), RuntimeError> { | ||
| tracing::info!(em_id = ? self.em_id, "Runtime main loop starting."); | ||
| let result = self.main_loop().await; | ||
| tracing::info!(em_id = ? self.em_id, "Runtime main loop exited. Shutting down."); | ||
| self.cancellation_token.cancel(); | ||
| if let Err(err) = self.liveness_join.await { | ||
| tracing::warn!(err = ? err, "Liveness actor task did not exit cleanly."); | ||
| } | ||
| result | ||
| } |
There was a problem hiding this comment.
Spawned outcome reports may be lost on shutdown.
When run returns, any in-flight tokio::spawn(report_outcome(...)) tasks are not awaited or tracked. If the tokio runtime shuts down immediately after, these reports may never reach storage.
Consider tracking spawned tasks via a JoinSet and draining them during shutdown, or documenting this as an accepted trade-off.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@components/spider-execution-manager/src/runtime.rs` around lines 176 - 185,
The run method currently drops any tokio::spawn(report_outcome(...)) tasks,
risking lost reports; update the runtime to track outcome tasks with a
tokio::task::JoinSet (add a field like outcome_joinset: JoinSet<ResultType> to
the runtime struct), replace direct tokio::spawn(report_outcome(...)) calls in
main_loop/other places with outcome_joinset.spawn(report_outcome(...)), and
during shutdown (in run after cancelling cancellation_token and awaiting
liveness_join) drain the JoinSet by awaiting join_next in a loop (or
collect/await all remaining joins) to ensure all report_outcome tasks complete
before returning; keep existing error logging (tracing::warn) for any failed
joins.
| // Fire-and-forget the outcome report so the main loop can dispatch the next task | ||
| // without waiting on storage. Retries and logging are handled inside `report_outcome`. |
There was a problem hiding this comment.
Misleading comment about retry handling.
The comment states "Retries and logging are handled inside report_outcome", but report_outcome (lines 468-479) only logs errors—there is no retry logic. Either update the comment to reflect the actual behaviour or implement retries.
📝 Suggested fix
- // Fire-and-forget the outcome report so the main loop can dispatch the next task
- // without waiting on storage. Retries and logging are handled inside `report_outcome`.
+ // Fire-and-forget the outcome report so the main loop can dispatch the next task
+ // without waiting on storage. Errors are logged inside `report_outcome` but not
+ // retried.📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| // Fire-and-forget the outcome report so the main loop can dispatch the next task | |
| // without waiting on storage. Retries and logging are handled inside `report_outcome`. | |
| // Fire-and-forget the outcome report so the main loop can dispatch the next task | |
| // without waiting on storage. Errors are logged inside `report_outcome` but not | |
| // retried. |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@components/spider-execution-manager/src/runtime.rs` around lines 265 - 266,
The comment claiming "Retries and logging are handled inside `report_outcome`"
is incorrect because `report_outcome` only logs errors; update the comment at
the fire-and-forget outcome-report site to accurately state that outcome
reporting is dispatched asynchronously and that `report_outcome` currently only
logs failures (it does not perform retries), and note that retry/backoff would
need to be implemented inside `report_outcome` (or by the caller) if desired;
reference `report_outcome` and the fire-and-forget dispatch so reviewers can
locate and, if choosing to add retries later, implement retry logic with
configured attempts and backoff inside `report_outcome`.
| //! Package resolution: each `Execute` request names a TDL package; the executor looks for | ||
| //! `${SPIDER_TDL_PACKAGE_DIR}/${package}/${package}.so` and caches the loaded library by name. | ||
| //! | ||
| //! Execution model: requests are processed strictly sequentially on a single-threaded tokio | ||
| //! runtime. Tokio is used only to match the async I/O surface on the execution manager side; | ||
| //! the executor itself has no concurrency requirements, and exactly one task runs for the | ||
| //! lifetime of the process. |
There was a problem hiding this comment.
Fix the top-level docs to match the binary’s actual contract.
Line 9 omits the lib prefix used by run_task, and Lines 11-14 say one task runs for the lifetime of the process even though main serves many requests sequentially. Those comments read like the subprocess contract, so they should match the implementation.
📝 Suggested doc fix
-//! Package resolution: each `Execute` request names a TDL package; the executor looks for
-//! `${SPIDER_TDL_PACKAGE_DIR}/${package}/${package}.so` and caches the loaded library by name.
+//! Package resolution: each `Execute` request names a TDL package; the executor looks for
+//! `${SPIDER_TDL_PACKAGE_DIR}/${package}/lib{package}.so` and caches the loaded library by name.
//!
//! Execution model: requests are processed strictly sequentially on a single-threaded tokio
//! runtime. Tokio is used only to match the async I/O surface on the execution manager side;
-//! the executor itself has no concurrency requirements, and exactly one task runs for the
-//! lifetime of the process.
+//! the executor itself has no concurrency requirements, and at most one task runs at a time
+//! during the process lifetime.📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| //! Package resolution: each `Execute` request names a TDL package; the executor looks for | |
| //! `${SPIDER_TDL_PACKAGE_DIR}/${package}/${package}.so` and caches the loaded library by name. | |
| //! | |
| //! Execution model: requests are processed strictly sequentially on a single-threaded tokio | |
| //! runtime. Tokio is used only to match the async I/O surface on the execution manager side; | |
| //! the executor itself has no concurrency requirements, and exactly one task runs for the | |
| //! lifetime of the process. | |
| //! Package resolution: each `Execute` request names a TDL package; the executor looks for | |
| //! `${SPIDER_TDL_PACKAGE_DIR}/${package}/lib{package}.so` and caches the loaded library by name. | |
| //! | |
| //! Execution model: requests are processed strictly sequentially on a single-threaded tokio | |
| //! runtime. Tokio is used only to match the async I/O surface on the execution manager side; | |
| //! the executor itself has no concurrency requirements, and at most one task runs at a time | |
| //! during the process lifetime. |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@components/spider-task-executor/src/bin/spider_task_executor.rs` around lines
8 - 14, Update the top-level docs to accurately reflect the binary contract:
state that packages are resolved to
`${SPIDER_TDL_PACKAGE_DIR}/${package}/lib${package}.so` (note the `lib` prefix
used by run_task) and revise the execution model text to say the binary
processes many Execute requests sequentially on a single-threaded tokio runtime
(no concurrency) rather than implying a single task runs for the lifetime of the
process; update references in the header comments to match the behavior
implemented in main and run_task.
| use std::{ | ||
| fs::File, | ||
| io::Write, | ||
| path::PathBuf, |
There was a problem hiding this comment.
Create the output directory before writing the report file.
Line 179 can fail when Line 177 resolves to a non-existent directory. Please create the directory tree first so report emission is robust.
🛠️ Suggested patch
use std::{
- fs::File,
+ fs::{create_dir_all, File},
io::Write,
path::PathBuf,
time::{Duration, Instant},
};
@@
- let path = output_dir.join(OUTPUT_FILE);
+ create_dir_all(&output_dir).unwrap_or_else(|err| {
+ panic!("create output dir {} failed: {err}", output_dir.display())
+ });
+ let path = output_dir.join(OUTPUT_FILE);Also applies to: 177-179
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@tests/huntsman/task-executor/tests/overhead_instrument.rs` around lines 21 -
24, The code opens/writes the report file (using File::create / File and a
PathBuf named report_path) without ensuring the parent directory exists, which
causes failures when the directory is missing; before creating/writing the file
in the test (the block that calls File::create(report_path) / writes the
report), call std::fs::create_dir_all on report_path.parent() (handling the None
case safely) to create the directory tree, then proceed to create and write the
file.
| let mut handle = ExecutorHandle::spawn(); | ||
| let input: u64 = 10; | ||
| handle | ||
| .send(&execute_request("fibonacci", encode_single_input(&input))) | ||
| .await; | ||
| let Response::Result { outcome, .. } = handle.recv().await; |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Verify unbounded awaits in these integration tests.
rg -nP 'handle\.(recv|try_recv|wait_for_exit|shutdown_clean)\(\)\.await' tests/huntsman/task-executor/tests/test_executor.rsRepository: y-scope/spider
Length of output: 362
🏁 Script executed:
#!/bin/bash
set -euo pipefail
FILE="tests/huntsman/task-executor/tests/test_executor.rs"
echo "---- file header (first 120 lines) ----"
nl -ba "$FILE" | head -n 120
echo
echo "---- context around unbounded awaits ----"
for L in 20 25 35 45 60 75 82 87 90; do
echo "### around line $L"
nl -ba "$FILE" | sed -n "$((L-5)),$((L+6))p"
doneRepository: y-scope/spider
Length of output: 159
🏁 Script executed:
#!/bin/bash
set -euo pipefail
FILE="tests/huntsman/task-executor/tests/test_executor.rs"
echo "---- file header (first 120 lines) ----"
cat -n "$FILE" | head -n 120
echo
echo "---- context around unbounded awaits ----"
for L in 20 27 40 50 67 80 85 87 90; do
echo "### around line $L"
cat -n "$FILE" | sed -n "$((L-5)),$((L+6))p"
doneRepository: y-scope/spider
Length of output: 8230
🏁 Script executed:
#!/bin/bash
set -euo pipefail
FILE="tests/huntsman/task-executor/tests/test_executor.rs"
echo "---- tokio::time / Duration imports in file ----"
rg -n "tokio::time|Duration|STEP_TIMEOUT|timeout\(" "$FILE" || true
echo
echo "---- full import section ----"
sed -n '1,30p' "$FILE"Repository: y-scope/spider
Length of output: 1186
Add bounded timeouts to prevent integration test hangs
tests/huntsman/task-executor/tests/test_executor.rs currently awaits executor IPC without any step deadlines, so a stuck child/process framing can hang the test run:
handle.recv().await(lines 27, 50)handle.shutdown_clean().await(lines 40, 67)handle.try_recv().await(line 80)handle.wait_for_exit().await(line 85)
⏱️ Proposed hardening
+use std::time::Duration;
+
+const STEP_TIMEOUT: Duration = Duration::from_secs(10);
@@
- let Response::Result { outcome, .. } = handle.recv().await;
+ let Response::Result { outcome, .. } = tokio::time::timeout(STEP_TIMEOUT, handle.recv())
+ .await
+ .expect("timed out waiting for executor response");
@@
- handle.shutdown_clean().await;
+ tokio::time::timeout(STEP_TIMEOUT, handle.shutdown_clean())
+ .await
+ .expect("timed out during clean shutdown");
@@
- let frame = handle.try_recv().await;
+ let frame = tokio::time::timeout(STEP_TIMEOUT, handle.try_recv())
+ .await
+ .expect("timed out waiting for EOF");
@@
- let status = handle.wait_for_exit().await;
+ let status = tokio::time::timeout(STEP_TIMEOUT, handle.wait_for_exit())
+ .await
+ .expect("timed out waiting for process exit");🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@tests/huntsman/task-executor/tests/test_executor.rs` around lines 22 - 27,
The test currently awaits ExecutorHandle IPC calls without any deadlines which
can hang; wrap all awaited calls on the ExecutorHandle (e.g.,
handle.recv().await, handle.shutdown_clean().await, handle.try_recv().await,
handle.wait_for_exit().await) with a hard timeout (tokio::time::timeout with a
reasonable Duration) and assert or unwrap the timeout result so the test fails
fast on hangs; update calls around ExecutorHandle::spawn, send/recv/try_recv,
shutdown_clean, and wait_for_exit to use timeout and propagate a clear test
failure when a timeout occurs.
Introduces tests/huntsman/test-utils, a shared support crate that consolidates the executor subprocess harness, TDL wire-payload helpers, and in-process mock client implementations (scheduler/storage/liveness) so multiple integration suites can reuse them. task-executor-tests is converted to a thin test-only crate that depends on it. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
The scheduler and storage mocks are only needed by the execution-manager runtime tests, which land on a later branch. Drop them here so this branch's test-utils exposes just the liveness client mock that the liveness work uses. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
Description
This PR adds
spider_execution_manager::runtime, the last piece of the execution-manager: a generic owner of the process pool, liveness actor, and clients that pulls task assignments from the scheduler, dispatches them through the executor pool, and reports outcomes back to storage.Runtime<SchedulerClientType, StorageClientType>— generic over the two client types the loop calls every iteration. Storage carries+ Clone + 'staticbecause the outcome reporter is spawned onto a detached tokio task and needs an owned client. The liveness client is only generic at thecreatefactory (the runtime hands its onlyArcto the actor at boot and never touches it again).Bootstrap (
Runtime::create) — registers with storage viaLivenessClient::register, seeds aSessionTrackerwith the returned session id, builds the initialProcessPool, and spawns the liveness actor. By the timecreatereturns, the actor has already sent its first heartbeat and the runtime is ready to serve.Main loop (
Runtime::run/Runtime::main_loop) — each iteration:SchedulerClient::next_taskis raced against the cancellation token viatokio::select!withbiased, so shutdown is observed promptly even when the scheduler is idle. AnySchedulerErroris treated as fatal and propagated via?(the client is responsible for absorbing transient transport blips internally; a surfaced error means it's already given up).session_idis compared against the sharedSessionTracker::current(): a strictly-older bundle is logged and dropped; a strictly-newer bundle nudges the liveness actor viaLivenessHandle::refreshso the tracker re-syncs through the next heartbeat. The tracker is never advanced from the main loop — the liveness actor is the only writer.StorageClient::register_task_instanceis racing-aware (cancellation interrupts the await).StaleSessiontriggers a liveness refresh and drops the assignment;InvalidInputis treated as a contract bug and is fatal; every other storage error is logged and the assignment is dropped.ExecuteRequestis built from the registration'sExecutionContext(thehard_timeoutis derived fromtimeout_policy.hard_timeout_ms) and submitted toProcessPool::execute. The pool already enforces the hard timeout, so the call is awaited to completion — losing the in-flight outcome to a cancellation-race would prevent step 5 from reporting it. PoolInternalErrors bubble up via?(executor binary missing, encode failures, etc.) and bail the runtime.tokio::spawn(report_outcome(...))decouples result reporting from the next iteration's scheduler poll. The reporter consumes the outcome end-to-end: builds aReport(Success(Option<Vec<u8>>)orFailure(String)with a message derived from theOutcomevariant), then transfers ownership intoStorageClient::report_task_success/report_task_failure. No retry logic in this layer — same rationale as the scheduler call.Shutdown semantics —
run(self)wrapsmain_loopwith explicit teardown: cancel the token (covering the scheduler-error / process-pool-error exit paths where the loop bailed without external cancellation), awaitliveness_join, then returnresult. Atokio_util::sync::DropGuardfield guarantees the token is cancelled even when therunfuture itself is dropped at an outer await point.Checklist
breaking change.
Validation performed
Summary by CodeRabbit
Release Notes
New Features
Tests