From d10f98f7a4107e72af6e6a14020cca13d655bb24 Mon Sep 17 00:00:00 2001 From: "george.robertson1" <50412379+georgeRobertson@users.noreply.github.com> Date: Wed, 29 Apr 2026 23:55:30 +0100 Subject: [PATCH 1/7] fix: remove default connection in csv reader causing hanging in multithreaded setups --- .../backends/implementations/duckdb/readers/csv.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/src/dve/core_engine/backends/implementations/duckdb/readers/csv.py b/src/dve/core_engine/backends/implementations/duckdb/readers/csv.py index 5fb5bcc..867f442 100644 --- a/src/dve/core_engine/backends/implementations/duckdb/readers/csv.py +++ b/src/dve/core_engine/backends/implementations/duckdb/readers/csv.py @@ -10,7 +10,6 @@ DuckDBPyConnection, DuckDBPyRelation, StarExpression, - default_connection, read_csv, ) from pydantic import BaseModel @@ -61,7 +60,7 @@ def __init__( self.header = header self.delim = delim self.quotechar = quotechar - self._connection = connection if connection else default_connection + self._connection = connection if connection else ddb.connect(":memory:") self.field_check = field_check self.field_check_error_code = field_check_error_code self.field_check_error_message = field_check_error_message @@ -181,7 +180,7 @@ def read_to_relation( # pylint: disable=unused-argument ] + [pl.col(RECORD_INDEX_COLUMN_NAME)] df = df.select(pl_exprs) - return ddb.sql("SELECT * FROM df") + return self._connection.sql("SELECT * FROM df") class DuckDBCSVRepeatingHeaderReader(PolarsToDuckDBCSVReader): From 0ff64b962231c09f73c9649d9c7223710fdf574b Mon Sep 17 00:00:00 2001 From: stevenhsd <56357022+stevenhsd@users.noreply.github.com> Date: Thu, 30 Apr 2026 14:27:14 +0100 Subject: [PATCH 2/7] fix: remove default connection from ddb json reader --- .../backends/implementations/duckdb/readers/json.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/src/dve/core_engine/backends/implementations/duckdb/readers/json.py b/src/dve/core_engine/backends/implementations/duckdb/readers/json.py index 8afb5a4..dd12721 100644 --- a/src/dve/core_engine/backends/implementations/duckdb/readers/json.py +++ b/src/dve/core_engine/backends/implementations/duckdb/readers/json.py @@ -4,7 +4,8 @@ from collections.abc import Iterator from typing import Any, Optional -from duckdb import DuckDBPyRelation, read_json +import duckdb +from duckdb import DuckDBPyConnection, DuckDBPyRelation from pydantic import BaseModel from dve.core_engine.backends.base.reader import BaseFileReader, read_function @@ -26,9 +27,11 @@ def __init__( self, *, json_format: Optional[str] = "array", + connection: Optional[DuckDBPyConnection] = None, **_, ): self._json_format = json_format + self._connection = duckdb.connect(":memory:") if not connection else connection super().__init__() @@ -50,5 +53,5 @@ def read_to_relation( # pylint: disable=unused-argument } return self.add_record_index( - read_json(resource, columns=ddb_schema, format=self._json_format) # type: ignore + self._connection.read_json(resource, columns=ddb_schema, format=self._json_format) # type: ignore ) From 8fa895e993f3a1fd46de931c2f32879c8d58e91a Mon Sep 17 00:00:00 2001 From: stevenhsd <56357022+stevenhsd@users.noreply.github.com> Date: Thu, 30 Apr 2026 14:31:31 +0100 Subject: [PATCH 3/7] test: add foundry ddb pipeline multithreading test --- .../test_foundry_ddb_pipeline.py | 62 +++++++++++++++++++ 1 file changed, 62 insertions(+) diff --git a/tests/test_pipeline/test_foundry_ddb_pipeline.py b/tests/test_pipeline/test_foundry_ddb_pipeline.py index 666bd90..9b7b60d 100644 --- a/tests/test_pipeline/test_foundry_ddb_pipeline.py +++ b/tests/test_pipeline/test_foundry_ddb_pipeline.py @@ -2,13 +2,17 @@ # pylint: disable=missing-function-docstring # pylint: disable=protected-access +from concurrent.futures import Future, ThreadPoolExecutor, as_completed from datetime import datetime from pathlib import Path import shutil import tempfile +from typing import Any from uuid import uuid4 +from duckdb import DuckDBPyConnection, connect import polars as pl +import pytest from dve.core_engine.backends.implementations.duckdb.auditing import DDBAuditingManager from dve.core_engine.backends.implementations.duckdb.reference_data import DuckDBRefDataLoader @@ -24,6 +28,28 @@ movies_test_files ) +@pytest.fixture(scope="function") +def prep_multithreading_test(): + sub_details: dict[str, tuple[DuckDBPyConnection, str, DDBAuditingManager]] = {} + for idx in range(1, 10): + db = f"dve_{uuid4().hex}" + tmp_dir = tempfile.mkdtemp(prefix="ddb_foundry_testing") + db_file = Path(tmp_dir, db + ".duckdb") + conn = connect(database=db_file, read_only=False) + ref_db_file = Path(tmp_dir, "movies_refdata.duckdb").as_posix() + conn.sql(f"ATTACH '{ref_db_file}' AS movies_refdata") + conn.read_parquet( + get_test_file_path("movies/refdata/movies_sequels.parquet").as_posix() + ).to_table("movies_refdata.sequels") + sub_details[f"submission_{idx}"] = (conn, tmp_dir, DDBAuditingManager(None, None, conn)) + + yield sub_details + for con, db_dir, aud in sub_details.values(): + con.close() + shutil.rmtree(db_dir) + aud.__exit__(None, None, None) + + def test_foundry_runner_validation_fail(planet_test_files, temp_ddb_conn): db_file, conn = temp_ddb_conn processing_folder = planet_test_files @@ -205,3 +231,39 @@ def test_foundry_runner_error_at_bi_rules(movies_test_files, temp_ddb_conn): assert len(list(fh.iter_prefix(audit_files))) == 2 assert audit_manager.get_submission_status(sub_id).processing_failed assert audit_manager.get_latest_processing_records().select("submission_result").pl().to_dicts()[0]["submission_result"] == "processing_failed" + + +def test_foundry_runner_multithreaded(prep_multithreading_test): + # get test files in submitted_files location - movies + sub_utils = prep_multithreading_test + sub_infos: dict[str, SubmissionInfo] = {} + with tempfile.TemporaryDirectory(prefix="ddb_mt_sub") as sub_dir, tempfile.TemporaryDirectory(prefix="ddb_mt_process") as proc_dir: + # get test files in submitted_files location + movie_file = get_test_file_path("movies").joinpath("good_movies.json") + for idx in range(1, 10): + dest = Path(sub_dir, f"good_movies_{idx}.json") + shutil.copyfile(movie_file, dest) + sub_infos[f"submission_{idx}"] = SubmissionInfo(submission_id=f"submission_{idx}", + dataset_id="movies", + file_name=f"good_movies_{idx}", + file_extension="json", + submitting_org="TEST", + datetime_received=datetime(2025,11,5)) + with ThreadPoolExecutor() as pool: + futures: dict[Future, str] = {} + for sub in sub_infos: + conn, _, aud = sub_utils.get(sub) + target = FoundryDDBPipeline(processed_files_path=proc_dir, + audit_tables=aud, + connection=conn, + rules_path=get_test_file_path("movies/movies_ddb.dischema.json").as_posix(), + submitted_files_path=sub_dir,).run_pipeline + futures[pool.submit(target, (sub_infos.get(sub)))] = sub + + for future in as_completed(futures): + sub = futures[future] + output_loc, report_uri, audit_files = future.result() + + assert audit_files + assert output_loc + assert report_uri From 00b66a6b87da9475f2652d662b5c87faabcb0da4 Mon Sep 17 00:00:00 2001 From: stevenhsd <56357022+stevenhsd@users.noreply.github.com> Date: Thu, 30 Apr 2026 15:50:27 +0100 Subject: [PATCH 4/7] refactor: add in backend kwargs for readers to allow reader args not determinable at config write time to be passed --- poetry.lock | 170 +++++++++--------- .../implementations/duckdb/readers/csv.py | 7 +- .../implementations/duckdb/readers/json.py | 4 +- src/dve/pipeline/duckdb_pipeline.py | 1 + src/dve/pipeline/pipeline.py | 15 +- src/dve/pipeline/utils.py | 13 +- .../test_foundry_ddb_pipeline.py | 2 +- 7 files changed, 110 insertions(+), 102 deletions(-) diff --git a/poetry.lock b/poetry.lock index 141c41b..634f9e1 100644 --- a/poetry.lock +++ b/poetry.lock @@ -534,14 +534,14 @@ types-awscrt = "*" [[package]] name = "certifi" -version = "2026.2.25" +version = "2026.4.22" description = "Python package for providing Mozilla's CA Bundle." optional = false python-versions = ">=3.7" groups = ["dev", "test"] files = [ - {file = "certifi-2026.2.25-py3-none-any.whl", hash = "sha256:027692e4402ad994f1c42e52a4997a9763c646b73e4096e4d5d6db8af1d6f0fa"}, - {file = "certifi-2026.2.25.tar.gz", hash = "sha256:e887ab5cee78ea814d3472169153c2d12cd43b14bd03329a39a9c6e2e80bfba7"}, + {file = "certifi-2026.4.22-py3-none-any.whl", hash = "sha256:3cb2210c8f88ba2318d29b0388d1023c8492ff72ecdde4ebdaddbb13a31b1c4a"}, + {file = "certifi-2026.4.22.tar.gz", hash = "sha256:8d455352a37b71bf76a79caa83a3d6c25afee4a385d632127b6afb3963f1c580"}, ] [[package]] @@ -955,61 +955,61 @@ toml = ["tomli ; python_full_version <= \"3.11.0a6\""] [[package]] name = "cryptography" -version = "46.0.7" +version = "47.0.0" description = "cryptography is a package which provides cryptographic recipes and primitives to Python developers." optional = false python-versions = "!=3.9.0,!=3.9.1,>=3.8" groups = ["dev", "test"] files = [ - {file = "cryptography-46.0.7-cp311-abi3-macosx_10_9_universal2.whl", hash = "sha256:ea42cbe97209df307fdc3b155f1b6fa2577c0defa8f1f7d3be7d31d189108ad4"}, - {file = "cryptography-46.0.7-cp311-abi3-manylinux2014_aarch64.manylinux_2_17_aarch64.whl", hash = "sha256:b36a4695e29fe69215d75960b22577197aca3f7a25b9cf9d165dcfe9d80bc325"}, - {file = "cryptography-46.0.7-cp311-abi3-manylinux2014_x86_64.manylinux_2_17_x86_64.whl", hash = "sha256:5ad9ef796328c5e3c4ceed237a183f5d41d21150f972455a9d926593a1dcb308"}, - {file = "cryptography-46.0.7-cp311-abi3-manylinux_2_28_aarch64.whl", hash = "sha256:73510b83623e080a2c35c62c15298096e2a5dc8d51c3b4e1740211839d0dea77"}, - {file = "cryptography-46.0.7-cp311-abi3-manylinux_2_28_ppc64le.whl", hash = "sha256:cbd5fb06b62bd0721e1170273d3f4d5a277044c47ca27ee257025146c34cbdd1"}, - {file = "cryptography-46.0.7-cp311-abi3-manylinux_2_28_x86_64.whl", hash = "sha256:420b1e4109cc95f0e5700eed79908cef9268265c773d3a66f7af1eef53d409ef"}, - {file = "cryptography-46.0.7-cp311-abi3-manylinux_2_31_armv7l.whl", hash = "sha256:24402210aa54baae71d99441d15bb5a1919c195398a87b563df84468160a65de"}, - {file = "cryptography-46.0.7-cp311-abi3-manylinux_2_34_aarch64.whl", hash = "sha256:8a469028a86f12eb7d2fe97162d0634026d92a21f3ae0ac87ed1c4a447886c83"}, - {file = "cryptography-46.0.7-cp311-abi3-manylinux_2_34_ppc64le.whl", hash = "sha256:9694078c5d44c157ef3162e3bf3946510b857df5a3955458381d1c7cfc143ddb"}, - {file = "cryptography-46.0.7-cp311-abi3-manylinux_2_34_x86_64.whl", hash = "sha256:42a1e5f98abb6391717978baf9f90dc28a743b7d9be7f0751a6f56a75d14065b"}, - {file = "cryptography-46.0.7-cp311-abi3-musllinux_1_2_aarch64.whl", hash = "sha256:91bbcb08347344f810cbe49065914fe048949648f6bd5c2519f34619142bbe85"}, - {file = "cryptography-46.0.7-cp311-abi3-musllinux_1_2_x86_64.whl", hash = "sha256:5d1c02a14ceb9148cc7816249f64f623fbfee39e8c03b3650d842ad3f34d637e"}, - {file = "cryptography-46.0.7-cp311-abi3-win32.whl", hash = "sha256:d23c8ca48e44ee015cd0a54aeccdf9f09004eba9fc96f38c911011d9ff1bd457"}, - {file = "cryptography-46.0.7-cp311-abi3-win_amd64.whl", hash = "sha256:397655da831414d165029da9bc483bed2fe0e75dde6a1523ec2fe63f3c46046b"}, - {file = "cryptography-46.0.7-cp314-cp314t-macosx_10_9_universal2.whl", hash = "sha256:d151173275e1728cf7839aaa80c34fe550c04ddb27b34f48c232193df8db5842"}, - {file = "cryptography-46.0.7-cp314-cp314t-manylinux2014_aarch64.manylinux_2_17_aarch64.whl", hash = "sha256:db0f493b9181c7820c8134437eb8b0b4792085d37dbb24da050476ccb664e59c"}, - {file = "cryptography-46.0.7-cp314-cp314t-manylinux2014_x86_64.manylinux_2_17_x86_64.whl", hash = "sha256:ebd6daf519b9f189f85c479427bbd6e9c9037862cf8fe89ee35503bd209ed902"}, - {file = "cryptography-46.0.7-cp314-cp314t-manylinux_2_28_aarch64.whl", hash = "sha256:b7b412817be92117ec5ed95f880defe9cf18a832e8cafacf0a22337dc1981b4d"}, - {file = "cryptography-46.0.7-cp314-cp314t-manylinux_2_28_ppc64le.whl", hash = "sha256:fbfd0e5f273877695cb93baf14b185f4878128b250cc9f8e617ea0c025dfb022"}, - {file = "cryptography-46.0.7-cp314-cp314t-manylinux_2_28_x86_64.whl", hash = "sha256:ffca7aa1d00cf7d6469b988c581598f2259e46215e0140af408966a24cf086ce"}, - {file = "cryptography-46.0.7-cp314-cp314t-manylinux_2_31_armv7l.whl", hash = "sha256:60627cf07e0d9274338521205899337c5d18249db56865f943cbe753aa96f40f"}, - {file = "cryptography-46.0.7-cp314-cp314t-manylinux_2_34_aarch64.whl", hash = "sha256:80406c3065e2c55d7f49a9550fe0c49b3f12e5bfff5dedb727e319e1afb9bf99"}, - {file = "cryptography-46.0.7-cp314-cp314t-manylinux_2_34_ppc64le.whl", hash = "sha256:c5b1ccd1239f48b7151a65bc6dd54bcfcc15e028c8ac126d3fada09db0e07ef1"}, - {file = "cryptography-46.0.7-cp314-cp314t-manylinux_2_34_x86_64.whl", hash = "sha256:d5f7520159cd9c2154eb61eb67548ca05c5774d39e9c2c4339fd793fe7d097b2"}, - {file = "cryptography-46.0.7-cp314-cp314t-musllinux_1_2_aarch64.whl", hash = "sha256:fcd8eac50d9138c1d7fc53a653ba60a2bee81a505f9f8850b6b2888555a45d0e"}, - {file = "cryptography-46.0.7-cp314-cp314t-musllinux_1_2_x86_64.whl", hash = "sha256:65814c60f8cc400c63131584e3e1fad01235edba2614b61fbfbfa954082db0ee"}, - {file = "cryptography-46.0.7-cp314-cp314t-win32.whl", hash = "sha256:fdd1736fed309b4300346f88f74cd120c27c56852c3838cab416e7a166f67298"}, - {file = "cryptography-46.0.7-cp314-cp314t-win_amd64.whl", hash = "sha256:e06acf3c99be55aa3b516397fe42f5855597f430add9c17fa46bf2e0fb34c9bb"}, - {file = "cryptography-46.0.7-cp38-abi3-macosx_10_9_universal2.whl", hash = "sha256:462ad5cb1c148a22b2e3bcc5ad52504dff325d17daf5df8d88c17dda1f75f2a4"}, - {file = "cryptography-46.0.7-cp38-abi3-manylinux2014_aarch64.manylinux_2_17_aarch64.whl", hash = "sha256:84d4cced91f0f159a7ddacad249cc077e63195c36aac40b4150e7a57e84fffe7"}, - {file = "cryptography-46.0.7-cp38-abi3-manylinux2014_x86_64.manylinux_2_17_x86_64.whl", hash = "sha256:128c5edfe5e5938b86b03941e94fac9ee793a94452ad1365c9fc3f4f62216832"}, - {file = "cryptography-46.0.7-cp38-abi3-manylinux_2_28_aarch64.whl", hash = "sha256:5e51be372b26ef4ba3de3c167cd3d1022934bc838ae9eaad7e644986d2a3d163"}, - {file = "cryptography-46.0.7-cp38-abi3-manylinux_2_28_ppc64le.whl", hash = "sha256:cdf1a610ef82abb396451862739e3fc93b071c844399e15b90726ef7470eeaf2"}, - {file = "cryptography-46.0.7-cp38-abi3-manylinux_2_28_x86_64.whl", hash = "sha256:1d25aee46d0c6f1a501adcddb2d2fee4b979381346a78558ed13e50aa8a59067"}, - {file = "cryptography-46.0.7-cp38-abi3-manylinux_2_31_armv7l.whl", hash = "sha256:cdfbe22376065ffcf8be74dc9a909f032df19bc58a699456a21712d6e5eabfd0"}, - {file = "cryptography-46.0.7-cp38-abi3-manylinux_2_34_aarch64.whl", hash = "sha256:abad9dac36cbf55de6eb49badd4016806b3165d396f64925bf2999bcb67837ba"}, - {file = "cryptography-46.0.7-cp38-abi3-manylinux_2_34_ppc64le.whl", hash = "sha256:935ce7e3cfdb53e3536119a542b839bb94ec1ad081013e9ab9b7cfd478b05006"}, - {file = "cryptography-46.0.7-cp38-abi3-manylinux_2_34_x86_64.whl", hash = "sha256:35719dc79d4730d30f1c2b6474bd6acda36ae2dfae1e3c16f2051f215df33ce0"}, - {file = "cryptography-46.0.7-cp38-abi3-musllinux_1_2_aarch64.whl", hash = "sha256:7bbc6ccf49d05ac8f7d7b5e2e2c33830d4fe2061def88210a126d130d7f71a85"}, - {file = "cryptography-46.0.7-cp38-abi3-musllinux_1_2_x86_64.whl", hash = "sha256:a1529d614f44b863a7b480c6d000fe93b59acee9c82ffa027cfadc77521a9f5e"}, - {file = "cryptography-46.0.7-cp38-abi3-win32.whl", hash = "sha256:f247c8c1a1fb45e12586afbb436ef21ff1e80670b2861a90353d9b025583d246"}, - {file = "cryptography-46.0.7-cp38-abi3-win_amd64.whl", hash = "sha256:506c4ff91eff4f82bdac7633318a526b1d1309fc07ca76a3ad182cb5b686d6d3"}, - {file = "cryptography-46.0.7-pp311-pypy311_pp73-macosx_11_0_arm64.whl", hash = "sha256:fc9ab8856ae6cf7c9358430e49b368f3108f050031442eaeb6b9d87e4dcf4e4f"}, - {file = "cryptography-46.0.7-pp311-pypy311_pp73-manylinux_2_28_aarch64.whl", hash = "sha256:d3b99c535a9de0adced13d159c5a9cf65c325601aa30f4be08afd680643e9c15"}, - {file = "cryptography-46.0.7-pp311-pypy311_pp73-manylinux_2_28_x86_64.whl", hash = "sha256:d02c738dacda7dc2a74d1b2b3177042009d5cab7c7079db74afc19e56ca1b455"}, - {file = "cryptography-46.0.7-pp311-pypy311_pp73-manylinux_2_34_aarch64.whl", hash = "sha256:04959522f938493042d595a736e7dbdff6eb6cc2339c11465b3ff89343b65f65"}, - {file = "cryptography-46.0.7-pp311-pypy311_pp73-manylinux_2_34_x86_64.whl", hash = "sha256:3986ac1dee6def53797289999eabe84798ad7817f3e97779b5061a95b0ee4968"}, - {file = "cryptography-46.0.7-pp311-pypy311_pp73-win_amd64.whl", hash = "sha256:258514877e15963bd43b558917bc9f54cf7cf866c38aa576ebf47a77ddbc43a4"}, - {file = "cryptography-46.0.7.tar.gz", hash = "sha256:e4cfd68c5f3e0bfdad0d38e023239b96a2fe84146481852dffbcca442c245aa5"}, + {file = "cryptography-47.0.0-cp311-abi3-macosx_10_9_universal2.whl", hash = "sha256:160ad728f128972d362e714054f6ba0067cab7fb350c5202a9ae8ae4ce3ef1a0"}, + {file = "cryptography-47.0.0-cp311-abi3-manylinux2014_aarch64.manylinux_2_17_aarch64.whl", hash = "sha256:b9a8943e359b7615db1a3ba587994618e094ff3d6fa5a390c73d079ce18b3973"}, + {file = "cryptography-47.0.0-cp311-abi3-manylinux2014_x86_64.manylinux_2_17_x86_64.whl", hash = "sha256:f5c15764f261394b22aef6b00252f5195f46f2ca300bec57149474e2538b31f8"}, + {file = "cryptography-47.0.0-cp311-abi3-manylinux_2_28_aarch64.whl", hash = "sha256:9c59ab0e0fa3a180a5a9c59f3a5abe3ef90d474bc56d7fadfbe80359491b615b"}, + {file = "cryptography-47.0.0-cp311-abi3-manylinux_2_28_ppc64le.whl", hash = "sha256:34b4358b925a5ea3e14384ca781a2c0ef7ac219b57bb9eacc4457078e2b19f92"}, + {file = "cryptography-47.0.0-cp311-abi3-manylinux_2_28_x86_64.whl", hash = "sha256:0024b87d47ae2399165a6bfb20d24888881eeab83ae2566d62467c5ff0030ce7"}, + {file = "cryptography-47.0.0-cp311-abi3-manylinux_2_31_armv7l.whl", hash = "sha256:1e47422b5557bb82d3fff997e8d92cff4e28b9789576984f08c248d2b3535d93"}, + {file = "cryptography-47.0.0-cp311-abi3-manylinux_2_34_aarch64.whl", hash = "sha256:6f29f36582e6151d9686235e586dd35bb67491f024767d10b842e520dc6a07ac"}, + {file = "cryptography-47.0.0-cp311-abi3-manylinux_2_34_ppc64le.whl", hash = "sha256:a9b761f012a943b7de0e828843c5688d0de94a0578d44d6c85a1bae32f87791f"}, + {file = "cryptography-47.0.0-cp311-abi3-manylinux_2_34_x86_64.whl", hash = "sha256:4e1de79e047e25d6e9f8cea71c86b4a53aced64134f0f003bbcbf3655fd172c8"}, + {file = "cryptography-47.0.0-cp311-abi3-musllinux_1_2_aarch64.whl", hash = "sha256:ef6b3634087f18d2155b1e8ce264e5345a753da2c5fa9815e7d41315c90f8318"}, + {file = "cryptography-47.0.0-cp311-abi3-musllinux_1_2_x86_64.whl", hash = "sha256:11dbb9f50a0f1bb9757b3d8c27c1101780efb8f0bdecfb12439c22a74d64c001"}, + {file = "cryptography-47.0.0-cp311-abi3-win32.whl", hash = "sha256:7fda2f02c9015db3f42bb8a22324a454516ed10a8c29ca6ece6cdbb5efe2a203"}, + {file = "cryptography-47.0.0-cp311-abi3-win_amd64.whl", hash = "sha256:f5c3296dab66202f1b18a91fa266be93d6aa0c2806ea3d67762c69f60adc71aa"}, + {file = "cryptography-47.0.0-cp314-cp314t-macosx_10_9_universal2.whl", hash = "sha256:be12cb6a204f77ed968bcefe68086eb061695b540a3dd05edac507a3111b25f0"}, + {file = "cryptography-47.0.0-cp314-cp314t-manylinux2014_aarch64.manylinux_2_17_aarch64.whl", hash = "sha256:2ebd84adf0728c039a3be2700289378e1c164afc6748df1a5ed456767bef9ba7"}, + {file = "cryptography-47.0.0-cp314-cp314t-manylinux2014_x86_64.manylinux_2_17_x86_64.whl", hash = "sha256:7f68d6fbc7fbbcfb0939fea72c3b96a9f9a6edfc0e1b1d29778a2066030418b1"}, + {file = "cryptography-47.0.0-cp314-cp314t-manylinux_2_28_aarch64.whl", hash = "sha256:6651d32eff255423503aa276739da98c30f26c40cbeffcc6048e0d54ef704c0c"}, + {file = "cryptography-47.0.0-cp314-cp314t-manylinux_2_28_ppc64le.whl", hash = "sha256:3fb8fa48075fad7193f2e5496135c6a76ac4b2aa5a38433df0a539296b377829"}, + {file = "cryptography-47.0.0-cp314-cp314t-manylinux_2_28_x86_64.whl", hash = "sha256:11438c7518132d95f354fa01a4aa2f806d172a061a7bed18cf18cbdacdb204d7"}, + {file = "cryptography-47.0.0-cp314-cp314t-manylinux_2_31_armv7l.whl", hash = "sha256:8c1a736bbb3288005796c3f7ccb9453360d7fed483b13b9f468aea5171432923"}, + {file = "cryptography-47.0.0-cp314-cp314t-manylinux_2_34_aarch64.whl", hash = "sha256:f1557695e5c2b86e204f6ce9470497848634100787935ab7adc5397c54abd7ab"}, + {file = "cryptography-47.0.0-cp314-cp314t-manylinux_2_34_ppc64le.whl", hash = "sha256:f9a034b642b960767fb343766ae5ba6ad653f2e890ddd82955aef288ffea8736"}, + {file = "cryptography-47.0.0-cp314-cp314t-manylinux_2_34_x86_64.whl", hash = "sha256:b1c76fca783aa7698eb21eb14f9c4aa09452248ee54a627d125025a43f83e7a7"}, + {file = "cryptography-47.0.0-cp314-cp314t-musllinux_1_2_aarch64.whl", hash = "sha256:4f7722c97826770bab8ae92959a2e7b20a5e9e9bf4deae68fd86c3ca457bab52"}, + {file = "cryptography-47.0.0-cp314-cp314t-musllinux_1_2_x86_64.whl", hash = "sha256:09f6d7bf6724f8db8b32f11eccf23efc8e759924bc5603800335cf8859a3ddbd"}, + {file = "cryptography-47.0.0-cp314-cp314t-win32.whl", hash = "sha256:6eebcaf0df1d21ce1f90605c9b432dd2c4f4ab665ac29a40d5e3fc68f51b5e63"}, + {file = "cryptography-47.0.0-cp314-cp314t-win_amd64.whl", hash = "sha256:51c9313e90bd1690ec5a75ed047c27c0b8e6c570029712943d6116ef9a90620b"}, + {file = "cryptography-47.0.0-cp38-abi3-macosx_10_9_universal2.whl", hash = "sha256:14432c8a9bcb37009784f9594a62fae211a2ae9543e96c92b2a8e4c3cd5cd0c4"}, + {file = "cryptography-47.0.0-cp38-abi3-manylinux2014_aarch64.manylinux_2_17_aarch64.whl", hash = "sha256:07efe86201817e7d3c18781ca9770bc0db04e1e48c994be384e4602bc38f8f27"}, + {file = "cryptography-47.0.0-cp38-abi3-manylinux2014_x86_64.manylinux_2_17_x86_64.whl", hash = "sha256:2b45761c6ec22b7c726d6a829558777e32d0f1c8be7c3f3480f9c912d5ee8a10"}, + {file = "cryptography-47.0.0-cp38-abi3-manylinux_2_28_aarch64.whl", hash = "sha256:edd4da498015da5b9f26d38d3bfc2e90257bfa9cbed1f6767c282a0025ae649b"}, + {file = "cryptography-47.0.0-cp38-abi3-manylinux_2_28_ppc64le.whl", hash = "sha256:9af828c0d5a65c70ec729cd7495a4bf1a67ecb66417b8f02ff125ab8a6326a74"}, + {file = "cryptography-47.0.0-cp38-abi3-manylinux_2_28_x86_64.whl", hash = "sha256:256d07c78a04d6b276f5df935a9923275f53bd1522f214447fdf365494e2d515"}, + {file = "cryptography-47.0.0-cp38-abi3-manylinux_2_31_armv7l.whl", hash = "sha256:5d0e362ff51041b0c0d219cc7d6924d7b8996f57ce5712bdcef71eb3c65a59cc"}, + {file = "cryptography-47.0.0-cp38-abi3-manylinux_2_34_aarch64.whl", hash = "sha256:1581aef4219f7ca2849d0250edaa3866212fb74bf5667284f46aa92f9e65c1ca"}, + {file = "cryptography-47.0.0-cp38-abi3-manylinux_2_34_ppc64le.whl", hash = "sha256:a49a3eb5341b9503fa3000a9a0db033161db90d47285291f53c2a9d2cd1b7f76"}, + {file = "cryptography-47.0.0-cp38-abi3-manylinux_2_34_x86_64.whl", hash = "sha256:2207a498b03275d0051589e326b79d4cf59985c99031b05bb292ac52631c37fe"}, + {file = "cryptography-47.0.0-cp38-abi3-musllinux_1_2_aarch64.whl", hash = "sha256:7a02675e2fabd0c0fc04c868b8781863cbf1967691543c22f5470500ff840b31"}, + {file = "cryptography-47.0.0-cp38-abi3-musllinux_1_2_x86_64.whl", hash = "sha256:80887c5cbd1774683cb126f0ab4184567f080071d5acf62205acb354b4b753b7"}, + {file = "cryptography-47.0.0-cp38-abi3-win32.whl", hash = "sha256:ed67ea4e0cfb5faa5bc7ecb6e2b8838f3807a03758eec239d6c21c8769355310"}, + {file = "cryptography-47.0.0-cp38-abi3-win_amd64.whl", hash = "sha256:835d2d7f47cdc53b3224e90810fb1d36ca94ea29cc1801fb4c1bc43876735769"}, + {file = "cryptography-47.0.0-pp311-pypy311_pp73-macosx_11_0_arm64.whl", hash = "sha256:7f1207974a904e005f762869996cf620e9bf79ecb4622f148550bb48e0eb35a7"}, + {file = "cryptography-47.0.0-pp311-pypy311_pp73-manylinux_2_28_aarch64.whl", hash = "sha256:1a405c08857258c11016777e11c02bacbe7ef596faf259305d282272a3a05cbe"}, + {file = "cryptography-47.0.0-pp311-pypy311_pp73-manylinux_2_28_x86_64.whl", hash = "sha256:20fdbe3e38fb67c385d233c89371fa27f9909f6ebca1cecc20c13518dae65475"}, + {file = "cryptography-47.0.0-pp311-pypy311_pp73-manylinux_2_34_aarch64.whl", hash = "sha256:f7db373287273d8af1414cf95dc4118b13ffdc62be521997b0f2b270771fef50"}, + {file = "cryptography-47.0.0-pp311-pypy311_pp73-manylinux_2_34_x86_64.whl", hash = "sha256:9fe6b7c64926c765f9dff301f9c1b867febcda5768868ca084e18589113732ab"}, + {file = "cryptography-47.0.0-pp311-pypy311_pp73-win_amd64.whl", hash = "sha256:cffbba3392df0fa8629bb7f43454ee2925059ee158e23c54620b9063912b86c8"}, + {file = "cryptography-47.0.0.tar.gz", hash = "sha256:9f8e55fe4e63613a5e1cc5819030f27b97742d720203a087802ce4ce9ceb52bb"}, ] [package.dependencies] @@ -1017,14 +1017,7 @@ cffi = {version = ">=2.0.0", markers = "python_full_version >= \"3.9.0\" and pla typing-extensions = {version = ">=4.13.2", markers = "python_full_version < \"3.11.0\""} [package.extras] -docs = ["sphinx (>=5.3.0)", "sphinx-inline-tabs", "sphinx-rtd-theme (>=3.0.0)"] -docstest = ["pyenchant (>=3)", "readme-renderer (>=30.0)", "sphinxcontrib-spelling (>=7.3.1)"] -nox = ["nox[uv] (>=2024.4.15)"] -pep8test = ["check-sdist", "click (>=8.0.1)", "mypy (>=1.14)", "ruff (>=0.11.11)"] -sdist = ["build (>=1.0.0)"] ssh = ["bcrypt (>=3.1.5)"] -test = ["certifi (>=2024)", "cryptography-vectors (==46.0.7)", "pretend (>=0.7)", "pytest (>=7.4.0)", "pytest-benchmark (>=4.0)", "pytest-cov (>=2.10.1)", "pytest-xdist (>=3.5.0)"] -test-randomorder = ["pytest-randomly"] [[package]] name = "cucumber-expressions" @@ -1249,14 +1242,14 @@ python-dateutil = ">=2.4" [[package]] name = "filelock" -version = "3.25.2" +version = "3.29.0" description = "A platform independent file lock." optional = false python-versions = ">=3.10" groups = ["dev"] files = [ - {file = "filelock-3.25.2-py3-none-any.whl", hash = "sha256:ca8afb0da15f229774c9ad1b455ed96e85a81373065fb10446672f64444ddf70"}, - {file = "filelock-3.25.2.tar.gz", hash = "sha256:b64ece2b38f4ca29dd3e810287aa8c48182bbecd1ae6e9ae126c9b35f1382694"}, + {file = "filelock-3.29.0-py3-none-any.whl", hash = "sha256:96f5f6344709aa1572bbf631c640e4ebeeb519e08da902c39a001882f30ac258"}, + {file = "filelock-3.29.0.tar.gz", hash = "sha256:69974355e960702e789734cb4871f884ea6fe50bd8404051a3530bc07809cf90"}, ] [[package]] @@ -1294,14 +1287,14 @@ pypi = ["pip (>=24.0)", "platformdirs (>=4.2)", "wheel (>=0.42)"] [[package]] name = "identify" -version = "2.6.18" +version = "2.6.19" description = "File identification library for Python" optional = false python-versions = ">=3.10" groups = ["dev"] files = [ - {file = "identify-2.6.18-py2.py3-none-any.whl", hash = "sha256:8db9d3c8ea9079db92cafb0ebf97abdc09d52e97f4dcf773a2e694048b7cd737"}, - {file = "identify-2.6.18.tar.gz", hash = "sha256:873ac56a5e3fd63e7438a7ecbc4d91aca692eb3fefa4534db2b7913f3fc352fd"}, + {file = "identify-2.6.19-py2.py3-none-any.whl", hash = "sha256:20e6a87f786f768c092a721ad107fc9df0eb89347be9396cadf3f4abbd1fb78a"}, + {file = "identify-2.6.19.tar.gz", hash = "sha256:6be5020c38fcb07da56c53733538a3081ea5aa70d36a156f83044bfbf9173842"}, ] [package.extras] @@ -1309,18 +1302,18 @@ license = ["ukkonen"] [[package]] name = "idna" -version = "3.11" +version = "3.13" description = "Internationalized Domain Names in Applications (IDNA)" optional = false python-versions = ">=3.8" groups = ["dev", "test"] files = [ - {file = "idna-3.11-py3-none-any.whl", hash = "sha256:771a87f49d9defaf64091e6e6fe9c18d4833f140bd19464795bc32d966ca37ea"}, - {file = "idna-3.11.tar.gz", hash = "sha256:795dafcc9c04ed0c1fb032c2aa73654d8e8c5023a7df64a53f39190ada629902"}, + {file = "idna-3.13-py3-none-any.whl", hash = "sha256:892ea0cde124a99ce773decba204c5552b69c3c67ffd5f232eb7696135bc8bb3"}, + {file = "idna-3.13.tar.gz", hash = "sha256:585ea8fe5d69b9181ec1afba340451fba6ba764af97026f92a91d4eef164a242"}, ] [package.extras] -all = ["flake8 (>=7.1.1)", "mypy (>=1.11.2)", "pytest (>=8.3.2)", "ruff (>=0.6.2)"] +all = ["mypy (>=1.11.2)", "pytest (>=8.3.2)", "ruff (>=0.6.2)"] [[package]] name = "importlib-metadata" @@ -2088,14 +2081,14 @@ et-xmlfile = "*" [[package]] name = "packaging" -version = "26.0" +version = "26.2" description = "Core utilities for Python packages" optional = false python-versions = ">=3.8" groups = ["dev", "docs", "lint", "test"] files = [ - {file = "packaging-26.0-py3-none-any.whl", hash = "sha256:b36f1fef9334a5588b4166f8bcd26a14e521f2b55e6b9de3aaa80d3ff7a37529"}, - {file = "packaging-26.0.tar.gz", hash = "sha256:00243ae351a257117b6a241061796684b084ed1c516a08c48a3f7e147a9d80b4"}, + {file = "packaging-26.2-py3-none-any.whl", hash = "sha256:5fc45236b9446107ff2415ce77c807cee2862cb6fac22b8a73826d0693b0980e"}, + {file = "packaging-26.2.tar.gz", hash = "sha256:ff452ff5a3e828ce110190feff1178bb1f2ea2281fa2075aadb987c2fb221661"}, ] [[package]] @@ -2244,32 +2237,31 @@ testing = ["pytest (<5.0) ; python_version < \"3.0\"", "pytest (>=5.0) ; python_ [[package]] name = "pathspec" -version = "1.0.4" +version = "1.1.1" description = "Utility library for gitignore style pattern matching of file paths." optional = false python-versions = ">=3.9" groups = ["dev", "docs", "lint"] files = [ - {file = "pathspec-1.0.4-py3-none-any.whl", hash = "sha256:fb6ae2fd4e7c921a165808a552060e722767cfa526f99ca5156ed2ce45a5c723"}, - {file = "pathspec-1.0.4.tar.gz", hash = "sha256:0210e2ae8a21a9137c0d470578cb0e595af87edaa6ebf12ff176f14a02e0e645"}, + {file = "pathspec-1.1.1-py3-none-any.whl", hash = "sha256:a00ce642f577bf7f473932318056212bc4f8bfdf53128c78bbd5af0b9b20b189"}, + {file = "pathspec-1.1.1.tar.gz", hash = "sha256:17db5ecd524104a120e173814c90367a96a98d07c45b2e10c2f3919fff91bf5a"}, ] [package.extras] hyperscan = ["hyperscan (>=0.7)"] optional = ["typing-extensions (>=4)"] re2 = ["google-re2 (>=1.1)"] -tests = ["pytest (>=9)", "typing-extensions (>=4.15)"] [[package]] name = "platformdirs" -version = "4.9.4" +version = "4.9.6" description = "A small Python package for determining appropriate platform-specific dirs, e.g. a `user data dir`." optional = false python-versions = ">=3.10" groups = ["dev", "docs", "lint"] files = [ - {file = "platformdirs-4.9.4-py3-none-any.whl", hash = "sha256:68a9a4619a666ea6439f2ff250c12a853cd1cbd5158d258bd824a7df6be2f868"}, - {file = "platformdirs-4.9.4.tar.gz", hash = "sha256:1ec356301b7dc906d83f371c8f487070e99d3ccf9e501686456394622a01a934"}, + {file = "platformdirs-4.9.6-py3-none-any.whl", hash = "sha256:e61adb1d5e5cb3441b4b7710bea7e4c12250ca49439228cc1021c00dcfac0917"}, + {file = "platformdirs-4.9.6.tar.gz", hash = "sha256:3bfa75b0ad0db84096ae777218481852c0ebc6c727b3168c1b9e0118e458cf0a"}, ] [[package]] @@ -3059,14 +3051,14 @@ markers = {docs = "python_version == \"3.10\"", test = "python_version == \"3.10 [[package]] name = "tzdata" -version = "2026.1" +version = "2026.2" description = "Provider of IANA time zone data" optional = false python-versions = ">=2" groups = ["main"] files = [ - {file = "tzdata-2026.1-py2.py3-none-any.whl", hash = "sha256:4b1d2be7ac37ceafd7327b961aa3a54e467efbdb563a23655fbfe0d39cfc42a9"}, - {file = "tzdata-2026.1.tar.gz", hash = "sha256:67658a1903c75917309e753fdc349ac0efd8c27db7a0cb406a25be4840f87f98"}, + {file = "tzdata-2026.2-py2.py3-none-any.whl", hash = "sha256:bbe9af844f658da81a5f95019480da3a89415801f6cc966806612cc7169bffe7"}, + {file = "tzdata-2026.2.tar.gz", hash = "sha256:9173fde7d80d9018e02a662e168e5a2d04f87c41ea174b139fbef642eda62d10"}, ] [[package]] @@ -3089,14 +3081,14 @@ zstd = ["backports-zstd (>=1.0.0) ; python_version < \"3.14\""] [[package]] name = "virtualenv" -version = "21.2.0" +version = "21.2.1" description = "Virtual Python Environment builder" optional = false python-versions = ">=3.8" groups = ["dev"] files = [ - {file = "virtualenv-21.2.0-py3-none-any.whl", hash = "sha256:1bd755b504931164a5a496d217c014d098426cddc79363ad66ac78125f9d908f"}, - {file = "virtualenv-21.2.0.tar.gz", hash = "sha256:1720dc3a62ef5b443092e3f499228599045d7fea4c79199770499df8becf9098"}, + {file = "virtualenv-21.2.1-py3-none-any.whl", hash = "sha256:bd16b49c53562b28cf1a3ad2f36edb805ad71301dee70ddc449e5c88a9f919a2"}, + {file = "virtualenv-21.2.1.tar.gz", hash = "sha256:b66ffe81301766c0d5e2208fc3576652c59d44e7b731fc5f5ed701c9b537fa78"}, ] [package.dependencies] @@ -3331,14 +3323,14 @@ tomli = {version = ">=2.0", markers = "python_full_version < \"3.11.0\""} [[package]] name = "zipp" -version = "3.23.0" +version = "3.23.1" description = "Backport of pathlib-compatible object wrapper for zip files" optional = false python-versions = ">=3.9" groups = ["main"] files = [ - {file = "zipp-3.23.0-py3-none-any.whl", hash = "sha256:071652d6115ed432f5ce1d34c336c0adfd6a884660d1e9712a256d3d3bd4b14e"}, - {file = "zipp-3.23.0.tar.gz", hash = "sha256:a07157588a12518c9d4034df3fbbee09c814741a33ff63c05fa29d26a2404166"}, + {file = "zipp-3.23.1-py3-none-any.whl", hash = "sha256:0b3596c50a5c700c9cb40ba8d86d9f2cc4807e9bedb06bcdf7fac85633e444dc"}, + {file = "zipp-3.23.1.tar.gz", hash = "sha256:32120e378d32cd9714ad503c1d024619063ec28aad2248dc6672ad13edfa5110"}, ] [package.extras] diff --git a/src/dve/core_engine/backends/implementations/duckdb/readers/csv.py b/src/dve/core_engine/backends/implementations/duckdb/readers/csv.py index 867f442..17d7635 100644 --- a/src/dve/core_engine/backends/implementations/duckdb/readers/csv.py +++ b/src/dve/core_engine/backends/implementations/duckdb/readers/csv.py @@ -6,12 +6,7 @@ import duckdb as ddb import polars as pl -from duckdb import ( - DuckDBPyConnection, - DuckDBPyRelation, - StarExpression, - read_csv, -) +from duckdb import DuckDBPyConnection, DuckDBPyRelation, StarExpression, read_csv from pydantic import BaseModel from dve.core_engine.backends.base.reader import BaseFileReader, read_function diff --git a/src/dve/core_engine/backends/implementations/duckdb/readers/json.py b/src/dve/core_engine/backends/implementations/duckdb/readers/json.py index dd12721..cf0fa82 100644 --- a/src/dve/core_engine/backends/implementations/duckdb/readers/json.py +++ b/src/dve/core_engine/backends/implementations/duckdb/readers/json.py @@ -53,5 +53,7 @@ def read_to_relation( # pylint: disable=unused-argument } return self.add_record_index( - self._connection.read_json(resource, columns=ddb_schema, format=self._json_format) # type: ignore + self._connection.read_json( + resource, columns=ddb_schema, format=self._json_format # type: ignore + ) ) diff --git a/src/dve/pipeline/duckdb_pipeline.py b/src/dve/pipeline/duckdb_pipeline.py index 0370106..b6a643f 100644 --- a/src/dve/pipeline/duckdb_pipeline.py +++ b/src/dve/pipeline/duckdb_pipeline.py @@ -45,6 +45,7 @@ def __init__( submitted_files_path, job_run_id, logger, + {"connection": self._connection}, ) def init_reference_data_loader( diff --git a/src/dve/pipeline/pipeline.py b/src/dve/pipeline/pipeline.py index 91ff2ee..fae60b6 100644 --- a/src/dve/pipeline/pipeline.py +++ b/src/dve/pipeline/pipeline.py @@ -9,7 +9,7 @@ from functools import lru_cache from itertools import starmap from threading import Lock -from typing import Optional, Union +from typing import Any, Optional, Union from uuid import uuid4 import polars as pl @@ -49,6 +49,7 @@ ) +# pylint: disable=R0904 class BaseDVEPipeline: """ Base class for running a DVE Pipeline either by a given step or a full e2e process. @@ -64,6 +65,7 @@ def __init__( submitted_files_path: Optional[URI], job_run_id: Optional[int] = None, logger: Optional[logging.Logger] = None, + backend_reader_kwargs: Optional[dict[str, Any]] = None, ): self._submitted_files_path = submitted_files_path self._processed_files_path = processed_files_path @@ -76,6 +78,7 @@ def __init__( self._summary_lock = Lock() self._rec_tracking_lock = Lock() self._aggregates_lock = Lock() + self._backend_reader_kwargs = backend_reader_kwargs if self._data_contract: self._data_contract.logger = self._logger @@ -107,6 +110,12 @@ def step_implementations(self) -> Optional[BaseStepImplementations[EntityType]]: """The step implementations to apply the business rules to a given dataset""" return self._step_implementations + @property + def backend_reader_kwargs(self) -> dict[str, Any] | None: + """Important required arguments for all readers related to the specific backend + that can't be specified at time of writing config eg. duckdb connection""" + return self._backend_reader_kwargs + @staticmethod def get_entity_count(entity: EntityType) -> int: """Get a row count of an entity stored as parquet""" @@ -203,7 +212,9 @@ def write_file_to_parquet( for model_name, model in models.items(): self._logger.info(f"Transforming {model_name} to stringified parquet") - reader: BaseFileReader = load_reader(dataset, model_name, ext) + reader: BaseFileReader = load_reader( + dataset, model_name, ext, self.backend_reader_kwargs + ) try: if not entity_type: reader.write_parquet( diff --git a/src/dve/pipeline/utils.py b/src/dve/pipeline/utils.py index 37f0cc7..eebaa90 100644 --- a/src/dve/pipeline/utils.py +++ b/src/dve/pipeline/utils.py @@ -3,7 +3,7 @@ import json from threading import Lock -from typing import Optional +from typing import Any, Optional from pydantic.main import ModelMetaclass from pyspark.sql import SparkSession @@ -45,10 +45,17 @@ def load_config( return models, config, dataset -def load_reader(dataset: Dataset, model_name: str, file_extension: str): +def load_reader( + dataset: Dataset, + model_name: str, + file_extension: str, + backend_reader_kwargs: Optional[dict[str, Any]] = None, +): """Loads the readers for the diven feed, model name and file extension""" reader_config = dataset[model_name].reader_config[f".{file_extension.lower()}"] - reader = _READER_REGISTRY[reader_config.reader](**reader_config.kwargs_) + reader = _READER_REGISTRY[reader_config.reader]( + **reader_config.kwargs_, **backend_reader_kwargs if backend_reader_kwargs else {} + ) return reader diff --git a/tests/test_pipeline/test_foundry_ddb_pipeline.py b/tests/test_pipeline/test_foundry_ddb_pipeline.py index 9b7b60d..f90592a 100644 --- a/tests/test_pipeline/test_foundry_ddb_pipeline.py +++ b/tests/test_pipeline/test_foundry_ddb_pipeline.py @@ -31,7 +31,7 @@ @pytest.fixture(scope="function") def prep_multithreading_test(): sub_details: dict[str, tuple[DuckDBPyConnection, str, DDBAuditingManager]] = {} - for idx in range(1, 10): + for idx in range(1, 4): db = f"dve_{uuid4().hex}" tmp_dir = tempfile.mkdtemp(prefix="ddb_foundry_testing") db_file = Path(tmp_dir, db + ".duckdb") From 56e217fafa2ef948168fd6b620ee49a359cf9d65 Mon Sep 17 00:00:00 2001 From: stevenhsd <56357022+stevenhsd@users.noreply.github.com> Date: Thu, 30 Apr 2026 16:38:59 +0100 Subject: [PATCH 5/7] fix: ddb xml reader connection args consistent with other ddb readers --- .../backends/implementations/duckdb/readers/xml.py | 7 ++++--- .../test_backends/test_readers/test_ddb_xml.py | 12 ++++++------ tests/test_pipeline/test_foundry_ddb_pipeline.py | 2 +- 3 files changed, 11 insertions(+), 10 deletions(-) diff --git a/src/dve/core_engine/backends/implementations/duckdb/readers/xml.py b/src/dve/core_engine/backends/implementations/duckdb/readers/xml.py index a10998c..4f1527f 100644 --- a/src/dve/core_engine/backends/implementations/duckdb/readers/xml.py +++ b/src/dve/core_engine/backends/implementations/duckdb/readers/xml.py @@ -3,6 +3,7 @@ from typing import Optional +import duckdb import polars as pl from duckdb import DuckDBPyConnection, DuckDBPyRelation, default_connection from pydantic import BaseModel @@ -24,8 +25,8 @@ class DuckDBXMLStreamReader(XMLStreamReader): """A reader for XML files""" - def __init__(self, *, ddb_connection: Optional[DuckDBPyConnection] = None, **kwargs): - self.ddb_connection = ddb_connection if ddb_connection else default_connection + def __init__(self, *, connection: Optional[DuckDBPyConnection] = None, **kwargs): + self._connection = connection if connection else duckdb.connect(":memory:") super().__init__(**kwargs) @read_function(DuckDBPyRelation) @@ -49,4 +50,4 @@ def read_to_relation(self, resource: URI, entity_name: str, schema: type[BaseMod data=self.read_to_py_iterator(resource, entity_name, schema), schema=polars_schema ) ) - return self.ddb_connection.sql("select * from _lazy_frame") + return self._connection.sql("select * from _lazy_frame") diff --git a/tests/test_core_engine/test_backends/test_readers/test_ddb_xml.py b/tests/test_core_engine/test_backends/test_readers/test_ddb_xml.py index 585f7b7..0a685ba 100644 --- a/tests/test_core_engine/test_backends/test_readers/test_ddb_xml.py +++ b/tests/test_core_engine/test_backends/test_readers/test_ddb_xml.py @@ -67,10 +67,10 @@ def test_ddb_xml_reader_all_str(temp_xml_file): uri, header_model, header_data, class_data_model, class_data = temp_xml_file ddb_conn = default_connection header_reader = DuckDBXMLStreamReader( - ddb_connection=ddb_conn, root_tag="root", record_tag="Header" + connection=ddb_conn, root_tag="root", record_tag="Header" ) class_reader = DuckDBXMLStreamReader( - ddb_connection=ddb_conn, root_tag="root", record_tag="ClassData" + connection=ddb_conn, root_tag="root", record_tag="ClassData" ) header_rel: DuckDBPyRelation = header_reader.read_to_relation( uri.as_uri(), "header", header_model @@ -90,10 +90,10 @@ def test_ddb_xml_reader_write_parquet(temp_xml_file): uri, header_model, header_data, class_data_model, class_data = temp_xml_file ddb_conn = default_connection header_reader = DuckDBXMLStreamReader( - ddb_connection=ddb_conn, root_tag="root", record_tag="Header" + connection=ddb_conn, root_tag="root", record_tag="Header" ) class_reader = DuckDBXMLStreamReader( - ddb_connection=ddb_conn, root_tag="root", record_tag="ClassData" + connection=ddb_conn, root_tag="root", record_tag="ClassData" ) header_rel: DuckDBPyRelation = header_reader.read_to_relation( uri.as_uri(), "header", header_model @@ -105,10 +105,10 @@ def test_ddb_xml_reader_write_parquet(temp_xml_file): target_class_loc: Path = uri.parent.joinpath("class_parquet.parquet").as_posix() header_reader.write_parquet(entity=header_rel, target_location=target_header_loc) class_reader.write_parquet(entity=class_rel, target_location=target_class_loc) - header_parquet_rel: DuckDBPyRelation = header_reader.ddb_connection.read_parquet( + header_parquet_rel: DuckDBPyRelation = header_reader._connection.read_parquet( target_header_loc ) - class_parquet_rel: DuckDBPyRelation = class_reader.ddb_connection.read_parquet(target_class_loc) + class_parquet_rel: DuckDBPyRelation = class_reader._connection.read_parquet(target_class_loc) assert header_parquet_rel.df().to_dict(orient="records") == header_rel.df().to_dict( orient="records" ) diff --git a/tests/test_pipeline/test_foundry_ddb_pipeline.py b/tests/test_pipeline/test_foundry_ddb_pipeline.py index f90592a..9b7b60d 100644 --- a/tests/test_pipeline/test_foundry_ddb_pipeline.py +++ b/tests/test_pipeline/test_foundry_ddb_pipeline.py @@ -31,7 +31,7 @@ @pytest.fixture(scope="function") def prep_multithreading_test(): sub_details: dict[str, tuple[DuckDBPyConnection, str, DDBAuditingManager]] = {} - for idx in range(1, 4): + for idx in range(1, 10): db = f"dve_{uuid4().hex}" tmp_dir = tempfile.mkdtemp(prefix="ddb_foundry_testing") db_file = Path(tmp_dir, db + ".duckdb") From 007943fd8dc1c98fe22873c3e4513f4f4333194f Mon Sep 17 00:00:00 2001 From: "george.robertson1" <50412379+georgeRobertson@users.noreply.github.com> Date: Thu, 30 Apr 2026 18:00:14 +0100 Subject: [PATCH 6/7] test: fix tests and linting and remove all default_connections --- .../implementations/duckdb/readers/xml.py | 2 +- .../test_duckdb/test_data_contract.py | 17 +++++++------- .../test_duckdb/test_rules.py | 4 ++-- .../test_readers/test_ddb_csv.py | 23 ++++++++++--------- .../test_readers/test_ddb_json.py | 13 +++++++---- .../test_readers/test_ddb_xml.py | 7 +++--- 6 files changed, 36 insertions(+), 30 deletions(-) diff --git a/src/dve/core_engine/backends/implementations/duckdb/readers/xml.py b/src/dve/core_engine/backends/implementations/duckdb/readers/xml.py index 4f1527f..c63e464 100644 --- a/src/dve/core_engine/backends/implementations/duckdb/readers/xml.py +++ b/src/dve/core_engine/backends/implementations/duckdb/readers/xml.py @@ -5,7 +5,7 @@ import duckdb import polars as pl -from duckdb import DuckDBPyConnection, DuckDBPyRelation, default_connection +from duckdb import DuckDBPyConnection, DuckDBPyRelation from pydantic import BaseModel from dve.core_engine.backends.base.reader import read_function diff --git a/tests/test_core_engine/test_backends/test_implementations/test_duckdb/test_data_contract.py b/tests/test_core_engine/test_backends/test_implementations/test_duckdb/test_data_contract.py index d382ecb..74edce9 100644 --- a/tests/test_core_engine/test_backends/test_implementations/test_duckdb/test_data_contract.py +++ b/tests/test_core_engine/test_backends/test_implementations/test_duckdb/test_data_contract.py @@ -2,8 +2,9 @@ from pathlib import Path from typing import Any, Dict, List, Tuple +import duckdb import pytest -from duckdb import DuckDBPyRelation, default_connection +from duckdb import DuckDBPyRelation from duckdb.typing import DuckDBPyType from dve.core_engine.backends.implementations.duckdb.contract import DuckDBDataContract @@ -34,7 +35,7 @@ def test_duckdb_data_contract_csv(temp_csv_file): uri, _, _, mdl = temp_csv_file - connection = default_connection + connection = duckdb.connect(":memory:") contract_meta = json.dumps( { @@ -106,7 +107,7 @@ def test_duckdb_data_contract_csv(temp_csv_file): def test_duckdb_data_contract_xml(temp_xml_file): uri, header_model, header_data, class_model, class_data = temp_xml_file - connection = default_connection + connection = duckdb.connect() contract_meta = json.dumps( { "contract": { @@ -153,10 +154,10 @@ def test_duckdb_data_contract_xml(temp_xml_file): contract_dict = json.loads(contract_meta).get("contract") entities: Dict[str, DuckDBPyRelation] = { "test_header": DuckDBXMLStreamReader( - ddb_connection=connection, root_tag="root", record_tag="Header" + connection=connection, root_tag="root", record_tag="Header" ).read_to_relation(str(uri), "header", header_model), "test_class_info": DuckDBXMLStreamReader( - ddb_connection=connection, root_tag="root", record_tag="ClassData" + connection=connection, root_tag="root", record_tag="ClassData" ).read_to_relation(str(uri), "class_info", class_model), } entity_locations: dict[str, URI] = {} @@ -218,7 +219,7 @@ def test_ddb_data_contract_read_and_write_basic_parquet( ): # can we read in a stringified parquet and run the data contract on it? # basic file - simple data structures - connection = default_connection + connection = duckdb.connect(":memory:") parquet_uri, contract_meta, _ = simple_all_string_parquet data_contract = DuckDBDataContract(connection) # check can read @@ -279,7 +280,7 @@ def test_ddb_data_contract_read_nested_parquet(nested_all_string_parquet): # can we read in a stringified parquet and run the data contract on it? # more complex file - nested, arrays of structs parquet_uri, contract_meta, _ = nested_all_string_parquet - connection = default_connection + connection = duckdb.connect() data_contract = DuckDBDataContract(connection) # check can read entity = data_contract.read_parquet(path=parquet_uri) @@ -337,7 +338,7 @@ def test_ddb_data_contract_read_nested_parquet(nested_all_string_parquet): def test_duckdb_data_contract_custom_error_details(nested_all_string_parquet_w_errors, nested_parquet_custom_dc_err_details): parquet_uri, contract_meta, _ = nested_all_string_parquet_w_errors - connection = default_connection + connection = duckdb.connect(":memory:") data_contract = DuckDBDataContract(connection) entity = data_contract.read_parquet(path=parquet_uri) diff --git a/tests/test_core_engine/test_backends/test_implementations/test_duckdb/test_rules.py b/tests/test_core_engine/test_backends/test_implementations/test_duckdb/test_rules.py index 038b4e9..5d3d3a8 100644 --- a/tests/test_core_engine/test_backends/test_implementations/test_duckdb/test_rules.py +++ b/tests/test_core_engine/test_backends/test_implementations/test_duckdb/test_rules.py @@ -4,6 +4,7 @@ from pathlib import Path from typing import Iterator, List, Optional, Set, Tuple, Type +import duckdb import numpy as np import polars as pl import pytest @@ -12,7 +13,6 @@ ConstantExpression, DuckDBPyRelation, StarExpression, - default_connection, ) from dve.core_engine.backends.base.core import EntityManager @@ -49,7 +49,7 @@ simple_typecast_parquet, ) -DUCKDB_STEP_BACKEND = DuckDBStepImplementations(default_connection) +DUCKDB_STEP_BACKEND = DuckDBStepImplementations(duckdb.connect()) """The backend for the duckdb steps.""" diff --git a/tests/test_core_engine/test_backends/test_readers/test_ddb_csv.py b/tests/test_core_engine/test_backends/test_readers/test_ddb_csv.py index f364045..78974eb 100644 --- a/tests/test_core_engine/test_backends/test_readers/test_ddb_csv.py +++ b/tests/test_core_engine/test_backends/test_readers/test_ddb_csv.py @@ -2,9 +2,10 @@ from pathlib import Path from tempfile import TemporaryDirectory +import duckdb import polars as pl import pytest -from duckdb import DuckDBPyRelation, default_connection +from duckdb import DuckDBPyRelation from pydantic import BaseModel from dve.core_engine.backends.exceptions import EmptyFileError, MessageBearingError @@ -71,7 +72,7 @@ def temp_empty_csv_file(temp_dir: Path): def test_ddb_csv_reader_all_str(temp_csv_file): uri, header, data, mdl = temp_csv_file - reader = DuckDBCSVReader(header=True, delim=",", connection=default_connection) + reader = DuckDBCSVReader(header=True, delim=",", connection=duckdb.connect()) rel: DuckDBPyRelation = reader.read_to_entity_type( DuckDBPyRelation, str(uri), "test", stringify_model(mdl) ) @@ -84,7 +85,7 @@ def test_ddb_csv_reader_all_str(temp_csv_file): def test_ddb_csv_reader_cast(temp_csv_file): uri, header, data, mdl = temp_csv_file - reader = DuckDBCSVReader(header=True, delim=",", connection=default_connection) + reader = DuckDBCSVReader(header=True, delim=",", connection=duckdb.connect()) rel: DuckDBPyRelation = reader.read_to_entity_type(DuckDBPyRelation, str(uri), "test", mdl) expected_dtypes = {**{ fld.name: str(get_duckdb_type_from_annotation(fld.annotation)) @@ -98,7 +99,7 @@ def test_ddb_csv_reader_cast(temp_csv_file): def test_ddb_csv_write_parquet(temp_csv_file): uri, header, data, mdl = temp_csv_file - reader = DuckDBCSVReader(header=True, delim=",", connection=default_connection) + reader = DuckDBCSVReader(header=True, delim=",", connection=duckdb.connect()) rel: DuckDBPyRelation = reader.read_to_entity_type( DuckDBPyRelation, str(uri), "test", stringify_model(mdl) ) @@ -110,7 +111,7 @@ def test_ddb_csv_write_parquet(temp_csv_file): def test_ddb_csv_read_empty_file(temp_empty_csv_file): uri, mdl = temp_empty_csv_file - reader = DuckDBCSVReader(header=True, delim=",", connection=default_connection) + reader = DuckDBCSVReader(header=True, delim=",", connection=duckdb.connect()) with pytest.raises(EmptyFileError): reader.read_to_relation(str(uri), "test", mdl) @@ -119,7 +120,7 @@ def test_ddb_csv_read_empty_file(temp_empty_csv_file): def test_polars_to_ddb_csv_reader(temp_csv_file): uri, header, data, mdl = temp_csv_file reader = PolarsToDuckDBCSVReader( - header=True, delim=",", quotechar='"', connection=default_connection + header=True, delim=",", quotechar='"', connection=duckdb.connect() ) entity = reader.read_to_relation(str(uri), "test", mdl) @@ -141,7 +142,7 @@ def test_ddb_csv_repeating_header_reader_non_duplicate(temp_dir): file_uri = temp_dir.joinpath("test_header.csv") reader = DuckDBCSVRepeatingHeaderReader( - header=True, delim=",", quotechar='"', connection=default_connection + header=True, delim=",", quotechar='"', connection=duckdb.connect() ) entity = reader.read_to_relation(str(file_uri), "test", SimpleHeaderModel) @@ -162,7 +163,7 @@ def test_ddb_csv_repeating_header_reader_with_more_than_one_set_of_distinct_valu file_uri = temp_dir.joinpath("test_header.csv") reader = DuckDBCSVRepeatingHeaderReader( - header=True, delim=",", quotechar='"', connection=default_connection + header=True, delim=",", quotechar='"', connection=duckdb.connect() ) with pytest.raises(MessageBearingError): @@ -182,7 +183,7 @@ def test_DuckDBCSVReader_with_null_empty_strings(temp_dir): header=True, delim=",", quotechar='"', - connection=default_connection, + connection=duckdb.connect(), null_empty_strings=True, ) @@ -207,7 +208,7 @@ def test_DuckDBCSVRepeatingHeaderReader_with_null_empty_strings(temp_dir): header=True, delim=",", quotechar='"', - connection=default_connection, + connection=duckdb.connect(), null_empty_strings=True, ) @@ -230,7 +231,7 @@ def test_PolarsToDuckDBCSVReader_with_null_empty_strings(temp_dir): header=True, delim=",", quotechar='"', - connection=default_connection, + connection=duckdb.connect(), null_empty_strings=True, ) diff --git a/tests/test_core_engine/test_backends/test_readers/test_ddb_json.py b/tests/test_core_engine/test_backends/test_readers/test_ddb_json.py index 6942c6a..87a90e8 100644 --- a/tests/test_core_engine/test_backends/test_readers/test_ddb_json.py +++ b/tests/test_core_engine/test_backends/test_readers/test_ddb_json.py @@ -4,8 +4,9 @@ from tempfile import TemporaryDirectory from typing import List +import duckdb import pytest -from duckdb import DuckDBPyRelation, default_connection +from duckdb import DuckDBPyRelation from pydantic import BaseModel from dve.core_engine.backends.implementations.duckdb.duckdb_helpers import ( @@ -87,17 +88,19 @@ def test_ddb_json_write_parquet(temp_json_file): ) target_loc: Path = uri.parent.joinpath("test_parquet.parquet").as_posix() reader.write_parquet(rel, target_loc) - parquet_rel = default_connection.read_parquet(target_loc) - assert parquet_rel.df().to_dict(orient="records") == rel.df().to_dict(orient="records") + with duckdb.connect() as cnn: + parquet_rel = cnn.read_parquet(target_loc) + assert parquet_rel.df().to_dict(orient="records") == rel.df().to_dict(orient="records") def test_ddb_json_write_parquet_py_iterator(temp_json_file): uri, _, mdl = temp_json_file reader = DuckDBJSONReader() + conn = duckdb.connect() data = list(reader.read_to_py_iterator(uri.as_posix(), "test", stringify_model(mdl))) target_loc: Path = uri.parent.joinpath("test_parquet.parquet").as_posix() - reader.write_parquet(default_connection.query("select dta.* from (select unnest($data) as dta)", + reader.write_parquet(conn.query("select dta.* from (select unnest($data) as dta)", params={"data": data}), target_loc) - parquet_data = sorted(default_connection.read_parquet(target_loc).pl().iter_rows(named=True), + parquet_data = sorted(conn.read_parquet(target_loc).pl().iter_rows(named=True), key= lambda x: x.get("bigint_field")) assert parquet_data == list(data) diff --git a/tests/test_core_engine/test_backends/test_readers/test_ddb_xml.py b/tests/test_core_engine/test_backends/test_readers/test_ddb_xml.py index 0a685ba..1c12a7b 100644 --- a/tests/test_core_engine/test_backends/test_readers/test_ddb_xml.py +++ b/tests/test_core_engine/test_backends/test_readers/test_ddb_xml.py @@ -3,8 +3,9 @@ from tempfile import TemporaryDirectory from typing import Dict, List +import duckdb import pytest -from duckdb import DuckDBPyRelation, default_connection +from duckdb import DuckDBPyRelation from lxml import etree as ET from pydantic import BaseModel @@ -65,7 +66,7 @@ class ClassDataModel(BaseModel): def test_ddb_xml_reader_all_str(temp_xml_file): uri, header_model, header_data, class_data_model, class_data = temp_xml_file - ddb_conn = default_connection + ddb_conn = duckdb.connect() header_reader = DuckDBXMLStreamReader( connection=ddb_conn, root_tag="root", record_tag="Header" ) @@ -88,7 +89,7 @@ def test_ddb_xml_reader_all_str(temp_xml_file): def test_ddb_xml_reader_write_parquet(temp_xml_file): uri, header_model, header_data, class_data_model, class_data = temp_xml_file - ddb_conn = default_connection + ddb_conn = duckdb.connect() header_reader = DuckDBXMLStreamReader( connection=ddb_conn, root_tag="root", record_tag="Header" ) From b11e7053cdba7ba172344c619eaedf5b84edaf0d Mon Sep 17 00:00:00 2001 From: "george.robertson1" <50412379+georgeRobertson@users.noreply.github.com> Date: Thu, 30 Apr 2026 18:15:05 +0100 Subject: [PATCH 7/7] =?UTF-8?q?bump:=20version=200.7.5=20=E2=86=92=200.7.6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- CHANGELOG.md | 13 +++++++++++++ pyproject.toml | 2 +- 2 files changed, 14 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 59b368b..b9b14f7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,16 @@ +## v0.7.6 (2026-04-30) + +### Fix + +- ddb xml reader connection args consistent with other ddb readers +- remove default connection from ddb json reader +- remove default connection in csv reader causing hanging in multithreaded setups +- configured refdata loader to be instantiated when required without class vars #99 + +### Refactor + +- add in backend kwargs for readers to allow reader args not determinable at config write time to be passed + ## v0.7.5 (2026-04-29) ### Fix diff --git a/pyproject.toml b/pyproject.toml index a6c679e..fb633c3 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -24,7 +24,7 @@ Issues = "https://github.com/NHSDigital/data-validation-engine/issues" Changelog = "https://github.com/NHSDigital/data-validation-engine/blob/main/CHANGELOG.md" [tool.poetry] -version = "0.7.5" +version = "0.7.6" packages = [ { include = "dve", from = "src" }, ]