-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathtest_foundry_ddb_pipeline.py
More file actions
158 lines (133 loc) · 6.63 KB
/
test_foundry_ddb_pipeline.py
File metadata and controls
158 lines (133 loc) · 6.63 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
"""Test DuckDBPipeline object methods"""
# pylint: disable=missing-function-docstring
# pylint: disable=protected-access
from datetime import datetime
from pathlib import Path
import shutil
import tempfile
from uuid import uuid4
import pytest
from dve.core_engine.backends.implementations.duckdb.auditing import DDBAuditingManager
from dve.core_engine.backends.implementations.duckdb.reference_data import DuckDBRefDataLoader
from dve.core_engine.models import SubmissionInfo
import dve.parser.file_handling as fh
from dve.pipeline.foundry_ddb_pipeline import FoundryDDBPipeline
from ..conftest import get_test_file_path
from ..fixtures import temp_ddb_conn # pylint: disable=unused-import
from .pipeline_helpers import ( # pylint: disable=unused-import
PLANETS_RULES_PATH,
planet_test_files,
movies_test_files
)
def test_foundry_runner_validation_fail(planet_test_files, temp_ddb_conn):
db_file, conn = temp_ddb_conn
processing_folder = planet_test_files
sub_id = uuid4().hex
sub_info = SubmissionInfo.from_metadata_file(submission_id=sub_id,
metadata_uri=processing_folder + "/planets_demo.metadata.json")
sub_folder = processing_folder + f"/{sub_id}"
shutil.copytree(planet_test_files, sub_folder)
DuckDBRefDataLoader.connection = conn
DuckDBRefDataLoader.dataset_config_uri = fh.get_parent(PLANETS_RULES_PATH)
with DDBAuditingManager(db_file.as_uri(), None, conn) as audit_manager:
dve_pipeline = FoundryDDBPipeline(
processed_files_path=processing_folder,
audit_tables=audit_manager,
connection=conn,
rules_path=get_test_file_path("planets/planets_ddb.dischema.json").as_posix(),
submitted_files_path=None,
reference_data_loader=DuckDBRefDataLoader,
)
output_loc, report_uri, audit_files = dve_pipeline.run_pipeline(sub_info)
assert fh.get_resource_exists(report_uri)
assert not output_loc
assert len(list(fh.iter_prefix(audit_files))) == 3
def test_foundry_runner_validation_success(movies_test_files, temp_ddb_conn):
db_file, conn = temp_ddb_conn
# add movies refdata to conn
ref_db_file = Path(db_file.parent, f"movies_refdata.duckdb").as_posix()
conn.sql(f"ATTACH '{ref_db_file}' AS movies_refdata")
conn.read_parquet(get_test_file_path("movies/refdata/movies_sequels.parquet").as_posix()).to_table(f"movies_refdata.sequels")
processing_folder = movies_test_files
sub_id = uuid4().hex
sub_info = SubmissionInfo(submission_id=sub_id,
dataset_id="movies",
file_name="good_movies",
file_extension="json",
submitting_org="TEST",
datetime_received=datetime(2025,11,5))
sub_folder = processing_folder + f"/{sub_id}"
shutil.copytree(movies_test_files, sub_folder)
DuckDBRefDataLoader.connection = conn
DuckDBRefDataLoader.dataset_config_uri = None
with DDBAuditingManager(db_file.as_uri(), None, conn) as audit_manager:
dve_pipeline = FoundryDDBPipeline(
processed_files_path=processing_folder,
audit_tables=audit_manager,
connection=conn,
rules_path=get_test_file_path("movies/movies_ddb.dischema.json").as_posix(),
submitted_files_path=None,
reference_data_loader=DuckDBRefDataLoader,
)
output_loc, report_uri, audit_files = dve_pipeline.run_pipeline(sub_info)
assert fh.get_resource_exists(report_uri)
assert len(list(fh.iter_prefix(output_loc))) == 2
assert len(list(fh.iter_prefix(audit_files))) == 3
def test_foundry_runner_error(planet_test_files, temp_ddb_conn):
# using spark reader config - should error in file transformation - check gracefully handled
db_file, conn = temp_ddb_conn
processing_folder = planet_test_files
sub_id = uuid4().hex
sub_info = SubmissionInfo.from_metadata_file(submission_id=sub_id,
metadata_uri=processing_folder + "/planets_demo.metadata.json")
sub_folder = processing_folder + f"/{sub_id}"
shutil.copytree(planet_test_files, sub_folder)
DuckDBRefDataLoader.connection = conn
DuckDBRefDataLoader.dataset_config_uri = fh.get_parent(PLANETS_RULES_PATH)
with DDBAuditingManager(db_file.as_uri(), None, conn) as audit_manager:
dve_pipeline = FoundryDDBPipeline(
processed_files_path=processing_folder,
audit_tables=audit_manager,
connection=conn,
rules_path=get_test_file_path("planets/planets.dischema.json").as_posix(),
submitted_files_path=None,
reference_data_loader=DuckDBRefDataLoader,
)
output_loc, report_uri, audit_files = dve_pipeline.run_pipeline(sub_info)
assert not fh.get_resource_exists(report_uri)
assert not output_loc
assert len(list(fh.iter_prefix(audit_files))) == 2
def test_foundry_runner_with_submitted_files_path(movies_test_files, temp_ddb_conn):
db_file, conn = temp_ddb_conn
ref_db_file = Path(db_file.parent, "movies_refdata.duckdb").as_posix()
conn.sql(f"ATTACH '{ref_db_file}' AS movies_refdata")
conn.read_parquet(
get_test_file_path("movies/refdata/movies_sequels.parquet").as_posix()
).to_table("movies_refdata.sequels")
processing_folder = Path(tempfile.mkdtemp()).as_posix()
submitted_files_path = Path(movies_test_files).as_posix()
sub_id = uuid4().hex
sub_info = SubmissionInfo(
submission_id=sub_id,
dataset_id="movies",
file_name="good_movies",
file_extension="json",
submitting_org="TEST",
datetime_received=datetime(2025,11,5)
)
DuckDBRefDataLoader.connection = conn
DuckDBRefDataLoader.dataset_config_uri = None
with DDBAuditingManager(db_file.as_uri(), None, conn) as audit_manager:
dve_pipeline = FoundryDDBPipeline(
processed_files_path=processing_folder,
audit_tables=audit_manager,
connection=conn,
rules_path=get_test_file_path("movies/movies_ddb.dischema.json").as_posix(),
submitted_files_path=submitted_files_path,
reference_data_loader=DuckDBRefDataLoader,
)
output_loc, report_uri, audit_files = dve_pipeline.run_pipeline(sub_info)
assert Path(processing_folder, sub_id, sub_info.file_name_with_ext).exists()
assert fh.get_resource_exists(report_uri)
assert len(list(fh.iter_prefix(output_loc))) == 2
assert len(list(fh.iter_prefix(audit_files))) == 3