Skip to content

Commit c64da59

Browse files
committed
refactor: Modified business rule step to write feedback messages in batches to increase tolerance to large files with large numbers of validation errors
1 parent 233e18f commit c64da59

3 files changed

Lines changed: 226 additions & 104 deletions

File tree

src/dve/core_engine/backends/base/backend.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -170,7 +170,7 @@ def apply(
170170
# TODO: Add stage success to 'apply_rules'
171171
# TODO: In case of large errors in business rules, write messages to jsonl file
172172
# TODO: and return uri to errors
173-
_ = self.step_implementations.apply_rules(entity_manager, rule_metadata)
173+
_ = self.step_implementations.apply_rules(working_dir, entity_manager, rule_metadata)
174174

175175
for entity_name, entity in entity_manager.entities.items():
176176
entity_manager.entities[entity_name] = self.step_implementations.drop_row_id(entity)

src/dve/core_engine/backends/base/rules.py

Lines changed: 216 additions & 94 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,12 @@
99

1010
from typing_extensions import Literal, Protocol, get_type_hints
1111

12+
from dve.common.error_utils import (
13+
BackgroundMessageWriter,
14+
dump_feedback_errors,
15+
dump_processing_errors,
16+
get_feedback_errors_uri,
17+
)
1218
from dve.core_engine.backends.base.core import get_entity_type
1319
from dve.core_engine.backends.exceptions import render_error
1420
from dve.core_engine.backends.metadata.rules import (
@@ -37,6 +43,7 @@
3743
TableUnion,
3844
)
3945
from dve.core_engine.backends.types import Entities, EntityType, StageSuccessful
46+
from dve.core_engine.exceptions import CriticalProcessingError
4047
from dve.core_engine.loggers import get_logger
4148
from dve.core_engine.type_hints import URI, EntityName, Messages, TemplateVariables
4249

@@ -343,9 +350,14 @@ def notify(self, entities: Entities, *, config: Notification) -> Messages:
343350
344351
"""
345352

353+
# pylint: disable=R0912,R0914
346354
def apply_sync_filters(
347-
self, entities: Entities, *filters: DeferredFilter
348-
) -> tuple[Messages, StageSuccessful]:
355+
self,
356+
working_directory: URI,
357+
entities: Entities,
358+
*filters: DeferredFilter,
359+
key_fields: Optional[dict[str, list[str]]] = None,
360+
) -> tuple[URI, StageSuccessful]:
349361
"""Apply the synchronised filters, emitting appropriate error messages for any
350362
records which do not meet the conditions.
351363
@@ -355,108 +367,178 @@ def apply_sync_filters(
355367
356368
"""
357369
filters_by_entity: dict[EntityName, list[DeferredFilter]] = defaultdict(list)
370+
feedback_errors_uri = get_feedback_errors_uri(working_directory, "business_rules")
358371
for rule in filters:
359372
filters_by_entity[rule.entity_name].append(rule)
360373

361-
messages: Messages = []
362-
for entity_name, filter_rules in filters_by_entity.items():
363-
entity = entities[entity_name]
364-
365-
filter_column_names: list[str] = []
366-
unmodified_entities = {entity_name: entity}
367-
modified_entities = {entity_name: entity}
368-
369-
for rule in filter_rules:
370-
if rule.reporting.emit == "record_failure":
371-
column_name = f"filter_{uuid4().hex}"
372-
filter_column_names.append(column_name)
373-
temp_messages, success = self.evaluate(
374-
modified_entities,
375-
config=ColumnAddition(
376-
entity_name=entity_name,
377-
column_name=column_name,
378-
expression=rule.expression,
379-
parent=rule.parent,
380-
),
381-
)
382-
messages.extend(temp_messages)
383-
if not success:
384-
return messages, False
385-
386-
temp_messages, success = self.evaluate(
387-
modified_entities,
388-
config=Notification(
389-
entity_name=entity_name,
390-
expression=f"NOT {column_name}",
391-
excluded_columns=filter_column_names,
392-
reporting=rule.reporting,
393-
parent=rule.parent,
394-
),
395-
)
396-
messages.extend(temp_messages)
397-
if not success:
398-
return messages, False
399-
400-
else:
401-
temp_messages, success = self.evaluate(
402-
unmodified_entities,
403-
config=Notification(
404-
entity_name=entity_name,
405-
expression=f"NOT ({rule.expression})",
406-
reporting=rule.reporting,
407-
parent=rule.parent,
408-
),
374+
with BackgroundMessageWriter(
375+
working_directory=working_directory,
376+
dve_stage="business_rules",
377+
key_fields=key_fields,
378+
logger=self.logger,
379+
) as msg_writer:
380+
for entity_name, filter_rules in filters_by_entity.items():
381+
entity = entities[entity_name]
382+
383+
filter_column_names: list[str] = []
384+
unmodified_entities = {entity_name: entity}
385+
modified_entities = {entity_name: entity}
386+
387+
for rule in filter_rules:
388+
if rule.reporting.emit == "record_failure":
389+
column_name = f"filter_{uuid4().hex}"
390+
filter_column_names.append(column_name)
391+
temp_messages, success = self.evaluate(
392+
modified_entities,
393+
config=ColumnAddition(
394+
entity_name=entity_name,
395+
column_name=column_name,
396+
expression=rule.expression,
397+
parent=rule.parent,
398+
),
399+
)
400+
if not success:
401+
processing_errors_uri = dump_processing_errors(
402+
working_directory,
403+
"business_rules",
404+
[
405+
CriticalProcessingError(
406+
"Issue occurred while applying filter logic",
407+
messages=temp_messages,
408+
)
409+
],
410+
)
411+
return processing_errors_uri, False
412+
if temp_messages:
413+
msg_writer.write_queue.put(temp_messages)
414+
415+
temp_messages, success = self.evaluate(
416+
modified_entities,
417+
config=Notification(
418+
entity_name=entity_name,
419+
expression=f"NOT {column_name}",
420+
excluded_columns=filter_column_names,
421+
reporting=rule.reporting,
422+
parent=rule.parent,
423+
),
424+
)
425+
if not success:
426+
processing_errors_uri = dump_processing_errors(
427+
working_directory,
428+
"business_rules",
429+
[
430+
CriticalProcessingError(
431+
"Issue occurred while generating FeedbackMessages",
432+
messages=temp_messages,
433+
)
434+
],
435+
)
436+
return processing_errors_uri, False
437+
if temp_messages:
438+
msg_writer.write_queue.put(temp_messages)
439+
440+
else:
441+
temp_messages, success = self.evaluate(
442+
unmodified_entities,
443+
config=Notification(
444+
entity_name=entity_name,
445+
expression=f"NOT ({rule.expression})",
446+
reporting=rule.reporting,
447+
parent=rule.parent,
448+
),
449+
)
450+
if not success:
451+
processing_errors_uri = dump_processing_errors(
452+
working_directory,
453+
"business_rules",
454+
[
455+
CriticalProcessingError(
456+
"Issue occurred while generating FeedbackMessages",
457+
messages=temp_messages,
458+
)
459+
],
460+
)
461+
return processing_errors_uri, False
462+
if temp_messages:
463+
msg_writer.write_queue.put(temp_messages)
464+
465+
if filter_column_names:
466+
success_condition = " AND ".join(
467+
[f"({c_name} IS NOT NULL AND {c_name})" for c_name in filter_column_names]
409468
)
410-
messages.extend(temp_messages)
411-
if not success:
412-
return messages, False
413-
414-
if filter_column_names:
415-
success_condition = " AND ".join(
416-
[f"({c_name} IS NOT NULL AND {c_name})" for c_name in filter_column_names]
417-
)
418-
temp_messages, success = self.evaluate(
419-
modified_entities,
420-
config=ImmediateFilter(
421-
entity_name=entity_name,
422-
expression=success_condition,
423-
parent=ParentMetadata(
424-
rule="FilterStageRecordLevelFilterApplication", index=0, stage="Sync"
425-
),
426-
),
427-
)
428-
messages.extend(temp_messages)
429-
if not success:
430-
return messages, False
431-
432-
for index, filter_column_name in enumerate(filter_column_names):
433469
temp_messages, success = self.evaluate(
434470
modified_entities,
435-
config=ColumnRemoval(
471+
config=ImmediateFilter(
436472
entity_name=entity_name,
437-
column_name=filter_column_name,
473+
expression=success_condition,
438474
parent=ParentMetadata(
439-
rule="FilterStageRecordLevelFilterColumnRemoval",
440-
index=index,
475+
rule="FilterStageRecordLevelFilterApplication",
476+
index=0,
441477
stage="Sync",
442478
),
443479
),
444480
)
445-
messages.extend(temp_messages)
446481
if not success:
447-
return messages, False
448-
449-
entities.update(modified_entities)
450-
451-
return messages, True
452-
453-
def apply_rules(self, entities: Entities, rule_metadata: RuleMetadata) -> Messages:
482+
processing_errors_uri = dump_processing_errors(
483+
working_directory,
484+
"business_rules",
485+
[
486+
CriticalProcessingError(
487+
"Issue occurred while filtering error records",
488+
messages=temp_messages,
489+
)
490+
],
491+
)
492+
return processing_errors_uri, False
493+
if temp_messages:
494+
msg_writer.write_queue.put(temp_messages)
495+
496+
for index, filter_column_name in enumerate(filter_column_names):
497+
temp_messages, success = self.evaluate(
498+
modified_entities,
499+
config=ColumnRemoval(
500+
entity_name=entity_name,
501+
column_name=filter_column_name,
502+
parent=ParentMetadata(
503+
rule="FilterStageRecordLevelFilterColumnRemoval",
504+
index=index,
505+
stage="Sync",
506+
),
507+
),
508+
)
509+
if not success:
510+
processing_errors_uri = dump_processing_errors(
511+
working_directory,
512+
"business_rules",
513+
[
514+
CriticalProcessingError(
515+
"Issue occurred while generating FeedbackMessages",
516+
messages=temp_messages,
517+
)
518+
],
519+
)
520+
return processing_errors_uri, False
521+
if temp_messages:
522+
msg_writer.write_queue.put(temp_messages)
523+
524+
entities.update(modified_entities)
525+
526+
return feedback_errors_uri, True
527+
528+
def apply_rules(
529+
self,
530+
working_directory: URI,
531+
entities: Entities,
532+
rule_metadata: RuleMetadata,
533+
key_fields: Optional[dict[str, list[str]]] = None,
534+
) -> tuple[URI, bool]:
454535
"""Create rule definitions from the metadata for a given dataset and evaluate
455536
the impact on the provided entities, returning a deque of messages and
456537
altering the entities in-place.
457538
458539
"""
459540
rules_and_locals: Iterable[tuple[Rule, TemplateVariables]]
541+
errors_uri = get_feedback_errors_uri(working_directory, "business_rules")
460542
if rule_metadata.templating_strategy == "upfront":
461543
rules_and_locals = []
462544
for rule, local_variables in rule_metadata:
@@ -471,7 +553,7 @@ def apply_rules(self, entities: Entities, rule_metadata: RuleMetadata) -> Messag
471553
else:
472554
rules_and_locals = rule_metadata
473555

474-
messages: Messages = []
556+
pre_sync_messages: Messages = []
475557
for rule, local_variables in rules_and_locals:
476558
for step in rule.pre_sync_steps:
477559
if rule_metadata.templating_strategy == "runtime":
@@ -480,9 +562,27 @@ def apply_rules(self, entities: Entities, rule_metadata: RuleMetadata) -> Messag
480562
)
481563

482564
stage_messages, success = self.evaluate(entities, config=step)
483-
messages.extend(stage_messages)
565+
# if failure, write out processing issues and all prior messages (so nothing lost)
484566
if not success:
485-
return messages
567+
processing_errors_uri = dump_processing_errors(
568+
working_directory,
569+
"business_rules",
570+
[
571+
CriticalProcessingError(
572+
"Issue occurred while applying pre filter steps",
573+
messages=stage_messages,
574+
)
575+
],
576+
)
577+
if pre_sync_messages:
578+
dump_feedback_errors(working_directory, "business_rules", pre_sync_messages)
579+
580+
return processing_errors_uri, False
581+
# if not a failure, ensure we keep track of any informational messages
582+
pre_sync_messages.extend(stage_messages)
583+
# if all successful, ensure we write out all informational messages
584+
if pre_sync_messages:
585+
dump_feedback_errors(working_directory, "business_rules", pre_sync_messages)
486586

487587
sync_steps = []
488588
for rule, local_variables in rules_and_locals:
@@ -493,11 +593,14 @@ def apply_rules(self, entities: Entities, rule_metadata: RuleMetadata) -> Messag
493593
)
494594
sync_steps.append(step)
495595

496-
stage_messages, success = self.apply_sync_filters(entities, *sync_steps)
497-
messages.extend(stage_messages)
596+
# error writing handled in apply_sync_filters
597+
errors_uri, success = self.apply_sync_filters(
598+
working_directory, entities, *sync_steps, key_fields=key_fields
599+
)
498600
if not success:
499-
return messages
601+
return errors_uri, False
500602

603+
post_sync_messages: Messages = []
501604
for rule, local_variables in rules_and_locals:
502605
for step in rule.post_sync_steps:
503606
if rule_metadata.templating_strategy == "runtime":
@@ -506,10 +609,29 @@ def apply_rules(self, entities: Entities, rule_metadata: RuleMetadata) -> Messag
506609
)
507610

508611
stage_messages, success = self.evaluate(entities, config=step)
509-
messages.extend(stage_messages)
510612
if not success:
511-
return messages
512-
return messages
613+
processing_errors_uri = dump_processing_errors(
614+
working_directory,
615+
"business_rules",
616+
[
617+
CriticalProcessingError(
618+
"Issue occurred while applying post filter steps",
619+
messages=stage_messages,
620+
)
621+
],
622+
)
623+
if post_sync_messages:
624+
dump_feedback_errors(
625+
working_directory, "business_rules", post_sync_messages
626+
)
627+
628+
return processing_errors_uri, False
629+
# if not a failure, ensure we keep track of any informational messages
630+
post_sync_messages.extend(stage_messages)
631+
# if all successful, ensure we write out all informational messages
632+
if post_sync_messages:
633+
dump_feedback_errors(working_directory, "business_rules", post_sync_messages)
634+
return errors_uri, True
513635

514636
def read_parquet(self, path: URI, **kwargs) -> EntityType:
515637
"""Method to read parquet files"""

0 commit comments

Comments
 (0)