Skip to content

Commit 6f6d218

Browse files
Merge pull request #34 from NHSDigital/release_v052
Release v0.5.2
2 parents 0f0e72c + b111c1c commit 6f6d218

20 files changed

Lines changed: 317 additions & 56 deletions

File tree

CHANGELOG.md

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,12 @@
1+
## v0.5.2 (2026-02-02)
2+
3+
### Refactor
4+
5+
- allow passing of custom loggers into pipeline objects
6+
- ensure traceback in broad exceptions
7+
- improve the logging around dve processing errors and align reporting to module name rather than legacy name
8+
- add sense check for text based file (#32)
9+
110
## v0.5.1 (2026-01-28)
211

312
### Fix

Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ behave:
1717
${activate} behave
1818

1919
pytest:
20-
${activate} pytest tests/
20+
${activate} pytest -c pytest-dev.ini
2121

2222
all-tests: pytest behave
2323

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[tool.poetry]
22
name = "nhs_dve"
3-
version = "0.5.1"
3+
version = "0.5.2"
44
description = "`nhs data validation engine` is a framework used to validate data"
55
authors = ["NHS England <england.contactus@nhs.net>"]
66
readme = "README.md"

pytest-dev.ini

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
[pytest]
2+
log_cli = true
3+
log_cli_level = INFO

src/dve/core_engine/backends/base/reader.py

Lines changed: 38 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,11 @@
88
from pydantic import BaseModel
99
from typing_extensions import Protocol
1010

11-
from dve.core_engine.backends.exceptions import ReaderLacksEntityTypeSupport
11+
from dve.core_engine.backends.exceptions import MessageBearingError, ReaderLacksEntityTypeSupport
1212
from dve.core_engine.backends.types import EntityName, EntityType
13+
from dve.core_engine.message import FeedbackMessage
1314
from dve.core_engine.type_hints import URI, ArbitraryFunction, WrapDecorator
15+
from dve.parser.file_handling.service import open_stream
1416

1517
T = TypeVar("T")
1618
ET_co = TypeVar("ET_co", covariant=True)
@@ -116,6 +118,8 @@ def read_to_entity_type(
116118
if entity_name == Iterator[dict[str, Any]]:
117119
return self.read_to_py_iterator(resource, entity_name, schema) # type: ignore
118120

121+
self.raise_if_not_sensible_file(resource, entity_name)
122+
119123
try:
120124
reader_func = self.__read_methods__[entity_type]
121125
except KeyError as err:
@@ -137,3 +141,36 @@ def write_parquet(
137141
138142
"""
139143
raise NotImplementedError(f"write_parquet not implemented in {self.__class__}")
144+
145+
@staticmethod
146+
def _check_likely_text_file(resource: URI) -> bool:
147+
"""Quick sense check of file to see if it looks like text
148+
- not 100% full proof, but hopefully enough to weed out most
149+
non-text files"""
150+
with open_stream(resource, "rb") as fle:
151+
start_chunk = fle.read(4096)
152+
# check for BOM character - utf-16 can contain NULL bytes
153+
if start_chunk.startswith((b"\xff\xfe", b"\xfe\xff")):
154+
return True
155+
# if null byte in - unlikely text
156+
if b"\x00" in start_chunk:
157+
return False
158+
return True
159+
160+
def raise_if_not_sensible_file(self, resource: URI, entity_name: str):
161+
"""Sense check that the file is a text file. Raise error if doesn't
162+
appear to be the case."""
163+
if not self._check_likely_text_file(resource):
164+
raise MessageBearingError(
165+
"The submitted file doesn't appear to be text",
166+
messages=[
167+
FeedbackMessage(
168+
entity=entity_name,
169+
record=None,
170+
failure_type="submission",
171+
error_location="Whole File",
172+
error_code="MalformedFile",
173+
error_message="The resource doesn't seem to be a valid text file",
174+
)
175+
],
176+
)

src/dve/core_engine/backends/implementations/duckdb/readers/csv.py

Lines changed: 57 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
get_duckdb_type_from_annotation,
1717
)
1818
from dve.core_engine.backends.implementations.duckdb.types import SQLType
19+
from dve.core_engine.backends.readers.utilities import check_csv_header_expected
1920
from dve.core_engine.backends.utilities import get_polars_type_from_annotation
2021
from dve.core_engine.message import FeedbackMessage
2122
from dve.core_engine.type_hints import URI, EntityName
@@ -24,7 +25,14 @@
2425

2526
@duckdb_write_parquet
2627
class DuckDBCSVReader(BaseFileReader):
27-
"""A reader for CSV files"""
28+
"""A reader for CSV files including the ability to compare the passed model
29+
to the file header, if it exists.
30+
31+
field_check: flag to compare submitted file header to the accompanying pydantic model
32+
field_check_error_code: The error code to provide if the file header doesn't contain
33+
the expected fields
34+
field_check_error_message: The error message to provide if the file header doesn't contain
35+
the expected fields"""
2836

2937
# TODO - the read_to_relation should include the schema and determine whether to
3038
# TODO - stringify or not
@@ -35,15 +43,43 @@ def __init__(
3543
delim: str = ",",
3644
quotechar: str = '"',
3745
connection: Optional[DuckDBPyConnection] = None,
46+
field_check: bool = False,
47+
field_check_error_code: Optional[str] = "ExpectedVsActualFieldMismatch",
48+
field_check_error_message: Optional[str] = "The submitted header is missing fields",
3849
**_,
3950
):
4051
self.header = header
4152
self.delim = delim
4253
self.quotechar = quotechar
4354
self._connection = connection if connection else default_connection
55+
self.field_check = field_check
56+
self.field_check_error_code = field_check_error_code
57+
self.field_check_error_message = field_check_error_message
4458

4559
super().__init__()
4660

61+
def perform_field_check(
62+
self, resource: URI, entity_name: str, expected_schema: type[BaseModel]
63+
):
64+
"""Check that the header of the CSV aligns with the provided model"""
65+
if not self.header:
66+
raise ValueError("Cannot perform field check without a CSV header")
67+
68+
if missing := check_csv_header_expected(resource, expected_schema, self.delim):
69+
raise MessageBearingError(
70+
"The CSV header doesn't match what is expected",
71+
messages=[
72+
FeedbackMessage(
73+
entity=entity_name,
74+
record=None,
75+
failure_type="submission",
76+
error_location="Whole File",
77+
error_code=self.field_check_error_code,
78+
error_message=f"{self.field_check_error_message} - missing fields: {missing}", # pylint: disable=line-too-long
79+
)
80+
],
81+
)
82+
4783
def read_to_py_iterator(
4884
self, resource: URI, entity_name: EntityName, schema: type[BaseModel]
4985
) -> Iterator[dict[str, Any]]:
@@ -58,6 +94,9 @@ def read_to_relation( # pylint: disable=unused-argument
5894
if get_content_length(resource) == 0:
5995
raise EmptyFileError(f"File at {resource} is empty.")
6096

97+
if self.field_check:
98+
self.perform_field_check(resource, entity_name, schema)
99+
61100
reader_options: dict[str, Any] = {
62101
"header": self.header,
63102
"delimiter": self.delim,
@@ -89,6 +128,9 @@ def read_to_relation( # pylint: disable=unused-argument
89128
if get_content_length(resource) == 0:
90129
raise EmptyFileError(f"File at {resource} is empty.")
91130

131+
if self.field_check:
132+
self.perform_field_check(resource, entity_name, schema)
133+
92134
reader_options: dict[str, Any] = {
93135
"has_header": self.header,
94136
"separator": self.delim,
@@ -132,6 +174,17 @@ class DuckDBCSVRepeatingHeaderReader(PolarsToDuckDBCSVReader):
132174
| shop1 | clothes | 2025-01-01 |
133175
"""
134176

177+
def __init__(
178+
self,
179+
*args,
180+
non_unique_header_error_code: Optional[str] = "NonUniqueHeader",
181+
non_unique_header_error_message: Optional[str] = None,
182+
**kwargs,
183+
):
184+
self._non_unique_header_code = non_unique_header_error_code
185+
self._non_unique_header_message = non_unique_header_error_message
186+
super().__init__(*args, **kwargs)
187+
135188
@read_function(DuckDBPyRelation)
136189
def read_to_relation( # pylint: disable=unused-argument
137190
self, resource: URI, entity_name: EntityName, schema: type[BaseModel]
@@ -156,10 +209,12 @@ def read_to_relation( # pylint: disable=unused-argument
156209
failure_type="submission",
157210
error_message=(
158211
f"Found {no_records} distinct combination of header values."
212+
if not self._non_unique_header_message
213+
else self._non_unique_header_message
159214
),
160215
error_location=entity_name,
161216
category="Bad file",
162-
error_code="NonUniqueHeader",
217+
error_code=self._non_unique_header_code,
163218
)
164219
],
165220
)
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
"""General utilities for file readers"""
2+
3+
from typing import Optional
4+
5+
from pydantic import BaseModel
6+
7+
from dve.core_engine.type_hints import URI
8+
from dve.parser.file_handling.service import open_stream
9+
10+
11+
def check_csv_header_expected(
12+
resource: URI,
13+
expected_schema: type[BaseModel],
14+
delimiter: Optional[str] = ",",
15+
quote_char: str = '"',
16+
) -> set[str]:
17+
"""Check the header of a CSV matches the expected fields"""
18+
with open_stream(resource) as fle:
19+
header_fields = fle.readline().rstrip().replace(quote_char, "").split(delimiter)
20+
expected_fields = expected_schema.__fields__.keys()
21+
return set(expected_fields).difference(header_fields)

src/dve/core_engine/exceptions.py

Lines changed: 4 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,8 @@
11
"""Exceptions emitted by the pipeline."""
22

3-
from collections.abc import Iterator
3+
import traceback
44
from typing import Optional
55

6-
from dve.core_engine.backends.implementations.spark.types import SparkEntities
7-
from dve.core_engine.message import FeedbackMessage
8-
from dve.core_engine.type_hints import Messages
9-
106

117
class CriticalProcessingError(ValueError):
128
"""An exception emitted if critical errors are received."""
@@ -15,26 +11,18 @@ def __init__(
1511
self,
1612
error_message: str,
1713
*args: object,
18-
messages: Optional[Messages],
19-
entities: Optional[SparkEntities] = None
14+
messages: Optional[list[str]] = None,
2015
) -> None:
2116
super().__init__(error_message, *args)
2217
self.error_message = error_message
2318
"""The error message explaining the critical processing error."""
2419
self.messages = messages
25-
"""The messages gathered at the time the error was emitted."""
26-
self.entities = entities
27-
"""The entities as they exist at the time the error was emitted."""
28-
29-
@property
30-
def critical_messages(self) -> Iterator[FeedbackMessage]:
31-
"""Critical messages which caused the processing error."""
32-
yield from filter(lambda message: message.is_critical, self.messages) # type: ignore
20+
"""The stacktrace for the messages."""
3321

3422
@classmethod
3523
def from_exception(cls, exc: Exception):
3624
"""Create from broader exception, for recording in processing errors"""
37-
return cls(error_message=repr(exc), entities=None, messages=[])
25+
return cls(error_message=repr(exc), messages=traceback.format_exception(exc))
3826

3927

4028
class EntityTypeMismatch(TypeError):

src/dve/pipeline/duckdb_pipeline.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
"""DuckDB implementation for `Pipeline` object."""
22

3+
import logging
34
from typing import Optional
45

56
from duckdb import DuckDBPyConnection, DuckDBPyRelation
@@ -20,7 +21,7 @@ class DDBDVEPipeline(BaseDVEPipeline):
2021
"""
2122
Modified Pipeline class for running a DVE Pipeline with Spark
2223
"""
23-
24+
# pylint: disable=R0913
2425
def __init__(
2526
self,
2627
processed_files_path: URI,
@@ -30,6 +31,7 @@ def __init__(
3031
submitted_files_path: Optional[URI],
3132
reference_data_loader: Optional[type[BaseRefDataLoader]] = None,
3233
job_run_id: Optional[int] = None,
34+
logger: Optional[logging.Logger] = None,
3335
):
3436
self._connection = connection
3537
super().__init__(
@@ -41,6 +43,7 @@ def __init__(
4143
submitted_files_path,
4244
reference_data_loader,
4345
job_run_id,
46+
logger,
4447
)
4548

4649
# pylint: disable=arguments-differ

src/dve/pipeline/foundry_ddb_pipeline.py

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -57,8 +57,7 @@ def file_transformation(
5757
try:
5858
return super().file_transformation(submission_info)
5959
except Exception as exc: # pylint: disable=W0718
60-
self._logger.error(f"File transformation raised exception: {exc}")
61-
self._logger.exception(exc)
60+
self._logger.exception("File transformation raised exception:")
6261
dump_processing_errors(
6362
fh.joinuri(self.processed_files_path, submission_info.submission_id),
6463
"file_transformation",
@@ -73,8 +72,7 @@ def apply_data_contract(
7372
try:
7473
return super().apply_data_contract(submission_info, submission_status)
7574
except Exception as exc: # pylint: disable=W0718
76-
self._logger.error(f"Apply data contract raised exception: {exc}")
77-
self._logger.exception(exc)
75+
self._logger.exception("Apply data contract raised exception:")
7876
dump_processing_errors(
7977
fh.joinuri(self.processed_files_path, submission_info.submission_id),
8078
"contract",
@@ -89,8 +87,7 @@ def apply_business_rules(
8987
try:
9088
return super().apply_business_rules(submission_info, submission_status)
9189
except Exception as exc: # pylint: disable=W0718
92-
self._logger.error(f"Apply business rules raised exception: {exc}")
93-
self._logger.exception(exc)
90+
self._logger.exception("Apply business rules raised exception:")
9491
dump_processing_errors(
9592
fh.joinuri(self.processed_files_path, submission_info.submission_id),
9693
"business_rules",
@@ -105,10 +102,11 @@ def error_report(
105102
try:
106103
return super().error_report(submission_info, submission_status)
107104
except Exception as exc: # pylint: disable=W0718
108-
self._logger.error(f"Error reports raised exception: {exc}")
109-
self._logger.exception(exc)
105+
self._logger.exception("Error reports raised exception:")
110106
sub_stats = None
111107
report_uri = None
108+
submission_status = submission_status if submission_status else SubmissionStatus()
109+
submission_status.processing_failed = True
112110
dump_processing_errors(
113111
fh.joinuri(self.processed_files_path, submission_info.submission_id),
114112
"error_report",
@@ -148,7 +146,8 @@ def run_pipeline(
148146
sub_info, sub_status, sub_stats, report_uri = self.error_report(
149147
submission_info=submission_info, submission_status=sub_status
150148
)
151-
self._audit_tables.add_submission_statistics_records(sub_stats=[sub_stats])
149+
if sub_stats:
150+
self._audit_tables.add_submission_statistics_records(sub_stats=[sub_stats])
152151
except Exception as err: # pylint: disable=W0718
153152
self._logger.error(
154153
f"During processing of submission_id: {sub_id}, this exception was raised: {err}"

0 commit comments

Comments
 (0)