-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathfoundry_ddb_pipeline.py
More file actions
92 lines (85 loc) · 4.38 KB
/
foundry_ddb_pipeline.py
File metadata and controls
92 lines (85 loc) · 4.38 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
"""A duckdb pipeline for running on Foundry platform"""
from typing import Optional
from dve.core_engine.backends.implementations.duckdb.duckdb_helpers import duckdb_get_entity_count, duckdb_write_parquet
from dve.core_engine.backends.utilities import dump_errors
from dve.core_engine.models import SubmissionInfo
from dve.core_engine.type_hints import URI, Failed
from dve.pipeline.duckdb_pipeline import DDBDVEPipeline
from dve.pipeline.utils import SubmissionStatus
from dve.parser import file_handling as fh
class FoundryDDBPipeline(DDBDVEPipeline):
"""DuckDB pipeline for running on Foundry Platform"""
def persist_audit_records(self, submission_info: SubmissionInfo) -> URI:
"""Write out key audit relations to parquet for persisting to datasets"""
write_to = fh.joinuri(self.processed_files_path, submission_info.submission_id, "audit/")
self.write_parquet(
self._audit_tables._processing_status.get_relation(),
write_to + "processing_status.parquet",
)
self.write_parquet(
self._audit_tables._submission_statistics.get_relation(),
write_to + "submission_statistics.parquet",
)
return write_to
def file_transformation(
self, submission_info: SubmissionInfo
) -> SubmissionInfo | dict[str, str]:
try:
return super().file_transformation(submission_info)
except Exception as exc: # pylint: disable=W0718
self._logger.error(f"File transformation raised exception: {exc}")
self._logger.exception(exc)
return submission_info.dict()
def apply_data_contract(self, submission_info: SubmissionInfo) -> tuple[SubmissionInfo | bool]:
try:
return super().apply_data_contract(submission_info)
except Exception as exc: # pylint: disable=W0718
self._logger.error(f"Apply data contract raised exception: {exc}")
self._logger.exception(exc)
return submission_info, True
def apply_business_rules(self, submission_info: SubmissionInfo, failed: Failed):
try:
return super().apply_business_rules(submission_info, failed)
except Exception as exc: # pylint: disable=W0718
self._logger.error(f"Apply business rules raised exception: {exc}")
self._logger.exception(exc)
return submission_info, SubmissionStatus(failed=True)
def run_pipeline(self, submission_info: SubmissionInfo) -> tuple[Optional[URI], URI, URI]:
"""Sequential single submission pipeline runner"""
try:
sub_id: str = submission_info.submission_id
self._audit_tables.add_new_submissions(submissions=[submission_info])
self._audit_tables.mark_transform(submission_ids=[sub_id])
sub_info = self.file_transformation(submission_info=submission_info)
if isinstance(sub_info, SubmissionInfo):
self._audit_tables.mark_data_contract(submission_ids=[sub_id])
sub_info, failed = self.apply_data_contract(submission_info=submission_info)
self._audit_tables.mark_business_rules(submissions=[(sub_id, failed)])
sub_info, sub_status = self.apply_business_rules(
submission_info=submission_info, failed=failed
)
else:
sub_status = SubmissionStatus(failed=True)
self._audit_tables.mark_error_report(
submissions=[(sub_id, sub_status.submission_result)]
)
sub_info, sub_status, sub_stats, report_uri = self.error_report(
submission_info=submission_info, status=sub_status
)
self._audit_tables.add_submission_statistics_records(sub_stats=[sub_stats])
except Exception as err: # pylint: disable=W0718
self._logger.error(
f"During processing of submission_id: {sub_id}, the following exception was raised: {err}"
)
self._audit_tables.mark_failed(submissions=[sub_id])
finally:
audit_files_uri = self.persist_audit_records(submission_info=submission_info)
return (
(
None
if sub_status.failed
else fh.joinuri(self.processed_files_path, sub_id, "business_rules")
),
report_uri,
audit_files_uri,
)