Skip to content

Commit 8fa895e

Browse files
committed
test: add foundry ddb pipeline multithreading test
1 parent 0ff64b9 commit 8fa895e

1 file changed

Lines changed: 62 additions & 0 deletions

File tree

tests/test_pipeline/test_foundry_ddb_pipeline.py

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,17 @@
22
# pylint: disable=missing-function-docstring
33
# pylint: disable=protected-access
44

5+
from concurrent.futures import Future, ThreadPoolExecutor, as_completed
56
from datetime import datetime
67
from pathlib import Path
78
import shutil
89
import tempfile
10+
from typing import Any
911
from uuid import uuid4
1012

13+
from duckdb import DuckDBPyConnection, connect
1114
import polars as pl
15+
import pytest
1216

1317
from dve.core_engine.backends.implementations.duckdb.auditing import DDBAuditingManager
1418
from dve.core_engine.backends.implementations.duckdb.reference_data import DuckDBRefDataLoader
@@ -24,6 +28,28 @@
2428
movies_test_files
2529
)
2630

31+
@pytest.fixture(scope="function")
32+
def prep_multithreading_test():
33+
sub_details: dict[str, tuple[DuckDBPyConnection, str, DDBAuditingManager]] = {}
34+
for idx in range(1, 10):
35+
db = f"dve_{uuid4().hex}"
36+
tmp_dir = tempfile.mkdtemp(prefix="ddb_foundry_testing")
37+
db_file = Path(tmp_dir, db + ".duckdb")
38+
conn = connect(database=db_file, read_only=False)
39+
ref_db_file = Path(tmp_dir, "movies_refdata.duckdb").as_posix()
40+
conn.sql(f"ATTACH '{ref_db_file}' AS movies_refdata")
41+
conn.read_parquet(
42+
get_test_file_path("movies/refdata/movies_sequels.parquet").as_posix()
43+
).to_table("movies_refdata.sequels")
44+
sub_details[f"submission_{idx}"] = (conn, tmp_dir, DDBAuditingManager(None, None, conn))
45+
46+
yield sub_details
47+
for con, db_dir, aud in sub_details.values():
48+
con.close()
49+
shutil.rmtree(db_dir)
50+
aud.__exit__(None, None, None)
51+
52+
2753
def test_foundry_runner_validation_fail(planet_test_files, temp_ddb_conn):
2854
db_file, conn = temp_ddb_conn
2955
processing_folder = planet_test_files
@@ -205,3 +231,39 @@ def test_foundry_runner_error_at_bi_rules(movies_test_files, temp_ddb_conn):
205231
assert len(list(fh.iter_prefix(audit_files))) == 2
206232
assert audit_manager.get_submission_status(sub_id).processing_failed
207233
assert audit_manager.get_latest_processing_records().select("submission_result").pl().to_dicts()[0]["submission_result"] == "processing_failed"
234+
235+
236+
def test_foundry_runner_multithreaded(prep_multithreading_test):
237+
# get test files in submitted_files location - movies
238+
sub_utils = prep_multithreading_test
239+
sub_infos: dict[str, SubmissionInfo] = {}
240+
with tempfile.TemporaryDirectory(prefix="ddb_mt_sub") as sub_dir, tempfile.TemporaryDirectory(prefix="ddb_mt_process") as proc_dir:
241+
# get test files in submitted_files location
242+
movie_file = get_test_file_path("movies").joinpath("good_movies.json")
243+
for idx in range(1, 10):
244+
dest = Path(sub_dir, f"good_movies_{idx}.json")
245+
shutil.copyfile(movie_file, dest)
246+
sub_infos[f"submission_{idx}"] = SubmissionInfo(submission_id=f"submission_{idx}",
247+
dataset_id="movies",
248+
file_name=f"good_movies_{idx}",
249+
file_extension="json",
250+
submitting_org="TEST",
251+
datetime_received=datetime(2025,11,5))
252+
with ThreadPoolExecutor() as pool:
253+
futures: dict[Future, str] = {}
254+
for sub in sub_infos:
255+
conn, _, aud = sub_utils.get(sub)
256+
target = FoundryDDBPipeline(processed_files_path=proc_dir,
257+
audit_tables=aud,
258+
connection=conn,
259+
rules_path=get_test_file_path("movies/movies_ddb.dischema.json").as_posix(),
260+
submitted_files_path=sub_dir,).run_pipeline
261+
futures[pool.submit(target, (sub_infos.get(sub)))] = sub
262+
263+
for future in as_completed(futures):
264+
sub = futures[future]
265+
output_loc, report_uri, audit_files = future.result()
266+
267+
assert audit_files
268+
assert output_loc
269+
assert report_uri

0 commit comments

Comments
 (0)