Skip to content

Commit 583951c

Browse files
committed
add streming example to docs
1 parent 1d03c82 commit 583951c

3 files changed

Lines changed: 70 additions & 4 deletions

File tree

native-lib/README.md

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -220,3 +220,34 @@ finally:
220220
# Optional: if you used the global API and want to force cleanup
221221
dataweave.cleanup()
222222
```
223+
224+
### 6) I/O streaming with callbacks
225+
226+
Use `run_input_output_callback` to feed input and consume output through callbacks, enabling constant-memory processing of arbitrarily large data.
227+
228+
```python
229+
json_input = b'[1,2,3,4,5]'
230+
pos = 0
231+
232+
def read_cb(buf_size):
233+
nonlocal pos
234+
chunk = json_input[pos:pos + buf_size]
235+
pos += len(chunk)
236+
return chunk # return b"" when done
237+
238+
chunks = []
239+
def write_cb(data):
240+
chunks.append(data)
241+
return 0 # 0 = success
242+
243+
result = dataweave.run_input_output_callback(
244+
"output application/json deferred=true --- payload map ($ * $)",
245+
input_name="payload",
246+
input_mime_type="application/json",
247+
read_callback=read_cb,
248+
write_callback=write_cb,
249+
)
250+
251+
print(result) # {"success": True}
252+
print(b"".join(chunks)) # [1,4,9,16,25]
253+
```

native-lib/example_streaming.py

Lines changed: 38 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,23 +25,30 @@ def example_streaming_input_output_callback():
2525
input_iter = iter(range(num_elements))
2626
input_started = False
2727
input_done = False
28+
pending_token = None
2829

2930
def read_callback(buf_size):
30-
nonlocal input_started, input_done
31+
nonlocal input_started, input_done, pending_token
3132
if input_done:
3233
return b""
3334
parts = []
3435
if not input_started:
3536
parts.append(b"[")
3637
input_started = True
3738
remaining = buf_size - sum(len(p) for p in parts)
39+
if pending_token is not None:
40+
if len(pending_token) <= remaining:
41+
parts.append(pending_token)
42+
remaining -= len(pending_token)
43+
pending_token = None
44+
else:
45+
return b"".join(parts)
3846
try:
3947
while remaining > 0:
4048
i = next(input_iter)
4149
token = (b"," if i > 0 else b"") + str(i).encode("utf-8")
4250
if len(token) > remaining:
43-
# put it back via a wrapper – simpler: just include it and break
44-
parts.append(token)
51+
pending_token = token
4552
break
4653
parts.append(token)
4754
remaining -= len(token)
@@ -93,8 +100,36 @@ def write_callback(data):
93100
return False
94101

95102

103+
def doc_example():
104+
json_input = b'[1,2,3,4,5]'
105+
pos = 0
106+
107+
def read_cb(buf_size):
108+
nonlocal pos
109+
chunk = json_input[pos:pos + buf_size]
110+
pos += len(chunk)
111+
return chunk # return b"" when done
112+
113+
chunks = []
114+
def write_cb(data):
115+
chunks.append(data)
116+
return 0 # 0 = success
117+
118+
result = dataweave.run_input_output_callback(
119+
"output application/json deferred=true --- payload map ($ * $)",
120+
input_name="payload",
121+
input_mime_type="application/json",
122+
read_callback=read_cb,
123+
write_callback=write_cb,
124+
)
125+
126+
print(result) # {"success": True}
127+
print(b"".join(chunks)) # [1,4,9,16,25]
128+
129+
96130
def main():
97131
example_streaming_input_output_callback()
132+
doc_example()
98133

99134

100135
if __name__ == "__main__":

native-lib/python/src/dataweave/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -467,7 +467,7 @@ def _read_cb(_ctx, buf, buf_size):
467467
data = read_callback(buf_size)
468468
if not data:
469469
return 0 # EOF
470-
n = len(data)
470+
n = min(len(data), buf_size)
471471
ctypes.memmove(buf, data, n)
472472
return n
473473
except Exception:

0 commit comments

Comments
 (0)