From f2996837f6202f666789b916512f4fd59cabf5f0 Mon Sep 17 00:00:00 2001 From: Andrew DiZenzo <59515127+andrewtdiz@users.noreply.github.com> Date: Fri, 29 May 2026 20:52:42 -0700 Subject: [PATCH] fix(stream/web): track writer backpressure desired size --- crates/perry-codegen/src/ext_registry.rs | 1 + .../perry-codegen/src/lower_call/builtin.rs | 7 + .../src/runtime_decls/strings.rs | 1 + crates/perry-stdlib/src/streams.rs | 270 +++++++++++++++--- test-parity/known_failures.json | 12 - 5 files changed, 243 insertions(+), 48 deletions(-) diff --git a/crates/perry-codegen/src/ext_registry.rs b/crates/perry-codegen/src/ext_registry.rs index 845a1bcde..f5b42799c 100644 --- a/crates/perry-codegen/src/ext_registry.rs +++ b/crates/perry-codegen/src/ext_registry.rs @@ -144,6 +144,7 @@ const FFI_REGISTRY: &[(&str, OwnerKind)] = &[ ("js_transform_stream_writable", OwnerKind::Stdlib { feature: Some("bundled-streams") }), ("js_stream_unwrap_handle", OwnerKind::Stdlib { feature: Some("bundled-streams") }), // #1545: node:stream/web QueuingStrategy constructors. + ("js_streams_strategy_high_water_mark", OwnerKind::Stdlib { feature: Some("bundled-streams") }), ("js_count_queuing_strategy_new", OwnerKind::Stdlib { feature: Some("bundled-streams") }), ("js_byte_length_queuing_strategy_new", OwnerKind::Stdlib { feature: Some("bundled-streams") }), diff --git a/crates/perry-codegen/src/lower_call/builtin.rs b/crates/perry-codegen/src/lower_call/builtin.rs index 445295671..bfdadaa31 100644 --- a/crates/perry-codegen/src/lower_call/builtin.rs +++ b/crates/perry-codegen/src/lower_call/builtin.rs @@ -869,6 +869,13 @@ pub(super) fn lower_builtin_new( hwm = lower_expr(ctx, vexpr)?; } } + } else { + let strategy = lower_expr(ctx, &args[1])?; + hwm = ctx.block().call( + DOUBLE, + "js_streams_strategy_high_water_mark", + &[(DOUBLE, &strategy)], + ); } } let h = ctx.block().call( diff --git a/crates/perry-codegen/src/runtime_decls/strings.rs b/crates/perry-codegen/src/runtime_decls/strings.rs index 75b4a8681..40a4dbaf9 100644 --- a/crates/perry-codegen/src/runtime_decls/strings.rs +++ b/crates/perry-codegen/src/runtime_decls/strings.rs @@ -1874,6 +1874,7 @@ pub fn declare_phase_b_strings(module: &mut LlModule) { module.declare_function("js_text_encoding_stream_new", DOUBLE, &[]); // #1545: node:stream/web QueuingStrategy constructors — take the options // object, return a `{ highWaterMark, size }` object. + module.declare_function("js_streams_strategy_high_water_mark", DOUBLE, &[DOUBLE]); module.declare_function("js_count_queuing_strategy_new", DOUBLE, &[DOUBLE]); module.declare_function("js_byte_length_queuing_strategy_new", DOUBLE, &[DOUBLE]); // Issue #562: stream subclassing (`class X extends WritableStream` etc.). diff --git a/crates/perry-stdlib/src/streams.rs b/crates/perry-stdlib/src/streams.rs index cd689a569..e6377ba9d 100644 --- a/crates/perry-stdlib/src/streams.rs +++ b/crates/perry-stdlib/src/streams.rs @@ -26,9 +26,9 @@ use perry_runtime::{ js_array_alloc, js_array_push, js_closure_call0, js_closure_call1, js_closure_call2, - js_object_alloc, js_object_get_field_by_name, js_object_set_field, js_object_set_field_by_name, - js_object_set_keys, js_promise_new, js_promise_reject, js_promise_resolve, - js_string_from_bytes, ClosureHeader, JSValue, ObjectHeader, Promise, + js_nanbox_get_pointer, js_object_alloc, js_object_get_field_by_name, js_object_set_field, + js_object_set_field_by_name, js_object_set_keys, js_promise_new, js_promise_reject, + js_promise_resolve, js_string_from_bytes, ClosureHeader, JSValue, ObjectHeader, Promise, }; use std::collections::{HashMap, VecDeque}; use std::sync::Mutex; @@ -84,9 +84,7 @@ struct WritableStreamData { write_cb: i64, close_cb: i64, abort_cb: i64, - /// Backlog of writes when `in_flight` is true. Reserved for the - /// async-write path tracked as a #237 followup; today every write - /// runs synchronously through the user `write` callback. + /// Backlog of writes while the sink's previous `write()` Promise is pending. write_queue: VecDeque<(u64, *mut Promise)>, in_flight: bool, high_water_mark: f64, @@ -629,6 +627,11 @@ unsafe fn read_queuing_strategy_size(strategy: f64) -> i64 { closure_from_bits(size.to_bits()) } +#[no_mangle] +pub unsafe extern "C" fn js_streams_strategy_high_water_mark(strategy: f64) -> f64 { + f64::from_bits(read_high_water_mark(strategy)) +} + /// `new CountQueuingStrategy({ highWaterMark })`. #[no_mangle] pub unsafe extern "C" fn js_count_queuing_strategy_new(opts: f64) -> f64 { @@ -1495,6 +1498,185 @@ pub unsafe extern "C" fn js_writable_stream_abort(stream_handle: f64, reason: f6 // WritableStreamDefaultWriter FFI // ───────────────────────────────────────────────────────────────────── +fn writable_desired_size(s: &WritableStreamData) -> f64 { + s.high_water_mark - if s.in_flight { 1.0 } else { 0.0 } - s.write_queue.len() as f64 +} + +fn sync_writer_ready_promise(stream_id: usize, writer_id: usize, ready: *mut Promise) { + if let Some(w) = WRITERS.lock().unwrap().get_mut(&writer_id) { + if w.stream_handle == stream_id { + w.ready_promise = ready; + } + } +} + +unsafe fn install_writable_backpressure_ready(stream_id: usize, writer_id: usize) { + let ready = js_promise_new(); + if let Some(s) = WRITABLE_STREAMS.lock().unwrap().get_mut(&stream_id) { + s.ready_promise = ready; + } + sync_writer_ready_promise(stream_id, writer_id, ready); +} + +fn writable_capture_usize(closure: *const ClosureHeader, idx: u32) -> usize { + let bits = perry_runtime::closure::js_closure_get_capture_ptr(closure, idx) as u64; + f64::from_bits(bits) as usize +} + +fn writable_capture_promise(closure: *const ClosureHeader, idx: u32) -> *mut Promise { + perry_runtime::closure::js_closure_get_capture_ptr(closure, idx) as *mut Promise +} + +extern "C" fn writable_write_fulfilled(closure: *const ClosureHeader, _value: f64) -> f64 { + unsafe { + let stream_id = writable_capture_usize(closure, 0); + let writer_id = writable_capture_usize(closure, 1); + let write_promise = writable_capture_promise(closure, 2); + finish_writable_write_success(stream_id, writer_id, write_promise); + } + f64::from_bits(TAG_UNDEFINED) +} + +extern "C" fn writable_write_rejected(closure: *const ClosureHeader, reason: f64) -> f64 { + unsafe { + let stream_id = writable_capture_usize(closure, 0); + let write_promise = writable_capture_promise(closure, 2); + finish_writable_write_error(stream_id, write_promise, reason); + } + f64::from_bits(TAG_UNDEFINED) +} + +unsafe fn attach_writable_write_handlers( + stream_id: usize, + writer_id: usize, + write_promise: *mut Promise, + sink_promise: *mut Promise, +) { + let fulfilled_fn = writable_write_fulfilled as *const u8; + let rejected_fn = writable_write_rejected as *const u8; + perry_runtime::closure::js_register_closure_arity(fulfilled_fn, 1); + perry_runtime::closure::js_register_closure_arity(rejected_fn, 1); + + let on_fulfilled = perry_runtime::closure::js_closure_alloc(fulfilled_fn, 3); + perry_runtime::closure::js_closure_set_capture_ptr( + on_fulfilled, + 0, + (stream_id as f64).to_bits() as i64, + ); + perry_runtime::closure::js_closure_set_capture_ptr( + on_fulfilled, + 1, + (writer_id as f64).to_bits() as i64, + ); + perry_runtime::closure::js_closure_set_capture_ptr(on_fulfilled, 2, write_promise as i64); + + let on_rejected = perry_runtime::closure::js_closure_alloc(rejected_fn, 3); + perry_runtime::closure::js_closure_set_capture_ptr( + on_rejected, + 0, + (stream_id as f64).to_bits() as i64, + ); + perry_runtime::closure::js_closure_set_capture_ptr( + on_rejected, + 1, + (writer_id as f64).to_bits() as i64, + ); + perry_runtime::closure::js_closure_set_capture_ptr(on_rejected, 2, write_promise as i64); + + let _ = perry_runtime::promise::js_promise_then(sink_promise, on_fulfilled, on_rejected); +} + +unsafe fn run_writable_write( + stream_id: usize, + writer_id: usize, + cb: i64, + chunk: f64, + promise: *mut Promise, +) { + if cb == 0 { + finish_writable_write_success(stream_id, writer_id, promise); + return; + } + let result = js_closure_call1(cb as *const ClosureHeader, chunk); + if perry_runtime::promise::js_value_is_promise(result) != 0 { + let sink_promise = js_nanbox_get_pointer(result) as *mut Promise; + if !sink_promise.is_null() { + attach_writable_write_handlers(stream_id, writer_id, promise, sink_promise); + return; + } + } + finish_writable_write_success(stream_id, writer_id, promise); +} + +unsafe fn finish_writable_write_success(stream_id: usize, writer_id: usize, promise: *mut Promise) { + if !promise.is_null() { + js_promise_resolve(promise, f64::from_bits(TAG_UNDEFINED)); + } + + let (next, ready) = { + let mut g = WRITABLE_STREAMS.lock().unwrap(); + match g.get_mut(&stream_id) { + Some(s) => { + s.in_flight = false; + let next = if s.state == WritableState::Writable { + s.write_queue.pop_front().map(|(chunk, p)| { + s.in_flight = true; + (s.write_cb, f64::from_bits(chunk), p) + }) + } else { + None + }; + let ready = if s.state == WritableState::Writable && writable_desired_size(s) > 0.0 + { + s.ready_promise + } else { + std::ptr::null_mut() + }; + (next, ready) + } + None => (None, std::ptr::null_mut()), + } + }; + + if !ready.is_null() { + js_promise_resolve(ready, f64::from_bits(TAG_UNDEFINED)); + } + if let Some((cb, chunk, queued_promise)) = next { + run_writable_write(stream_id, writer_id, cb, chunk, queued_promise); + } +} + +unsafe fn finish_writable_write_error(stream_id: usize, promise: *mut Promise, reason: f64) { + let (ready, closed, queued) = { + let mut g = WRITABLE_STREAMS.lock().unwrap(); + match g.get_mut(&stream_id) { + Some(s) => { + s.in_flight = false; + s.state = WritableState::Errored; + s.error_value = reason.to_bits(); + let queued: Vec<*mut Promise> = s.write_queue.drain(..).map(|(_, p)| p).collect(); + (s.ready_promise, s.closed_promise, queued) + } + None => (std::ptr::null_mut(), std::ptr::null_mut(), Vec::new()), + } + }; + + if !promise.is_null() { + js_promise_reject(promise, reason); + } + for queued_promise in queued { + if !queued_promise.is_null() { + js_promise_reject(queued_promise, reason); + } + } + if !ready.is_null() { + js_promise_reject(ready, reason); + } + if !closed.is_null() { + js_promise_reject(closed, reason); + } +} + #[no_mangle] pub unsafe extern "C" fn js_writer_write(writer_handle: f64, chunk: f64) -> *mut Promise { let promise = js_promise_new(); @@ -1510,26 +1692,57 @@ pub unsafe extern "C" fn js_writer_write(writer_handle: f64, chunk: f64) -> *mut if TRANSFORM_PAIRS.lock().unwrap().contains_key(&stream_id) { return transform_write(stream_id, chunk); } - let cb = match WRITABLE_STREAMS.lock().unwrap().get(&stream_id) { - Some(s) if s.state == WritableState::Writable => s.write_cb, - Some(s) if s.state == WritableState::Errored => { - let e = s.error_value; - js_promise_reject(promise, f64::from_bits(e)); - return promise; - } - _ => { - let err = make_error_with_message("Stream is closed or closing"); - js_promise_reject(promise, f64::from_bits(err)); - return promise; + let mut start_write = None; + let needs_pending_ready; + { + let mut g = WRITABLE_STREAMS.lock().unwrap(); + let s = match g.get_mut(&stream_id) { + Some(s) if s.state == WritableState::Writable => s, + Some(s) if s.state == WritableState::Errored => { + let e = s.error_value; + js_promise_reject(promise, f64::from_bits(e)); + return promise; + } + _ => { + let err = make_error_with_message("Stream is closed or closing"); + js_promise_reject(promise, f64::from_bits(err)); + return promise; + } + }; + let before = writable_desired_size(s); + if s.in_flight { + s.write_queue.push_back((chunk.to_bits(), promise)); + } else { + s.in_flight = true; + start_write = Some((s.write_cb, chunk, promise)); } - }; - if cb != 0 { - js_closure_call1(cb as *const ClosureHeader, chunk); + let after = writable_desired_size(s); + needs_pending_ready = before > 0.0 && after <= 0.0; + } + if needs_pending_ready { + install_writable_backpressure_ready(stream_id, writer_id); + } + if let Some((cb, chunk, write_promise)) = start_write { + run_writable_write(stream_id, writer_id, cb, chunk, write_promise); } - js_promise_resolve(promise, f64::from_bits(TAG_UNDEFINED)); promise } +#[no_mangle] +pub unsafe extern "C" fn js_writer_desired_size(writer_handle: f64) -> f64 { + let writer_id = writer_handle as usize; + let stream_id = match WRITERS.lock().unwrap().get(&writer_id) { + Some(w) => w.stream_handle, + None => return 0.0, + }; + let g = WRITABLE_STREAMS.lock().unwrap(); + match g.get(&stream_id) { + Some(s) if s.state == WritableState::Writable => writable_desired_size(s), + Some(s) if s.state == WritableState::Errored => f64::NAN, + _ => 0.0, + } +} + #[no_mangle] pub unsafe extern "C" fn js_writer_close(writer_handle: f64) -> *mut Promise { let writer_id = writer_handle as usize; @@ -1606,21 +1819,6 @@ pub unsafe extern "C" fn js_writer_ready(writer_handle: f64) -> *mut Promise { } } -#[no_mangle] -pub unsafe extern "C" fn js_writer_desired_size(writer_handle: f64) -> f64 { - let writer_id = writer_handle as usize; - let stream_id = match WRITERS.lock().unwrap().get(&writer_id) { - Some(w) => w.stream_handle, - None => return 0.0, - }; - let g = WRITABLE_STREAMS.lock().unwrap(); - match g.get(&stream_id) { - Some(s) if s.state == WritableState::Writable => s.high_water_mark, - Some(s) if s.state == WritableState::Errored => f64::NAN, - _ => 0.0, - } -} - // ───────────────────────────────────────────────────────────────────── // TransformStream FFI // ───────────────────────────────────────────────────────────────────── diff --git a/test-parity/known_failures.json b/test-parity/known_failures.json index 752f0b041..d803f533c 100644 --- a/test-parity/known_failures.json +++ b/test-parity/known_failures.json @@ -242,12 +242,6 @@ "category": "bug-open", "reason": "node:stream: Readable.from(infiniteGen).take(N) \u2014 take helper does not abort upstream early; iteration hangs or returns wrong items. Flips to PASS when #1558 lands." }, - "node-suite/stream/web/desired-size-backpressure": { - "issue": "1545", - "added": "2026-05-24", - "category": "bug-open", - "reason": "node:stream/web: writer.desiredSize w/ CountQueuingStrategy \u2014 fails to compile (CountQueuingStrategy constructor not on globalThis). Flips to PASS when #1545 lands." - }, "node-suite/stream/compose/error-mid-chain": { "issue": "1531", "added": "2026-05-24", @@ -290,12 +284,6 @@ "category": "bug-open", "reason": "node:stream: compose() error does not destroy source. Flips to PASS when #1531 lands." }, - "node-suite/stream/web/desired-size-restores-after-drain": { - "issue": "1545", - "added": "2026-05-24", - "category": "bug-open", - "reason": "node:stream/web: writer.desiredSize does not restore to HWM after write resolves. Flips to PASS when #1545 lands." - }, "node-suite/stream/web/abort-during-pipeto": { "issue": "1545", "added": "2026-05-24",