-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathfoundry_ddb_pipeline.py
More file actions
135 lines (126 loc) · 6.94 KB
/
foundry_ddb_pipeline.py
File metadata and controls
135 lines (126 loc) · 6.94 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
"""A duckdb pipeline for running on Foundry platform"""
from typing import Optional
from dve.core_engine.backends.implementations.duckdb.duckdb_helpers import duckdb_get_entity_count, duckdb_write_parquet
from dve.core_engine.exceptions import CriticalProcessingError
from dve.core_engine.models import SubmissionInfo
from dve.core_engine.type_hints import URI, Failed
from dve.pipeline.duckdb_pipeline import DDBDVEPipeline
from dve.pipeline.utils import SubmissionStatus
from dve.parser import file_handling as fh
from dve.reporting.utils import dump_processing_errors
@duckdb_get_entity_count
@duckdb_write_parquet
class FoundryDDBPipeline(DDBDVEPipeline):
"""DuckDB pipeline for running on Foundry Platform"""
def persist_audit_records(self, submission_info: SubmissionInfo) -> URI:
"""Write out key audit relations to parquet for persisting to datasets"""
write_to = fh.joinuri(self.processed_files_path, submission_info.submission_id, "audit/")
self.write_parquet(
self._audit_tables._processing_status.get_relation(),
write_to + "processing_status.parquet",
)
self.write_parquet(
self._audit_tables._submission_statistics.get_relation(),
write_to + "submission_statistics.parquet",
)
return write_to
def file_transformation(
self, submission_info: SubmissionInfo
) -> tuple[SubmissionInfo, SubmissionStatus]:
try:
return super().file_transformation(submission_info)
except Exception as exc: # pylint: disable=W0718
self._logger.error(f"File transformation raised exception: {exc}")
self._logger.exception(exc)
dump_processing_errors(
fh.joinuri(self.processed_files_path, submission_info.submission_id),
"file_transformation",
[CriticalProcessingError.from_exception(exc)]
)
self._audit_tables.mark_failed(submissions=[submission_info.submission_id])
return submission_info, SubmissionStatus(processing_failed=True)
def apply_data_contract(self, submission_info: SubmissionInfo, submission_status: Optional[SubmissionStatus] = None) -> tuple[SubmissionInfo | SubmissionStatus]:
try:
return super().apply_data_contract(submission_info, submission_status)
except Exception as exc: # pylint: disable=W0718
self._logger.error(f"Apply data contract raised exception: {exc}")
self._logger.exception(exc)
dump_processing_errors(
fh.joinuri(self.processed_files_path, submission_info.submission_id),
"contract",
[CriticalProcessingError.from_exception(exc)]
)
self._audit_tables.mark_failed(submissions=[submission_info.submission_id])
return submission_info, SubmissionStatus(processing_failed=True)
def apply_business_rules(self, submission_info: SubmissionInfo, submission_status: Optional[SubmissionStatus] = None):
try:
return super().apply_business_rules(submission_info, submission_status)
except Exception as exc: # pylint: disable=W0718
self._logger.error(f"Apply business rules raised exception: {exc}")
self._logger.exception(exc)
dump_processing_errors(
fh.joinuri(self.processed_files_path, submission_info.submission_id),
"business_rules",
[CriticalProcessingError.from_exception(exc)]
)
self._audit_tables.mark_failed(submissions=[submission_info.submission_id])
return submission_info, SubmissionStatus(processing_failed=True)
def error_report(self, submission_info: SubmissionInfo, submission_status: Optional[SubmissionStatus] = None):
try:
return super().error_report(submission_info, submission_status)
except Exception as exc: # pylint: disable=W0718
self._logger.error(f"Error reports raised exception: {exc}")
self._logger.exception(exc)
sub_stats = None
report_uri = None
dump_processing_errors(
fh.joinuri(self.processed_files_path, submission_info.submission_id),
"error_report",
[CriticalProcessingError.from_exception(exc)]
)
self._audit_tables.mark_failed(submissions=[submission_info.submission_id])
return submission_info, submission_status, sub_stats, report_uri
def run_pipeline(self, submission_info: SubmissionInfo) -> tuple[Optional[URI], Optional[URI], URI]:
"""Sequential single submission pipeline runner"""
try:
sub_id: str = submission_info.submission_id
report_uri = None
self._audit_tables.add_new_submissions(submissions=[submission_info])
self._audit_tables.mark_transform(submission_ids=[sub_id])
sub_info, sub_status = self.file_transformation(submission_info=submission_info)
if not (sub_status.validation_failed or sub_status.processing_failed):
self._audit_tables.mark_data_contract(submission_ids=[sub_id])
sub_info, sub_status = self.apply_data_contract(submission_info=sub_info, submission_status=sub_status)
self._audit_tables.mark_business_rules(submissions=[(sub_id, sub_status.validation_failed)])
sub_info, sub_status = self.apply_business_rules(
submission_info=submission_info, submission_status=sub_status
)
if not sub_status.processing_failed:
self._audit_tables.mark_error_report(
submissions=[(sub_id, sub_status.submission_result)]
)
sub_info, sub_status, sub_stats, report_uri = self.error_report(
submission_info=submission_info, submission_status=sub_status
)
self._audit_tables.add_submission_statistics_records(sub_stats=[sub_stats])
except Exception as err: # pylint: disable=W0718
self._logger.error(
f"During processing of submission_id: {sub_id}, the following exception was raised: {err}"
)
dump_processing_errors(
fh.joinuri(self.processed_files_path, submission_info.submission_id),
"run_pipeline",
[CriticalProcessingError.from_exception(err)]
)
self._audit_tables.mark_failed(submissions=[sub_id])
finally:
audit_files_uri = self.persist_audit_records(submission_info=submission_info)
return (
(
None
if (sub_status.validation_failed or sub_status.processing_failed)
else fh.joinuri(self.processed_files_path, sub_id, "business_rules")
),
report_uri if report_uri else None,
audit_files_uri,
)