diff --git a/crates/perry-stdlib/src/common/dispatch.rs b/crates/perry-stdlib/src/common/dispatch.rs index 29019e267..33c5533ed 100644 --- a/crates/perry-stdlib/src/common/dispatch.rs +++ b/crates/perry-stdlib/src/common/dispatch.rs @@ -296,14 +296,23 @@ pub unsafe extern "C" fn js_handle_method_dispatch( // zlib Transform streams (#1843): `zlib.createGzip()` etc. return handles // in the 0x60000+ range; their `.write`/`.end`/`.on`/`.pipe`/`.flush`/ - // `.close` calls lose their static type and route here. Gated on the - // registry AND the method vocabulary so a handle-id reused across another - // subsystem's registry can't misroute (handle id-spaces aren't unified — - // see the long comment above). + // `.params`/`.reset`/`.close` calls lose their static type and route here. + // Gated on the registry AND the method vocabulary so a handle-id reused + // across another subsystem's registry can't misroute (handle id-spaces + // aren't unified — see the long comment above). #[cfg(feature = "compression")] if matches!( method_name, - "write" | "end" | "on" | "once" | "pipe" | "flush" | "close" | "destroy" + "write" + | "end" + | "on" + | "once" + | "pipe" + | "flush" + | "params" + | "reset" + | "close" + | "destroy" ) && crate::zlib::is_zlib_stream_handle(handle) { // zlib streams are synchronous, so nothing else triggers the pump @@ -321,7 +330,17 @@ pub unsafe extern "C" fn js_handle_method_dispatch( #[cfg(all(feature = "external-zlib-pump", not(feature = "compression")))] if matches!( method_name, - "write" | "end" | "on" | "once" | "addListener" | "pipe" | "flush" | "close" | "destroy" + "write" + | "end" + | "on" + | "once" + | "addListener" + | "pipe" + | "flush" + | "params" + | "reset" + | "close" + | "destroy" ) { extern "C" { fn js_ext_zlib_is_stream_handle(handle: i64) -> i32; @@ -793,10 +812,10 @@ unsafe fn dispatch_net_socket(handle: i64, method: &str, args: &[f64]) -> f64 { /// Dispatch a method call on a zlib Transform-stream handle (#1843). /// /// `createGzip()` / `createDeflate()` / `createBrotliCompress()` / … return -/// handles whose `.write`/`.end`/`.on`/`.pipe`/`.flush`/`.close` lose their -/// static type and arrive here. Compression is synchronous and buffered in the -/// runtime: `.write()` accumulates input, `.end()` runs the codec and queues -/// 'data'/'end' onto the deferred-event pump. +/// handles whose `.write`/`.end`/`.on`/`.pipe`/`.flush`/`.params`/`.reset`/ +/// `.close` lose their static type and arrive here. Compression is synchronous +/// and buffered in the runtime: `.write()` accumulates input, `.end()` runs the +/// codec and queues 'data'/'end' onto the deferred-event pump. #[cfg(feature = "compression")] unsafe fn dispatch_zlib_stream(handle: i64, method: &str, args: &[f64]) -> f64 { fn unbox_to_i64(v: f64) -> i64 { @@ -845,6 +864,20 @@ unsafe fn dispatch_zlib_stream(handle: i64, method: &str, args: &[f64]) -> f64 { crate::zlib::zlib_stream_flush(handle, cb); f64::from_bits(UNDEFINED) } + "params" => { + let cb = args + .iter() + .rev() + .find(|a| (a.to_bits() >> 48) == 0x7FFD) + .map(|a| unbox_to_i64(*a)) + .unwrap_or(0); + crate::zlib::zlib_stream_params(handle, cb); + f64::from_bits(UNDEFINED) + } + "reset" => { + crate::zlib::zlib_stream_reset(handle); + f64::from_bits(UNDEFINED) + } _ => f64::from_bits(UNDEFINED), } } @@ -1057,6 +1090,9 @@ pub unsafe extern "C" fn js_handle_property_dispatch( // bind a closure here so the typeof short-circuit sees "function". #[cfg(feature = "compression")] if crate::zlib::is_zlib_stream_handle(handle) { + if property_name == "bytesWritten" { + return crate::zlib::zlib_stream_bytes_written(handle); + } let method: Option<&'static [u8]> = match property_name { "write" => Some(b"write"), "end" => Some(b"end"), @@ -1065,6 +1101,8 @@ pub unsafe extern "C" fn js_handle_property_dispatch( "emit" => Some(b"emit"), "pipe" => Some(b"pipe"), "flush" => Some(b"flush"), + "params" => Some(b"params"), + "reset" => Some(b"reset"), "removeListener" => Some(b"removeListener"), "removeAllListeners" => Some(b"removeAllListeners"), _ => None, diff --git a/crates/perry-stdlib/src/zlib.rs b/crates/perry-stdlib/src/zlib.rs index 842137de8..86f312a2d 100644 --- a/crates/perry-stdlib/src/zlib.rs +++ b/crates/perry-stdlib/src/zlib.rs @@ -476,6 +476,8 @@ struct ZlibStreamState { /// Only used by `createUnzip` (buffer-until-end auto-detect). input: Vec, ended: bool, + bytes_written: usize, + pending_bytes_written: usize, /// Destinations registered via `.pipe(dest)` — stored as NaN-boxed bits; /// 'data'/'end' are forwarded to each via dynamic method dispatch. pipes: Vec, @@ -546,6 +548,8 @@ fn create_zlib_stream(codec: Codec) -> i64 { codec_state: make_codec_state(codec), input: Vec::new(), ended: false, + bytes_written: 0, + pending_bytes_written: 0, pipes: Vec::new(), }, ); @@ -799,19 +803,22 @@ pub unsafe fn zlib_stream_write(handle: i64, chunk: f64) { let event = { let mut g = ZLIB_STREAMS.lock().unwrap(); match g.get_mut(&handle) { - Some(s) if !s.ended => match s.codec_state.as_mut() { - Some(cs) => match cs.write_chunk(&bytes) { - Ok(()) => { - let out = cs.drain(); - (!out.is_empty()).then(|| ZlibEvent::Data(handle, out)) + Some(s) if !s.ended => { + s.pending_bytes_written = s.pending_bytes_written.saturating_add(bytes.len()); + match s.codec_state.as_mut() { + Some(cs) => match cs.write_chunk(&bytes) { + Ok(()) => { + let out = cs.drain(); + (!out.is_empty()).then(|| ZlibEvent::Data(handle, out)) + } + Err(e) => Some(ZlibEvent::Error(handle, e.to_string())), + }, + None => { + s.input.extend_from_slice(&bytes); + None } - Err(e) => Some(ZlibEvent::Error(handle, e.to_string())), - }, - None => { - s.input.extend_from_slice(&bytes); - None } - }, + } _ => return, } }; @@ -854,6 +861,46 @@ pub fn zlib_stream_flush(handle: i64, cb: i64) { perry_runtime::event_pump::js_notify_main_thread(); } +/// `stream.params(level, strategy, cb?)` — Perry does not currently retune +/// compression levels, but Node exposes this as a function and invokes the +/// callback asynchronously when parameters are unchanged. +pub fn zlib_stream_params(_handle: i64, cb: i64) { + if cb != 0 { + ZLIB_PENDING_EVENTS + .lock() + .unwrap() + .push(ZlibEvent::Callback(cb)); + perry_runtime::event_pump::js_notify_main_thread(); + } +} + +/// `stream.reset()` — reset buffered codec state and byte accounting. +pub fn zlib_stream_reset(handle: i64) { + let mut g = ZLIB_STREAMS.lock().unwrap(); + if let Some(s) = g.get_mut(&handle) { + s.codec_state = make_codec_state(s.codec); + s.input.clear(); + s.ended = false; + s.bytes_written = 0; + s.pending_bytes_written = 0; + } +} + +pub fn zlib_stream_bytes_written(handle: i64) -> f64 { + ZLIB_STREAMS + .lock() + .unwrap() + .get(&handle) + .map(|s| s.bytes_written as f64) + .unwrap_or(0.0) +} + +fn publish_zlib_bytes_written(handle: i64) { + if let Some(s) = ZLIB_STREAMS.lock().unwrap().get_mut(&handle) { + s.bytes_written = s.pending_bytes_written; + } +} + fn finish_zlib_stream(handle: i64) { let (codec_state, codec, input) = { let mut g = ZLIB_STREAMS.lock().unwrap(); @@ -1015,6 +1062,7 @@ pub unsafe extern "C" fn js_zlib_process_pending() -> i32 { for ev in events { match ev { ZlibEvent::Data(id, bytes) => { + publish_zlib_bytes_written(id); let cbs = listeners_for(id, "data"); if !cbs.is_empty() { if let Some(buf_f64) = make_buffer(&bytes) { @@ -1032,6 +1080,7 @@ pub unsafe extern "C" fn js_zlib_process_pending() -> i32 { } } ZlibEvent::End(id) => { + publish_zlib_bytes_written(id); for cb in listeners_for(id, "end") { if cb != 0 { js_closure_call0(cb as *const ClosureHeader); diff --git a/test-parity/node-suite/zlib/streams/instance-methods-and-bytes.ts b/test-parity/node-suite/zlib/streams/instance-methods-and-bytes.ts new file mode 100644 index 000000000..a9bb08301 --- /dev/null +++ b/test-parity/node-suite/zlib/streams/instance-methods-and-bytes.ts @@ -0,0 +1,26 @@ +import * as zlib from "node:zlib"; + +const gzip = zlib.createGzip(); + +console.log("params typeof:", typeof gzip.params); +console.log("reset typeof:", typeof gzip.reset); +console.log("bytesWritten typeof:", typeof gzip.bytesWritten); +console.log("bytesWritten initial:", gzip.bytesWritten); +console.log("bytesRead typeof:", typeof (gzip as any).bytesRead); + +gzip.destroy(); + +const written = zlib.createGzip(); +written.on("data", () => {}); +const finished = new Promise((resolve, reject) => { + written.on("end", () => { + console.log("bytesWritten on end:", written.bytesWritten); + resolve(); + }); + written.on("error", reject); +}); + +written.end(Buffer.from("abc")); +console.log("bytesWritten after end call:", written.bytesWritten); + +await finished;