Skip to content

Commit 8aa4973

Browse files
refactor: allow passing of custom loggers into pipeline objects
1 parent 5c96472 commit 8aa4973

3 files changed

Lines changed: 14 additions & 1 deletion

File tree

src/dve/pipeline/duckdb_pipeline.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
"""DuckDB implementation for `Pipeline` object."""
22

3+
import logging
34
from typing import Optional
45

56
from duckdb import DuckDBPyConnection, DuckDBPyRelation
@@ -30,6 +31,7 @@ def __init__(
3031
submitted_files_path: Optional[URI],
3132
reference_data_loader: Optional[type[BaseRefDataLoader]] = None,
3233
job_run_id: Optional[int] = None,
34+
logger: Optional[logging.Logger] = None,
3335
):
3436
self._connection = connection
3537
super().__init__(
@@ -41,6 +43,7 @@ def __init__(
4143
submitted_files_path,
4244
reference_data_loader,
4345
job_run_id,
46+
logger,
4447
)
4548

4649
# pylint: disable=arguments-differ

src/dve/pipeline/pipeline.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
# pylint: disable=protected-access,too-many-instance-attributes,too-many-arguments,line-too-long
22
"""Generic Pipeline object to define how DVE should be interacted with."""
33
import json
4+
import logging
45
import re
56
from collections import defaultdict
67
from collections.abc import Generator, Iterable, Iterator
@@ -57,6 +58,7 @@ def __init__(
5758
submitted_files_path: Optional[URI],
5859
reference_data_loader: Optional[type[BaseRefDataLoader]] = None,
5960
job_run_id: Optional[int] = None,
61+
logger: Optional[logging.Logger] = None,
6062
):
6163
self._submitted_files_path = submitted_files_path
6264
self._processed_files_path = processed_files_path
@@ -66,11 +68,16 @@ def __init__(
6668
self._audit_tables = audit_tables
6769
self._data_contract = data_contract
6870
self._step_implementations = step_implementations
69-
self._logger = get_logger(__name__)
71+
self._logger = logger or get_logger(__name__)
7072
self._summary_lock = Lock()
7173
self._rec_tracking_lock = Lock()
7274
self._aggregates_lock = Lock()
7375

76+
if self._data_contract:
77+
self._data_contract.logger = self._logger
78+
if self._step_implementations:
79+
self._step_implementations.logger = self._logger
80+
7481
@property
7582
def job_run_id(self) -> Optional[int]:
7683
"""Unique Identifier for the job/process that is running this Pipeline."""

src/dve/pipeline/spark_pipeline.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
"""Spark implementation for `Pipeline` object."""
22

3+
import logging
34
from concurrent.futures import Executor
45
from typing import Optional
56

@@ -32,6 +33,7 @@ def __init__(
3233
reference_data_loader: Optional[type[BaseRefDataLoader]] = None,
3334
spark: Optional[SparkSession] = None,
3435
job_run_id: Optional[int] = None,
36+
logger: Optional[logging.Logger] = None,
3537
):
3638
self._spark = spark if spark else SparkSession.builder.getOrCreate()
3739
super().__init__(
@@ -43,6 +45,7 @@ def __init__(
4345
submitted_files_path,
4446
reference_data_loader,
4547
job_run_id,
48+
logger,
4649
)
4750

4851
# pylint: disable=arguments-differ

0 commit comments

Comments
 (0)