Skip to content

Commit efe0fa0

Browse files
committed
feat: Change how error messages are generated (by writing in batches). Duckdb no long relies on pandas- use of pyarrow, multiprocessing and background thread batch writing to avoid memory pressure
1 parent 5b5a2eb commit efe0fa0

28 files changed

Lines changed: 655 additions & 482 deletions

File tree

docs/README.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -165,8 +165,8 @@ for entity in data_contract_config.schemas:
165165

166166
# Data contract step here
167167
data_contract = SparkDataContract(spark_session=spark)
168-
entities, validation_messages, success = data_contract.apply_data_contract(
169-
entities, data_contract_config
168+
entities, feedback_errors_uri, success = data_contract.apply_data_contract(
169+
entities, None, data_contract_config
170170
)
171171
```
172172

src/dve/common/__init__.py

Whitespace-only changes.

src/dve/common/error_utils.py

Lines changed: 173 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,173 @@
1+
"""Utilities to support reporting"""
2+
3+
import datetime as dt
4+
from itertools import chain
5+
import json
6+
import logging
7+
from multiprocessing import Queue
8+
from threading import Thread
9+
from typing import Iterable, Iterator, Optional, Union
10+
11+
from dve.core_engine.message import UserMessage
12+
from dve.core_engine.loggers import get_logger
13+
import dve.parser.file_handling as fh
14+
from dve.core_engine.exceptions import CriticalProcessingError
15+
from dve.core_engine.type_hints import URI, DVEStage, Messages
16+
17+
18+
def get_feedback_errors_uri(working_folder: URI, step_name: DVEStage) -> URI:
19+
"""Determine the location of json lines file containing all errors generated in a step"""
20+
return fh.joinuri(working_folder, "errors", f"{step_name}_errors.jsonl")
21+
22+
def get_processing_errors_uri(working_folder: URI) -> URI:
23+
"""Determine the location of json lines file containing all processing
24+
errors generated from DVE run"""
25+
return fh.joinuri(working_folder, "errors", "processing_errors.jsonl")
26+
27+
28+
def dump_feedback_errors(
29+
working_folder: URI,
30+
step_name: DVEStage,
31+
messages: Messages,
32+
key_fields: Optional[dict[str, list[str]]] = None,
33+
) -> URI:
34+
"""Write out captured feedback error messages."""
35+
if not working_folder:
36+
raise AttributeError("processed files path not passed")
37+
38+
if not key_fields:
39+
key_fields = {}
40+
41+
error_file = get_feedback_errors_uri(working_folder, step_name)
42+
processed = []
43+
44+
for message in messages:
45+
if message.original_entity is not None:
46+
primary_keys = key_fields.get(message.original_entity, [])
47+
elif message.entity is not None:
48+
primary_keys = key_fields.get(message.entity, [])
49+
else:
50+
primary_keys = []
51+
52+
error = message.to_dict(
53+
key_field=primary_keys,
54+
value_separator=" -- ",
55+
max_number_of_values=10,
56+
record_converter=None,
57+
)
58+
error["Key"] = conditional_cast(error["Key"], primary_keys, value_separator=" -- ")
59+
processed.append(error)
60+
61+
with fh.open_stream(error_file, "a") as f:
62+
f.write("\n".join([json.dumps(rec, default=str) for rec in processed]) + "\n")
63+
return error_file
64+
65+
66+
def dump_processing_errors(
67+
working_folder: URI, step_name: DVEStage, errors: list[CriticalProcessingError]
68+
) -> URI:
69+
"""Write out critical processing errors"""
70+
if not working_folder:
71+
raise AttributeError("processed files path not passed")
72+
if not step_name:
73+
raise AttributeError("step name not passed")
74+
if not errors:
75+
raise AttributeError("errors list not passed")
76+
77+
error_file: URI = get_processing_errors_uri(working_folder)
78+
processed = []
79+
80+
for error in errors:
81+
processed.append(
82+
{
83+
"step_name": step_name,
84+
"error_location": "processing",
85+
"error_level": "integrity",
86+
"error_message": error.error_message,
87+
}
88+
)
89+
90+
with fh.open_stream(error_file, "a") as f:
91+
f.write("\n".join([json.dumps(rec, default=str) for rec in processed]) + "\n")
92+
93+
return error_file
94+
95+
def load_feedback_messages(feedback_messages_uri: URI) -> Iterable[UserMessage]:
96+
if not fh.get_resource_exists(feedback_messages_uri):
97+
return
98+
with fh.open_stream(feedback_messages_uri) as errs:
99+
yield from (UserMessage(**json.loads(err)) for err in errs.readlines())
100+
101+
def load_all_error_messages(error_directory_uri: URI) -> Iterable[UserMessage]:
102+
return chain.from_iterable([load_feedback_messages(err_file) for err_file, _ in fh.iter_prefix(error_directory_uri) if err_file.endswith(".jsonl")])
103+
104+
class BackgroundMessageWriter:
105+
def __init__(self,
106+
working_directory: URI,
107+
dve_stage: DVEStage,
108+
key_fields: Optional[dict[str, list[str]]] = None,
109+
logger: Optional[logging.Logger] = None):
110+
self._working_directory = working_directory
111+
self._dve_stage = dve_stage
112+
self._feedback_message_uri = get_feedback_errors_uri(self._working_directory, self._dve_stage)
113+
self._key_fields = key_fields
114+
self.logger = logger or get_logger(type(self).__name__)
115+
self._write_thread = None
116+
self._queue = Queue()
117+
118+
@property
119+
def write_queue(self):
120+
return self._queue
121+
122+
@property
123+
def write_thread(self):
124+
if not self._write_thread:
125+
self._write_thread = Thread(target=self._write_process_wrapper)
126+
return self._write_thread
127+
128+
129+
def _write_process_wrapper(self):
130+
"""Wrapper for dump feedback errors to run in background process"""
131+
while True:
132+
if msgs := self.write_queue.get():
133+
dump_feedback_errors(self._working_directory, self._dve_stage, msgs, self._key_fields)
134+
else:
135+
break
136+
137+
def __enter__(self) -> "BackgroundMessageWriter":
138+
self.write_thread.start()
139+
return self
140+
141+
def __exit__(self, exc_type, exc_value, traceback):
142+
if exc_type:
143+
self.logger.exception(
144+
"Issue occured during background write process:",
145+
exc_info=(exc_type, exc_value, traceback)
146+
)
147+
self.write_queue.put(None)
148+
self.write_thread.join()
149+
150+
151+
152+
def write_process_wrapper(working_directory: URI, *, queue: Queue, key_fields: Optional[dict[str, list[str]]] = None):
153+
"""Wrapper for dump feedback errors to run in background process"""
154+
while True:
155+
if msgs := queue.get():
156+
dump_feedback_errors(fh.joinuri(working_directory, "data_contract"), msgs, key_fields)
157+
else:
158+
break
159+
160+
def conditional_cast(value, primary_keys: list[str], value_separator: str) -> Union[list[str], str]:
161+
"""Determines what to do with a value coming back from the error list"""
162+
if isinstance(value, list):
163+
casts = [
164+
conditional_cast(val, primary_keys, value_separator) for val in value
165+
] # type: ignore
166+
return value_separator.join(
167+
[f"{pk}: {id}" if pk else "" for pk, id in zip(primary_keys, casts)]
168+
)
169+
if isinstance(value, dt.date):
170+
return value.isoformat()
171+
if isinstance(value, dict):
172+
return ""
173+
return str(value)

src/dve/core_engine/backends/base/backend.py

Lines changed: 22 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
EntityParquetLocations,
2525
Messages,
2626
)
27+
from dve.parser.file_handling.service import get_parent, joinuri
2728

2829

2930
class BaseBackend(Generic[EntityType], ABC):
@@ -148,65 +149,66 @@ def convert_entities_to_spark(
148149

149150
def apply(
150151
self,
152+
working_dir: URI,
151153
entity_locations: EntityLocations,
152154
contract_metadata: DataContractMetadata,
153155
rule_metadata: RuleMetadata,
154156
submission_info: Optional[SubmissionInfo] = None,
155-
) -> tuple[Entities, Messages, StageSuccessful]:
157+
) -> tuple[Entities, URI, StageSuccessful]:
156158
"""Apply the data contract and the rules, returning the entities and all
157159
generated messages.
158160
159161
"""
160162
reference_data = self.load_reference_data(
161163
rule_metadata.reference_data_config, submission_info
162164
)
163-
entities, messages, successful = self.contract.apply(entity_locations, contract_metadata)
165+
entities, dc_feedback_errors_uri, successful, processing_errors_uri = self.contract.apply(working_dir, entity_locations, contract_metadata)
164166
if not successful:
165-
return entities, messages, successful
167+
return entities, dc_feedback_errors_uri, successful, processing_errors_uri
166168

167169
for entity_name, entity in entities.items():
168170
entities[entity_name] = self.step_implementations.add_row_id(entity)
169171

170172
# TODO: Handle entity manager creation errors.
171173
entity_manager = EntityManager(entities, reference_data)
172174
# TODO: Add stage success to 'apply_rules'
173-
rule_messages = self.step_implementations.apply_rules(entity_manager, rule_metadata)
174-
messages.extend(rule_messages)
175+
# TODO: In case of large errors in business rules, write messages to jsonl file and return uri to errors
176+
_ = self.step_implementations.apply_rules(entity_manager, rule_metadata)
175177

176178
for entity_name, entity in entity_manager.entities.items():
177179
entity_manager.entities[entity_name] = self.step_implementations.drop_row_id(entity)
178180

179-
return entity_manager.entities, messages, True
181+
return entity_manager.entities, get_parent(dc_feedback_errors_uri), True
180182

181183
def process(
182184
self,
185+
working_dir: URI,
183186
entity_locations: EntityLocations,
184187
contract_metadata: DataContractMetadata,
185188
rule_metadata: RuleMetadata,
186-
cache_prefix: URI,
187189
submission_info: Optional[SubmissionInfo] = None,
188-
) -> tuple[MutableMapping[EntityName, URI], Messages]:
190+
) -> tuple[MutableMapping[EntityName, URI], URI, URI]:
189191
"""Apply the data contract and the rules, write the entities out to parquet
190192
and returning the entity locations and all generated messages.
191193
192194
"""
193-
entities, messages, successful = self.apply(
194-
entity_locations, contract_metadata, rule_metadata, submission_info
195+
entities, feedback_errors_uri, successful, processing_errors_uri = self.apply(
196+
working_dir, entity_locations, contract_metadata, rule_metadata, submission_info
195197
)
196198
if successful:
197-
parquet_locations = self.write_entities_to_parquet(entities, cache_prefix)
199+
parquet_locations = self.write_entities_to_parquet(entities, joinuri(working_dir, "outputs"))
198200
else:
199201
parquet_locations = {}
200-
return parquet_locations, messages
202+
return parquet_locations, feedback_errors_uri, processing_errors_uri
201203

202204
def process_legacy(
203205
self,
206+
working_dir: URI,
204207
entity_locations: EntityLocations,
205208
contract_metadata: DataContractMetadata,
206209
rule_metadata: RuleMetadata,
207-
cache_prefix: URI,
208210
submission_info: Optional[SubmissionInfo] = None,
209-
) -> tuple[MutableMapping[EntityName, DataFrame], Messages]:
211+
) -> tuple[MutableMapping[EntityName, DataFrame], URI]:
210212
"""Apply the data contract and the rules, create Spark `DataFrame`s from the
211213
entities and return the Spark entities and all generated messages.
212214
@@ -221,17 +223,17 @@ def process_legacy(
221223
category=DeprecationWarning,
222224
)
223225

224-
entities, messages, successful = self.apply(
225-
entity_locations, contract_metadata, rule_metadata, submission_info
226+
entities, errors_uri, successful = self.apply(
227+
working_dir, entity_locations, contract_metadata, rule_metadata, submission_info
226228
)
227229

228230
if not successful:
229-
return {}, messages
231+
return {}, errors_uri
230232

231233
if self.__entity_type__ == DataFrame:
232-
return entities, messages # type: ignore
234+
return entities, errors_uri # type: ignore
233235

234236
return (
235-
self.convert_entities_to_spark(entities, cache_prefix, _emit_deprecation_warning=False),
236-
messages,
237+
self.convert_entities_to_spark(entities, joinuri(working_dir, "outputs"), _emit_deprecation_warning=False),
238+
errors_uri,
237239
)

src/dve/core_engine/backends/base/contract.py

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,10 @@
2727
Messages,
2828
WrapDecorator,
2929
)
30-
from dve.parser.file_handling import get_file_suffix, get_resource_exists
30+
from dve.parser.file_handling import get_file_suffix, get_resource_exists, get_parent
31+
from dve.parser.file_handling.service import joinuri
3132
from dve.parser.type_hints import Extension
33+
from dve.common.error_utils import dump_processing_errors, get_feedback_errors_uri, get_processing_errors_uri
3234

3335
T = TypeVar("T")
3436
ExtensionConfig = dict[Extension, "ReaderConfig"]
@@ -360,8 +362,8 @@ def read_raw_entities(
360362

361363
@abstractmethod
362364
def apply_data_contract(
363-
self, entities: Entities, contract_metadata: DataContractMetadata
364-
) -> tuple[Entities, Messages, StageSuccessful]:
365+
self, working_dir: URI, entities: Entities, entity_locations: EntityLocations, contract_metadata: DataContractMetadata, key_fields: Optional[dict[str, list[str]]] = None
366+
) -> tuple[Entities, URI, StageSuccessful]:
365367
"""Apply the data contract to the raw entities, returning the validated entities
366368
and any messages.
367369
@@ -371,35 +373,37 @@ def apply_data_contract(
371373
raise NotImplementedError()
372374

373375
def apply(
374-
self, entity_locations: EntityLocations, contract_metadata: DataContractMetadata
375-
) -> tuple[Entities, Messages, StageSuccessful]:
376+
self, working_dir: URI, entity_locations: EntityLocations, contract_metadata: DataContractMetadata, key_fields: Optional[dict[str, list[str]]] = None
377+
) -> tuple[Entities, URI, StageSuccessful, URI]:
376378
"""Read the entities from the provided locations according to the data contract,
377379
and return the validated entities and any messages.
378380
379381
"""
382+
feedback_errors_uri = get_feedback_errors_uri(working_dir, "data_contract")
383+
processing_errors_uri = get_processing_errors_uri(working_dir)
380384
entities, messages, successful = self.read_raw_entities(entity_locations, contract_metadata)
381385
if not successful:
382-
return {}, messages, successful
386+
dump_processing_errors(working_dir, "data_contract", messages)
387+
return {}, feedback_errors_uri, successful, processing_errors_uri
383388

384389
try:
385-
entities, contract_messages, successful = self.apply_data_contract(
386-
entities, contract_metadata
390+
entities, feedback_errors_uri, successful = self.apply_data_contract(
391+
working_dir, entities, entity_locations, contract_metadata, key_fields
387392
)
388-
messages.extend(contract_messages)
389393
except Exception as err: # pylint: disable=broad-except
390394
successful = False
391395
new_messages = render_error(
392396
err,
393397
"data contract",
394398
self.logger,
395399
)
396-
messages.extend(new_messages)
400+
dump_processing_errors(working_dir, "data_contract", new_messages)
397401

398402
if contract_metadata.cache_originals:
399403
for entity_name in list(entities):
400404
entities[f"Original{entity_name}"] = entities[entity_name]
401405

402-
return entities, messages, successful
406+
return entities, feedback_errors_uri, successful, processing_errors_uri
403407

404408
def read_parquet(self, path: URI, **kwargs) -> EntityType:
405409
"""Method to read parquet files from stringified parquet output

src/dve/core_engine/backends/base/utilities.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,12 @@
55
from collections.abc import Sequence
66
from typing import Optional
77

8+
import pyarrow
9+
import pyarrow.parquet as pq
10+
811
from dve.core_engine.message import FeedbackMessage
912
from dve.core_engine.type_hints import ExpressionArray, MultiExpression
13+
from dve.parser.type_hints import URI
1014

1115
BRACKETS = {"(": ")", "{": "}", "[": "]", "<": ">"}
1216
"""A mapping of opening brackets to their closing counterpart."""
@@ -131,3 +135,11 @@ def _get_non_heterogenous_type(types: Sequence[type]) -> type:
131135
+ f"union types (got {type_list!r}) but nullable types are okay"
132136
)
133137
return type_list[0]
138+
139+
def check_if_parquet_file(file_location: URI) -> bool:
140+
"""Check if a file path is valid parquet"""
141+
try:
142+
pq.ParquetFile(file_location)
143+
return True
144+
except (pyarrow.ArrowInvalid, pyarrow.ArrowIOError):
145+
return False

0 commit comments

Comments
 (0)