Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions crates/perry-codegen/src/ext_registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ const FFI_REGISTRY: &[(&str, OwnerKind)] = &[
("js_transform_stream_writable", OwnerKind::Stdlib { feature: Some("bundled-streams") }),
("js_stream_unwrap_handle", OwnerKind::Stdlib { feature: Some("bundled-streams") }),
// #1545: node:stream/web QueuingStrategy constructors.
("js_streams_strategy_high_water_mark", OwnerKind::Stdlib { feature: Some("bundled-streams") }),
("js_count_queuing_strategy_new", OwnerKind::Stdlib { feature: Some("bundled-streams") }),
("js_byte_length_queuing_strategy_new", OwnerKind::Stdlib { feature: Some("bundled-streams") }),

Expand Down
7 changes: 7 additions & 0 deletions crates/perry-codegen/src/lower_call/builtin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -869,6 +869,13 @@ pub(super) fn lower_builtin_new(
hwm = lower_expr(ctx, vexpr)?;
}
}
} else {
let strategy = lower_expr(ctx, &args[1])?;
hwm = ctx.block().call(
DOUBLE,
"js_streams_strategy_high_water_mark",
&[(DOUBLE, &strategy)],
);
}
}
let h = ctx.block().call(
Expand Down
1 change: 1 addition & 0 deletions crates/perry-codegen/src/runtime_decls/strings.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1874,6 +1874,7 @@ pub fn declare_phase_b_strings(module: &mut LlModule) {
module.declare_function("js_text_encoding_stream_new", DOUBLE, &[]);
// #1545: node:stream/web QueuingStrategy constructors — take the options
// object, return a `{ highWaterMark, size }` object.
module.declare_function("js_streams_strategy_high_water_mark", DOUBLE, &[DOUBLE]);
module.declare_function("js_count_queuing_strategy_new", DOUBLE, &[DOUBLE]);
module.declare_function("js_byte_length_queuing_strategy_new", DOUBLE, &[DOUBLE]);
// Issue #562: stream subclassing (`class X extends WritableStream` etc.).
Expand Down
270 changes: 234 additions & 36 deletions crates/perry-stdlib/src/streams.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@

use perry_runtime::{
js_array_alloc, js_array_push, js_closure_call0, js_closure_call1, js_closure_call2,
js_object_alloc, js_object_get_field_by_name, js_object_set_field, js_object_set_field_by_name,
js_object_set_keys, js_promise_new, js_promise_reject, js_promise_resolve,
js_string_from_bytes, ClosureHeader, JSValue, ObjectHeader, Promise,
js_nanbox_get_pointer, js_object_alloc, js_object_get_field_by_name, js_object_set_field,
js_object_set_field_by_name, js_object_set_keys, js_promise_new, js_promise_reject,
js_promise_resolve, js_string_from_bytes, ClosureHeader, JSValue, ObjectHeader, Promise,
};
use std::collections::{HashMap, VecDeque};
use std::sync::Mutex;
Expand Down Expand Up @@ -84,9 +84,7 @@ struct WritableStreamData {
write_cb: i64,
close_cb: i64,
abort_cb: i64,
/// Backlog of writes when `in_flight` is true. Reserved for the
/// async-write path tracked as a #237 followup; today every write
/// runs synchronously through the user `write` callback.
/// Backlog of writes while the sink's previous `write()` Promise is pending.
write_queue: VecDeque<(u64, *mut Promise)>,
in_flight: bool,
high_water_mark: f64,
Expand Down Expand Up @@ -629,6 +627,11 @@ unsafe fn read_queuing_strategy_size(strategy: f64) -> i64 {
closure_from_bits(size.to_bits())
}

#[no_mangle]
pub unsafe extern "C" fn js_streams_strategy_high_water_mark(strategy: f64) -> f64 {
f64::from_bits(read_high_water_mark(strategy))
}

/// `new CountQueuingStrategy({ highWaterMark })`.
#[no_mangle]
pub unsafe extern "C" fn js_count_queuing_strategy_new(opts: f64) -> f64 {
Expand Down Expand Up @@ -1495,6 +1498,185 @@ pub unsafe extern "C" fn js_writable_stream_abort(stream_handle: f64, reason: f6
// WritableStreamDefaultWriter FFI
// ─────────────────────────────────────────────────────────────────────

fn writable_desired_size(s: &WritableStreamData) -> f64 {
s.high_water_mark - if s.in_flight { 1.0 } else { 0.0 } - s.write_queue.len() as f64
}

fn sync_writer_ready_promise(stream_id: usize, writer_id: usize, ready: *mut Promise) {
if let Some(w) = WRITERS.lock().unwrap().get_mut(&writer_id) {
if w.stream_handle == stream_id {
w.ready_promise = ready;
}
}
}

unsafe fn install_writable_backpressure_ready(stream_id: usize, writer_id: usize) {
let ready = js_promise_new();
if let Some(s) = WRITABLE_STREAMS.lock().unwrap().get_mut(&stream_id) {
s.ready_promise = ready;
}
sync_writer_ready_promise(stream_id, writer_id, ready);
}

fn writable_capture_usize(closure: *const ClosureHeader, idx: u32) -> usize {
let bits = perry_runtime::closure::js_closure_get_capture_ptr(closure, idx) as u64;
f64::from_bits(bits) as usize
}

fn writable_capture_promise(closure: *const ClosureHeader, idx: u32) -> *mut Promise {
perry_runtime::closure::js_closure_get_capture_ptr(closure, idx) as *mut Promise
}

extern "C" fn writable_write_fulfilled(closure: *const ClosureHeader, _value: f64) -> f64 {
unsafe {
let stream_id = writable_capture_usize(closure, 0);
let writer_id = writable_capture_usize(closure, 1);
let write_promise = writable_capture_promise(closure, 2);
finish_writable_write_success(stream_id, writer_id, write_promise);
}
f64::from_bits(TAG_UNDEFINED)
}

extern "C" fn writable_write_rejected(closure: *const ClosureHeader, reason: f64) -> f64 {
unsafe {
let stream_id = writable_capture_usize(closure, 0);
let write_promise = writable_capture_promise(closure, 2);
finish_writable_write_error(stream_id, write_promise, reason);
}
f64::from_bits(TAG_UNDEFINED)
}

unsafe fn attach_writable_write_handlers(
stream_id: usize,
writer_id: usize,
write_promise: *mut Promise,
sink_promise: *mut Promise,
) {
let fulfilled_fn = writable_write_fulfilled as *const u8;
let rejected_fn = writable_write_rejected as *const u8;
perry_runtime::closure::js_register_closure_arity(fulfilled_fn, 1);
perry_runtime::closure::js_register_closure_arity(rejected_fn, 1);

let on_fulfilled = perry_runtime::closure::js_closure_alloc(fulfilled_fn, 3);
perry_runtime::closure::js_closure_set_capture_ptr(
on_fulfilled,
0,
(stream_id as f64).to_bits() as i64,
);
perry_runtime::closure::js_closure_set_capture_ptr(
on_fulfilled,
1,
(writer_id as f64).to_bits() as i64,
);
perry_runtime::closure::js_closure_set_capture_ptr(on_fulfilled, 2, write_promise as i64);

let on_rejected = perry_runtime::closure::js_closure_alloc(rejected_fn, 3);
perry_runtime::closure::js_closure_set_capture_ptr(
on_rejected,
0,
(stream_id as f64).to_bits() as i64,
);
perry_runtime::closure::js_closure_set_capture_ptr(
on_rejected,
1,
(writer_id as f64).to_bits() as i64,
);
perry_runtime::closure::js_closure_set_capture_ptr(on_rejected, 2, write_promise as i64);

let _ = perry_runtime::promise::js_promise_then(sink_promise, on_fulfilled, on_rejected);
}

unsafe fn run_writable_write(
stream_id: usize,
writer_id: usize,
cb: i64,
chunk: f64,
promise: *mut Promise,
) {
if cb == 0 {
finish_writable_write_success(stream_id, writer_id, promise);
return;
}
let result = js_closure_call1(cb as *const ClosureHeader, chunk);
if perry_runtime::promise::js_value_is_promise(result) != 0 {
let sink_promise = js_nanbox_get_pointer(result) as *mut Promise;
if !sink_promise.is_null() {
attach_writable_write_handlers(stream_id, writer_id, promise, sink_promise);
return;
}
}
finish_writable_write_success(stream_id, writer_id, promise);
}

unsafe fn finish_writable_write_success(stream_id: usize, writer_id: usize, promise: *mut Promise) {
if !promise.is_null() {
js_promise_resolve(promise, f64::from_bits(TAG_UNDEFINED));
}

let (next, ready) = {
let mut g = WRITABLE_STREAMS.lock().unwrap();
match g.get_mut(&stream_id) {
Some(s) => {
s.in_flight = false;
let next = if s.state == WritableState::Writable {
s.write_queue.pop_front().map(|(chunk, p)| {
s.in_flight = true;
(s.write_cb, f64::from_bits(chunk), p)
})
} else {
None
};
let ready = if s.state == WritableState::Writable && writable_desired_size(s) > 0.0
{
s.ready_promise
} else {
std::ptr::null_mut()
};
(next, ready)
}
None => (None, std::ptr::null_mut()),
}
};

if !ready.is_null() {
js_promise_resolve(ready, f64::from_bits(TAG_UNDEFINED));
}
if let Some((cb, chunk, queued_promise)) = next {
run_writable_write(stream_id, writer_id, cb, chunk, queued_promise);
}
}

unsafe fn finish_writable_write_error(stream_id: usize, promise: *mut Promise, reason: f64) {
let (ready, closed, queued) = {
let mut g = WRITABLE_STREAMS.lock().unwrap();
match g.get_mut(&stream_id) {
Some(s) => {
s.in_flight = false;
s.state = WritableState::Errored;
s.error_value = reason.to_bits();
let queued: Vec<*mut Promise> = s.write_queue.drain(..).map(|(_, p)| p).collect();
(s.ready_promise, s.closed_promise, queued)
}
None => (std::ptr::null_mut(), std::ptr::null_mut(), Vec::new()),
}
};

if !promise.is_null() {
js_promise_reject(promise, reason);
}
for queued_promise in queued {
if !queued_promise.is_null() {
js_promise_reject(queued_promise, reason);
}
}
if !ready.is_null() {
js_promise_reject(ready, reason);
}
if !closed.is_null() {
js_promise_reject(closed, reason);
}
}

#[no_mangle]
pub unsafe extern "C" fn js_writer_write(writer_handle: f64, chunk: f64) -> *mut Promise {
let promise = js_promise_new();
Expand All @@ -1510,26 +1692,57 @@ pub unsafe extern "C" fn js_writer_write(writer_handle: f64, chunk: f64) -> *mut
if TRANSFORM_PAIRS.lock().unwrap().contains_key(&stream_id) {
return transform_write(stream_id, chunk);
}
let cb = match WRITABLE_STREAMS.lock().unwrap().get(&stream_id) {
Some(s) if s.state == WritableState::Writable => s.write_cb,
Some(s) if s.state == WritableState::Errored => {
let e = s.error_value;
js_promise_reject(promise, f64::from_bits(e));
return promise;
}
_ => {
let err = make_error_with_message("Stream is closed or closing");
js_promise_reject(promise, f64::from_bits(err));
return promise;
let mut start_write = None;
let needs_pending_ready;
{
let mut g = WRITABLE_STREAMS.lock().unwrap();
let s = match g.get_mut(&stream_id) {
Some(s) if s.state == WritableState::Writable => s,
Some(s) if s.state == WritableState::Errored => {
let e = s.error_value;
js_promise_reject(promise, f64::from_bits(e));
return promise;
}
_ => {
let err = make_error_with_message("Stream is closed or closing");
js_promise_reject(promise, f64::from_bits(err));
return promise;
}
};
let before = writable_desired_size(s);
if s.in_flight {
s.write_queue.push_back((chunk.to_bits(), promise));
} else {
s.in_flight = true;
start_write = Some((s.write_cb, chunk, promise));
}
};
if cb != 0 {
js_closure_call1(cb as *const ClosureHeader, chunk);
let after = writable_desired_size(s);
needs_pending_ready = before > 0.0 && after <= 0.0;
}
if needs_pending_ready {
install_writable_backpressure_ready(stream_id, writer_id);
}
if let Some((cb, chunk, write_promise)) = start_write {
run_writable_write(stream_id, writer_id, cb, chunk, write_promise);
}
js_promise_resolve(promise, f64::from_bits(TAG_UNDEFINED));
promise
}

#[no_mangle]
pub unsafe extern "C" fn js_writer_desired_size(writer_handle: f64) -> f64 {
let writer_id = writer_handle as usize;
let stream_id = match WRITERS.lock().unwrap().get(&writer_id) {
Some(w) => w.stream_handle,
None => return 0.0,
};
let g = WRITABLE_STREAMS.lock().unwrap();
match g.get(&stream_id) {
Some(s) if s.state == WritableState::Writable => writable_desired_size(s),
Some(s) if s.state == WritableState::Errored => f64::NAN,
_ => 0.0,
}
}

#[no_mangle]
pub unsafe extern "C" fn js_writer_close(writer_handle: f64) -> *mut Promise {
let writer_id = writer_handle as usize;
Expand Down Expand Up @@ -1606,21 +1819,6 @@ pub unsafe extern "C" fn js_writer_ready(writer_handle: f64) -> *mut Promise {
}
}

#[no_mangle]
pub unsafe extern "C" fn js_writer_desired_size(writer_handle: f64) -> f64 {
let writer_id = writer_handle as usize;
let stream_id = match WRITERS.lock().unwrap().get(&writer_id) {
Some(w) => w.stream_handle,
None => return 0.0,
};
let g = WRITABLE_STREAMS.lock().unwrap();
match g.get(&stream_id) {
Some(s) if s.state == WritableState::Writable => s.high_water_mark,
Some(s) if s.state == WritableState::Errored => f64::NAN,
_ => 0.0,
}
}

// ─────────────────────────────────────────────────────────────────────
// TransformStream FFI
// ─────────────────────────────────────────────────────────────────────
Expand Down
12 changes: 0 additions & 12 deletions test-parity/known_failures.json
Original file line number Diff line number Diff line change
Expand Up @@ -242,12 +242,6 @@
"category": "bug-open",
"reason": "node:stream: Readable.from(infiniteGen).take(N) \u2014 take helper does not abort upstream early; iteration hangs or returns wrong items. Flips to PASS when #1558 lands."
},
"node-suite/stream/web/desired-size-backpressure": {
"issue": "1545",
"added": "2026-05-24",
"category": "bug-open",
"reason": "node:stream/web: writer.desiredSize w/ CountQueuingStrategy \u2014 fails to compile (CountQueuingStrategy constructor not on globalThis). Flips to PASS when #1545 lands."
},
"node-suite/stream/compose/error-mid-chain": {
"issue": "1531",
"added": "2026-05-24",
Expand Down Expand Up @@ -290,12 +284,6 @@
"category": "bug-open",
"reason": "node:stream: compose() error does not destroy source. Flips to PASS when #1531 lands."
},
"node-suite/stream/web/desired-size-restores-after-drain": {
"issue": "1545",
"added": "2026-05-24",
"category": "bug-open",
"reason": "node:stream/web: writer.desiredSize does not restore to HWM after write resolves. Flips to PASS when #1545 lands."
},
"node-suite/stream/web/abort-during-pipeto": {
"issue": "1545",
"added": "2026-05-24",
Expand Down