Skip to content

Commit 7110ef6

Browse files
authored
refactor: add sense check for text based file (#32)
* feat: Added new option to check csv headers in duckdb csv readers * refactor: small changes to foundry pipeline and duckdb csv to fix header check * feat: Added new check to base reader for sense check of whether processing a text file - applied for all reads via read_to_entity_type * refactor: Address review comments
1 parent 0f0e72c commit 7110ef6

7 files changed

Lines changed: 181 additions & 10 deletions

File tree

src/dve/core_engine/backends/base/reader.py

Lines changed: 38 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,11 @@
88
from pydantic import BaseModel
99
from typing_extensions import Protocol
1010

11-
from dve.core_engine.backends.exceptions import ReaderLacksEntityTypeSupport
11+
from dve.core_engine.backends.exceptions import MessageBearingError, ReaderLacksEntityTypeSupport
1212
from dve.core_engine.backends.types import EntityName, EntityType
13+
from dve.core_engine.message import FeedbackMessage
1314
from dve.core_engine.type_hints import URI, ArbitraryFunction, WrapDecorator
15+
from dve.parser.file_handling.service import open_stream
1416

1517
T = TypeVar("T")
1618
ET_co = TypeVar("ET_co", covariant=True)
@@ -116,6 +118,8 @@ def read_to_entity_type(
116118
if entity_name == Iterator[dict[str, Any]]:
117119
return self.read_to_py_iterator(resource, entity_name, schema) # type: ignore
118120

121+
self.raise_if_not_sensible_file(resource, entity_name)
122+
119123
try:
120124
reader_func = self.__read_methods__[entity_type]
121125
except KeyError as err:
@@ -137,3 +141,36 @@ def write_parquet(
137141
138142
"""
139143
raise NotImplementedError(f"write_parquet not implemented in {self.__class__}")
144+
145+
@staticmethod
146+
def _check_likely_text_file(resource: URI) -> bool:
147+
"""Quick sense check of file to see if it looks like text
148+
- not 100% full proof, but hopefully enough to weed out most
149+
non-text files"""
150+
with open_stream(resource, "rb") as fle:
151+
start_chunk = fle.read(4096)
152+
# check for BOM character - utf-16 can contain NULL bytes
153+
if start_chunk.startswith((b"\xff\xfe", b"\xfe\xff")):
154+
return True
155+
# if null byte in - unlikely text
156+
if b"\x00" in start_chunk:
157+
return False
158+
return True
159+
160+
def raise_if_not_sensible_file(self, resource: URI, entity_name: str):
161+
"""Sense check that the file is a text file. Raise error if doesn't
162+
appear to be the case."""
163+
if not self._check_likely_text_file(resource):
164+
raise MessageBearingError(
165+
"The submitted file doesn't appear to be text",
166+
messages=[
167+
FeedbackMessage(
168+
entity=entity_name,
169+
record=None,
170+
failure_type="submission",
171+
error_location="Whole File",
172+
error_code="MalformedFile",
173+
error_message="The resource doesn't seem to be a valid text file",
174+
)
175+
],
176+
)

src/dve/core_engine/backends/implementations/duckdb/readers/csv.py

Lines changed: 57 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
get_duckdb_type_from_annotation,
1717
)
1818
from dve.core_engine.backends.implementations.duckdb.types import SQLType
19+
from dve.core_engine.backends.readers.utilities import check_csv_header_expected
1920
from dve.core_engine.backends.utilities import get_polars_type_from_annotation
2021
from dve.core_engine.message import FeedbackMessage
2122
from dve.core_engine.type_hints import URI, EntityName
@@ -24,7 +25,14 @@
2425

2526
@duckdb_write_parquet
2627
class DuckDBCSVReader(BaseFileReader):
27-
"""A reader for CSV files"""
28+
"""A reader for CSV files including the ability to compare the passed model
29+
to the file header, if it exists.
30+
31+
field_check: flag to compare submitted file header to the accompanying pydantic model
32+
field_check_error_code: The error code to provide if the file header doesn't contain
33+
the expected fields
34+
field_check_error_message: The error message to provide if the file header doesn't contain
35+
the expected fields"""
2836

2937
# TODO - the read_to_relation should include the schema and determine whether to
3038
# TODO - stringify or not
@@ -35,15 +43,43 @@ def __init__(
3543
delim: str = ",",
3644
quotechar: str = '"',
3745
connection: Optional[DuckDBPyConnection] = None,
46+
field_check: bool = False,
47+
field_check_error_code: Optional[str] = "ExpectedVsActualFieldMismatch",
48+
field_check_error_message: Optional[str] = "The submitted header is missing fields",
3849
**_,
3950
):
4051
self.header = header
4152
self.delim = delim
4253
self.quotechar = quotechar
4354
self._connection = connection if connection else default_connection
55+
self.field_check = field_check
56+
self.field_check_error_code = field_check_error_code
57+
self.field_check_error_message = field_check_error_message
4458

4559
super().__init__()
4660

61+
def perform_field_check(
62+
self, resource: URI, entity_name: str, expected_schema: type[BaseModel]
63+
):
64+
"""Check that the header of the CSV aligns with the provided model"""
65+
if not self.header:
66+
raise ValueError("Cannot perform field check without a CSV header")
67+
68+
if missing := check_csv_header_expected(resource, expected_schema, self.delim):
69+
raise MessageBearingError(
70+
"The CSV header doesn't match what is expected",
71+
messages=[
72+
FeedbackMessage(
73+
entity=entity_name,
74+
record=None,
75+
failure_type="submission",
76+
error_location="Whole File",
77+
error_code=self.field_check_error_code,
78+
error_message=f"{self.field_check_error_message} - missing fields: {missing}", # pylint: disable=line-too-long
79+
)
80+
],
81+
)
82+
4783
def read_to_py_iterator(
4884
self, resource: URI, entity_name: EntityName, schema: type[BaseModel]
4985
) -> Iterator[dict[str, Any]]:
@@ -58,6 +94,9 @@ def read_to_relation( # pylint: disable=unused-argument
5894
if get_content_length(resource) == 0:
5995
raise EmptyFileError(f"File at {resource} is empty.")
6096

97+
if self.field_check:
98+
self.perform_field_check(resource, entity_name, schema)
99+
61100
reader_options: dict[str, Any] = {
62101
"header": self.header,
63102
"delimiter": self.delim,
@@ -89,6 +128,9 @@ def read_to_relation( # pylint: disable=unused-argument
89128
if get_content_length(resource) == 0:
90129
raise EmptyFileError(f"File at {resource} is empty.")
91130

131+
if self.field_check:
132+
self.perform_field_check(resource, entity_name, schema)
133+
92134
reader_options: dict[str, Any] = {
93135
"has_header": self.header,
94136
"separator": self.delim,
@@ -132,6 +174,17 @@ class DuckDBCSVRepeatingHeaderReader(PolarsToDuckDBCSVReader):
132174
| shop1 | clothes | 2025-01-01 |
133175
"""
134176

177+
def __init__(
178+
self,
179+
*args,
180+
non_unique_header_error_code: Optional[str] = "NonUniqueHeader",
181+
non_unique_header_error_message: Optional[str] = None,
182+
**kwargs,
183+
):
184+
self._non_unique_header_code = non_unique_header_error_code
185+
self._non_unique_header_message = non_unique_header_error_message
186+
super().__init__(*args, **kwargs)
187+
135188
@read_function(DuckDBPyRelation)
136189
def read_to_relation( # pylint: disable=unused-argument
137190
self, resource: URI, entity_name: EntityName, schema: type[BaseModel]
@@ -156,10 +209,12 @@ def read_to_relation( # pylint: disable=unused-argument
156209
failure_type="submission",
157210
error_message=(
158211
f"Found {no_records} distinct combination of header values."
212+
if not self._non_unique_header_message
213+
else self._non_unique_header_message
159214
),
160215
error_location=entity_name,
161216
category="Bad file",
162-
error_code="NonUniqueHeader",
217+
error_code=self._non_unique_header_code,
163218
)
164219
],
165220
)
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
"""General utilities for file readers"""
2+
3+
from typing import Optional
4+
5+
from pydantic import BaseModel
6+
7+
from dve.core_engine.type_hints import URI
8+
from dve.parser.file_handling.service import open_stream
9+
10+
11+
def check_csv_header_expected(
12+
resource: URI,
13+
expected_schema: type[BaseModel],
14+
delimiter: Optional[str] = ",",
15+
quote_char: str = '"',
16+
) -> set[str]:
17+
"""Check the header of a CSV matches the expected fields"""
18+
with open_stream(resource) as fle:
19+
header_fields = fle.readline().rstrip().replace(quote_char, "").split(delimiter)
20+
expected_fields = expected_schema.__fields__.keys()
21+
return set(expected_fields).difference(header_fields)

src/dve/pipeline/foundry_ddb_pipeline.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,8 @@ def error_report(
109109
self._logger.exception(exc)
110110
sub_stats = None
111111
report_uri = None
112+
submission_status = submission_status if submission_status else SubmissionStatus()
113+
submission_status.processing_failed = True
112114
dump_processing_errors(
113115
fh.joinuri(self.processed_files_path, submission_info.submission_id),
114116
"error_report",
@@ -148,7 +150,8 @@ def run_pipeline(
148150
sub_info, sub_status, sub_stats, report_uri = self.error_report(
149151
submission_info=submission_info, submission_status=sub_status
150152
)
151-
self._audit_tables.add_submission_statistics_records(sub_stats=[sub_stats])
153+
if sub_stats:
154+
self._audit_tables.add_submission_statistics_records(sub_stats=[sub_stats])
152155
except Exception as err: # pylint: disable=W0718
153156
self._logger.error(
154157
f"During processing of submission_id: {sub_id}, this exception was raised: {err}"

tests/test_core_engine/test_backends/test_implementations/test_duckdb/test_ddb_utils.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
from typing import Dict, List
21
import pytest
32

43
from dve.core_engine.backends.implementations.duckdb.utilities import (
@@ -16,7 +15,7 @@
1615
),
1716
],
1817
)
19-
def test_expr_mapping_to_columns(expressions: Dict[str, str], expected: list[str]):
18+
def test_expr_mapping_to_columns(expressions: dict[str, str], expected: list[str]):
2019
observed = expr_mapping_to_columns(expressions)
2120
assert observed == expected
2221

@@ -51,6 +50,7 @@ def test_expr_mapping_to_columns(expressions: Dict[str, str], expected: list[str
5150
),
5251
],
5352
)
54-
def test_expr_array_to_columns(expressions: Dict[str, str], expected: list[str]):
53+
def test_expr_array_to_columns(expressions: dict[str, str], expected: list[str]):
5554
observed = expr_array_to_columns(expressions)
5655
assert observed == expected
56+

tests/test_core_engine/test_backends/test_readers/test_ddb_json.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ def test_ddb_json_reader_all_str(temp_json_file):
5757
expected_fields = [fld for fld in mdl.__fields__]
5858
reader = DuckDBJSONReader()
5959
rel: DuckDBPyRelation = reader.read_to_entity_type(
60-
DuckDBPyRelation, uri, "test", stringify_model(mdl)
60+
DuckDBPyRelation, uri.as_posix(), "test", stringify_model(mdl)
6161
)
6262
assert rel.columns == expected_fields
6363
assert dict(zip(rel.columns, rel.dtypes)) == {fld: "VARCHAR" for fld in expected_fields}
@@ -68,7 +68,7 @@ def test_ddb_json_reader_cast(temp_json_file):
6868
uri, data, mdl = temp_json_file
6969
expected_fields = [fld for fld in mdl.__fields__]
7070
reader = DuckDBJSONReader()
71-
rel: DuckDBPyRelation = reader.read_to_entity_type(DuckDBPyRelation, uri, "test", mdl)
71+
rel: DuckDBPyRelation = reader.read_to_entity_type(DuckDBPyRelation, uri.as_posix(), "test", mdl)
7272

7373
assert rel.columns == expected_fields
7474
assert dict(zip(rel.columns, rel.dtypes)) == {
@@ -82,7 +82,7 @@ def test_ddb_csv_write_parquet(temp_json_file):
8282
uri, _, mdl = temp_json_file
8383
reader = DuckDBJSONReader()
8484
rel: DuckDBPyRelation = reader.read_to_entity_type(
85-
DuckDBPyRelation, uri, "test", stringify_model(mdl)
85+
DuckDBPyRelation, uri.as_posix(), "test", stringify_model(mdl)
8686
)
8787
target_loc: Path = uri.parent.joinpath("test_parquet.parquet").as_posix()
8888
reader.write_parquet(rel, target_loc)
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
import datetime as dt
2+
from pathlib import Path
3+
import tempfile
4+
from uuid import uuid4
5+
6+
import pytest
7+
from pydantic import BaseModel, create_model
8+
9+
from dve.core_engine.backends.readers.utilities import check_csv_header_expected
10+
11+
@pytest.mark.parametrize(
12+
["header_row", "delim", "schema", "expected"],
13+
[
14+
(
15+
"field1,field2,field3",
16+
",",
17+
{"field1": (str, ...), "field2": (int, ...), "field3": (float, 1.2)},
18+
set(),
19+
),
20+
(
21+
"field2,field3,field1",
22+
",",
23+
{"field1": (str, ...), "field2": (int, ...), "field3": (float, 1.2)},
24+
set(),
25+
),
26+
(
27+
"str_field|int_field|date_field|",
28+
",",
29+
{"str_field": (str, ...), "int_field": (int, ...), "date_field": (dt.date, dt.date.today())},
30+
{"str_field","int_field","date_field"},
31+
),
32+
(
33+
'"str_field"|"int_field"|"date_field"',
34+
"|",
35+
{"str_field": (str, ...), "int_field": (int, ...), "date_field": (dt.date, dt.date.today())},
36+
set(),
37+
),
38+
(
39+
'str_field,int_field,date_field\n',
40+
",",
41+
{"str_field": (str, ...), "int_field": (int, ...), "date_field": (dt.date, dt.date.today())},
42+
set(),
43+
),
44+
45+
],
46+
)
47+
def test_check_csv_header_expected(
48+
header_row: str, delim: str, schema: type[BaseModel], expected: set[str]
49+
):
50+
mdl = create_model("TestModel", **schema)
51+
with tempfile.TemporaryDirectory() as tmpdir:
52+
fle = Path(tmpdir).joinpath(f"test_file_{uuid4().hex}.csv")
53+
fle.open("w+").write(header_row)
54+
res = check_csv_header_expected(fle.as_posix(), mdl, delim)
55+
assert res == expected

0 commit comments

Comments
 (0)