Skip to content

Commit f922f0a

Browse files
committed
feat: added pass through of submission status to provide context to services processing files
1 parent 8b787cb commit f922f0a

13 files changed

Lines changed: 328 additions & 214 deletions

File tree

src/dve/core_engine/backends/base/auditing.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
QueueType,
3232
SubmissionResult,
3333
)
34+
from dve.pipeline.utils import SubmissionStatus
3435

3536
AuditReturnType = TypeVar("AuditReturnType") # pylint: disable=invalid-name
3637

@@ -493,6 +494,21 @@ def get_submission_statistics(self, submission_id: str) -> Optional[SubmissionSt
493494
)
494495
except StopIteration:
495496
return None
497+
def get_submission_status(self, submission_id: str) -> SubmissionStatus:
498+
"""Get the latest submission status for a submission"""
499+
sub_status = SubmissionStatus()
500+
processing_rec: ProcessingStatusRecord = next(self._processing_status.conv_to_records(self._processing_status.get_most_recent_records(order_criteria=[OrderCriteria("time_updated", True)],
501+
pre_filter_criteria=[FilterCriteria("submission_id",
502+
submission_id)])))
503+
sub_stats_rec: Optional[SubmissionStatisticsRecord] = self.get_submission_statistics(submission_id)
504+
if processing_rec.processing_status == "failed":
505+
sub_status.processing_failed = True
506+
if processing_rec.submission_result == "failed":
507+
sub_status.validation_failed = True
508+
if sub_stats_rec:
509+
sub_status.number_of_records = sub_stats_rec.record_count
510+
511+
return sub_status
496512

497513
def __enter__(self):
498514
"""Use audit table as context manager"""

src/dve/core_engine/backends/utilities.py

Lines changed: 0 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515
import dve.parser.file_handling as fh
1616
from dve.core_engine.backends.base.utilities import _get_non_heterogenous_type
1717
from dve.core_engine.type_hints import URI, Messages
18-
from dve.reporting.error_report import conditional_cast
1918

2019
# We need to rely on a Python typing implementation detail in Python <= 3.7.
2120
if sys.version_info[:2] <= (3, 7):
@@ -179,38 +178,3 @@ def get_polars_type_from_annotation(type_annotation: Any) -> PolarsType:
179178
if polars_type:
180179
return polars_type
181180
raise ValueError(f"No equivalent DuckDB type for {type_annotation!r}")
182-
183-
184-
def dump_errors(
185-
working_folder: URI,
186-
step_name: str,
187-
messages: Messages,
188-
key_fields: Optional[dict[str, list[str]]] = None,
189-
):
190-
"""Write out to disk captured feedback error messages."""
191-
if not working_folder:
192-
raise AttributeError("processed files path not passed")
193-
194-
if not key_fields:
195-
key_fields = {}
196-
197-
errors = fh.joinuri(working_folder, "errors", f"{step_name}_errors.json")
198-
processed = []
199-
200-
for message in messages:
201-
primary_keys: list[str] = key_fields.get(message.entity if message.entity else "", [])
202-
error = message.to_dict(
203-
key_field=primary_keys,
204-
value_separator=" -- ",
205-
max_number_of_values=10,
206-
record_converter=None,
207-
)
208-
error["Key"] = conditional_cast(error["Key"], primary_keys, value_separator=" -- ")
209-
processed.append(error)
210-
211-
with fh.open_stream(errors, "a") as f:
212-
json.dump(
213-
processed,
214-
f,
215-
default=str,
216-
)

src/dve/core_engine/exceptions.py

Lines changed: 6 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -25,18 +25,12 @@ def __init__(
2525
def critical_messages(self) -> Iterator[FeedbackMessage]:
2626
"""Critical messages which caused the processing error."""
2727
yield from filter(lambda message: message.is_critical, self.messages)
28-
29-
def to_feedback_message(self) -> FeedbackMessage:
30-
"Convert to feedback message to write to json file"
31-
return FeedbackMessage(
32-
entity=None,
33-
record=None,
34-
failure_type="integrity",
35-
error_type="processing",
36-
error_location="Whole File",
37-
error_message=self.error_message,
38-
)
39-
28+
29+
@classmethod
30+
def from_exception(cls, exc:Exception):
31+
return cls(error_message = repr(exc),
32+
entities=None,
33+
messages=[])
4034

4135
class EntityTypeMismatch(TypeError):
4236
"""An exception emitted if entity type outputs from two collaborative objects are different."""

src/dve/core_engine/type_hints.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -236,7 +236,7 @@
236236
PROCESSING_STATUSES: tuple[ProcessingStatus, ...] = tuple(list(get_args(ProcessingStatus)))
237237
"""List of all possible DVE submission statuses"""
238238

239-
SubmissionResult = Literal["success", "failed", "failed_xml_generation", "archived"]
239+
SubmissionResult = Literal["success", "failed", "archived", "processing_failed"]
240240
"""Allowed DVE submission results"""
241241

242242
SUBMISSION_RESULTS: tuple[SubmissionResult, ...] = tuple(list(get_args(SubmissionResult)))

src/dve/pipeline/foundry_ddb_pipeline.py

Lines changed: 62 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,16 @@
22

33
from typing import Optional
44
from dve.core_engine.backends.implementations.duckdb.duckdb_helpers import duckdb_get_entity_count, duckdb_write_parquet
5-
from dve.core_engine.backends.utilities import dump_errors
5+
from dve.core_engine.exceptions import CriticalProcessingError
66
from dve.core_engine.models import SubmissionInfo
77
from dve.core_engine.type_hints import URI, Failed
88
from dve.pipeline.duckdb_pipeline import DDBDVEPipeline
99
from dve.pipeline.utils import SubmissionStatus
1010
from dve.parser import file_handling as fh
11+
from dve.reporting.utils import dump_processing_errors
1112

13+
@duckdb_get_entity_count
14+
@duckdb_write_parquet
1215
class FoundryDDBPipeline(DDBDVEPipeline):
1316
"""DuckDB pipeline for running on Foundry Platform"""
1417

@@ -27,46 +30,75 @@ def persist_audit_records(self, submission_info: SubmissionInfo) -> URI:
2730

2831
def file_transformation(
2932
self, submission_info: SubmissionInfo
30-
) -> SubmissionInfo | dict[str, str]:
33+
) -> tuple[SubmissionInfo, SubmissionStatus]:
3134
try:
3235
return super().file_transformation(submission_info)
3336
except Exception as exc: # pylint: disable=W0718
3437
self._logger.error(f"File transformation raised exception: {exc}")
3538
self._logger.exception(exc)
36-
return submission_info.dict()
39+
dump_processing_errors(
40+
fh.joinuri(self.processed_files_path, submission_info.submission_id),
41+
"file_transformation",
42+
[CriticalProcessingError.from_exception(exc)]
43+
)
44+
return submission_info, SubmissionStatus(processing_failed=True)
3745

38-
def apply_data_contract(self, submission_info: SubmissionInfo) -> tuple[SubmissionInfo | bool]:
46+
def apply_data_contract(self, submission_info: SubmissionInfo, submission_status: SubmissionStatus) -> tuple[SubmissionInfo | SubmissionStatus]:
3947
try:
40-
return super().apply_data_contract(submission_info)
48+
return super().apply_data_contract(submission_info, submission_status)
4149
except Exception as exc: # pylint: disable=W0718
4250
self._logger.error(f"Apply data contract raised exception: {exc}")
4351
self._logger.exception(exc)
44-
return submission_info, True
52+
dump_processing_errors(
53+
fh.joinuri(self.processed_files_path, submission_info.submission_id),
54+
"contract",
55+
[CriticalProcessingError.from_exception(exc)]
56+
)
57+
return submission_info, SubmissionStatus(processing_failed=True)
4558

46-
def apply_business_rules(self, submission_info: SubmissionInfo, failed: Failed):
59+
def apply_business_rules(self, submission_info: SubmissionInfo, submission_status: SubmissionStatus):
4760
try:
48-
return super().apply_business_rules(submission_info, failed)
61+
return super().apply_business_rules(submission_info, submission_status)
4962
except Exception as exc: # pylint: disable=W0718
5063
self._logger.error(f"Apply business rules raised exception: {exc}")
5164
self._logger.exception(exc)
52-
return submission_info, SubmissionStatus(failed=True)
65+
dump_processing_errors(
66+
fh.joinuri(self.processed_files_path, submission_info.submission_id),
67+
"business_rules",
68+
[CriticalProcessingError.from_exception(exc)]
69+
)
70+
return submission_info, SubmissionStatus(processing_failed=True)
71+
72+
def error_report(self, submission_info: SubmissionInfo, submission_status: SubmissionStatus):
73+
try:
74+
return super().error_report(submission_info, submission_status)
75+
except Exception as exc: # pylint: disable=W0718
76+
self._logger.error(f"Error reports raised exception: {exc}")
77+
self._logger.exception(exc)
78+
sub_stats = None
79+
report_uri = None
80+
dump_processing_errors(
81+
fh.joinuri(self.processed_files_path, submission_info.submission_id),
82+
"error_report",
83+
[CriticalProcessingError.from_exception(exc)]
84+
)
85+
return submission_info, submission_status, sub_stats, report_uri
5386

5487
def run_pipeline(self, submission_info: SubmissionInfo) -> tuple[Optional[URI], URI, URI]:
5588
"""Sequential single submission pipeline runner"""
5689
try:
5790
sub_id: str = submission_info.submission_id
5891
self._audit_tables.add_new_submissions(submissions=[submission_info])
5992
self._audit_tables.mark_transform(submission_ids=[sub_id])
60-
sub_info = self.file_transformation(submission_info=submission_info)
61-
if isinstance(sub_info, SubmissionInfo):
93+
sub_info, sub_status = self.file_transformation(submission_info=submission_info)
94+
if not (sub_status.validation_failed or sub_status.processing_failed):
6295
self._audit_tables.mark_data_contract(submission_ids=[sub_id])
63-
sub_info, failed = self.apply_data_contract(submission_info=submission_info)
64-
self._audit_tables.mark_business_rules(submissions=[(sub_id, failed)])
96+
sub_info, sub_status = self.apply_data_contract(submission_info=sub_info, submission_status=sub_status)
97+
self._audit_tables.mark_business_rules(submissions=[(sub_id, sub_status.validation_failed)])
6598
sub_info, sub_status = self.apply_business_rules(
66-
submission_info=submission_info, failed=failed
99+
submission_info=submission_info, submission_status=sub_status
67100
)
68-
else:
69-
sub_status = SubmissionStatus(failed=True)
101+
70102
self._audit_tables.mark_error_report(
71103
submissions=[(sub_id, sub_status.submission_result)]
72104
)
@@ -78,15 +110,20 @@ def run_pipeline(self, submission_info: SubmissionInfo) -> tuple[Optional[URI],
78110
self._logger.error(
79111
f"During processing of submission_id: {sub_id}, the following exception was raised: {err}"
80112
)
113+
dump_processing_errors(
114+
fh.joinuri(self.processed_files_path, submission_info.submission_id),
115+
"run_pipeline",
116+
[CriticalProcessingError.from_exception(err)]
117+
)
81118
self._audit_tables.mark_failed(submissions=[sub_id])
82119
finally:
83120
audit_files_uri = self.persist_audit_records(submission_info=submission_info)
84-
return (
85-
(
86-
None
87-
if sub_status.failed
88-
else fh.joinuri(self.processed_files_path, sub_id, "business_rules")
89-
),
90-
report_uri,
91-
audit_files_uri,
92-
)
121+
return (
122+
(
123+
None
124+
if (sub_status.validation_failed or sub_status.processing_failed)
125+
else fh.joinuri(self.processed_files_path, sub_id, "business_rules")
126+
),
127+
report_uri,
128+
audit_files_uri,
129+
)

0 commit comments

Comments
 (0)