Skip to content

Commit fc120b1

Browse files
committed
refactor: merging logging additions from release branch
2 parents 141e1b3 + aaea473 commit fc120b1

4 files changed

Lines changed: 38 additions & 2 deletions

File tree

src/dve/core_engine/backends/base/rules.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -378,13 +378,15 @@ def apply_sync_filters(
378378
logger=self.logger,
379379
) as msg_writer:
380380
for entity_name, filter_rules in filters_by_entity.items():
381+
self.logger.info(f"Applying filters to {entity_name}")
381382
entity = entities[entity_name]
382383

383384
filter_column_names: list[str] = []
384385
unmodified_entities = {entity_name: entity}
385386
modified_entities = {entity_name: entity}
386387

387388
for rule in filter_rules:
389+
self.logger.info(f"Applying filter {rule.reporting.code}")
388390
if rule.reporting.emit == "record_failure":
389391
column_name = f"filter_{uuid4().hex}"
390392
filter_column_names.append(column_name)
@@ -436,6 +438,9 @@ def apply_sync_filters(
436438
return processing_errors_uri, False
437439
if temp_messages:
438440
msg_writer.write_queue.put(temp_messages)
441+
self.logger.info(
442+
f"Filter {rule.reporting.code} found {len(temp_messages)} issues"
443+
)
439444

440445
else:
441446
temp_messages, success = self.evaluate(
@@ -461,8 +466,15 @@ def apply_sync_filters(
461466
return processing_errors_uri, False
462467
if temp_messages:
463468
msg_writer.write_queue.put(temp_messages)
469+
470+
self.logger.info(
471+
f"Filter {rule.reporting.code} found {len(temp_messages)} issues"
472+
)
464473

465474
if filter_column_names:
475+
self.logger.info(
476+
f"Filtering records from entity {entity_name} for error code {rule.reporting.code}" # pylint: disable=line-too-long
477+
)
466478
success_condition = " AND ".join(
467479
[f"({c_name} IS NOT NULL AND {c_name})" for c_name in filter_column_names]
468480
)
@@ -537,6 +549,7 @@ def apply_rules(
537549
altering the entities in-place.
538550
539551
"""
552+
self.logger.info("Applying business rules")
540553
rules_and_locals: Iterable[tuple[Rule, TemplateVariables]]
541554
errors_uri = get_feedback_errors_uri(working_directory, "business_rules")
542555
if rule_metadata.templating_strategy == "upfront":
@@ -554,6 +567,7 @@ def apply_rules(
554567
rules_and_locals = rule_metadata
555568

556569
pre_sync_messages: Messages = []
570+
self.logger.info("Applying pre-sync steps")
557571
for rule, local_variables in rules_and_locals:
558572
for step in rule.pre_sync_steps:
559573
if rule_metadata.templating_strategy == "runtime":
@@ -601,6 +615,8 @@ def apply_rules(
601615
return errors_uri, False
602616

603617
post_sync_messages: Messages = []
618+
self.logger.info("Applying post-sync steps")
619+
604620
for rule, local_variables in rules_and_locals:
605621
for step in rule.post_sync_steps:
606622
if rule_metadata.templating_strategy == "runtime":

src/dve/core_engine/backends/implementations/duckdb/contract.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -163,10 +163,16 @@ def apply_data_contract(
163163
)
164164

165165
batches = pq.ParquetFile(entity_locations[entity_name]).iter_batches(10000)
166+
msg_count = 0
166167
with Pool(8) as pool:
167168
for msgs in pool.imap_unordered(row_validator_helper, batches):
168169
if msgs:
169170
msg_writer.write_queue.put(msgs)
171+
msg_count += len(msgs)
172+
173+
self.logger.info(
174+
f"Data contract found {msg_count} issues in {entity_name}"
175+
)
170176

171177
casting_statements = [
172178
(

src/dve/core_engine/backends/implementations/spark/contract.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,11 +129,14 @@ def apply_data_contract(
129129
working_dir, "data_contract", key_fields, self.logger
130130
) as msg_writer:
131131
messages = validated.flatMap(lambda row: row[1]).filter(bool).toLocalIterator()
132+
msg_count = 0
132133
while True:
133134
batch = list(islice(messages, 10000))
134135
if not batch:
135136
break
136137
msg_writer.write_queue.put(batch)
138+
msg_count += len(batch)
139+
self.logger.info(f"Data contract found {msg_count} issues in {entity_name}")
137140

138141
try:
139142
record_df = record_df.select(

src/dve/pipeline/pipeline.py

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -195,6 +195,7 @@ def write_file_to_parquet(
195195
errors = []
196196

197197
for model_name, model in models.items():
198+
self._logger.info(f"Transforming {model_name} to stringified parquet")
198199
reader: BaseFileReader = load_reader(dataset, model_name, ext)
199200
try:
200201
if not entity_type:
@@ -235,6 +236,7 @@ def audit_received_file_step(
235236
self, pool: ThreadPoolExecutor, submitted_files: Iterable[tuple[FileURI, InfoURI]]
236237
) -> tuple[list[SubmissionInfo], list[SubmissionInfo]]:
237238
"""Set files as being received and mark them for file transformation"""
239+
self._logger.info("Starting audit received file service")
238240
audit_received_futures: list[tuple[str, FileURI, Future]] = []
239241
for submission_file in submitted_files:
240242
data_uri, metadata_uri = submission_file
@@ -296,7 +298,7 @@ def file_transformation(
296298
"""Transform a file from its original format into a 'stringified' parquet file"""
297299
if not self.processed_files_path:
298300
raise AttributeError("processed files path not provided")
299-
301+
self._logger.info(f"Applying file transformation to {submission_info.submission_id}")
300302
errors: list[FeedbackMessage] = []
301303
submission_status: SubmissionStatus = SubmissionStatus()
302304
submission_file_uri: URI = fh.joinuri(
@@ -331,6 +333,7 @@ def file_transformation_step(
331333
list[tuple[SubmissionInfo, SubmissionStatus]], list[tuple[SubmissionInfo, SubmissionStatus]]
332334
]:
333335
"""Step to transform files from their original format into parquet files"""
336+
self._logger.info("Starting file transformation service")
334337
file_transform_futures: list[tuple[SubmissionInfo, Future]] = []
335338

336339
for submission_info in submissions_to_process:
@@ -402,6 +405,7 @@ def apply_data_contract(
402405
self, submission_info: SubmissionInfo, submission_status: Optional[SubmissionStatus] = None
403406
) -> tuple[SubmissionInfo, SubmissionStatus]:
404407
"""Method for applying the data contract given a submission_info"""
408+
self._logger.info(f"Applying data contract to {submission_info.submission_id}")
405409
if not submission_status:
406410
submission_status = self.get_submission_status(
407411
"data_contract", submission_info.submission_id
@@ -456,6 +460,7 @@ def data_contract_step(
456460
list[tuple[SubmissionInfo, SubmissionStatus]], list[tuple[SubmissionInfo, SubmissionStatus]]
457461
]:
458462
"""Step to validate the types of an untyped (stringly typed) parquet file"""
463+
self._logger.info("Starting data contract service")
459464
processed_files: list[tuple[SubmissionInfo, SubmissionStatus]] = []
460465
failed_processing: list[tuple[SubmissionInfo, SubmissionStatus]] = []
461466
dc_futures: list[tuple[SubmissionInfo, SubmissionStatus, Future]] = []
@@ -523,6 +528,7 @@ def apply_business_rules(
523528
"""Apply the business rules to a given submission, the submission may have failed at the
524529
data_contract step so this should be passed in as a bool
525530
"""
531+
self._logger.info(f"Applying business rules to {submission_info.submission_id}")
526532
if not submission_status:
527533
submission_status = self.get_submission_status(
528534
"business_rules", submission_info.submission_id
@@ -613,6 +619,7 @@ def business_rule_step(
613619
list[tuple[SubmissionInfo, SubmissionStatus]],
614620
]:
615621
"""Step to apply business rules (Step impl) to a typed parquet file"""
622+
self._logger.info("Starting business rules service")
616623
future_files: list[tuple[SubmissionInfo, SubmissionStatus, Future]] = []
617624

618625
for submission_info, submission_status in files:
@@ -754,7 +761,7 @@ def error_report(
754761
SubmissionInfo, SubmissionStatus, Optional[SubmissionStatisticsRecord], Optional[URI]
755762
]:
756763
"""Creates the error reports given a submission info and submission status"""
757-
764+
self._logger.info(f"Generating error report for {submission_info.submission_id}")
758765
if not submission_status:
759766
submission_status = self.get_submission_status(
760767
"error_report", submission_info.submission_id
@@ -763,6 +770,7 @@ def error_report(
763770
if not self.processed_files_path:
764771
raise AttributeError("processed files path not provided")
765772

773+
self._logger.info("Reading error dataframes")
766774
errors_df, aggregates = self._get_error_dataframes(submission_info.submission_id)
767775

768776
if not submission_status.number_of_records:
@@ -801,9 +809,11 @@ def error_report(
801809
"error_reports",
802810
f"{submission_info.file_name}_{submission_info.file_extension.strip('.')}.xlsx",
803811
)
812+
self._logger.info("Writing error report")
804813
with fh.open_stream(report_uri, "wb") as stream:
805814
stream.write(er.ExcelFormat.convert_to_bytes(workbook))
806815

816+
self._logger.info("Publishing error aggregates")
807817
self._publish_error_aggregates(submission_info.submission_id, aggregates)
808818

809819
return submission_info, submission_status, sub_stats, report_uri
@@ -819,6 +829,7 @@ def error_report_step(
819829
"""Step to produce error reports
820830
takes processed files and files that failed file transformation
821831
"""
832+
self._logger.info("Starting error reports service")
822833
futures: list[tuple[SubmissionInfo, SubmissionStatus, Future]] = []
823834
reports: list[
824835
tuple[SubmissionInfo, SubmissionStatus, Union[None, SubmissionStatisticsRecord], URI]

0 commit comments

Comments
 (0)