Skip to content

Commit a6a0c36

Browse files
release v0.7.6 #100
bump: version 0.7.5 → 0.7.6 fix: remove default connection in csv reader causing hanging in multithreaded setups fix: remove default connection from ddb json reader fix: ddb xml reader connection args consistent with other ddb readers test: add foundry ddb pipeline multithreading test test: fix tests and linting and remove all default_connections
2 parents 3386b0e + b11e705 commit a6a0c36

15 files changed

Lines changed: 237 additions & 145 deletions

File tree

CHANGELOG.md

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,16 @@
1+
## v0.7.6 (2026-04-30)
2+
3+
### Fix
4+
5+
- ddb xml reader connection args consistent with other ddb readers
6+
- remove default connection from ddb json reader
7+
- remove default connection in csv reader causing hanging in multithreaded setups
8+
- configured refdata loader to be instantiated when required without class vars #99
9+
10+
### Refactor
11+
12+
- add in backend kwargs for readers to allow reader args not determinable at config write time to be passed
13+
114
## v0.7.5 (2026-04-29)
215

316
### Fix

poetry.lock

Lines changed: 81 additions & 89 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ Issues = "https://github.com/NHSDigital/data-validation-engine/issues"
2424
Changelog = "https://github.com/NHSDigital/data-validation-engine/blob/main/CHANGELOG.md"
2525

2626
[tool.poetry]
27-
version = "0.7.5"
27+
version = "0.7.6"
2828
packages = [
2929
{ include = "dve", from = "src" },
3030
]

src/dve/core_engine/backends/implementations/duckdb/readers/csv.py

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,7 @@
66

77
import duckdb as ddb
88
import polars as pl
9-
from duckdb import (
10-
DuckDBPyConnection,
11-
DuckDBPyRelation,
12-
StarExpression,
13-
default_connection,
14-
read_csv,
15-
)
9+
from duckdb import DuckDBPyConnection, DuckDBPyRelation, StarExpression, read_csv
1610
from pydantic import BaseModel
1711

1812
from dve.core_engine.backends.base.reader import BaseFileReader, read_function
@@ -61,7 +55,7 @@ def __init__(
6155
self.header = header
6256
self.delim = delim
6357
self.quotechar = quotechar
64-
self._connection = connection if connection else default_connection
58+
self._connection = connection if connection else ddb.connect(":memory:")
6559
self.field_check = field_check
6660
self.field_check_error_code = field_check_error_code
6761
self.field_check_error_message = field_check_error_message
@@ -181,7 +175,7 @@ def read_to_relation( # pylint: disable=unused-argument
181175
] + [pl.col(RECORD_INDEX_COLUMN_NAME)]
182176
df = df.select(pl_exprs)
183177

184-
return ddb.sql("SELECT * FROM df")
178+
return self._connection.sql("SELECT * FROM df")
185179

186180

187181
class DuckDBCSVRepeatingHeaderReader(PolarsToDuckDBCSVReader):

src/dve/core_engine/backends/implementations/duckdb/readers/json.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,8 @@
44
from collections.abc import Iterator
55
from typing import Any, Optional
66

7-
from duckdb import DuckDBPyRelation, read_json
7+
import duckdb
8+
from duckdb import DuckDBPyConnection, DuckDBPyRelation
89
from pydantic import BaseModel
910

1011
from dve.core_engine.backends.base.reader import BaseFileReader, read_function
@@ -26,9 +27,11 @@ def __init__(
2627
self,
2728
*,
2829
json_format: Optional[str] = "array",
30+
connection: Optional[DuckDBPyConnection] = None,
2931
**_,
3032
):
3133
self._json_format = json_format
34+
self._connection = duckdb.connect(":memory:") if not connection else connection
3235

3336
super().__init__()
3437

@@ -50,5 +53,7 @@ def read_to_relation( # pylint: disable=unused-argument
5053
}
5154

5255
return self.add_record_index(
53-
read_json(resource, columns=ddb_schema, format=self._json_format) # type: ignore
56+
self._connection.read_json(
57+
resource, columns=ddb_schema, format=self._json_format # type: ignore
58+
)
5459
)

src/dve/core_engine/backends/implementations/duckdb/readers/xml.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,9 @@
33

44
from typing import Optional
55

6+
import duckdb
67
import polars as pl
7-
from duckdb import DuckDBPyConnection, DuckDBPyRelation, default_connection
8+
from duckdb import DuckDBPyConnection, DuckDBPyRelation
89
from pydantic import BaseModel
910

1011
from dve.core_engine.backends.base.reader import read_function
@@ -24,8 +25,8 @@
2425
class DuckDBXMLStreamReader(XMLStreamReader):
2526
"""A reader for XML files"""
2627

27-
def __init__(self, *, ddb_connection: Optional[DuckDBPyConnection] = None, **kwargs):
28-
self.ddb_connection = ddb_connection if ddb_connection else default_connection
28+
def __init__(self, *, connection: Optional[DuckDBPyConnection] = None, **kwargs):
29+
self._connection = connection if connection else duckdb.connect(":memory:")
2930
super().__init__(**kwargs)
3031

3132
@read_function(DuckDBPyRelation)
@@ -49,4 +50,4 @@ def read_to_relation(self, resource: URI, entity_name: str, schema: type[BaseMod
4950
data=self.read_to_py_iterator(resource, entity_name, schema), schema=polars_schema
5051
)
5152
)
52-
return self.ddb_connection.sql("select * from _lazy_frame")
53+
return self._connection.sql("select * from _lazy_frame")

src/dve/pipeline/duckdb_pipeline.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ def __init__(
4545
submitted_files_path,
4646
job_run_id,
4747
logger,
48+
{"connection": self._connection},
4849
)
4950

5051
def init_reference_data_loader(

src/dve/pipeline/pipeline.py

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
from functools import lru_cache
1010
from itertools import starmap
1111
from threading import Lock
12-
from typing import Optional, Union
12+
from typing import Any, Optional, Union
1313
from uuid import uuid4
1414

1515
import polars as pl
@@ -49,6 +49,7 @@
4949
)
5050

5151

52+
# pylint: disable=R0904
5253
class BaseDVEPipeline:
5354
"""
5455
Base class for running a DVE Pipeline either by a given step or a full e2e process.
@@ -64,6 +65,7 @@ def __init__(
6465
submitted_files_path: Optional[URI],
6566
job_run_id: Optional[int] = None,
6667
logger: Optional[logging.Logger] = None,
68+
backend_reader_kwargs: Optional[dict[str, Any]] = None,
6769
):
6870
self._submitted_files_path = submitted_files_path
6971
self._processed_files_path = processed_files_path
@@ -76,6 +78,7 @@ def __init__(
7678
self._summary_lock = Lock()
7779
self._rec_tracking_lock = Lock()
7880
self._aggregates_lock = Lock()
81+
self._backend_reader_kwargs = backend_reader_kwargs
7982

8083
if self._data_contract:
8184
self._data_contract.logger = self._logger
@@ -107,6 +110,12 @@ def step_implementations(self) -> Optional[BaseStepImplementations[EntityType]]:
107110
"""The step implementations to apply the business rules to a given dataset"""
108111
return self._step_implementations
109112

113+
@property
114+
def backend_reader_kwargs(self) -> dict[str, Any] | None:
115+
"""Important required arguments for all readers related to the specific backend
116+
that can't be specified at time of writing config eg. duckdb connection"""
117+
return self._backend_reader_kwargs
118+
110119
@staticmethod
111120
def get_entity_count(entity: EntityType) -> int:
112121
"""Get a row count of an entity stored as parquet"""
@@ -203,7 +212,9 @@ def write_file_to_parquet(
203212

204213
for model_name, model in models.items():
205214
self._logger.info(f"Transforming {model_name} to stringified parquet")
206-
reader: BaseFileReader = load_reader(dataset, model_name, ext)
215+
reader: BaseFileReader = load_reader(
216+
dataset, model_name, ext, self.backend_reader_kwargs
217+
)
207218
try:
208219
if not entity_type:
209220
reader.write_parquet(

src/dve/pipeline/utils.py

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33

44
import json
55
from threading import Lock
6-
from typing import Optional
6+
from typing import Any, Optional
77

88
from pydantic.main import ModelMetaclass
99
from pyspark.sql import SparkSession
@@ -45,10 +45,17 @@ def load_config(
4545
return models, config, dataset
4646

4747

48-
def load_reader(dataset: Dataset, model_name: str, file_extension: str):
48+
def load_reader(
49+
dataset: Dataset,
50+
model_name: str,
51+
file_extension: str,
52+
backend_reader_kwargs: Optional[dict[str, Any]] = None,
53+
):
4954
"""Loads the readers for the diven feed, model name and file extension"""
5055
reader_config = dataset[model_name].reader_config[f".{file_extension.lower()}"]
51-
reader = _READER_REGISTRY[reader_config.reader](**reader_config.kwargs_)
56+
reader = _READER_REGISTRY[reader_config.reader](
57+
**reader_config.kwargs_, **backend_reader_kwargs if backend_reader_kwargs else {}
58+
)
5259
return reader
5360

5461

tests/test_core_engine/test_backends/test_implementations/test_duckdb/test_data_contract.py

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,9 @@
22
from pathlib import Path
33
from typing import Any, Dict, List, Tuple
44

5+
import duckdb
56
import pytest
6-
from duckdb import DuckDBPyRelation, default_connection
7+
from duckdb import DuckDBPyRelation
78
from duckdb.typing import DuckDBPyType
89

910
from dve.core_engine.backends.implementations.duckdb.contract import DuckDBDataContract
@@ -34,7 +35,7 @@
3435

3536
def test_duckdb_data_contract_csv(temp_csv_file):
3637
uri, _, _, mdl = temp_csv_file
37-
connection = default_connection
38+
connection = duckdb.connect(":memory:")
3839

3940
contract_meta = json.dumps(
4041
{
@@ -106,7 +107,7 @@ def test_duckdb_data_contract_csv(temp_csv_file):
106107

107108
def test_duckdb_data_contract_xml(temp_xml_file):
108109
uri, header_model, header_data, class_model, class_data = temp_xml_file
109-
connection = default_connection
110+
connection = duckdb.connect()
110111
contract_meta = json.dumps(
111112
{
112113
"contract": {
@@ -153,10 +154,10 @@ def test_duckdb_data_contract_xml(temp_xml_file):
153154
contract_dict = json.loads(contract_meta).get("contract")
154155
entities: Dict[str, DuckDBPyRelation] = {
155156
"test_header": DuckDBXMLStreamReader(
156-
ddb_connection=connection, root_tag="root", record_tag="Header"
157+
connection=connection, root_tag="root", record_tag="Header"
157158
).read_to_relation(str(uri), "header", header_model),
158159
"test_class_info": DuckDBXMLStreamReader(
159-
ddb_connection=connection, root_tag="root", record_tag="ClassData"
160+
connection=connection, root_tag="root", record_tag="ClassData"
160161
).read_to_relation(str(uri), "class_info", class_model),
161162
}
162163
entity_locations: dict[str, URI] = {}
@@ -218,7 +219,7 @@ def test_ddb_data_contract_read_and_write_basic_parquet(
218219
):
219220
# can we read in a stringified parquet and run the data contract on it?
220221
# basic file - simple data structures
221-
connection = default_connection
222+
connection = duckdb.connect(":memory:")
222223
parquet_uri, contract_meta, _ = simple_all_string_parquet
223224
data_contract = DuckDBDataContract(connection)
224225
# check can read
@@ -279,7 +280,7 @@ def test_ddb_data_contract_read_nested_parquet(nested_all_string_parquet):
279280
# can we read in a stringified parquet and run the data contract on it?
280281
# more complex file - nested, arrays of structs
281282
parquet_uri, contract_meta, _ = nested_all_string_parquet
282-
connection = default_connection
283+
connection = duckdb.connect()
283284
data_contract = DuckDBDataContract(connection)
284285
# check can read
285286
entity = data_contract.read_parquet(path=parquet_uri)
@@ -337,7 +338,7 @@ def test_ddb_data_contract_read_nested_parquet(nested_all_string_parquet):
337338
def test_duckdb_data_contract_custom_error_details(nested_all_string_parquet_w_errors,
338339
nested_parquet_custom_dc_err_details):
339340
parquet_uri, contract_meta, _ = nested_all_string_parquet_w_errors
340-
connection = default_connection
341+
connection = duckdb.connect(":memory:")
341342
data_contract = DuckDBDataContract(connection)
342343

343344
entity = data_contract.read_parquet(path=parquet_uri)

0 commit comments

Comments
 (0)