Skip to content

Commit 7da9a7e

Browse files
committed
feat: cooperative interleaving, keyed routing, and warm isolate fixes
Worker pool: - Replace flume with tokio::sync::mpsc for proper LocalSet integration - Persistent LocalSet + spawn_local for cooperative task interleaving - try_recv() drain to batch-dispatch bursts - Add consistent-hash routing (spawn_keyed, spawn_await_keyed) - Add interleaving and spawn_local concurrency tests with pitfall docs Task executor: - Use keyed routing (hash of worker_id) for warm cache affinity - Fix event loop crash on warm reuse (spawn_local → tokio::spawn) Ops: - Interior mutability with Mutex<RequestState> for warm context reuse - update_request() to swap per-request state (log_tx, span) - Implement as_any() for downcast in warm hit callback
1 parent 4cda2b8 commit 7da9a7e

7 files changed

Lines changed: 381 additions & 110 deletions

File tree

Cargo.lock

Lines changed: 1 addition & 26 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,6 @@ bytes = "1.11.0"
3535
log = "0.4.29"
3636
tokio = "1.49.0"
3737
tokio-util = { version = "0.7", features = ["rt"] }
38-
flume = "0.12.0"
3938
once_cell = "1.19"
4039
# filetime = "0.2" # was used by disk-based snapshot cache
4140
env_logger = "0.11.6"
@@ -56,14 +55,14 @@ sqlx = { version = "0.8.6", features = [ "runtime-tokio", "postgres", "uuid", "b
5655
reqwest = { version = "0.13.1", default-features = false, features = ["rustls", "json", "stream"] }
5756

5857
# Core types
59-
openworkers-core = { git = "https://github.com/openworkers/openworkers-core", tag = "v0.13.0", features = ["hyper"] }
58+
openworkers-core = { path = "../openworkers-core", features = ["hyper"] }
6059

6160
# Transform (TS strip + export default → globalThis.default)
6261
# openworkers-transform = { path = "../openworkers-transform" }
6362
openworkers-transform = { git = "https://github.com/openworkers/openworkers-transform", tag = "v0.1.0" }
6463

6564
# Runtime backend (v8 only for now, others require older version of core)
66-
openworkers-runtime-v8 = { git = "https://github.com/openworkers/openworkers-runtime-v8", tag = "v0.13.3", optional = true, features = ["ptrcomp"] }
65+
openworkers-runtime-v8 = { path = "../openworkers-runtime-v8", optional = true, features = ["ptrcomp"] }
6766

6867
# WASM runtime (optional)
6968
# openworkers-runtime-wasm = { path = "../openworkers-runtime-wasm", optional = true }

src/ops.rs

Lines changed: 64 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ use openworkers_core::{
3535
};
3636
use std::collections::HashMap;
3737
use std::sync::Arc;
38+
use std::sync::Mutex;
3839
use std::sync::atomic::{AtomicU64, Ordering};
3940

4041
use tracing::Instrument;
@@ -115,26 +116,36 @@ impl BindingConfigs {
115116
/// Database pool type alias
116117
pub type DbPool = sqlx::Pool<sqlx::Postgres>;
117118

119+
/// Per-request state that changes between requests for context reuse.
120+
///
121+
/// When warm isolates are enabled, the same `RunnerOperations` handle persists
122+
/// across requests. Only `log_tx` and `span` change per-request.
123+
pub struct RequestState {
124+
pub log_tx: Option<LogTx>,
125+
pub span: tracing::Span,
126+
}
127+
118128
/// Runner's implementation of OperationsHandler
119129
///
120130
/// Implements fetch via reqwest, bindings via config lookup, and logs via channel.
131+
///
132+
/// For warm isolates: `request_state` is swappable via interior mutability so that
133+
/// the same ops handle (captured in the event loop) can serve multiple requests.
121134
pub struct RunnerOperations {
122135
/// Stats for this worker
123136
pub stats: Arc<OperationsStats>,
124137
/// User ID for logging/quotas
125138
pub user_id: Option<String>,
126139
/// Worker ID for logging/quotas
127140
pub worker_id: Option<String>,
128-
/// Log sender (optional - if None, uses default stderr)
129-
pub log_tx: Option<LogTx>,
141+
/// Per-request state (log_tx + span), swappable between requests
142+
request_state: Mutex<RequestState>,
130143
/// Binding configs for this worker
131144
pub bindings: BindingConfigs,
132145
/// Database pool for KV operations
133146
pub db_pool: Option<DbPool>,
134147
/// Binding limiters (rate limiting for fetch, KV, database, storage)
135148
pub limiters: BindingLimiters,
136-
/// Tracing span for async operations (propagates trace context)
137-
pub span: tracing::Span,
138149
}
139150

140151
impl RunnerOperations {
@@ -143,11 +154,13 @@ impl RunnerOperations {
143154
stats: Arc::new(OperationsStats::new()),
144155
user_id: None,
145156
worker_id: None,
146-
log_tx: None,
157+
request_state: Mutex::new(RequestState {
158+
log_tx: None,
159+
span: tracing::Span::none(),
160+
}),
147161
bindings: BindingConfigs::new(),
148162
db_pool: None,
149163
limiters: BindingLimiters::default(),
150-
span: tracing::Span::none(),
151164
}
152165
}
153166

@@ -158,8 +171,8 @@ impl RunnerOperations {
158171
}
159172

160173
/// Attach tracing span for context propagation
161-
pub fn with_span(mut self, span: tracing::Span) -> Self {
162-
self.span = span;
174+
pub fn with_span(self, span: tracing::Span) -> Self {
175+
self.request_state.lock().unwrap().span = span;
163176
self
164177
}
165178

@@ -173,8 +186,8 @@ impl RunnerOperations {
173186
self
174187
}
175188

176-
pub fn with_log_tx(mut self, log_tx: LogTx) -> Self {
177-
self.log_tx = Some(log_tx);
189+
pub fn with_log_tx(self, log_tx: LogTx) -> Self {
190+
self.request_state.lock().unwrap().log_tx = Some(log_tx);
178191
self
179192
}
180193

@@ -188,6 +201,20 @@ impl RunnerOperations {
188201
self
189202
}
190203

204+
/// Swap per-request state (log handler + tracing span) for warm context reuse.
205+
///
206+
/// Called when an existing ops handle is reused for a new request.
207+
pub fn update_request(&self, log_tx: LogTx, span: tracing::Span) {
208+
let mut state = self.request_state.lock().unwrap();
209+
state.log_tx = Some(log_tx);
210+
state.span = span;
211+
}
212+
213+
/// Get a clone of the current span
214+
fn span(&self) -> tracing::Span {
215+
self.request_state.lock().unwrap().span.clone()
216+
}
217+
191218
/// Execute a binding fetch with the given config (shared by assets and storage).
192219
fn do_binding_fetch(
193220
&self,
@@ -196,7 +223,7 @@ impl RunnerOperations {
196223
request: HttpRequest,
197224
) -> OpFuture<'_, Result<HttpResponse, String>> {
198225
let binding_name = binding.to_string();
199-
let span = self.span.clone();
226+
let span = self.span();
200227

201228
Box::pin(
202229
async move {
@@ -261,6 +288,10 @@ impl Default for RunnerOperations {
261288
}
262289

263290
impl OperationsHandler for RunnerOperations {
291+
fn as_any(&self) -> &dyn std::any::Any {
292+
self
293+
}
294+
264295
/// Handle direct fetch: `fetch("https://example.com")`
265296
///
266297
/// This is a pass-through fetch with no auth modification.
@@ -269,7 +300,7 @@ impl OperationsHandler for RunnerOperations {
269300
/// Special case: URLs matching `*.workers.rocks` or `*.workers.dev.localhost`
270301
/// are routed internally to avoid DNS lookup and external network hop.
271302
fn handle_fetch(&self, request: HttpRequest) -> OpFuture<'_, Result<HttpResponse, String>> {
272-
let span = self.span.clone();
303+
let span = self.span();
273304
Box::pin(
274305
async move {
275306
// Acquire fetch limiter permit (blocks if at concurrent limit, errors if total exceeded)
@@ -366,7 +397,7 @@ impl OperationsHandler for RunnerOperations {
366397
}
367398
};
368399

369-
let span = self.span.clone();
400+
let span = self.span();
370401
Box::pin(
371402
async move {
372403
// Acquire storage limiter permit
@@ -436,7 +467,7 @@ impl OperationsHandler for RunnerOperations {
436467
};
437468

438469
let namespace_id = config.id.clone();
439-
let span = self.span.clone();
470+
let span = self.span();
440471

441472
Box::pin(
442473
async move {
@@ -489,7 +520,7 @@ impl OperationsHandler for RunnerOperations {
489520

490521
// Get shared pool for schema mode
491522
let shared_pool = self.db_pool.clone();
492-
let span = self.span.clone();
523+
let span = self.span();
493524

494525
Box::pin(
495526
async move {
@@ -590,7 +621,7 @@ impl OperationsHandler for RunnerOperations {
590621
}
591622
};
592623

593-
let span = self.span.clone();
624+
let span = self.span();
594625

595626
Box::pin(
596627
async move {
@@ -641,11 +672,15 @@ impl OperationsHandler for RunnerOperations {
641672
self.stats.log_count.fetch_add(1, Ordering::Relaxed);
642673

643674
// Send to log channel if available (for log collection/storage)
644-
if let Some(ref tx) = self.log_tx {
645-
let _ = tx.send(LogEvent {
646-
level,
647-
message: message.clone(),
648-
});
675+
{
676+
let state = self.request_state.lock().unwrap();
677+
678+
if let Some(ref tx) = state.log_tx {
679+
let _ = tx.send(LogEvent {
680+
level,
681+
message: message.clone(),
682+
});
683+
}
649684
}
650685

651686
// Also log via the log crate for debugging
@@ -670,6 +705,14 @@ mod tests {
670705
assert!(ops.bindings.kv.is_empty());
671706
}
672707

708+
#[test]
709+
fn test_runner_operations_update_request() {
710+
let ops = RunnerOperations::new();
711+
let (tx, _rx) = std::sync::mpsc::channel();
712+
ops.update_request(tx, tracing::Span::none());
713+
assert!(ops.request_state.lock().unwrap().log_tx.is_some());
714+
}
715+
673716
#[test]
674717
fn test_runner_operations_with_user() {
675718
let ops = RunnerOperations::new().with_user_id("user-123".to_string());

src/task_executor.rs

Lines changed: 38 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ use tokio::sync::OwnedSemaphorePermit;
44
use tracing::Instrument;
55

66
use crate::log::WorkerLogHandler;
7-
use crate::ops::{DbPool, RunnerOperations};
7+
use crate::ops::{DbPool, LogTx, RunnerOperations};
88
use crate::store::{CodeType, WorkerWithBindings};
99
use crate::worker::{Worker, create_worker, prepare_script};
1010
use crate::worker_pool::{TaskPermit, WORKER_POOL};
@@ -81,6 +81,10 @@ struct TaskComponents {
8181
ops: Arc<RunnerOperations>,
8282
code_type: CodeType,
8383
log_handler: WorkerLogHandler,
84+
/// Raw log sender for warm hit callback (to update cached ops)
85+
log_tx: LogTx,
86+
/// Tracing span for warm hit callback
87+
span: tracing::Span,
8488
}
8589

8690
/// Prepare task components: parse script, setup logging, and create operations handle.
@@ -101,7 +105,7 @@ fn prepare_task_components(config: &TaskExecutionConfig) -> Option<TaskComponent
101105
let ops = Arc::new(
102106
RunnerOperations::new()
103107
.with_worker_id(config.worker_data.id.clone())
104-
.with_log_tx(log_tx)
108+
.with_log_tx(log_tx.clone())
105109
.with_bindings(config.worker_data.bindings.clone())
106110
.with_db_pool(config.db_pool.clone())
107111
.with_span(config.span.clone()),
@@ -112,6 +116,8 @@ fn prepare_task_components(config: &TaskExecutionConfig) -> Option<TaskComponent
112116
ops,
113117
code_type: config.worker_data.code_type.clone(),
114118
log_handler,
119+
log_tx,
120+
span: config.span.clone(),
115121
})
116122
}
117123

@@ -184,20 +190,45 @@ pub async fn execute_task_await_v8_pooled(
184190
let execute_mode = V8ExecuteMode::get();
185191
let span = config.span.clone();
186192

193+
// Route to a consistent thread based on worker_id for warm cache affinity
194+
let routing_key = worker_id_for_snapshot.clone();
195+
187196
WORKER_POOL
188-
.spawn_await(move || {
197+
.spawn_await_keyed(&routing_key, move || {
189198
async move {
190199
// Wrap permit to automatically notify drain monitor on drop
191200
let _permit = TaskPermit::new(permit);
192201

193202
let result = match execute_mode {
194203
V8ExecuteMode::Pinned => {
195204
// Thread-pinned pool (default, best performance)
205+
// Pass worker_id + version for warm context caching.
206+
// The warm hit callback updates the cached ops' per-request state
207+
// so the existing event loop sends logs to the new handler.
208+
let warm_log_tx = components.log_tx.clone();
209+
let warm_span = components.span.clone();
210+
let on_warm_hit: openworkers_runtime_v8::WarmHitCallback =
211+
Box::new(move |cached_ops| {
212+
// Downcast to RunnerOperations to call update_request.
213+
// This updates the cached ops' per-request state (log_tx, span)
214+
// so the existing event loop sends logs to the new handler.
215+
if let Some(runner_ops) =
216+
cached_ops.as_any().downcast_ref::<RunnerOperations>()
217+
{
218+
runner_ops.update_request(warm_log_tx, warm_span);
219+
}
220+
});
221+
196222
openworkers_runtime_v8::execute_pinned(
197-
&owner_id,
198-
components.script,
199-
components.ops,
200-
task,
223+
openworkers_runtime_v8::PinnedExecuteRequest {
224+
owner_id,
225+
worker_id: worker_id_for_snapshot.clone(),
226+
version: version_for_snapshot,
227+
script: components.script,
228+
ops: components.ops,
229+
task,
230+
on_warm_hit: Some(on_warm_hit),
231+
},
201232
)
202233
.await
203234
}

0 commit comments

Comments
 (0)