Skip to content

Commit 6d342c5

Browse files
Release v073
## v0.7.3 (2026-04-16) ### Fix - make time format more strict to stop invalid time date flowing (#92) - enhance duckdb casting to be less permissive of poorly formatted dates and trim whitespace ### Refactor - integrated duckdb casting into data contract and added initial spark casting
2 parents 81bfb53 + 3f057dd commit 6d342c5

11 files changed

Lines changed: 387 additions & 32 deletions

File tree

CHANGELOG.md

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,14 @@
1+
## v0.7.3 (2026-04-16)
2+
3+
### Fix
4+
5+
- make time format more strict to stop invalid time date flowing (#92)
6+
- enhance duckdb casting to be less permissive of poorly formatted dates and trim whitespace
7+
8+
### Refactor
9+
10+
- integrated duckdb casting into data contract and added initial spark casting
11+
112
## v0.7.2 (2026-04-02)
213

314
### Fix

poetry.lock

Lines changed: 6 additions & 6 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ Issues = "https://github.com/NHSDigital/data-validation-engine/issues"
2424
Changelog = "https://github.com/NHSDigital/data-validation-engine/blob/main/CHANGELOG.md"
2525

2626
[tool.poetry]
27-
version = "0.7.2"
27+
version = "0.7.3"
2828
packages = [
2929
{ include = "dve", from = "src" },
3030
]

src/dve/core_engine/backends/implementations/duckdb/contract.py

Lines changed: 9 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
duckdb_read_parquet,
3232
duckdb_record_index,
3333
duckdb_write_parquet,
34+
get_duckdb_cast_statement_from_annotation,
3435
get_duckdb_type_from_annotation,
3536
relation_is_empty,
3637
)
@@ -101,18 +102,7 @@ def create_entity_from_py_iterator( # pylint: disable=unused-argument
101102
_lazy_df = pl.LazyFrame(records, polars_schema) # type: ignore # pylint: disable=unused-variable
102103
return self._connection.sql("select * from _lazy_df")
103104

104-
@staticmethod
105-
def generate_ddb_cast_statement(
106-
column_name: str, dtype: DuckDBPyType, null_flag: bool = False
107-
) -> str:
108-
"""Helper method to generate sql statements for casting datatypes (permissively).
109-
Current duckdb python API doesn't play well with this currently.
110-
"""
111-
if not null_flag:
112-
return f'try_cast("{column_name}" AS {dtype}) AS "{column_name}"'
113-
return f'cast(NULL AS {dtype}) AS "{column_name}"'
114-
115-
# pylint: disable=R0914
105+
# pylint: disable=R0914,R0915
116106
def apply_data_contract(
117107
self,
118108
working_dir: URI,
@@ -180,12 +170,16 @@ def apply_data_contract(
180170

181171
casting_statements = [
182172
(
183-
self.generate_ddb_cast_statement(column, dtype)
173+
get_duckdb_cast_statement_from_annotation(column, mdl_fld.annotation)
174+
+ f""" AS "{column}" """
184175
if column in relation.columns
185-
else self.generate_ddb_cast_statement(column, dtype, null_flag=True)
176+
else f"CAST(NULL AS {ddb_schema[column]}) AS {column}"
186177
)
187-
for column, dtype in ddb_schema.items()
178+
for column, mdl_fld in entity_fields.items()
188179
]
180+
casting_statements.append(
181+
f"CAST({RECORD_INDEX_COLUMN_NAME} AS {get_duckdb_type_from_annotation(int)}) AS {RECORD_INDEX_COLUMN_NAME}" # pylint: disable=C0301
182+
)
189183
try:
190184
relation = relation.project(", ".join(casting_statements))
191185
except Exception as err: # pylint: disable=broad-except

src/dve/core_engine/backends/implementations/duckdb/duckdb_helpers.py

Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -313,3 +313,108 @@ def duckdb_record_index(cls):
313313
setattr(cls, "add_record_index", _add_duckdb_record_index)
314314
setattr(cls, "drop_record_index", _drop_duckdb_record_index)
315315
return cls
316+
317+
318+
def _cast_as_ddb_type(field_expr: str, type_annotation: Any) -> str:
319+
"""Cast to Duck DB type"""
320+
return f"""try_cast({field_expr} as {get_duckdb_type_from_annotation(type_annotation)})"""
321+
322+
323+
def _ddb_safely_quote_name(field_name: str) -> str:
324+
"""Quote field names in case reserved"""
325+
try:
326+
sep_idx = field_name.index(".")
327+
return f'"{field_name[: sep_idx]}"' + field_name[sep_idx:]
328+
except ValueError:
329+
return f'"{field_name}"'
330+
331+
332+
# pylint: disable=R0801,R0911,R0912
333+
def get_duckdb_cast_statement_from_annotation(
334+
element_name: str,
335+
type_annotation: Any,
336+
parent_element: bool = True,
337+
date_regex: str = r"^[0-9]{4}-[0-9]{2}-[0-9]{2}$",
338+
timestamp_regex: str = r"^[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}((\+|\-)[0-9]{2}:[0-9]{2})?$", # pylint: disable=C0301
339+
time_regex: str = r"^[0-9]{2}:[0-9]{2}:[0-9]{2}$",
340+
) -> str:
341+
"""Generate casting statements for duckdb relations from type annotations"""
342+
type_origin = get_origin(type_annotation)
343+
344+
quoted_name = _ddb_safely_quote_name(element_name)
345+
346+
# An `Optional` or `Union` type, check to ensure non-heterogenity.
347+
if type_origin is Union:
348+
python_type = _get_non_heterogenous_type(get_args(type_annotation))
349+
return get_duckdb_cast_statement_from_annotation(
350+
element_name, python_type, parent_element, date_regex, timestamp_regex
351+
)
352+
353+
# Type hint is e.g. `List[str]`, check to ensure non-heterogenity.
354+
if type_origin is list or (isinstance(type_origin, type) and issubclass(type_origin, list)):
355+
element_type = _get_non_heterogenous_type(get_args(type_annotation))
356+
stmt = f"list_transform({quoted_name}, x -> {get_duckdb_cast_statement_from_annotation('x',element_type, False, date_regex, timestamp_regex)})" # pylint: disable=C0301
357+
return stmt if not parent_element else _cast_as_ddb_type(stmt, type_annotation)
358+
359+
if type_origin is Annotated:
360+
python_type, *other_args = get_args(type_annotation) # pylint: disable=unused-variable
361+
return get_duckdb_cast_statement_from_annotation(
362+
element_name, python_type, parent_element, date_regex, timestamp_regex
363+
) # add other expected params here
364+
# Ensure that we have a concrete type at this point.
365+
if not isinstance(type_annotation, type):
366+
raise ValueError(f"Unsupported type annotation {type_annotation!r}")
367+
368+
if (
369+
# Type hint is a dict subclass, but not dict. Possibly a `TypedDict`.
370+
(issubclass(type_annotation, dict) and type_annotation is not dict)
371+
# Type hint is a dataclass.
372+
or is_dataclass(type_annotation)
373+
# Type hint is a `pydantic` model.
374+
or (type_origin is None and issubclass(type_annotation, BaseModel))
375+
):
376+
fields: dict[str, str] = {}
377+
for field_name, field_annotation in get_type_hints(type_annotation).items():
378+
# Technically non-string keys are disallowed, but people are bad.
379+
if not isinstance(field_name, str):
380+
raise ValueError(
381+
f"Dictionary/Dataclass keys must be strings, got {type_annotation!r}"
382+
) # pragma: no cover
383+
if get_origin(field_annotation) is ClassVar:
384+
continue
385+
386+
fields[field_name] = get_duckdb_cast_statement_from_annotation(
387+
f"{element_name}.{field_name}", field_annotation, False, date_regex, timestamp_regex
388+
)
389+
390+
if not fields:
391+
raise ValueError(
392+
f"No type annotations in dict/dataclass type (got {type_annotation!r})"
393+
)
394+
cast_exprs = ",".join([f'"{nme}":= {stmt}' for nme, stmt in fields.items()])
395+
stmt = f"struct_pack({cast_exprs})"
396+
return stmt if not parent_element else _cast_as_ddb_type(stmt, type_annotation)
397+
398+
if type_annotation is list:
399+
raise ValueError(
400+
f"List must have type annotation (e.g. `List[str]`), got {type_annotation!r}"
401+
)
402+
if type_annotation is dict or type_origin is dict:
403+
raise ValueError(f"dict must be `typing.TypedDict` subclass, got {type_annotation!r}")
404+
405+
for type_ in type_annotation.mro():
406+
# datetime is subclass of date, so needs to be handled first
407+
if issubclass(type_, datetime):
408+
stmt = rf"CASE WHEN REGEXP_MATCHES(TRIM({quoted_name}), '{timestamp_regex}') THEN TRY_CAST(TRIM({quoted_name}) as TIMESTAMP) ELSE NULL END" # pylint: disable=C0301
409+
return stmt
410+
if issubclass(type_, date):
411+
stmt = rf"CASE WHEN REGEXP_MATCHES(TRIM({quoted_name}), '{date_regex}') THEN TRY_CAST(TRIM({quoted_name}) as DATE) ELSE NULL END" # pylint: disable=C0301
412+
return stmt
413+
if issubclass(type_, time):
414+
stmt = rf"CASE WHEN REGEXP_MATCHES(TRIM({quoted_name}), '{time_regex}') THEN TRY_CAST(TRIM({quoted_name}) as TIME) ELSE NULL END" # pylint: disable=C0301
415+
return stmt
416+
duck_type = get_duckdb_type_from_annotation(type_)
417+
if duck_type:
418+
stmt = f"trim({quoted_name})"
419+
return _cast_as_ddb_type(stmt, type_) if parent_element else stmt
420+
raise ValueError(f"No equivalent DuckDB type for {type_annotation!r}")

src/dve/core_engine/backends/implementations/spark/spark_helpers.py

Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -439,3 +439,103 @@ def spark_record_index(cls):
439439
setattr(cls, "add_record_index", _add_spark_record_index)
440440
setattr(cls, "drop_record_index", _drop_spark_record_index)
441441
return cls
442+
443+
444+
def _cast_as_spark_type(field_expr: str, field_type: Any) -> Column:
445+
"""Cast to spark type"""
446+
return sf.expr(field_expr).cast(get_type_from_annotation(field_type))
447+
448+
449+
def _spark_safely_quote_name(field_name: str) -> str:
450+
"""Quote field names in case reserved"""
451+
try:
452+
sep_idx = field_name.index(".")
453+
return f"`{field_name[: sep_idx]}`" + field_name[sep_idx:]
454+
except ValueError:
455+
return f"`{field_name}`"
456+
457+
458+
# pylint: disable=R0801
459+
def get_spark_cast_statement_from_annotation(
460+
element_name: str,
461+
type_annotation: Any,
462+
parent_element: bool = True,
463+
date_regex: str = r"^[0-9]{4}-[0-9]{2}-[0-9]{2}$",
464+
timestamp_regex: str = r"^[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}((\\+|\\-)[0-9]{2}:[0-9]{2})?$", # pylint: disable=C0301
465+
):
466+
"""Generate casting statements for spark dataframes based on type annotations"""
467+
type_origin = get_origin(type_annotation)
468+
469+
quoted_name = _spark_safely_quote_name(element_name)
470+
471+
# An `Optional` or `Union` type, check to ensure non-heterogenity.
472+
if type_origin is Union:
473+
python_type = _get_non_heterogenous_type(get_args(type_annotation))
474+
return get_spark_cast_statement_from_annotation(
475+
element_name, python_type, parent_element, date_regex, timestamp_regex
476+
)
477+
478+
# Type hint is e.g. `List[str]`, check to ensure non-heterogenity.
479+
if type_origin is list or (isinstance(type_origin, type) and issubclass(type_origin, list)):
480+
element_type = _get_non_heterogenous_type(get_args(type_annotation))
481+
stmt = f"transform({quoted_name}, x -> {get_spark_cast_statement_from_annotation('x',element_type, False, date_regex, timestamp_regex)})" # pylint: disable=C0301
482+
return stmt if not parent_element else _cast_as_spark_type(stmt, type_annotation)
483+
484+
if type_origin is Annotated:
485+
python_type, *_ = get_args(type_annotation) # pylint: disable=unused-variable
486+
return get_spark_cast_statement_from_annotation(
487+
element_name, python_type, parent_element, date_regex, timestamp_regex
488+
) # add other expected params here
489+
# Ensure that we have a concrete type at this point.
490+
if not isinstance(type_annotation, type):
491+
raise ValueError(f"Unsupported type annotation {type_annotation!r}")
492+
493+
if (
494+
# Type hint is a dict subclass, but not dict. Possibly a `TypedDict`.
495+
(issubclass(type_annotation, dict) and type_annotation is not dict)
496+
# Type hint is a dataclass.
497+
or is_dataclass(type_annotation)
498+
# Type hint is a `pydantic` model.
499+
or (type_origin is None and issubclass(type_annotation, BaseModel))
500+
):
501+
fields: dict[str, str] = {}
502+
for field_name, field_annotation in get_type_hints(type_annotation).items():
503+
# Technically non-string keys are disallowed, but people are bad.
504+
if not isinstance(field_name, str):
505+
raise ValueError(
506+
f"Dictionary/Dataclass keys must be strings, got {type_annotation!r}"
507+
) # pragma: no cover
508+
if get_origin(field_annotation) is ClassVar:
509+
continue
510+
511+
fields[field_name] = get_spark_cast_statement_from_annotation(
512+
f"{element_name}.{field_name}", field_annotation, False, date_regex, timestamp_regex
513+
)
514+
515+
if not fields:
516+
raise ValueError(
517+
f"No type annotations in dict/dataclass type (got {type_annotation!r})"
518+
)
519+
cast_exprs = ",".join([f"{stmt} AS `{nme}`" for nme, stmt in fields.items()])
520+
stmt = f"struct({cast_exprs})"
521+
return stmt if not parent_element else _cast_as_spark_type(stmt, type_annotation)
522+
if type_annotation is list:
523+
raise ValueError(
524+
f"List must have type annotation (e.g. `List[str]`), got {type_annotation!r}"
525+
)
526+
if type_annotation is dict or type_origin is dict:
527+
raise ValueError(f"dict must be `typing.TypedDict` subclass, got {type_annotation!r}")
528+
529+
for type_ in type_annotation.mro():
530+
# datetime is subclass of date, so needs to be handled first
531+
if issubclass(type_, dt.datetime):
532+
stmt = rf"CASE WHEN REGEXP(TRIM({quoted_name}), '{timestamp_regex}') THEN TRIM({quoted_name}) ELSE NULL END" # pylint: disable=C0301
533+
return _cast_as_spark_type(stmt, type_) if parent_element else stmt
534+
if issubclass(type_, dt.date):
535+
stmt = rf"CASE WHEN REGEXP(TRIM({quoted_name}), '{date_regex}') THEN TRIM({quoted_name}) ELSE NULL END" # pylint: disable=C0301
536+
return _cast_as_spark_type(stmt, type_) if parent_element else stmt
537+
spark_type = get_type_from_annotation(type_)
538+
if spark_type:
539+
stmt = f"trim({quoted_name})"
540+
return _cast_as_spark_type(stmt, type_) if parent_element else stmt
541+
raise ValueError(f"No equivalent Spark type for {type_annotation!r}")

src/dve/metadata_parser/domain_types.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -519,7 +519,8 @@ def validate(cls, value: Union[dt.time, dt.datetime, str]) -> dt.time | None:
519519
raise ValueError("Provided time has timezone, but this is forbidden for this field")
520520
if cls.TIMEZONE_TREATMENT == "require" and not new_time.tzinfo:
521521
raise ValueError("Provided time missing timezone, but this is required for this field")
522-
522+
if isinstance(value, str) and cls.TIME_FORMAT and value != str(new_time):
523+
raise ValueError("Provided time is not matching expected time format supplied.")
523524
return new_time
524525

525526
@classmethod

src/dve/pipeline/foundry_ddb_pipeline.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,13 +42,13 @@ def persist_audit_records(self, submission_info: SubmissionInfo) -> URI:
4242
write_to.parent.mkdir(parents=True, exist_ok=True)
4343
write_to = write_to.as_posix()
4444
self.write_parquet( # type: ignore # pylint: disable=E1101
45-
self._audit_tables._processing_status.get_relation().filter( # pylint: disable=W0212
45+
self._audit_tables._processing_status.get_relation().filter( # pylint: disable=W0212
4646
f"submission_id = '{submission_info.submission_id}'"
4747
),
4848
fh.joinuri(write_to, "processing_status.parquet"),
4949
)
5050
self.write_parquet( # type: ignore # pylint: disable=E1101
51-
self._audit_tables._submission_statistics.get_relation().filter( # pylint: disable=W0212
51+
self._audit_tables._submission_statistics.get_relation().filter( # pylint: disable=W0212
5252
f"submission_id = '{submission_info.submission_id}'"
5353
),
5454
fh.joinuri(write_to, "submission_statistics.parquet"),

0 commit comments

Comments
 (0)