Skip to content

Commit a4d49b6

Browse files
committed
style: address formatting, linting and type checking issues
1 parent adb23de commit a4d49b6

10 files changed

Lines changed: 56 additions & 54 deletions

File tree

src/dve/core_engine/backends/base/backend.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
import logging
44
import warnings
55
from abc import ABC, abstractmethod
6-
from collections.abc import Mapping, MutableMapping
6+
from collections.abc import MutableMapping
77
from typing import Any, ClassVar, Generic, Optional
88

99
from pyspark.sql import DataFrame, SparkSession
@@ -67,6 +67,7 @@ def load_reference_data(
6767
reference_entity_config: dict[EntityName, ReferenceConfigUnion],
6868
submission_info: Optional[SubmissionInfo],
6969
) -> BaseRefDataLoader[EntityType]:
70+
"""Supply configured reference data loader for use with business rules"""
7071
raise NotImplementedError()
7172

7273
@abstractmethod

src/dve/core_engine/backends/base/reference_data.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -208,8 +208,8 @@ def __getitem__(self, key: EntityName) -> EntityType:
208208
try:
209209
config = self.reference_entity_config[key]
210210
return self.load_entity(entity_name=key, config=config)
211-
except TypeError:
212-
raise NoRefDataConfigSupplied()
211+
except TypeError as err:
212+
raise NoRefDataConfigSupplied() from err
213213
except Exception as err:
214214
raise MissingRefDataEntity(entity_name=key) from err
215215

src/dve/core_engine/backends/exceptions.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -118,9 +118,10 @@ def get_message_preamble(self) -> str:
118118
"""
119119
return f"Missing reference data entity {self.entity_name!r}"
120120

121+
121122
class NoRefDataConfigSupplied(BackendError):
122123
"""An error raised when trying to load a refdata entity when no refdata
123-
config has been supplied.
124+
config has been supplied.
124125
125126
"""
126127

@@ -129,7 +130,7 @@ def __init__(self, *args: object) -> None:
129130

130131
def get_message_preamble(self) -> EntityName:
131132
"""Message for logging purposes"""
132-
return f"Refdata loader not supplied with refdata config - unable to load refdata entities"
133+
return "Refdata loader not supplied with refdata config - unable to load refdata entities"
133134

134135

135136
class ConstraintError(ValueError, BackendErrorMixin):

src/dve/core_engine/backends/implementations/duckdb/duckdb_helpers.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -411,7 +411,7 @@ def get_duckdb_cast_statement_from_annotation(
411411
stmt = rf"CASE WHEN REGEXP_MATCHES(TRIM({quoted_name}), '{date_regex}') THEN TRY_CAST(TRIM({quoted_name}) as DATE) ELSE NULL END" # pylint: disable=C0301
412412
return stmt
413413
if issubclass(type_, time):
414-
stmt = rf"CASE WHEN REGEXP_MATCHES(TRIM({quoted_name}), '{time_regex}') THEN TRY_CAST(TRIM({quoted_name}) as TIME) ELSE NULL END" # pylint: disable=C0301
414+
stmt = rf"CASE WHEN REGEXP_MATCHES(TRIM({quoted_name}), '{time_regex}') THEN TRY_CAST(TRIM({quoted_name}) as TIME) ELSE NULL END" # pylint: disable=C0301
415415
return stmt
416416
duck_type = get_duckdb_type_from_annotation(type_)
417417
if duck_type:

src/dve/core_engine/backends/implementations/duckdb/reference_data.py

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,11 @@
11
"""A reference data loader for duckdb."""
22

3-
from typing import Optional
4-
53
from duckdb import DuckDBPyConnection, DuckDBPyRelation
64
from pyarrow import ipc # type: ignore
75

86
from dve.core_engine.backends.base.reference_data import (
97
BaseRefDataLoader,
108
ReferenceConfig,
11-
ReferenceConfigUnion,
129
ReferenceTable,
1310
mark_refdata_file_extension,
1411
)
@@ -19,18 +16,18 @@
1916
# pylint: disable=too-few-public-methods
2017
class DuckDBRefDataLoader(BaseRefDataLoader[DuckDBPyRelation]):
2118
"""A reference data loader using already existing DuckDB tables.
22-
reference_entity_config and dataset_config_uri (if config uses relative paths)
23-
should be supplied using setter methods for the dataset being processed before running."""
19+
reference_entity_config and dataset_config_uri (if config uses relative paths)
20+
should be supplied using setter methods for the dataset being processed before running."""
2421

2522
def __init__(
2623
self,
2724
connection: DuckDBPyConnection,
2825
reference_data_config: dict[EntityName, ReferenceConfig],
2926
dataset_config_uri: URI,
30-
**kwargs
27+
**kwargs,
3128
) -> None:
32-
super().__init__(reference_data_config, dataset_config_uri,**kwargs)
33-
29+
super().__init__(reference_data_config, dataset_config_uri, **kwargs)
30+
3431
self.connection = connection
3532

3633
if not self.connection:

src/dve/core_engine/backends/implementations/spark/backend.py

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
from dve.core_engine.loggers import get_child_logger, get_logger
1818
from dve.core_engine.models import SubmissionInfo
1919
from dve.core_engine.type_hints import URI, EntityName, EntityParquetLocations
20-
from dve.parser.file_handling import get_resource_exists, joinuri, get_parent
20+
from dve.parser.file_handling import get_resource_exists, joinuri
2121

2222

2323
class SparkBackend(BaseBackend[DataFrame]):
@@ -50,18 +50,22 @@ def __init__(
5050
logger=get_child_logger("SparkStepImplementations", logger)
5151
)
5252
super().__init__(contract, steps, logger, **kwargs)
53-
54-
def load_reference_data(self,
53+
54+
def load_reference_data(
55+
self,
5556
reference_entity_config: dict[EntityName, ReferenceConfigUnion],
56-
submission_info: Optional[SubmissionInfo],):
57+
submission_info: Optional[SubmissionInfo],
58+
):
5759
"""Load the reference data as specified in the reference entity config."""
58-
sub_info_entity: Optional[EntityType] = None
60+
sub_info_entity: Optional[DataFrame] = None
5961
if submission_info:
6062
sub_info_entity = self.convert_submission_info(submission_info)
6163

62-
reference_data_loader = SparkRefDataLoader(spark=self.spark_session,
63-
reference_data_config=reference_entity_config,
64-
dataset_config_uri=self.dataset_config_uri)
64+
reference_data_loader = SparkRefDataLoader(
65+
spark=self.spark_session,
66+
reference_data_config=reference_entity_config,
67+
dataset_config_uri=self.dataset_config_uri, # type: ignore
68+
)
6569
if sub_info_entity is not None:
6670
reference_data_loader.entity_cache["dve_submission_info"] = sub_info_entity
6771

src/dve/core_engine/backends/implementations/spark/reference_data.py

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,6 @@
11
# pylint: disable=no-member
22
"""A reference data loader for Spark."""
33

4-
from typing import Optional
5-
64
from pyspark.sql import DataFrame, SparkSession
75

86
from dve.core_engine.backends.base.reference_data import (
@@ -18,8 +16,8 @@
1816
# pylint: disable=too-few-public-methods
1917
class SparkRefDataLoader(BaseRefDataLoader[DataFrame]):
2018
"""A reference data loader using already existing Apache Spark Tables.
21-
reference_entity_config and dataset_config_uri (if config uses relative paths)
22-
should be supplied using setter methods for the dataset being processed before running."""
19+
reference_entity_config and dataset_config_uri (if config uses relative paths)
20+
should be supplied using setter methods for the dataset being processed before running."""
2321

2422
def __init__(
2523
self,

src/dve/pipeline/duckdb_pipeline.py

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55

66
from duckdb import DuckDBPyConnection, DuckDBPyRelation
77

8+
import dve.parser.file_handling as fh
89
from dve.core_engine.backends.base.reference_data import BaseRefDataLoader, ReferenceConfig
910
from dve.core_engine.backends.implementations.duckdb.auditing import DDBAuditingManager
1011
from dve.core_engine.backends.implementations.duckdb.contract import DuckDBDataContract
@@ -14,7 +15,6 @@
1415
from dve.core_engine.models import SubmissionInfo
1516
from dve.core_engine.type_hints import URI
1617
from dve.pipeline.pipeline import BaseDVEPipeline
17-
import dve.parser.file_handling as fh
1818

1919

2020
# pylint: disable=abstract-method
@@ -47,13 +47,16 @@ def __init__(
4747
logger,
4848
)
4949

50-
def get_reference_data_loader(self,
51-
reference_data_config: dict[str, ReferenceConfig],
52-
**kwargs) -> BaseRefDataLoader[DuckDBPyRelation]:
53-
return DuckDBRefDataLoader(connection=self._connection,
54-
reference_data_config=reference_data_config,
55-
dataset_config_uri=fh.get_parent(self._rules_path),
56-
**kwargs)
50+
def get_reference_data_loader(
51+
self, reference_data_config: dict[str, ReferenceConfig], **kwargs
52+
) -> DuckDBRefDataLoader:
53+
return DuckDBRefDataLoader(
54+
connection=self._connection,
55+
reference_data_config=reference_data_config,
56+
dataset_config_uri=fh.get_parent(self._rules_path), # type: ignore
57+
**kwargs
58+
)
59+
5760
# pylint: disable=arguments-differ
5861
def write_file_to_parquet( # type: ignore
5962
self, submission_file_uri: URI, submission_info: SubmissionInfo, output: URI

src/dve/pipeline/pipeline.py

Lines changed: 7 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -112,13 +112,12 @@ def step_implementations(self) -> Optional[BaseStepImplementations[EntityType]]:
112112
def get_entity_count(entity: EntityType) -> int:
113113
"""Get a row count of an entity stored as parquet"""
114114
raise NotImplementedError()
115-
116-
def get_reference_data_loader(self,
117-
reference_data_config: dict[EntityName, ReferenceConfig],
118-
**kwargs) -> BaseRefDataLoader[EntityType]:
115+
116+
def get_reference_data_loader(
117+
self, reference_data_config: dict[EntityName, ReferenceConfig], **kwargs
118+
) -> BaseRefDataLoader:
119119
"""Get reference data loader if required for business rules"""
120120
raise NotImplementedError()
121-
122121

123122
def get_submission_status(
124123
self, step_name: DVEStageName, submission_id: str
@@ -533,7 +532,7 @@ def data_contract_step(
533532

534533
return processed_files, failed_processing
535534

536-
def apply_business_rules( # pylint: disable=R0914
535+
def apply_business_rules( # pylint: disable=R0914
537536
self, submission_info: SubmissionInfo, submission_status: Optional[SubmissionStatus] = None
538537
) -> tuple[SubmissionInfo, SubmissionStatus]:
539538
"""Apply the business rules to a given submission, the submission may have failed at the
@@ -559,7 +558,7 @@ def apply_business_rules( # pylint: disable=R0914
559558
self._processed_files_path, submission_info.submission_id
560559
)
561560
ref_data = config.get_reference_data_config()
562-
reference_data = self.get_reference_data_loader(reference_data_config=ref_data)
561+
reference_data: BaseRefDataLoader = self.get_reference_data_loader(reference_data_config=ref_data)
563562
rules = config.get_rule_metadata()
564563
entities = {}
565564
contract = fh.joinuri(
@@ -585,10 +584,7 @@ def apply_business_rules( # pylint: disable=R0914
585584
key_fields = {model: conf.reporting_fields for model, conf in model_config.items()}
586585

587586
_errors_uri, rules_success = self.step_implementations.apply_rules( # type: ignore
588-
working_directory,
589-
entity_manager,
590-
rules,
591-
key_fields
587+
working_directory, entity_manager, rules, key_fields
592588
)
593589

594590
rule_messages = load_feedback_messages(

src/dve/pipeline/spark_pipeline.py

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66

77
from pyspark.sql import DataFrame, SparkSession
88

9+
import dve.parser.file_handling as fh
910
from dve.core_engine.backends.base.reference_data import BaseRefDataLoader, ReferenceConfig
1011
from dve.core_engine.backends.implementations.spark.auditing import SparkAuditingManager
1112
from dve.core_engine.backends.implementations.spark.contract import SparkDataContract
@@ -16,7 +17,6 @@
1617
from dve.core_engine.type_hints import URI
1718
from dve.pipeline.pipeline import BaseDVEPipeline
1819
from dve.pipeline.utils import SubmissionStatus, unpersist_all_rdds
19-
import dve.parser.file_handling as fh
2020

2121

2222
# pylint: disable=abstract-method
@@ -48,14 +48,16 @@ def __init__(
4848
job_run_id,
4949
logger,
5050
)
51-
52-
def get_reference_data_loader(self,
53-
reference_data_config: dict[str, ReferenceConfig],
54-
**kwargs) -> BaseRefDataLoader[DataFrame]:
55-
return SparkRefDataLoader(spark=self._spark,
56-
reference_data_config=reference_data_config,
57-
dataset_config_uri=fh.get_parent(self._rules_path),
58-
**kwargs)
51+
52+
def get_reference_data_loader(
53+
self, reference_data_config: dict[str, ReferenceConfig], **kwargs
54+
) -> SparkRefDataLoader:
55+
return SparkRefDataLoader(
56+
spark=self._spark,
57+
reference_data_config=reference_data_config,
58+
dataset_config_uri=fh.get_parent(self._rules_path), # type: ignore
59+
**kwargs
60+
)
5961

6062
# pylint: disable=arguments-differ
6163
def write_file_to_parquet( # type: ignore

0 commit comments

Comments
 (0)