Skip to content

Commit cdd5fc8

Browse files
refactor: improve the logging around dve processing errors and align reporting to module name rather than legacy name
1 parent 4bbdf02 commit cdd5fc8

6 files changed

Lines changed: 94 additions & 18 deletions

File tree

src/dve/core_engine/exceptions.py

Lines changed: 4 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,8 @@
11
"""Exceptions emitted by the pipeline."""
22

3-
from collections.abc import Iterator
3+
import traceback
44
from typing import Optional
55

6-
from dve.core_engine.backends.implementations.spark.types import SparkEntities
7-
from dve.core_engine.message import FeedbackMessage
8-
from dve.core_engine.type_hints import Messages
9-
106

117
class CriticalProcessingError(ValueError):
128
"""An exception emitted if critical errors are received."""
@@ -15,26 +11,18 @@ def __init__(
1511
self,
1612
error_message: str,
1713
*args: object,
18-
messages: Optional[Messages],
19-
entities: Optional[SparkEntities] = None
14+
messages: Optional[list[str]] = None,
2015
) -> None:
2116
super().__init__(error_message, *args)
2217
self.error_message = error_message
2318
"""The error message explaining the critical processing error."""
2419
self.messages = messages
25-
"""The messages gathered at the time the error was emitted."""
26-
self.entities = entities
27-
"""The entities as they exist at the time the error was emitted."""
28-
29-
@property
30-
def critical_messages(self) -> Iterator[FeedbackMessage]:
31-
"""Critical messages which caused the processing error."""
32-
yield from filter(lambda message: message.is_critical, self.messages) # type: ignore
20+
"""The stacktrace for the messages."""
3321

3422
@classmethod
3523
def from_exception(cls, exc: Exception):
3624
"""Create from broader exception, for recording in processing errors"""
37-
return cls(error_message=repr(exc), entities=None, messages=[])
25+
return cls(error_message=repr(exc), messages=traceback.format_exception(exc))
3826

3927

4028
class EntityTypeMismatch(TypeError):

src/dve/reporting/utils.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ def dump_processing_errors(
6161
if not errors:
6262
raise AttributeError("errors list not passed")
6363

64-
error_file: URI = fh.joinuri(working_folder, "errors", "processing_errors.json")
64+
error_file: URI = fh.joinuri(working_folder, "processing_errors", "processing_errors.json")
6565
processed = []
6666

6767
for error in errors:
@@ -71,6 +71,7 @@ def dump_processing_errors(
7171
"error_location": "processing",
7272
"error_level": "integrity",
7373
"error_message": error.error_message,
74+
"error_traceback": error.messages,
7475
}
7576
)
7677

tests/test_pipeline/test_foundry_ddb_pipeline.py

Lines changed: 37 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
import tempfile
99
from uuid import uuid4
1010

11-
import pytest
11+
import polars as pl
1212

1313
from dve.core_engine.backends.implementations.duckdb.auditing import DDBAuditingManager
1414
from dve.core_engine.backends.implementations.duckdb.reference_data import DuckDBRefDataLoader
@@ -116,6 +116,42 @@ def test_foundry_runner_error(planet_test_files, temp_ddb_conn):
116116
output_loc, report_uri, audit_files = dve_pipeline.run_pipeline(sub_info)
117117
assert not fh.get_resource_exists(report_uri)
118118
assert not output_loc
119+
120+
perror_path = Path(
121+
processing_folder,
122+
sub_info.submission_id,
123+
"processing_errors",
124+
"processing_errors.json"
125+
)
126+
assert perror_path.exists()
127+
perror_schema = {
128+
"step_name": pl.Utf8(),
129+
"error_location": pl.Utf8(),
130+
"error_level": pl.Utf8(),
131+
"error_message": pl.Utf8(),
132+
"error_traceback": pl.List(pl.Utf8()),
133+
}
134+
expected_error_df = (
135+
pl.DataFrame(
136+
[
137+
{
138+
"step_name": "file_transformation",
139+
"error_location": "processing",
140+
"error_level": "integrity",
141+
"error_message": "ReaderLacksEntityTypeSupport()",
142+
"error_traceback": None,
143+
},
144+
],
145+
perror_schema
146+
)
147+
.select(pl.col("step_name"), pl.col("error_location"), pl.col("error_message"))
148+
)
149+
actual_error_df = (
150+
pl.read_json(perror_path, schema=perror_schema)
151+
.select(pl.col("step_name"), pl.col("error_location"), pl.col("error_message"))
152+
)
153+
assert actual_error_df.equals(expected_error_df)
154+
119155
assert len(list(fh.iter_prefix(audit_files))) == 2
120156

121157

File renamed without changes.

tests/test_reporting/test_utils.py

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
"""test utility functions & objects in dve.reporting module"""
2+
3+
import tempfile
4+
from pathlib import Path
5+
6+
import polars as pl
7+
8+
from dve.core_engine.exceptions import CriticalProcessingError
9+
from dve.reporting.utils import dump_processing_errors
10+
11+
# pylint: disable=C0116
12+
13+
14+
def test_dump_processing_errors():
15+
perror_schema = {
16+
"step_name": pl.Utf8(),
17+
"error_location": pl.Utf8(),
18+
"error_level": pl.Utf8(),
19+
"error_message": pl.Utf8(),
20+
"error_stacktrace": pl.List(pl.Utf8()),
21+
}
22+
with tempfile.TemporaryDirectory() as temp_dir:
23+
dump_processing_errors(
24+
temp_dir,
25+
"test_step",
26+
[CriticalProcessingError("test error message")]
27+
)
28+
29+
output_path = Path(temp_dir, "processing_errors")
30+
31+
assert output_path.exists()
32+
assert len(list(output_path.iterdir())) == 1
33+
34+
expected_df = pl.DataFrame(
35+
[
36+
{
37+
"step_name": "test_step",
38+
"error_location": "processing",
39+
"error_level": "integrity",
40+
"error_message": "test error message",
41+
"error_stacktrace": None,
42+
},
43+
],
44+
perror_schema
45+
)
46+
error_df = pl.read_json(
47+
Path(output_path, "processing_errors.json")
48+
)
49+
cols_to_check = ["step_name", "error_location", "error_level", "error_message"]
50+
51+
assert error_df.select(pl.col(k) for k in cols_to_check).equals(expected_df.select(pl.col(k) for k in cols_to_check))

0 commit comments

Comments
 (0)