Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,16 @@
## v0.7.6 (2026-04-30)

### Fix

- ddb xml reader connection args consistent with other ddb readers
- remove default connection from ddb json reader
- remove default connection in csv reader causing hanging in multithreaded setups
- configured refdata loader to be instantiated when required without class vars #99

### Refactor

- add in backend kwargs for readers to allow reader args not determinable at config write time to be passed

## v0.7.5 (2026-04-29)

### Fix
Expand Down
170 changes: 81 additions & 89 deletions poetry.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ Issues = "https://github.com/NHSDigital/data-validation-engine/issues"
Changelog = "https://github.com/NHSDigital/data-validation-engine/blob/main/CHANGELOG.md"

[tool.poetry]
version = "0.7.5"
version = "0.7.6"
packages = [
{ include = "dve", from = "src" },
]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,7 @@

import duckdb as ddb
import polars as pl
from duckdb import (
DuckDBPyConnection,
DuckDBPyRelation,
StarExpression,
default_connection,
read_csv,
)
from duckdb import DuckDBPyConnection, DuckDBPyRelation, StarExpression, read_csv
from pydantic import BaseModel

from dve.core_engine.backends.base.reader import BaseFileReader, read_function
Expand Down Expand Up @@ -61,7 +55,7 @@ def __init__(
self.header = header
self.delim = delim
self.quotechar = quotechar
self._connection = connection if connection else default_connection
self._connection = connection if connection else ddb.connect(":memory:")
self.field_check = field_check
self.field_check_error_code = field_check_error_code
self.field_check_error_message = field_check_error_message
Expand Down Expand Up @@ -181,7 +175,7 @@ def read_to_relation( # pylint: disable=unused-argument
] + [pl.col(RECORD_INDEX_COLUMN_NAME)]
df = df.select(pl_exprs)

return ddb.sql("SELECT * FROM df")
return self._connection.sql("SELECT * FROM df")


class DuckDBCSVRepeatingHeaderReader(PolarsToDuckDBCSVReader):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@
from collections.abc import Iterator
from typing import Any, Optional

from duckdb import DuckDBPyRelation, read_json
import duckdb
from duckdb import DuckDBPyConnection, DuckDBPyRelation
from pydantic import BaseModel

from dve.core_engine.backends.base.reader import BaseFileReader, read_function
Expand All @@ -26,9 +27,11 @@ def __init__(
self,
*,
json_format: Optional[str] = "array",
connection: Optional[DuckDBPyConnection] = None,
**_,
):
self._json_format = json_format
self._connection = duckdb.connect(":memory:") if not connection else connection

super().__init__()

Expand All @@ -50,5 +53,7 @@ def read_to_relation( # pylint: disable=unused-argument
}

return self.add_record_index(
read_json(resource, columns=ddb_schema, format=self._json_format) # type: ignore
self._connection.read_json(
resource, columns=ddb_schema, format=self._json_format # type: ignore
)
)
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@

from typing import Optional

import duckdb
import polars as pl
from duckdb import DuckDBPyConnection, DuckDBPyRelation, default_connection
from duckdb import DuckDBPyConnection, DuckDBPyRelation
from pydantic import BaseModel

from dve.core_engine.backends.base.reader import read_function
Expand All @@ -24,8 +25,8 @@
class DuckDBXMLStreamReader(XMLStreamReader):
"""A reader for XML files"""

def __init__(self, *, ddb_connection: Optional[DuckDBPyConnection] = None, **kwargs):
self.ddb_connection = ddb_connection if ddb_connection else default_connection
def __init__(self, *, connection: Optional[DuckDBPyConnection] = None, **kwargs):
self._connection = connection if connection else duckdb.connect(":memory:")
super().__init__(**kwargs)

@read_function(DuckDBPyRelation)
Expand All @@ -49,4 +50,4 @@ def read_to_relation(self, resource: URI, entity_name: str, schema: type[BaseMod
data=self.read_to_py_iterator(resource, entity_name, schema), schema=polars_schema
)
)
return self.ddb_connection.sql("select * from _lazy_frame")
return self._connection.sql("select * from _lazy_frame")
1 change: 1 addition & 0 deletions src/dve/pipeline/duckdb_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ def __init__(
submitted_files_path,
job_run_id,
logger,
{"connection": self._connection},
)

def init_reference_data_loader(
Expand Down
15 changes: 13 additions & 2 deletions src/dve/pipeline/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from functools import lru_cache
from itertools import starmap
from threading import Lock
from typing import Optional, Union
from typing import Any, Optional, Union
from uuid import uuid4

import polars as pl
Expand Down Expand Up @@ -49,6 +49,7 @@
)


# pylint: disable=R0904
class BaseDVEPipeline:
"""
Base class for running a DVE Pipeline either by a given step or a full e2e process.
Expand All @@ -64,6 +65,7 @@ def __init__(
submitted_files_path: Optional[URI],
job_run_id: Optional[int] = None,
logger: Optional[logging.Logger] = None,
backend_reader_kwargs: Optional[dict[str, Any]] = None,
):
self._submitted_files_path = submitted_files_path
self._processed_files_path = processed_files_path
Expand All @@ -76,6 +78,7 @@ def __init__(
self._summary_lock = Lock()
self._rec_tracking_lock = Lock()
self._aggregates_lock = Lock()
self._backend_reader_kwargs = backend_reader_kwargs

if self._data_contract:
self._data_contract.logger = self._logger
Expand Down Expand Up @@ -107,6 +110,12 @@ def step_implementations(self) -> Optional[BaseStepImplementations[EntityType]]:
"""The step implementations to apply the business rules to a given dataset"""
return self._step_implementations

@property
def backend_reader_kwargs(self) -> dict[str, Any] | None:
"""Important required arguments for all readers related to the specific backend
that can't be specified at time of writing config eg. duckdb connection"""
return self._backend_reader_kwargs

@staticmethod
def get_entity_count(entity: EntityType) -> int:
"""Get a row count of an entity stored as parquet"""
Expand Down Expand Up @@ -203,7 +212,9 @@ def write_file_to_parquet(

for model_name, model in models.items():
self._logger.info(f"Transforming {model_name} to stringified parquet")
reader: BaseFileReader = load_reader(dataset, model_name, ext)
reader: BaseFileReader = load_reader(
dataset, model_name, ext, self.backend_reader_kwargs
)
try:
if not entity_type:
reader.write_parquet(
Expand Down
13 changes: 10 additions & 3 deletions src/dve/pipeline/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

import json
from threading import Lock
from typing import Optional
from typing import Any, Optional

from pydantic.main import ModelMetaclass
from pyspark.sql import SparkSession
Expand Down Expand Up @@ -45,10 +45,17 @@ def load_config(
return models, config, dataset


def load_reader(dataset: Dataset, model_name: str, file_extension: str):
def load_reader(
dataset: Dataset,
model_name: str,
file_extension: str,
backend_reader_kwargs: Optional[dict[str, Any]] = None,
):
"""Loads the readers for the diven feed, model name and file extension"""
reader_config = dataset[model_name].reader_config[f".{file_extension.lower()}"]
reader = _READER_REGISTRY[reader_config.reader](**reader_config.kwargs_)
reader = _READER_REGISTRY[reader_config.reader](
**reader_config.kwargs_, **backend_reader_kwargs if backend_reader_kwargs else {}
)
return reader


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@
from pathlib import Path
from typing import Any, Dict, List, Tuple

import duckdb
import pytest
from duckdb import DuckDBPyRelation, default_connection
from duckdb import DuckDBPyRelation
from duckdb.typing import DuckDBPyType

from dve.core_engine.backends.implementations.duckdb.contract import DuckDBDataContract
Expand Down Expand Up @@ -34,7 +35,7 @@

def test_duckdb_data_contract_csv(temp_csv_file):
uri, _, _, mdl = temp_csv_file
connection = default_connection
connection = duckdb.connect(":memory:")

contract_meta = json.dumps(
{
Expand Down Expand Up @@ -106,7 +107,7 @@ def test_duckdb_data_contract_csv(temp_csv_file):

def test_duckdb_data_contract_xml(temp_xml_file):
uri, header_model, header_data, class_model, class_data = temp_xml_file
connection = default_connection
connection = duckdb.connect()
contract_meta = json.dumps(
{
"contract": {
Expand Down Expand Up @@ -153,10 +154,10 @@ def test_duckdb_data_contract_xml(temp_xml_file):
contract_dict = json.loads(contract_meta).get("contract")
entities: Dict[str, DuckDBPyRelation] = {
"test_header": DuckDBXMLStreamReader(
ddb_connection=connection, root_tag="root", record_tag="Header"
connection=connection, root_tag="root", record_tag="Header"
).read_to_relation(str(uri), "header", header_model),
"test_class_info": DuckDBXMLStreamReader(
ddb_connection=connection, root_tag="root", record_tag="ClassData"
connection=connection, root_tag="root", record_tag="ClassData"
).read_to_relation(str(uri), "class_info", class_model),
}
entity_locations: dict[str, URI] = {}
Expand Down Expand Up @@ -218,7 +219,7 @@ def test_ddb_data_contract_read_and_write_basic_parquet(
):
# can we read in a stringified parquet and run the data contract on it?
# basic file - simple data structures
connection = default_connection
connection = duckdb.connect(":memory:")
parquet_uri, contract_meta, _ = simple_all_string_parquet
data_contract = DuckDBDataContract(connection)
# check can read
Expand Down Expand Up @@ -279,7 +280,7 @@ def test_ddb_data_contract_read_nested_parquet(nested_all_string_parquet):
# can we read in a stringified parquet and run the data contract on it?
# more complex file - nested, arrays of structs
parquet_uri, contract_meta, _ = nested_all_string_parquet
connection = default_connection
connection = duckdb.connect()
data_contract = DuckDBDataContract(connection)
# check can read
entity = data_contract.read_parquet(path=parquet_uri)
Expand Down Expand Up @@ -337,7 +338,7 @@ def test_ddb_data_contract_read_nested_parquet(nested_all_string_parquet):
def test_duckdb_data_contract_custom_error_details(nested_all_string_parquet_w_errors,
nested_parquet_custom_dc_err_details):
parquet_uri, contract_meta, _ = nested_all_string_parquet_w_errors
connection = default_connection
connection = duckdb.connect(":memory:")
data_contract = DuckDBDataContract(connection)

entity = data_contract.read_parquet(path=parquet_uri)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from pathlib import Path
from typing import Iterator, List, Optional, Set, Tuple, Type

import duckdb
import numpy as np
import polars as pl
import pytest
Expand All @@ -12,7 +13,6 @@
ConstantExpression,
DuckDBPyRelation,
StarExpression,
default_connection,
)

from dve.core_engine.backends.base.core import EntityManager
Expand Down Expand Up @@ -49,7 +49,7 @@
simple_typecast_parquet,
)

DUCKDB_STEP_BACKEND = DuckDBStepImplementations(default_connection)
DUCKDB_STEP_BACKEND = DuckDBStepImplementations(duckdb.connect())
"""The backend for the duckdb steps."""


Expand Down
Loading
Loading