Skip to content

Commit 0163cec

Browse files
committed
feature: add trigger strategy
1 parent 0ab663c commit 0163cec

11 files changed

Lines changed: 361 additions & 46 deletions

File tree

src/extract.rs

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,28 +2,34 @@ use super::{RustMatcher, StreamsonError};
22
use pyo3::prelude::*;
33
use streamson_lib::strategy;
44

5-
/// Low level Python wrapper for Simple matcher and Buffer handler
5+
/// Low level Python wrapper for Extract strategy
66
#[pyclass]
77
pub struct Extract {
88
extract: strategy::Extract,
99
}
1010

1111
#[pymethods]
1212
impl Extract {
13-
/// Create a new instance of Streamson
13+
/// Create a new instance of Extract
1414
///
1515
/// # Arguments
16-
/// * `matches` - a list of valid simple matches (e.g. `{"users"}`, `[]{"name"}`, `[0]{}`)
1716
/// * `export_path` - indicator whether path is required in further processing
1817
#[new]
19-
pub fn new(matcher: &RustMatcher, export_path: Option<bool>) -> PyResult<Self> {
18+
pub fn new(export_path: Option<bool>) -> PyResult<Self> {
2019
let export_path = export_path.unwrap_or(true);
21-
let mut extract = strategy::Extract::new().set_export_path(export_path);
22-
extract.add_matcher(Box::new(matcher.inner.clone()));
20+
let extract = strategy::Extract::new().set_export_path(export_path);
2321
Ok(Self { extract })
2422
}
2523

26-
/// Feeds Streamson processor with data
24+
/// Adds matcher for Extract
25+
///
26+
/// # Arguments
27+
/// * `matcher` - matcher to be added (`Simple` or `Depth`)
28+
pub fn add_matcher(&mut self, matcher: &RustMatcher) {
29+
self.extract.add_matcher(Box::new(matcher.inner.clone()));
30+
}
31+
32+
/// Process data for Extract strategy
2733
///
2834
/// # Arguments
2935
/// * `data` - input data to be processed

src/lib.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
pub mod extract;
2+
pub mod trigger;
3+
24
pub use extract::Extract;
5+
pub use trigger::{PythonHandler, Trigger};
36

47
use pyo3::{class::PyNumberProtocol, create_exception, exceptions, prelude::*};
58

@@ -85,6 +88,8 @@ impl PyNumberProtocol for RustMatcher {
8588
fn streamson(_py: Python, m: &PyModule) -> PyResult<()> {
8689
m.add_class::<Extract>()?;
8790
m.add_class::<RustMatcher>()?;
91+
m.add_class::<Trigger>()?;
92+
m.add_class::<PythonHandler>()?;
8893

8994
Ok(())
9095
}

src/trigger.rs

Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
use super::{RustMatcher, StreamsonError};
2+
use pyo3::prelude::*;
3+
use std::sync::{Arc, Mutex};
4+
use streamson_lib::{error, handler, path::Path, strategy};
5+
6+
/// Streamson handler which calls python callable
7+
#[pyclass]
8+
#[derive(Clone)]
9+
pub struct PythonHandler {
10+
callable: PyObject,
11+
require_path: bool,
12+
}
13+
14+
#[pymethods]
15+
impl PythonHandler {
16+
/// Create instance of PythonHandler
17+
///
18+
/// # Arguments
19+
/// * `callable` - python callable (3 arguments)
20+
/// * `require_path` - should path be passed to handler
21+
#[new]
22+
pub fn new(callable: PyObject, require_path: bool) -> Self {
23+
Self {
24+
callable,
25+
require_path,
26+
}
27+
}
28+
}
29+
30+
impl handler::Handler for PythonHandler {
31+
/// Call python function as a part of rust handler
32+
///
33+
/// # Arguments
34+
/// * `path` - matched path
35+
/// * `matcher_idx` - index of triggered matcher
36+
/// * `data` - matched data
37+
fn handle(
38+
&mut self,
39+
path: &Path,
40+
matcher_idx: usize,
41+
data: Option<&[u8]>,
42+
) -> Result<(), error::Handler> {
43+
let gil = Python::acquire_gil();
44+
self.callable
45+
.call1(
46+
gil.python(),
47+
(
48+
if self.require_path {
49+
Some(path.to_string())
50+
} else {
51+
None
52+
},
53+
matcher_idx,
54+
data,
55+
),
56+
)
57+
.map_err(|_| error::Handler::new("Calling python failed"))?;
58+
Ok(())
59+
}
60+
}
61+
62+
/// Low level Python wrapper for Trigger strategy
63+
#[pyclass]
64+
pub struct Trigger {
65+
trigger: strategy::Trigger,
66+
}
67+
68+
#[pymethods]
69+
impl Trigger {
70+
/// Create a new instance of Trigger
71+
///
72+
/// # Arguments
73+
/// * `export_path` - indicator whether path is required in further processing
74+
#[new]
75+
pub fn new() -> PyResult<Self> {
76+
let trigger = strategy::Trigger::new();
77+
Ok(Self { trigger })
78+
}
79+
80+
/// Adds matcher for Trigger
81+
///
82+
/// # Arguments
83+
/// * `matcher` - matcher to be added (`Simple` or `Depth`)
84+
pub fn add_matcher(&mut self, matcher: &RustMatcher, mut handlers: Vec<PythonHandler>) {
85+
let mut res: Vec<Arc<Mutex<dyn handler::Handler>>> = vec![];
86+
handlers
87+
.drain(..)
88+
.for_each(|e| res.push(Arc::new(Mutex::new(e))));
89+
self.trigger
90+
.add_matcher(Box::new(matcher.inner.clone()), &res);
91+
}
92+
93+
pub fn process(&mut self, data: &[u8]) -> PyResult<String> {
94+
match self.trigger.process(data) {
95+
Err(err) => Err(StreamsonError::from(err).into()),
96+
Ok(()) => Ok(String::from_utf8(data.to_vec())?),
97+
}
98+
}
99+
}

streamson/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,3 @@
11
from .extract import extract_async, extract_fd, extract_iter # noqa
22
from .matcher import DepthMatcher, Matcher, SimpleMatcher # noqa
3+
from .trigger import trigger_async, trigger_fd, trigger_iter # noqa

streamson/extract.py

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -11,15 +11,16 @@ def extract_iter(
1111
convert: typing.Callable[[str], typing.Any] = lambda x: x,
1212
require_path: bool = True,
1313
) -> typing.Generator[typing.Tuple[str, typing.Any], None, None]:
14-
"""Extracts json from generator specified by given simple matcher
14+
"""Extracts json from generator specified by given matcher
1515
:param: input_gen: input generator
1616
:param: matcher: used matcher
1717
:param: convert: function used to convert raw data
1818
:param: require_path: is path required in output stream
1919
2020
:yields: path and converted data
2121
"""
22-
extract = Extract(matcher.inner, require_path)
22+
extract = Extract(require_path)
23+
extract.add_matcher(matcher.inner)
2324
for item in input_gen:
2425
for path, data in extract.process(item):
2526
yield path, convert(data)
@@ -32,7 +33,7 @@ def extract_fd(
3233
convert: typing.Callable[[str], typing.Any] = lambda x: x,
3334
require_path: bool = True,
3435
) -> typing.Generator[typing.Tuple[str, typing.Any], None, None]:
35-
"""Extracts json from input file specified by given simple matcher
36+
"""Extracts json from input file specified by given matcher
3637
:param: input_fd: input fd
3738
:param: buffer_size: how many bytes can be read from a file at once
3839
:param: matcher: used matcher
@@ -41,7 +42,9 @@ def extract_fd(
4142
4243
:yields: path and converted data
4344
"""
44-
extract = Extract(matcher.inner, require_path)
45+
extract = Extract(require_path)
46+
extract.add_matcher(matcher.inner)
47+
4548
input_data = input_fd.read(buffer_size)
4649

4750
while input_data:
@@ -56,15 +59,17 @@ async def extract_async(
5659
convert: typing.Callable[[str], typing.Any] = lambda x: x,
5760
require_path: bool = True,
5861
):
59-
"""Extracts json from given async generator specified by given simple matcher
62+
"""Extracts json from given async generator specified by given matcher
6063
:param: input_gen: input generator
6164
:param: matcher: used matcher
6265
:param: convert: function used to convert raw data
6366
:param: require_path: is path required in output stream
6467
6568
:yields: path and converted data
6669
"""
67-
extract = Extract(matcher.inner, require_path)
70+
extract = Extract(require_path)
71+
extract.add_matcher(matcher.inner)
72+
6873
async for input_data in input_gen:
6974
for path, data in extract.process(input_data):
7075
yield path, convert(data)

streamson/trigger.py

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
import typing
2+
3+
from streamson.streamson import PythonHandler, Trigger
4+
5+
from .matcher import Matcher
6+
7+
8+
def trigger_iter(
9+
input_gen: typing.Generator[bytes, None, None],
10+
handlers: typing.List[typing.Callable[[typing.Optional[str], int, bytes], None]],
11+
matcher: Matcher,
12+
require_path: bool = True,
13+
) -> typing.Generator[bytes, None, None]:
14+
"""Triggers handlers on matched input
15+
:param input_gen: input generator
16+
:param handlers: list of handlers which should be called on match
17+
:param matcher: used matcher
18+
:param: require_path: is path required for handlers
19+
20+
:yields: input data
21+
"""
22+
trigger = Trigger()
23+
trigger.add_matcher(matcher.inner, [PythonHandler(handler, require_path) for handler in handlers])
24+
for item in input_gen:
25+
trigger.process(item)
26+
yield item
27+
28+
29+
def trigger_fd(
30+
input_fd: typing.IO[bytes],
31+
handlers: typing.List[typing.Callable[[typing.Optional[str], int, bytes], None]],
32+
matcher: Matcher,
33+
buffer_size: int = 1024 * 1024,
34+
require_path: bool = True,
35+
) -> typing.Generator[bytes, None, None]:
36+
"""Triggers handlers on matched data from a file description
37+
:param input_fd: input generator
38+
:param handlers: list of handlers which should be called on match
39+
:param matcher: used matcher
40+
:param: buffer_size: how many bytes can be read from a file at once
41+
:param: require_path: is path required for handlers
42+
43+
:yields: input data
44+
"""
45+
trigger = Trigger()
46+
trigger.add_matcher(matcher.inner, [PythonHandler(handler, require_path) for handler in handlers])
47+
48+
input_data = input_fd.read(buffer_size)
49+
50+
while input_data:
51+
trigger.process(input_data)
52+
yield input_data
53+
input_data = input_fd.read(buffer_size)
54+
55+
56+
async def trigger_async(
57+
input_gen: typing.AsyncGenerator[bytes, None],
58+
handlers: typing.List[typing.Callable[[typing.Optional[str], int, bytes], None]],
59+
matcher: Matcher,
60+
require_path: bool = True,
61+
):
62+
"""Triggers handlers on matched data from async generator
63+
:param: input_gen: input generator
64+
:param handlers: list of handlers which should be called on match
65+
:param: matcher: used matcher
66+
:param: require_path: is path required for handlers
67+
68+
:yields: input data
69+
"""
70+
trigger = Trigger()
71+
trigger.add_matcher(matcher.inner, [PythonHandler(handler, require_path) for handler in handlers])
72+
73+
async for input_data in input_gen:
74+
trigger.process(input_data)
75+
yield input_data

tests/conftest.py

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
import io
2+
import json
3+
import typing
4+
5+
import pytest
6+
7+
DATA_JSON = {
8+
"users": ["john", "carl", "bob"],
9+
"groups": ["admins", "users"],
10+
}
11+
12+
13+
@pytest.fixture(scope="function")
14+
def make_async_gen():
15+
def make():
16+
async def async_in():
17+
18+
yield b'{"users": '
19+
yield b'["john", "carl", "bob"'
20+
yield b"]}"
21+
22+
return async_in
23+
24+
return make
25+
26+
27+
@pytest.fixture
28+
def data() -> typing.List[bytes]:
29+
return [json.dumps(DATA_JSON).encode()]
30+
31+
32+
@pytest.fixture
33+
def io_reader() -> io.BytesIO:
34+
return io.BytesIO(json.dumps(DATA_JSON).encode())
35+
36+
37+
@pytest.fixture(scope="function")
38+
def handler():
39+
output: typing.List[typing.Tuple[typing.Optional[str], int, bytes]] = []
40+
41+
def store_to_output(path: typing.Optional[str], matcher_idx: int, data: bytes):
42+
output.append((path, matcher_idx, data))
43+
44+
return store_to_output, output

tests/test_extract.py

Lines changed: 0 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,34 +1,17 @@
1-
import io
21
import json
3-
import typing
42
from enum import Enum, auto
53

64
import hyperjson
75
import pytest
86

97
import streamson
108

11-
DATA_JSON = {
12-
"users": ["john", "carl", "bob"],
13-
"groups": ["admins", "users"],
14-
}
15-
169

1710
class Kind(Enum):
1811
FD = auto()
1912
ITER = auto()
2013

2114

22-
@pytest.fixture
23-
def data() -> typing.List[bytes]:
24-
return [json.dumps(DATA_JSON).encode()]
25-
26-
27-
@pytest.fixture
28-
def io_reader() -> io.BytesIO:
29-
return io.BytesIO(json.dumps(DATA_JSON).encode())
30-
31-
3215
@pytest.mark.parametrize(
3316
"kind,convert,extract_path",
3417
[

0 commit comments

Comments
 (0)