Skip to content

Commit a892bd4

Browse files
committed
feature: added convert strategy
1 parent bafdd00 commit a892bd4

12 files changed

Lines changed: 259 additions & 10 deletions

File tree

Cargo.lock

Lines changed: 2 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,4 +32,4 @@ crate-type = ["cdylib"]
3232

3333
[dependencies]
3434
pyo3 = { version = "~0.11.1", features = ["extension-module"] }
35-
streamson-lib = "~5.0.0"
35+
streamson-lib = "~6.0.0"

src/convert.rs

Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
use super::{RustMatcher, StreamsonError};
2+
use pyo3::{prelude::*, types::PyBytes};
3+
use std::sync::{Arc, Mutex};
4+
use streamson_lib::{error, handler, path::Path, strategy};
5+
6+
/// TODO
7+
#[pyclass]
8+
#[derive(Clone)]
9+
pub struct PythonConverter {
10+
callable: PyObject,
11+
use_path: bool,
12+
}
13+
14+
#[pymethods]
15+
impl PythonConverter {
16+
/// Create instance of PythonConverter
17+
///
18+
/// # Arguments
19+
/// * `callable` - python callable (3 arguments)
20+
#[new]
21+
pub fn new(callable: PyObject, use_path: bool) -> Self {
22+
Self { callable, use_path }
23+
}
24+
}
25+
26+
impl handler::Handler for PythonConverter {
27+
fn handle(
28+
&mut self,
29+
path: &Path,
30+
matcher_idx: usize,
31+
data: Option<&[u8]>,
32+
) -> Result<Option<Vec<u8>>, error::Handler> {
33+
let gil = Python::acquire_gil();
34+
let res = self
35+
.callable
36+
.call1(
37+
gil.python(),
38+
(
39+
if self.use_path {
40+
Some(path.to_string())
41+
} else {
42+
None
43+
},
44+
matcher_idx,
45+
data.unwrap().to_vec(),
46+
),
47+
)
48+
.map_err(|_| error::Handler::new("Failed to call handler function"))?;
49+
let bytes = res.cast_as::<PyBytes>(gil.python()).unwrap();
50+
Ok(FromPyObject::extract(bytes)
51+
.map_err(|_| error::Handler::new("Function does not return bytes."))?)
52+
}
53+
}
54+
55+
/// Low level Python wrapper for Convert strategy
56+
#[pyclass]
57+
pub struct Convert {
58+
convert: strategy::Convert,
59+
}
60+
61+
#[pymethods]
62+
impl Convert {
63+
/// Create a new instance of Convert
64+
///
65+
/// # Arguments
66+
/// * `export_path` - indicator whether path is required in further processing
67+
#[new]
68+
pub fn new() -> PyResult<Self> {
69+
let convert = strategy::Convert::new();
70+
Ok(Self { convert })
71+
}
72+
73+
/// Adds matcher for Convert
74+
///
75+
/// # Arguments
76+
/// * `matcher` - matcher to be added (`Simple` or `Depth`)
77+
/// * `handlers` - list of handlers to process
78+
pub fn add_matcher(&mut self, matcher: &RustMatcher, mut handlers: Vec<PythonConverter>) {
79+
let mut res: Vec<Arc<Mutex<dyn handler::Handler>>> = vec![];
80+
handlers
81+
.drain(..)
82+
.for_each(|e| res.push(Arc::new(Mutex::new(e))));
83+
84+
self.convert
85+
.add_matcher(Box::new(matcher.inner.clone()), res)
86+
}
87+
88+
pub fn process(&mut self, input_data: &[u8]) -> PyResult<String> {
89+
match self.convert.process(input_data) {
90+
Err(err) => Err(StreamsonError::from(err).into()),
91+
Ok(data_list) => {
92+
let mut res = String::new();
93+
for out_data in data_list {
94+
res += &String::from_utf8(out_data.to_vec())?;
95+
}
96+
Ok(res)
97+
}
98+
}
99+
}
100+
}

src/filter.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ impl Filter {
3131
/// * `data` - input data to be processed
3232
///
3333
/// # Returns
34-
/// * `vector of tuples` - (path_or_none, data)
34+
/// * `filtered output`
3535
pub fn process(&mut self, data: &[u8]) -> PyResult<String> {
3636
match self.filter.process(data) {
3737
Err(err) => Err(StreamsonError::from(err).into()),

src/lib.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
1+
pub mod convert;
12
pub mod extract;
23
pub mod filter;
34
pub mod trigger;
45

6+
pub use convert::{Convert, PythonConverter};
57
pub use extract::Extract;
68
pub use filter::Filter;
79
pub use trigger::{PythonHandler, Trigger};
@@ -88,6 +90,8 @@ impl PyNumberProtocol for RustMatcher {
8890
/// This module is a python module implemented in Rust.
8991
#[pymodule]
9092
fn streamson(_py: Python, m: &PyModule) -> PyResult<()> {
93+
m.add_class::<Convert>()?;
94+
m.add_class::<PythonConverter>()?;
9195
m.add_class::<Extract>()?;
9296
m.add_class::<Filter>()?;
9397
m.add_class::<RustMatcher>()?;

src/trigger.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ impl handler::Handler for PythonHandler {
3939
path: &Path,
4040
matcher_idx: usize,
4141
data: Option<&[u8]>,
42-
) -> Result<(), error::Handler> {
42+
) -> Result<Option<Vec<u8>>, error::Handler> {
4343
let gil = Python::acquire_gil();
4444
self.callable
4545
.call1(
@@ -55,7 +55,7 @@ impl handler::Handler for PythonHandler {
5555
),
5656
)
5757
.map_err(|_| error::Handler::new("Calling python failed"))?;
58-
Ok(())
58+
Ok(None)
5959
}
6060
}
6161

streamson/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
from .convert import convert_async, convert_fd, convert_iter # noqa
12
from .extract import extract_async, extract_fd, extract_iter # noqa
23
from .filter import filter_async, filter_fd, filter_iter # noqa
34
from .matcher import DepthMatcher, Matcher, SimpleMatcher # noqa

streamson/convert.py

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
import typing
2+
3+
from streamson.streamson import Convert, PythonConverter
4+
5+
from .matcher import Matcher
6+
7+
8+
def convert_iter(
9+
input_gen: typing.Generator[bytes, None, None],
10+
handlers: typing.List[typing.Callable[[typing.Optional[str], int, typing.Optional[bytes]], typing.Optional[bytes]]],
11+
matcher: Matcher,
12+
require_path: bool = True,
13+
) -> typing.Generator[str, None, None]:
14+
"""Converts handlers on matched data from a file description
15+
:param input_gen: input generator
16+
:param handlers: list of handlers which should be called on match
17+
:param matcher: used matcher
18+
:param: require_path: is path required for handlers
19+
20+
:yields: converted data
21+
"""
22+
convert = Convert()
23+
convert.add_matcher(matcher.inner, [PythonConverter(handler, require_path) for handler in handlers])
24+
for item in input_gen:
25+
for output in convert.process(item):
26+
yield output
27+
28+
29+
def convert_fd(
30+
input_fd: typing.IO[bytes],
31+
handlers: typing.List[typing.Callable[[typing.Optional[str], int, typing.Optional[bytes]], typing.Optional[bytes]]],
32+
matcher: Matcher,
33+
buffer_size: int = 1024 * 1024,
34+
require_path: bool = True,
35+
) -> typing.Generator[str, None, None]:
36+
"""Converts handlers on matched data from a file description
37+
:param input_fd: input generator
38+
:param handlers: list of handlers which should be called on match
39+
:param matcher: used matcher
40+
:param: buffer_size: how many bytes can be read from a file at once
41+
:param: require_path: is path required for handlers
42+
43+
:yields: converted data
44+
"""
45+
convert = Convert()
46+
convert.add_matcher(matcher.inner, [PythonConverter(handler, require_path) for handler in handlers])
47+
48+
input_data = input_fd.read(buffer_size)
49+
50+
while input_data:
51+
for item in convert.process(input_data):
52+
yield item
53+
input_data = input_fd.read(buffer_size)
54+
55+
56+
async def convert_async(
57+
input_gen: typing.AsyncGenerator[bytes, None],
58+
handlers: typing.List[typing.Callable[[typing.Optional[str], int, typing.Optional[bytes]], typing.Optional[bytes]]],
59+
matcher: Matcher,
60+
require_path: bool = True,
61+
) -> typing.AsyncGenerator[str, None]:
62+
"""Convert handlers on matched data from async generator
63+
:param: input_gen: input generator
64+
:param handlers: list of handlers which should be called on match
65+
:param: matcher: used matcher
66+
:param: require_path: is path required for handlers
67+
68+
:yields: input data
69+
"""
70+
convert = Convert()
71+
convert.add_matcher(matcher.inner, [PythonConverter(handler, require_path) for handler in handlers])
72+
73+
async for input_data in input_gen:
74+
for item in convert.process(input_data):
75+
yield item

streamson/trigger.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77

88
def trigger_iter(
99
input_gen: typing.Generator[bytes, None, None],
10-
handlers: typing.List[typing.Callable[[typing.Optional[str], int, bytes], None]],
10+
handlers: typing.List[typing.Callable[[typing.Optional[str], int, typing.Optional[bytes]], typing.Optional[bytes]]],
1111
matcher: Matcher,
1212
require_path: bool = True,
1313
) -> typing.Generator[bytes, None, None]:
@@ -28,7 +28,7 @@ def trigger_iter(
2828

2929
def trigger_fd(
3030
input_fd: typing.IO[bytes],
31-
handlers: typing.List[typing.Callable[[typing.Optional[str], int, bytes], None]],
31+
handlers: typing.List[typing.Callable[[typing.Optional[str], int, typing.Optional[bytes]], typing.Optional[bytes]]],
3232
matcher: Matcher,
3333
buffer_size: int = 1024 * 1024,
3434
require_path: bool = True,
@@ -55,7 +55,7 @@ def trigger_fd(
5555

5656
async def trigger_async(
5757
input_gen: typing.AsyncGenerator[bytes, None],
58-
handlers: typing.List[typing.Callable[[typing.Optional[str], int, bytes], None]],
58+
handlers: typing.List[typing.Callable[[typing.Optional[str], int, typing.Optional[bytes]], typing.Optional[bytes]]],
5959
matcher: Matcher,
6060
require_path: bool = True,
6161
):

tests/conftest.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,9 +36,17 @@ def io_reader() -> io.BytesIO:
3636

3737
@pytest.fixture(scope="function")
3838
def handler():
39-
output: typing.List[typing.Tuple[typing.Optional[str], int, bytes]] = []
39+
output: typing.List[typing.Tuple[typing.Optional[str], int, typing.Optional[bytes]]] = []
4040

4141
def store_to_output(path: typing.Optional[str], matcher_idx: int, data: bytes):
4242
output.append((path, matcher_idx, data))
4343

4444
return store_to_output, output
45+
46+
47+
@pytest.fixture(scope="function")
48+
def converter():
49+
def convert_to_stars(path: typing.Optional[str], matcher_idx: int, data: typing.Optional[bytes]):
50+
return br'"***"'
51+
52+
return convert_to_stars

0 commit comments

Comments
 (0)