Skip to content

Commit d323e24

Browse files
committed
feature: async generator integration
1 parent 63e94f3 commit d323e24

3 files changed

Lines changed: 292 additions & 2 deletions

File tree

pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ isort = {version = "*", extras = ["pyproject"]}
4141
ijson = "*"
4242
mypy = "*"
4343
pytest = "^5.2"
44+
pytest-asyncio = "^0.14"
4445
pre-commit = {version = "~2.4.0", python = "^3.6.1"}
4546
maturin = "~0.8.0"
4647
safety = "~1.9.0"

streamson/__init__.py

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ def extract_iter(
5353
convert: typing.Callable[[str], typing.Any] = lambda x: x,
5454
require_path: bool = True,
5555
) -> typing.Generator[typing.Tuple[str, typing.Any], None, None]:
56-
"""Extracts json specified by given list of simple matches
56+
"""Extracts json from generator specified by given simple matcher
5757
:param: input_gen: input generator
5858
:param: matcher: used matcher
5959
:param: convert: function used to convert raw data
@@ -78,7 +78,7 @@ def extract_fd(
7878
convert: typing.Callable[[str], typing.Any] = lambda x: x,
7979
require_path: bool = True,
8080
) -> typing.Generator[typing.Tuple[str, typing.Any], None, None]:
81-
"""Extracts json specified by given list of simple matches
81+
"""Extracts json from input file specified by given simple matcher
8282
:param: input_fd: input fd
8383
:param: buffer_size: how many bytes can be read from a file at once
8484
:param: matcher: used matcher
@@ -99,3 +99,28 @@ def extract_fd(
9999
res = streamson.pop()
100100

101101
input_data = input_fd.read(buffer_size)
102+
103+
104+
async def extract_async(
105+
input_gen: typing.AsyncGenerator[bytes, None],
106+
matcher: Matcher,
107+
convert: typing.Callable[[str], typing.Any] = lambda x: x,
108+
require_path: bool = True,
109+
):
110+
"""Extracts json from given async generator specified by given simple matcher
111+
:param: input_gen: input generator
112+
:param: matcher: used matcher
113+
:param: convert: function used to convert raw data
114+
:param: require_path: is path required in output stream
115+
116+
:yields: path and converted data
117+
"""
118+
streamson = _Streamson(matcher.inner, require_path)
119+
async for input_data in input_gen:
120+
streamson.feed(input_data)
121+
122+
res = streamson.pop()
123+
while res is not None:
124+
path, data = res
125+
yield path, convert(data)
126+
res = streamson.pop()

tests/test_extract_async.py

Lines changed: 264 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,264 @@
1+
import json
2+
3+
import hyperjson
4+
import pytest
5+
6+
import streamson
7+
8+
9+
def make_async_gen():
10+
async def async_in():
11+
12+
yield b'{"users": '
13+
yield b'["john", "carl", "bob"'
14+
yield b"]}"
15+
16+
return async_in
17+
18+
19+
@pytest.mark.parametrize(
20+
"convert,extract_path",
21+
[
22+
(lambda x: x, True),
23+
(lambda x: x, False),
24+
(lambda x: json.dumps(x), True),
25+
(lambda x: json.dumps(x), False),
26+
(lambda x: hyperjson.dumps(x), True),
27+
(lambda x: hyperjson.dumps(x), False),
28+
],
29+
ids=[
30+
"raw-path",
31+
"raw-nopath",
32+
"json-path",
33+
"json-nopath",
34+
"hyperjson-path",
35+
"hyperjson-nopath",
36+
],
37+
)
38+
@pytest.mark.asyncio
39+
async def test_simple(convert, extract_path):
40+
matcher = streamson.SimpleMatcher('{"users"}[]')
41+
async_out = streamson.extract_async(make_async_gen()(), matcher, convert, extract_path)
42+
43+
res = []
44+
45+
async for rec in async_out:
46+
res.append(rec)
47+
48+
assert len(res) == 3
49+
50+
assert res[0] == ('{"users"}[0]' if extract_path else None, convert('"john"'))
51+
assert res[1] == ('{"users"}[1]' if extract_path else None, convert('"carl"'))
52+
assert res[2] == ('{"users"}[2]' if extract_path else None, convert('"bob"'))
53+
54+
55+
@pytest.mark.parametrize(
56+
"convert,extract_path",
57+
[
58+
(lambda x: x, True),
59+
(lambda x: x, False),
60+
(lambda x: json.dumps(x), True),
61+
(lambda x: json.dumps(x), False),
62+
(lambda x: hyperjson.dumps(x), True),
63+
(lambda x: hyperjson.dumps(x), False),
64+
],
65+
ids=[
66+
"raw-path",
67+
"raw-nopath",
68+
"json-path",
69+
"json-nopath",
70+
"hyperjson-path",
71+
"hyperjson-nopath",
72+
],
73+
)
74+
@pytest.mark.asyncio
75+
async def test_depth(convert, extract_path):
76+
matcher = streamson.DepthMatcher(1)
77+
78+
async_out = streamson.extract_async(make_async_gen()(), matcher, convert, extract_path)
79+
80+
res = []
81+
async for rec in async_out:
82+
res.append(rec)
83+
84+
assert len(res) == 4
85+
86+
assert res[0] == ('{"users"}[0]' if extract_path else None, convert('"john"'))
87+
assert res[1] == ('{"users"}[1]' if extract_path else None, convert('"carl"'))
88+
assert res[2] == ('{"users"}[2]' if extract_path else None, convert('"bob"'))
89+
90+
assert res[3] == (
91+
'{"users"}' if extract_path else None,
92+
convert('["john", "carl", "bob"]'),
93+
)
94+
95+
matcher = streamson.DepthMatcher(0, 1)
96+
async_out = streamson.extract_async(make_async_gen()(), matcher, convert, extract_path)
97+
98+
res = []
99+
async for rec in async_out:
100+
res.append(rec)
101+
102+
assert len(res) == 2
103+
assert res[0] == (
104+
'{"users"}' if extract_path else None,
105+
convert('["john", "carl", "bob"]'),
106+
)
107+
assert res[1] == (
108+
"" if extract_path else None,
109+
convert('{"users": ["john", "carl", "bob"]}'),
110+
)
111+
112+
113+
@pytest.mark.parametrize(
114+
"convert,extract_path",
115+
[
116+
(lambda x: x, True),
117+
(lambda x: x, False),
118+
(lambda x: json.dumps(x), True),
119+
(lambda x: json.dumps(x), False),
120+
(lambda x: hyperjson.dumps(x), True),
121+
(lambda x: hyperjson.dumps(x), False),
122+
],
123+
ids=[
124+
"raw-path",
125+
"raw-nopath",
126+
"json-path",
127+
"json-nopath",
128+
"hyperjson-path",
129+
"hyperjson-nopath",
130+
],
131+
)
132+
@pytest.mark.asyncio
133+
async def test_invert(convert, extract_path):
134+
matcher = ~streamson.DepthMatcher(2)
135+
async_out = streamson.extract_async(make_async_gen()(), matcher, convert, extract_path)
136+
137+
res = []
138+
async for rec in async_out:
139+
res.append(rec)
140+
141+
assert len(res) == 2
142+
143+
assert res[0] == (
144+
'{"users"}' if extract_path else None,
145+
convert('["john", "carl", "bob"]'),
146+
)
147+
assert res[1] == (
148+
"" if extract_path else None,
149+
convert('{"users": ["john", "carl", "bob"]}'),
150+
)
151+
152+
153+
@pytest.mark.parametrize(
154+
"convert,extract_path",
155+
[
156+
(lambda x: x, True),
157+
(lambda x: x, False),
158+
(lambda x: json.dumps(x), True),
159+
(lambda x: json.dumps(x), False),
160+
(lambda x: hyperjson.dumps(x), True),
161+
(lambda x: hyperjson.dumps(x), False),
162+
],
163+
ids=[
164+
"raw-path",
165+
"raw-nopath",
166+
"json-path",
167+
"json-nopath",
168+
"hyperjson-path",
169+
"hyperjson-nopath",
170+
],
171+
)
172+
@pytest.mark.asyncio
173+
async def test_all(convert, extract_path):
174+
matcher = streamson.SimpleMatcher('{"users"}[]') & streamson.SimpleMatcher("{}[1]")
175+
176+
async_out = streamson.extract_async(make_async_gen()(), matcher, convert, extract_path)
177+
178+
res = []
179+
async for rec in async_out:
180+
res.append(rec)
181+
182+
assert len(res) == 1
183+
184+
assert res[0] == ('{"users"}[1]' if extract_path else None, convert('"carl"'))
185+
186+
187+
@pytest.mark.parametrize(
188+
"convert,extract_path",
189+
[
190+
(lambda x: x, True),
191+
(lambda x: x, False),
192+
(lambda x: json.dumps(x), True),
193+
(lambda x: json.dumps(x), False),
194+
(lambda x: hyperjson.dumps(x), True),
195+
(lambda x: hyperjson.dumps(x), False),
196+
],
197+
ids=[
198+
"raw-path",
199+
"raw-nopath",
200+
"json-path",
201+
"json-nopath",
202+
"hyperjson-path",
203+
"hyperjson-nopath",
204+
],
205+
)
206+
@pytest.mark.asyncio
207+
async def test_any(convert, extract_path):
208+
matcher = streamson.DepthMatcher(2, 2) | streamson.SimpleMatcher('{"users"}')
209+
210+
async_out = streamson.extract_async(make_async_gen()(), matcher, convert, extract_path)
211+
212+
res = []
213+
async for rec in async_out:
214+
res.append(rec)
215+
216+
assert len(res) == 4
217+
assert res[0] == ('{"users"}[0]' if extract_path else None, convert('"john"'))
218+
assert res[1] == ('{"users"}[1]' if extract_path else None, convert('"carl"'))
219+
assert res[2] == ('{"users"}[2]' if extract_path else None, convert('"bob"'))
220+
assert res[3] == (
221+
'{"users"}' if extract_path else None,
222+
convert('["john", "carl", "bob"]'),
223+
)
224+
225+
226+
@pytest.mark.parametrize(
227+
"convert,extract_path",
228+
[
229+
(lambda x: x, True),
230+
(lambda x: x, False),
231+
(lambda x: json.dumps(x), True),
232+
(lambda x: json.dumps(x), False),
233+
(lambda x: hyperjson.dumps(x), True),
234+
(lambda x: hyperjson.dumps(x), False),
235+
],
236+
ids=[
237+
"raw-path",
238+
"raw-nopath",
239+
"json-path",
240+
"json-nopath",
241+
"hyperjson-path",
242+
"hyperjson-nopath",
243+
],
244+
)
245+
@pytest.mark.asyncio
246+
async def test_complex(convert, extract_path):
247+
matcher = (streamson.DepthMatcher(2, 2) | streamson.SimpleMatcher('{"users"}')) & ~streamson.SimpleMatcher(
248+
'{"users"}[0]'
249+
)
250+
251+
async_out = streamson.extract_async(make_async_gen()(), matcher, convert, extract_path)
252+
253+
res = []
254+
async for rec in async_out:
255+
res.append(rec)
256+
257+
assert len(res) == 3
258+
259+
assert res[0] == ('{"users"}[1]' if extract_path else None, convert('"carl"'))
260+
assert res[1] == ('{"users"}[2]' if extract_path else None, convert('"bob"'))
261+
assert res[2] == (
262+
'{"users"}' if extract_path else None,
263+
convert('["john", "carl", "bob"]'),
264+
)

0 commit comments

Comments
 (0)