Skip to content

Commit bafdd00

Browse files
committed
feature: add filter strategy
1 parent 0163cec commit bafdd00

6 files changed

Lines changed: 151 additions & 0 deletions

File tree

src/filter.rs

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
use super::{RustMatcher, StreamsonError};
2+
use pyo3::prelude::*;
3+
use streamson_lib::strategy;
4+
5+
/// Low level Python wrapper for Filter strategy
6+
#[pyclass]
7+
pub struct Filter {
8+
filter: strategy::Filter,
9+
}
10+
11+
#[pymethods]
12+
impl Filter {
13+
/// Create a new instance of Filter
14+
#[new]
15+
pub fn new() -> PyResult<Self> {
16+
let filter = strategy::Filter::new();
17+
Ok(Self { filter })
18+
}
19+
20+
/// Adds matcher for Filter
21+
///
22+
/// # Arguments
23+
/// * `matcher` - matcher to be added (`Simple` or `Depth`)
24+
pub fn add_matcher(&mut self, matcher: &RustMatcher) {
25+
self.filter.add_matcher(Box::new(matcher.inner.clone()));
26+
}
27+
28+
/// Process data for Filter strategy
29+
///
30+
/// # Arguments
31+
/// * `data` - input data to be processed
32+
///
33+
/// # Returns
34+
/// * `vector of tuples` - (path_or_none, data)
35+
pub fn process(&mut self, data: &[u8]) -> PyResult<String> {
36+
match self.filter.process(data) {
37+
Err(err) => Err(StreamsonError::from(err).into()),
38+
Ok(data) => Ok(String::from_utf8(data)?),
39+
}
40+
}
41+
}

src/lib.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
pub mod extract;
2+
pub mod filter;
23
pub mod trigger;
34

45
pub use extract::Extract;
6+
pub use filter::Filter;
57
pub use trigger::{PythonHandler, Trigger};
68

79
use pyo3::{class::PyNumberProtocol, create_exception, exceptions, prelude::*};
@@ -87,6 +89,7 @@ impl PyNumberProtocol for RustMatcher {
8789
#[pymodule]
8890
fn streamson(_py: Python, m: &PyModule) -> PyResult<()> {
8991
m.add_class::<Extract>()?;
92+
m.add_class::<Filter>()?;
9093
m.add_class::<RustMatcher>()?;
9194
m.add_class::<Trigger>()?;
9295
m.add_class::<PythonHandler>()?;

streamson/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
11
from .extract import extract_async, extract_fd, extract_iter # noqa
2+
from .filter import filter_async, filter_fd, filter_iter # noqa
23
from .matcher import DepthMatcher, Matcher, SimpleMatcher # noqa
34
from .trigger import trigger_async, trigger_fd, trigger_iter # noqa

streamson/filter.py

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
import typing
2+
3+
from streamson.streamson import Filter
4+
5+
from .matcher import Matcher
6+
7+
8+
def filter_iter(
9+
input_gen: typing.Generator[bytes, None, None],
10+
matcher: Matcher,
11+
) -> typing.Generator[typing.Tuple[str, typing.Any], None, None]:
12+
"""Filters json parts from generator specified by given matcher
13+
:param: input_gen: input generator
14+
:param: matcher: used matcher
15+
16+
:yields: filtered data
17+
"""
18+
filter_strategy = Filter()
19+
filter_strategy.add_matcher(matcher.inner)
20+
for item in input_gen:
21+
yield filter_strategy.process(item)
22+
23+
24+
def filter_fd(
25+
input_fd: typing.IO[bytes],
26+
matcher: Matcher,
27+
buffer_size: int = 1024 * 1024,
28+
) -> typing.Generator[typing.Tuple[str, typing.Any], None, None]:
29+
"""Filters json parts from input file specified by given matcher
30+
:param: input_fd: input fd
31+
:param: matcher: used matcher
32+
:param: buffer_size: how many bytes can be read from a file at once
33+
34+
:yields: filtered data
35+
"""
36+
filter_strategy = Filter()
37+
filter_strategy.add_matcher(matcher.inner)
38+
39+
input_data = input_fd.read(buffer_size)
40+
41+
while input_data:
42+
yield filter_strategy.process(input_data)
43+
input_data = input_fd.read(buffer_size)
44+
45+
46+
async def filter_async(
47+
input_gen: typing.AsyncGenerator[bytes, None],
48+
matcher: Matcher,
49+
):
50+
"""Filters json parts from given async generator specified by given matcher
51+
:param: input_gen: input generator
52+
:param: matcher: used matcher
53+
54+
:yields: filtered data
55+
"""
56+
filter_strategy = Filter()
57+
filter_strategy.add_matcher(matcher.inner)
58+
59+
async for input_data in input_gen:
60+
yield filter_strategy.process(input_data)

tests/test_filter.py

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
from enum import Enum, auto
2+
3+
import pytest
4+
5+
import streamson
6+
7+
8+
class Kind(Enum):
9+
FD = auto()
10+
ITER = auto()
11+
12+
13+
@pytest.mark.parametrize(
14+
"kind",
15+
[
16+
(Kind.FD),
17+
(Kind.ITER),
18+
],
19+
)
20+
def test_simple(io_reader, data, kind):
21+
matcher = streamson.SimpleMatcher('{"users"}[]')
22+
23+
output_data = ""
24+
if kind == Kind.ITER:
25+
for e in streamson.filter_iter((e for e in data), matcher):
26+
output_data += e
27+
elif kind == Kind.FD:
28+
for e in streamson.filter_fd(io_reader, matcher, 5):
29+
output_data += e
30+
31+
assert output_data == '{"users": [], "groups": ["admins", "users"]}'

tests/test_filter_async.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
import pytest
2+
3+
import streamson
4+
5+
6+
@pytest.mark.asyncio
7+
async def test_simple(make_async_gen, handler):
8+
matcher = streamson.SimpleMatcher('{"users"}[]')
9+
async_out = streamson.filter_async(make_async_gen()(), matcher)
10+
11+
output_data = ""
12+
async for rec in async_out:
13+
output_data += rec
14+
15+
assert output_data == '{"users": []}'

0 commit comments

Comments
 (0)