Skip to content

Commit 17b96b3

Browse files
committed
style: Additional logging around file transformation, business rules and error reports
1 parent 6f6d218 commit 17b96b3

2 files changed

Lines changed: 13 additions & 1 deletion

File tree

src/dve/core_engine/backends/base/rules.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -360,13 +360,15 @@ def apply_sync_filters(
360360

361361
messages: Messages = []
362362
for entity_name, filter_rules in filters_by_entity.items():
363+
self.logger.info(f"Applying filters to {entity_name}")
363364
entity = entities[entity_name]
364365

365366
filter_column_names: list[str] = []
366367
unmodified_entities = {entity_name: entity}
367368
modified_entities = {entity_name: entity}
368369

369370
for rule in filter_rules:
371+
self.logger.info(f"Applying filter {rule.reporting.code}")
370372
if rule.reporting.emit == "record_failure":
371373
column_name = f"filter_{uuid4().hex}"
372374
filter_column_names.append(column_name)
@@ -412,6 +414,7 @@ def apply_sync_filters(
412414
return messages, False
413415

414416
if filter_column_names:
417+
self.logger.info(f"Filtering records where validation is record level")
415418
success_condition = " AND ".join(
416419
[f"({c_name} IS NOT NULL AND {c_name})" for c_name in filter_column_names]
417420
)
@@ -456,6 +459,7 @@ def apply_rules(self, entities: Entities, rule_metadata: RuleMetadata) -> Messag
456459
altering the entities in-place.
457460
458461
"""
462+
self.logger.info("Applying business rules")
459463
rules_and_locals: Iterable[tuple[Rule, TemplateVariables]]
460464
if rule_metadata.templating_strategy == "upfront":
461465
rules_and_locals = []
@@ -472,6 +476,8 @@ def apply_rules(self, entities: Entities, rule_metadata: RuleMetadata) -> Messag
472476
rules_and_locals = rule_metadata
473477

474478
messages: Messages = []
479+
480+
self.logger.info("Applying pre-sync steps")
475481
for rule, local_variables in rules_and_locals:
476482
for step in rule.pre_sync_steps:
477483
if rule_metadata.templating_strategy == "runtime":
@@ -498,6 +504,8 @@ def apply_rules(self, entities: Entities, rule_metadata: RuleMetadata) -> Messag
498504
if not success:
499505
return messages
500506

507+
self.logger.info("Applying post-sync steps")
508+
501509
for rule, local_variables in rules_and_locals:
502510
for step in rule.post_sync_steps:
503511
if rule_metadata.templating_strategy == "runtime":

src/dve/pipeline/pipeline.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -190,6 +190,7 @@ def write_file_to_parquet(
190190
errors = []
191191

192192
for model_name, model in models.items():
193+
self._logger.info(f"Transforming {model_name} to stringly typed parquet")
193194
reader: BaseFileReader = load_reader(dataset, model_name, ext)
194195
try:
195196
if not entity_type:
@@ -747,7 +748,7 @@ def error_report(
747748
SubmissionInfo, SubmissionStatus, Optional[SubmissionStatisticsRecord], Optional[URI]
748749
]:
749750
"""Creates the error reports given a submission info and submission status"""
750-
751+
self._logger.info("Generating error report")
751752
if not submission_status:
752753
submission_status = self.get_submission_status(
753754
"error_report", submission_info.submission_id
@@ -756,6 +757,7 @@ def error_report(
756757
if not self.processed_files_path:
757758
raise AttributeError("processed files path not provided")
758759

760+
self._logger.info("Reading error dataframes")
759761
errors_df, aggregates = self._get_error_dataframes(submission_info.submission_id)
760762

761763
if not submission_status.number_of_records:
@@ -794,9 +796,11 @@ def error_report(
794796
"error_reports",
795797
f"{submission_info.file_name}_{submission_info.file_extension.strip('.')}.xlsx",
796798
)
799+
self._logger.info("Writing error report")
797800
with fh.open_stream(report_uri, "wb") as stream:
798801
stream.write(er.ExcelFormat.convert_to_bytes(workbook))
799802

803+
self._logger.info("Publishing error aggregates")
800804
self._publish_error_aggregates(submission_info.submission_id, aggregates)
801805

802806
return submission_info, submission_status, sub_stats, report_uri

0 commit comments

Comments
 (0)