Skip to content

Commit 1d03c82

Browse files
committed
remove session-based streaming api
1 parent c668cca commit 1d03c82

4 files changed

Lines changed: 4 additions & 913 deletions

File tree

native-lib/example_streaming.py

Lines changed: 0 additions & 126 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,6 @@
99
import dataweave
1010
import resource
1111
import psutil, os
12-
import threading
13-
import json
1412
import time
1513

1614
def example_streaming_input_output_callback():
@@ -95,132 +93,8 @@ def write_callback(data):
9593
return False
9694

9795

98-
def example_streaming_input_output():
99-
print("\nTesting streaming input and output (square numbers)...")
100-
try:
101-
start_time = time.monotonic()
102-
num_elements = 1_000_000 * 50
103-
chunk_size = 1024 * 64
104-
105-
input_stream = dataweave.open_input_stream("application/json", "utf-8")
106-
107-
script = """output application/json deferred=true
108-
---
109-
payload map ($ * $)"""
110-
111-
def feed_input():
112-
try:
113-
input_stream.write(b"[")
114-
for i in range(num_elements):
115-
if i > 0:
116-
input_stream.write(b",")
117-
input_stream.write(str(i).encode("utf-8"))
118-
input_stream.write(b"]")
119-
finally:
120-
input_stream.close()
121-
122-
feeder = threading.Thread(target=feed_input, daemon=True)
123-
feeder.start()
124-
125-
with dataweave.run_stream(script, inputs={"payload": input_stream}) as stream:
126-
usage = resource.getrusage(resource.RUSAGE_SELF)
127-
current_rss = psutil.Process(os.getpid()).memory_info().rss
128-
print(f">>> Output mimeType={stream.mimeType}, charset={stream.charset}, binary={stream.binary}, Max RSS: {usage.ru_maxrss / 1048576:.1f} MB, Current RSS: {current_rss / 1048576:.1f} MB ---")
129-
chunk_count = 0
130-
total_bytes = 0
131-
while True:
132-
chunk = stream.read(chunk_size)
133-
if not chunk:
134-
break
135-
chunk_count += 1
136-
total_bytes += len(chunk)
137-
if chunk_count % 5000 == 0:
138-
usage = resource.getrusage(resource.RUSAGE_SELF)
139-
current_rss = psutil.Process(os.getpid()).memory_info().rss
140-
print(f"--- chunk {chunk_count}: {len(chunk)} bytes, total: {total_bytes / 1048576:.1f} MB, Max RSS: {usage.ru_maxrss / 1048576:.1f} MB, Current RSS: {current_rss / 1048576:.1f} MB ---")
141-
142-
feeder.join()
143-
144-
peak_rss = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss / 1048576
145-
elapsed = time.monotonic() - start_time
146-
mins, secs = divmod(elapsed, 60)
147-
print(f"\n[OK] Streaming input/output done ({chunk_count} chunks, {total_bytes/ 1048576:.1f} MB, {num_elements:,} elements) - Time: {int(mins)}:{secs:06.3f}")
148-
print(f"Peak memory (max RSS): {peak_rss:.1f} MB")
149-
return True
150-
except Exception as e:
151-
print(f"[FAIL] Streaming input/output failed: {e}")
152-
import traceback
153-
traceback.print_exc()
154-
return False
155-
156-
def example_streaming_output_larger_than_memory():
157-
print("\nTesting streaming output larger than memory...")
158-
try:
159-
script = """output application/json deferred=true
160-
---
161-
{items: (1 to pow(1000, 2)*10) map {id: $, name: "item_" ++ $}}"""
162-
with dataweave.run_stream(script) as stream:
163-
print(f">>> Output mimeType={stream.mimeType}, charset={stream.charset}, binary={stream.binary}")
164-
chunk_count = 0
165-
total_bytes = 0
166-
while True:
167-
chunk = stream.read(1024*1024*10) # deferred=true uses 8k chunks
168-
if not chunk:
169-
break
170-
chunk_count += 1
171-
total_bytes += len(chunk)
172-
usage = resource.getrusage(resource.RUSAGE_SELF)
173-
current_rss = psutil.Process(os.getpid()).memory_info().rss
174-
# print script output
175-
# sys.stdout.write(chunk.decode(stream.charset or "utf-8"))
176-
# sys.stdout.flush()
177-
print(f"--- chunk {chunk_count}: {len(chunk)} bytes, total: {total_bytes / 1048576:.1f} MB, Max RSS: {usage.ru_maxrss / 1048576:.1f} MB, Current RSS: {current_rss / 1048576:.1f} MB ---")
178-
peak_rss = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss / 1048576
179-
print(f"\n[OK] Streaming done ({chunk_count} chunks, {total_bytes} bytes)")
180-
print(f"Peak memory (max RSS): {peak_rss:.1f} MB")
181-
return True
182-
except Exception as e:
183-
print(f"[FAIL] Streaming chunked read failed: {e}")
184-
return False
185-
186-
187-
def example_streaming_adding_chunks():
188-
print("\nTesting streaming adding chunks...")
189-
try:
190-
script = """output application/json deferred=true
191-
---
192-
{items: (1 to pow(1000, 2)*10) map {id: $, name: "item_" ++ $}}"""
193-
with dataweave.run_stream(script) as stream:
194-
print(f">>> Output mimeType={stream.mimeType}, charset={stream.charset}, binary={stream.binary}")
195-
chunks = []
196-
total_bytes = 0
197-
while True:
198-
chunk = stream.read(1024*1024*16)
199-
if not chunk:
200-
break
201-
total_bytes += len(chunk)
202-
chunks.append(chunk)
203-
usage = resource.getrusage(resource.RUSAGE_SELF)
204-
current_rss = psutil.Process(os.getpid()).memory_info().rss
205-
if len(chunks) % 1000 == 0:
206-
print(f"--- chunk {len(chunks)}: {len(chunk)} bytes, total: {total_bytes / 1048576:.1f} MB, Max RSS: {usage.ru_maxrss / 1048576:.1f} MB, Current RSS: {current_rss / 1048576:.1f} MB ---")
207-
full = b"".join(chunks)
208-
assert len(chunks) > 1, f"Expected multiple chunks, got {len(chunks)}"
209-
assert b"item_1" in full, "Expected 'item_1' in result"
210-
assert b"item_100" in full, "Expected 'item_100' in result"
211-
print(f"\n[OK] Streaming chunked read works ({len(chunks)} chunks)")
212-
#print(f"\n[OK] Streaming done ({chunk_count} chunks, {total_bytes} bytes)")
213-
return True
214-
except Exception as e:
215-
print(f"[FAIL] Streaming chunked read failed: {e}")
216-
return False
217-
218-
21996
def main():
22097
example_streaming_input_output_callback()
221-
# example_streaming_input_output()
222-
# example_streaming_output_larger_than_memory()
223-
# example_streaming_adding_chunks()
22498

22599

226100
if __name__ == "__main__":

0 commit comments

Comments
 (0)