Skip to content

Commit 141e1b3

Browse files
committed
refactor: merge in main and resolve conflicts and linting issues
2 parents c64da59 + 6f6d218 commit 141e1b3

22 files changed

Lines changed: 410 additions & 80 deletions

File tree

CHANGELOG.md

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,18 @@
1+
## v0.5.2 (2026-02-02)
2+
3+
### Refactor
4+
5+
- allow passing of custom loggers into pipeline objects
6+
- ensure traceback in broad exceptions
7+
- improve the logging around dve processing errors and align reporting to module name rather than legacy name
8+
- add sense check for text based file (#32)
9+
10+
## v0.5.1 (2026-01-28)
11+
12+
### Fix
13+
14+
- deal with pathing assumption that file had been moved to processed_file_path during file transformation
15+
116
## v0.5.0 (2026-01-16)
217

318
### Feat

Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ behave:
1717
${activate} behave
1818

1919
pytest:
20-
${activate} pytest tests/
20+
${activate} pytest -c pytest-dev.ini
2121

2222
all-tests: pytest behave
2323

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[tool.poetry]
22
name = "nhs_dve"
3-
version = "0.5.0"
3+
version = "0.5.2"
44
description = "`nhs data validation engine` is a framework used to validate data"
55
authors = ["NHS England <england.contactus@nhs.net>"]
66
readme = "README.md"

pytest-dev.ini

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
[pytest]
2+
log_cli = true
3+
log_cli_level = INFO

src/dve/common/error_utils.py

Lines changed: 4 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -66,8 +66,8 @@ def dump_feedback_errors(
6666

6767

6868
def dump_processing_errors(
69-
working_folder: URI, step_name: DVEStage, errors: Union[list[CriticalProcessingError], Messages]
70-
) -> URI:
69+
working_folder: URI, step_name: str, errors: list[CriticalProcessingError]
70+
):
7171
"""Write out critical processing errors"""
7272
if not working_folder:
7373
raise AttributeError("processed files path not passed")
@@ -76,27 +76,17 @@ def dump_processing_errors(
7676
if not errors:
7777
raise AttributeError("errors list not passed")
7878

79-
error_file: URI = get_processing_errors_uri(working_folder)
79+
error_file: URI = fh.joinuri(working_folder, "processing_errors", "processing_errors.json")
8080
processed = []
8181

8282
for error in errors:
83-
if isinstance(error, CriticalProcessingError):
84-
if msgs := error.messages:
85-
for msg in msgs:
86-
processed.append(
87-
{
88-
"step_name": step_name,
89-
"error_location": msg.error_location,
90-
"error_level": msg.error_type,
91-
"error_message": msg.error_message,
92-
}
93-
)
9483
processed.append(
9584
{
9685
"step_name": step_name,
9786
"error_location": "processing",
9887
"error_level": "integrity",
9988
"error_message": error.error_message,
89+
"error_traceback": error.messages,
10090
}
10191
)
10292

src/dve/core_engine/backends/base/contract.py

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
from dve.core_engine.backends.readers import get_reader
2222
from dve.core_engine.backends.types import Entities, EntityType, StageSuccessful
2323
from dve.core_engine.backends.utilities import dedup_messages, stringify_model
24+
from dve.core_engine.exceptions import CriticalProcessingError
2425
from dve.core_engine.loggers import get_logger
2526
from dve.core_engine.message import FeedbackMessage
2627
from dve.core_engine.type_hints import (
@@ -395,7 +396,16 @@ def apply(
395396
processing_errors_uri = get_processing_errors_uri(working_dir)
396397
entities, messages, successful = self.read_raw_entities(entity_locations, contract_metadata)
397398
if not successful:
398-
dump_processing_errors(working_dir, "data_contract", messages)
399+
dump_processing_errors(
400+
working_dir,
401+
"data_contract",
402+
[
403+
CriticalProcessingError(
404+
"Issue occurred while reading raw entities",
405+
[msg.error_message for msg in messages],
406+
)
407+
],
408+
)
399409
return {}, feedback_errors_uri, successful, processing_errors_uri
400410

401411
try:
@@ -409,7 +419,16 @@ def apply(
409419
"data contract",
410420
self.logger,
411421
)
412-
dump_processing_errors(working_dir, "data_contract", new_messages)
422+
dump_processing_errors(
423+
working_dir,
424+
"data_contract",
425+
[
426+
CriticalProcessingError(
427+
"Issue occurred while applying data_contract",
428+
[msg.error_message for msg in new_messages],
429+
)
430+
],
431+
)
413432

414433
if contract_metadata.cache_originals:
415434
for entity_name in list(entities):

src/dve/core_engine/backends/base/reader.py

Lines changed: 38 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,11 @@
88
from pydantic import BaseModel
99
from typing_extensions import Protocol
1010

11-
from dve.core_engine.backends.exceptions import ReaderLacksEntityTypeSupport
11+
from dve.core_engine.backends.exceptions import MessageBearingError, ReaderLacksEntityTypeSupport
1212
from dve.core_engine.backends.types import EntityName, EntityType
13+
from dve.core_engine.message import FeedbackMessage
1314
from dve.core_engine.type_hints import URI, ArbitraryFunction, WrapDecorator
15+
from dve.parser.file_handling.service import open_stream
1416

1517
T = TypeVar("T")
1618
ET_co = TypeVar("ET_co", covariant=True)
@@ -116,6 +118,8 @@ def read_to_entity_type(
116118
if entity_name == Iterator[dict[str, Any]]:
117119
return self.read_to_py_iterator(resource, entity_name, schema) # type: ignore
118120

121+
self.raise_if_not_sensible_file(resource, entity_name)
122+
119123
try:
120124
reader_func = self.__read_methods__[entity_type]
121125
except KeyError as err:
@@ -137,3 +141,36 @@ def write_parquet(
137141
138142
"""
139143
raise NotImplementedError(f"write_parquet not implemented in {self.__class__}")
144+
145+
@staticmethod
146+
def _check_likely_text_file(resource: URI) -> bool:
147+
"""Quick sense check of file to see if it looks like text
148+
- not 100% full proof, but hopefully enough to weed out most
149+
non-text files"""
150+
with open_stream(resource, "rb") as fle:
151+
start_chunk = fle.read(4096)
152+
# check for BOM character - utf-16 can contain NULL bytes
153+
if start_chunk.startswith((b"\xff\xfe", b"\xfe\xff")):
154+
return True
155+
# if null byte in - unlikely text
156+
if b"\x00" in start_chunk:
157+
return False
158+
return True
159+
160+
def raise_if_not_sensible_file(self, resource: URI, entity_name: str):
161+
"""Sense check that the file is a text file. Raise error if doesn't
162+
appear to be the case."""
163+
if not self._check_likely_text_file(resource):
164+
raise MessageBearingError(
165+
"The submitted file doesn't appear to be text",
166+
messages=[
167+
FeedbackMessage(
168+
entity=entity_name,
169+
record=None,
170+
failure_type="submission",
171+
error_location="Whole File",
172+
error_code="MalformedFile",
173+
error_message="The resource doesn't seem to be a valid text file",
174+
)
175+
],
176+
)

src/dve/core_engine/backends/base/rules.py

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -195,7 +195,7 @@ def evaluate(self, entities, *, config: AbstractStep) -> tuple[Messages, StageSu
195195
if success:
196196
success = False
197197
msg = f"Critical failure in rule {self._step_metadata_to_location(config)}"
198-
self.logger.error(msg)
198+
self.logger.exception(msg)
199199
self.logger.error(str(message))
200200

201201
return messages, success
@@ -404,7 +404,7 @@ def apply_sync_filters(
404404
[
405405
CriticalProcessingError(
406406
"Issue occurred while applying filter logic",
407-
messages=temp_messages,
407+
messages=[msg.error_message for msg in temp_messages if msg.error_message],
408408
)
409409
],
410410
)
@@ -429,7 +429,7 @@ def apply_sync_filters(
429429
[
430430
CriticalProcessingError(
431431
"Issue occurred while generating FeedbackMessages",
432-
messages=temp_messages,
432+
[msg.error_message for msg in temp_messages],
433433
)
434434
],
435435
)
@@ -454,7 +454,7 @@ def apply_sync_filters(
454454
[
455455
CriticalProcessingError(
456456
"Issue occurred while generating FeedbackMessages",
457-
messages=temp_messages,
457+
[msg.error_message for msg in temp_messages],
458458
)
459459
],
460460
)
@@ -485,7 +485,7 @@ def apply_sync_filters(
485485
[
486486
CriticalProcessingError(
487487
"Issue occurred while filtering error records",
488-
messages=temp_messages,
488+
[msg.error_message for msg in temp_messages],
489489
)
490490
],
491491
)
@@ -513,7 +513,7 @@ def apply_sync_filters(
513513
[
514514
CriticalProcessingError(
515515
"Issue occurred while generating FeedbackMessages",
516-
messages=temp_messages,
516+
[msg.error_message for msg in temp_messages],
517517
)
518518
],
519519
)
@@ -570,7 +570,7 @@ def apply_rules(
570570
[
571571
CriticalProcessingError(
572572
"Issue occurred while applying pre filter steps",
573-
messages=stage_messages,
573+
[msg.error_message for msg in stage_messages],
574574
)
575575
],
576576
)
@@ -616,7 +616,7 @@ def apply_rules(
616616
[
617617
CriticalProcessingError(
618618
"Issue occurred while applying post filter steps",
619-
messages=stage_messages,
619+
[msg.error_message for msg in stage_messages],
620620
)
621621
],
622622
)

src/dve/core_engine/backends/implementations/duckdb/readers/csv.py

Lines changed: 57 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
get_duckdb_type_from_annotation,
1717
)
1818
from dve.core_engine.backends.implementations.duckdb.types import SQLType
19+
from dve.core_engine.backends.readers.utilities import check_csv_header_expected
1920
from dve.core_engine.backends.utilities import get_polars_type_from_annotation
2021
from dve.core_engine.message import FeedbackMessage
2122
from dve.core_engine.type_hints import URI, EntityName
@@ -24,7 +25,14 @@
2425

2526
@duckdb_write_parquet
2627
class DuckDBCSVReader(BaseFileReader):
27-
"""A reader for CSV files"""
28+
"""A reader for CSV files including the ability to compare the passed model
29+
to the file header, if it exists.
30+
31+
field_check: flag to compare submitted file header to the accompanying pydantic model
32+
field_check_error_code: The error code to provide if the file header doesn't contain
33+
the expected fields
34+
field_check_error_message: The error message to provide if the file header doesn't contain
35+
the expected fields"""
2836

2937
# TODO - the read_to_relation should include the schema and determine whether to
3038
# TODO - stringify or not
@@ -35,15 +43,43 @@ def __init__(
3543
delim: str = ",",
3644
quotechar: str = '"',
3745
connection: Optional[DuckDBPyConnection] = None,
46+
field_check: bool = False,
47+
field_check_error_code: Optional[str] = "ExpectedVsActualFieldMismatch",
48+
field_check_error_message: Optional[str] = "The submitted header is missing fields",
3849
**_,
3950
):
4051
self.header = header
4152
self.delim = delim
4253
self.quotechar = quotechar
4354
self._connection = connection if connection else default_connection
55+
self.field_check = field_check
56+
self.field_check_error_code = field_check_error_code
57+
self.field_check_error_message = field_check_error_message
4458

4559
super().__init__()
4660

61+
def perform_field_check(
62+
self, resource: URI, entity_name: str, expected_schema: type[BaseModel]
63+
):
64+
"""Check that the header of the CSV aligns with the provided model"""
65+
if not self.header:
66+
raise ValueError("Cannot perform field check without a CSV header")
67+
68+
if missing := check_csv_header_expected(resource, expected_schema, self.delim):
69+
raise MessageBearingError(
70+
"The CSV header doesn't match what is expected",
71+
messages=[
72+
FeedbackMessage(
73+
entity=entity_name,
74+
record=None,
75+
failure_type="submission",
76+
error_location="Whole File",
77+
error_code=self.field_check_error_code,
78+
error_message=f"{self.field_check_error_message} - missing fields: {missing}", # pylint: disable=line-too-long
79+
)
80+
],
81+
)
82+
4783
def read_to_py_iterator(
4884
self, resource: URI, entity_name: EntityName, schema: type[BaseModel]
4985
) -> Iterator[dict[str, Any]]:
@@ -58,6 +94,9 @@ def read_to_relation( # pylint: disable=unused-argument
5894
if get_content_length(resource) == 0:
5995
raise EmptyFileError(f"File at {resource} is empty.")
6096

97+
if self.field_check:
98+
self.perform_field_check(resource, entity_name, schema)
99+
61100
reader_options: dict[str, Any] = {
62101
"header": self.header,
63102
"delimiter": self.delim,
@@ -89,6 +128,9 @@ def read_to_relation( # pylint: disable=unused-argument
89128
if get_content_length(resource) == 0:
90129
raise EmptyFileError(f"File at {resource} is empty.")
91130

131+
if self.field_check:
132+
self.perform_field_check(resource, entity_name, schema)
133+
92134
reader_options: dict[str, Any] = {
93135
"has_header": self.header,
94136
"separator": self.delim,
@@ -132,6 +174,17 @@ class DuckDBCSVRepeatingHeaderReader(PolarsToDuckDBCSVReader):
132174
| shop1 | clothes | 2025-01-01 |
133175
"""
134176

177+
def __init__(
178+
self,
179+
*args,
180+
non_unique_header_error_code: Optional[str] = "NonUniqueHeader",
181+
non_unique_header_error_message: Optional[str] = None,
182+
**kwargs,
183+
):
184+
self._non_unique_header_code = non_unique_header_error_code
185+
self._non_unique_header_message = non_unique_header_error_message
186+
super().__init__(*args, **kwargs)
187+
135188
@read_function(DuckDBPyRelation)
136189
def read_to_relation( # pylint: disable=unused-argument
137190
self, resource: URI, entity_name: EntityName, schema: type[BaseModel]
@@ -156,10 +209,12 @@ def read_to_relation( # pylint: disable=unused-argument
156209
failure_type="submission",
157210
error_message=(
158211
f"Found {no_records} distinct combination of header values."
212+
if not self._non_unique_header_message
213+
else self._non_unique_header_message
159214
),
160215
error_location=entity_name,
161216
category="Bad file",
162-
error_code="NonUniqueHeader",
217+
error_code=self._non_unique_header_code,
163218
)
164219
],
165220
)
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
"""General utilities for file readers"""
2+
3+
from typing import Optional
4+
5+
from pydantic import BaseModel
6+
7+
from dve.core_engine.type_hints import URI
8+
from dve.parser.file_handling.service import open_stream
9+
10+
11+
def check_csv_header_expected(
12+
resource: URI,
13+
expected_schema: type[BaseModel],
14+
delimiter: Optional[str] = ",",
15+
quote_char: str = '"',
16+
) -> set[str]:
17+
"""Check the header of a CSV matches the expected fields"""
18+
with open_stream(resource) as fle:
19+
header_fields = fle.readline().rstrip().replace(quote_char, "").split(delimiter)
20+
expected_fields = expected_schema.__fields__.keys()
21+
return set(expected_fields).difference(header_fields)

0 commit comments

Comments
 (0)