Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 48 additions & 10 deletions crates/perry-stdlib/src/common/dispatch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -296,14 +296,23 @@ pub unsafe extern "C" fn js_handle_method_dispatch(

// zlib Transform streams (#1843): `zlib.createGzip()` etc. return handles
// in the 0x60000+ range; their `.write`/`.end`/`.on`/`.pipe`/`.flush`/
// `.close` calls lose their static type and route here. Gated on the
// registry AND the method vocabulary so a handle-id reused across another
// subsystem's registry can't misroute (handle id-spaces aren't unified —
// see the long comment above).
// `.params`/`.reset`/`.close` calls lose their static type and route here.
// Gated on the registry AND the method vocabulary so a handle-id reused
// across another subsystem's registry can't misroute (handle id-spaces
// aren't unified — see the long comment above).
#[cfg(feature = "compression")]
if matches!(
method_name,
"write" | "end" | "on" | "once" | "pipe" | "flush" | "close" | "destroy"
"write"
| "end"
| "on"
| "once"
| "pipe"
| "flush"
| "params"
| "reset"
| "close"
| "destroy"
) && crate::zlib::is_zlib_stream_handle(handle)
{
// zlib streams are synchronous, so nothing else triggers the pump
Expand All @@ -321,7 +330,17 @@ pub unsafe extern "C" fn js_handle_method_dispatch(
#[cfg(all(feature = "external-zlib-pump", not(feature = "compression")))]
if matches!(
method_name,
"write" | "end" | "on" | "once" | "addListener" | "pipe" | "flush" | "close" | "destroy"
"write"
| "end"
| "on"
| "once"
| "addListener"
| "pipe"
| "flush"
| "params"
| "reset"
| "close"
| "destroy"
) {
extern "C" {
fn js_ext_zlib_is_stream_handle(handle: i64) -> i32;
Expand Down Expand Up @@ -793,10 +812,10 @@ unsafe fn dispatch_net_socket(handle: i64, method: &str, args: &[f64]) -> f64 {
/// Dispatch a method call on a zlib Transform-stream handle (#1843).
///
/// `createGzip()` / `createDeflate()` / `createBrotliCompress()` / … return
/// handles whose `.write`/`.end`/`.on`/`.pipe`/`.flush`/`.close` lose their
/// static type and arrive here. Compression is synchronous and buffered in the
/// runtime: `.write()` accumulates input, `.end()` runs the codec and queues
/// 'data'/'end' onto the deferred-event pump.
/// handles whose `.write`/`.end`/`.on`/`.pipe`/`.flush`/`.params`/`.reset`/
/// `.close` lose their static type and arrive here. Compression is synchronous
/// and buffered in the runtime: `.write()` accumulates input, `.end()` runs the
/// codec and queues 'data'/'end' onto the deferred-event pump.
#[cfg(feature = "compression")]
unsafe fn dispatch_zlib_stream(handle: i64, method: &str, args: &[f64]) -> f64 {
fn unbox_to_i64(v: f64) -> i64 {
Expand Down Expand Up @@ -845,6 +864,20 @@ unsafe fn dispatch_zlib_stream(handle: i64, method: &str, args: &[f64]) -> f64 {
crate::zlib::zlib_stream_flush(handle, cb);
f64::from_bits(UNDEFINED)
}
"params" => {
let cb = args
.iter()
.rev()
.find(|a| (a.to_bits() >> 48) == 0x7FFD)
.map(|a| unbox_to_i64(*a))
.unwrap_or(0);
crate::zlib::zlib_stream_params(handle, cb);
f64::from_bits(UNDEFINED)
}
"reset" => {
crate::zlib::zlib_stream_reset(handle);
f64::from_bits(UNDEFINED)
}
_ => f64::from_bits(UNDEFINED),
}
}
Expand Down Expand Up @@ -1057,6 +1090,9 @@ pub unsafe extern "C" fn js_handle_property_dispatch(
// bind a closure here so the typeof short-circuit sees "function".
#[cfg(feature = "compression")]
if crate::zlib::is_zlib_stream_handle(handle) {
if property_name == "bytesWritten" {
return crate::zlib::zlib_stream_bytes_written(handle);
}
let method: Option<&'static [u8]> = match property_name {
"write" => Some(b"write"),
"end" => Some(b"end"),
Expand All @@ -1065,6 +1101,8 @@ pub unsafe extern "C" fn js_handle_property_dispatch(
"emit" => Some(b"emit"),
"pipe" => Some(b"pipe"),
"flush" => Some(b"flush"),
"params" => Some(b"params"),
"reset" => Some(b"reset"),
"removeListener" => Some(b"removeListener"),
"removeAllListeners" => Some(b"removeAllListeners"),
_ => None,
Expand Down
71 changes: 60 additions & 11 deletions crates/perry-stdlib/src/zlib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -476,6 +476,8 @@ struct ZlibStreamState {
/// Only used by `createUnzip` (buffer-until-end auto-detect).
input: Vec<u8>,
ended: bool,
bytes_written: usize,
pending_bytes_written: usize,
/// Destinations registered via `.pipe(dest)` — stored as NaN-boxed bits;
/// 'data'/'end' are forwarded to each via dynamic method dispatch.
pipes: Vec<u64>,
Expand Down Expand Up @@ -546,6 +548,8 @@ fn create_zlib_stream(codec: Codec) -> i64 {
codec_state: make_codec_state(codec),
input: Vec::new(),
ended: false,
bytes_written: 0,
pending_bytes_written: 0,
pipes: Vec::new(),
},
);
Expand Down Expand Up @@ -799,19 +803,22 @@ pub unsafe fn zlib_stream_write(handle: i64, chunk: f64) {
let event = {
let mut g = ZLIB_STREAMS.lock().unwrap();
match g.get_mut(&handle) {
Some(s) if !s.ended => match s.codec_state.as_mut() {
Some(cs) => match cs.write_chunk(&bytes) {
Ok(()) => {
let out = cs.drain();
(!out.is_empty()).then(|| ZlibEvent::Data(handle, out))
Some(s) if !s.ended => {
s.pending_bytes_written = s.pending_bytes_written.saturating_add(bytes.len());
match s.codec_state.as_mut() {
Some(cs) => match cs.write_chunk(&bytes) {
Ok(()) => {
let out = cs.drain();
(!out.is_empty()).then(|| ZlibEvent::Data(handle, out))
}
Err(e) => Some(ZlibEvent::Error(handle, e.to_string())),
},
None => {
s.input.extend_from_slice(&bytes);
None
}
Err(e) => Some(ZlibEvent::Error(handle, e.to_string())),
},
None => {
s.input.extend_from_slice(&bytes);
None
}
},
}
_ => return,
}
};
Expand Down Expand Up @@ -854,6 +861,46 @@ pub fn zlib_stream_flush(handle: i64, cb: i64) {
perry_runtime::event_pump::js_notify_main_thread();
}

/// `stream.params(level, strategy, cb?)` — Perry does not currently retune
/// compression levels, but Node exposes this as a function and invokes the
/// callback asynchronously when parameters are unchanged.
pub fn zlib_stream_params(_handle: i64, cb: i64) {
if cb != 0 {
ZLIB_PENDING_EVENTS
.lock()
.unwrap()
.push(ZlibEvent::Callback(cb));
perry_runtime::event_pump::js_notify_main_thread();
}
}

/// `stream.reset()` — reset buffered codec state and byte accounting.
pub fn zlib_stream_reset(handle: i64) {
let mut g = ZLIB_STREAMS.lock().unwrap();
if let Some(s) = g.get_mut(&handle) {
s.codec_state = make_codec_state(s.codec);
s.input.clear();
s.ended = false;
s.bytes_written = 0;
s.pending_bytes_written = 0;
}
}

pub fn zlib_stream_bytes_written(handle: i64) -> f64 {
ZLIB_STREAMS
.lock()
.unwrap()
.get(&handle)
.map(|s| s.bytes_written as f64)
.unwrap_or(0.0)
}

fn publish_zlib_bytes_written(handle: i64) {
if let Some(s) = ZLIB_STREAMS.lock().unwrap().get_mut(&handle) {
s.bytes_written = s.pending_bytes_written;
}
}

fn finish_zlib_stream(handle: i64) {
let (codec_state, codec, input) = {
let mut g = ZLIB_STREAMS.lock().unwrap();
Expand Down Expand Up @@ -1015,6 +1062,7 @@ pub unsafe extern "C" fn js_zlib_process_pending() -> i32 {
for ev in events {
match ev {
ZlibEvent::Data(id, bytes) => {
publish_zlib_bytes_written(id);
let cbs = listeners_for(id, "data");
if !cbs.is_empty() {
if let Some(buf_f64) = make_buffer(&bytes) {
Expand All @@ -1032,6 +1080,7 @@ pub unsafe extern "C" fn js_zlib_process_pending() -> i32 {
}
}
ZlibEvent::End(id) => {
publish_zlib_bytes_written(id);
for cb in listeners_for(id, "end") {
if cb != 0 {
js_closure_call0(cb as *const ClosureHeader);
Expand Down
26 changes: 26 additions & 0 deletions test-parity/node-suite/zlib/streams/instance-methods-and-bytes.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import * as zlib from "node:zlib";

const gzip = zlib.createGzip();

console.log("params typeof:", typeof gzip.params);
console.log("reset typeof:", typeof gzip.reset);
console.log("bytesWritten typeof:", typeof gzip.bytesWritten);
console.log("bytesWritten initial:", gzip.bytesWritten);
console.log("bytesRead typeof:", typeof (gzip as any).bytesRead);

gzip.destroy();

const written = zlib.createGzip();
written.on("data", () => {});
const finished = new Promise<void>((resolve, reject) => {
written.on("end", () => {
console.log("bytesWritten on end:", written.bytesWritten);
resolve();
});
written.on("error", reject);
});

written.end(Buffer.from("abc"));
console.log("bytesWritten after end call:", written.bytesWritten);

await finished;