-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathutils.py
More file actions
82 lines (68 loc) · 2.34 KB
/
utils.py
File metadata and controls
82 lines (68 loc) · 2.34 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
"""Utilities to support reporting"""
import json
from typing import Optional
import dve.parser.file_handling as fh
from dve.core_engine.exceptions import CriticalProcessingError
from dve.core_engine.type_hints import URI, Messages
from dve.reporting.error_report import conditional_cast
def dump_feedback_errors(
working_folder: URI,
step_name: str,
messages: Messages,
key_fields: Optional[dict[str, list[str]]] = None,
):
"""Write out captured feedback error messages."""
if not working_folder:
raise AttributeError("processed files path not passed")
if not key_fields:
key_fields = {}
errors = fh.joinuri(working_folder, "errors", f"{step_name}_errors.json")
processed = []
for message in messages:
if message.original_entity is not None:
primary_keys = key_fields.get(message.original_entity, [])
elif message.entity is not None:
primary_keys = key_fields.get(message.entity, [])
else:
primary_keys = []
error = message.to_dict(
key_field=primary_keys,
value_separator=" -- ",
max_number_of_values=10,
record_converter=None,
)
error["Key"] = conditional_cast(error["Key"], primary_keys, value_separator=" -- ")
processed.append(error)
with fh.open_stream(errors, "a") as f:
json.dump(
processed,
f,
default=str,
)
def dump_processing_errors(
working_folder: URI, step_name: str, errors: list[CriticalProcessingError]
):
"""Write out critical processing errors"""
if not working_folder:
raise AttributeError("processed files path not passed")
if not step_name:
raise AttributeError("step name not passed")
if not errors:
raise AttributeError("errors list not passed")
error_file: URI = fh.joinuri(working_folder, "errors", "processing_errors.json")
processed = []
for error in errors:
processed.append(
{
"step_name": step_name,
"error_location": "processing",
"error_level": "integrity",
"error_message": error.error_message,
}
)
with fh.open_stream(error_file, "a") as f:
json.dump(
processed,
f,
default=str,
)