Skip to content

Commit 70d2399

Browse files
committed
feat: added reference data loading of arrow ipc files including enhanced test coverage for reference data loaders
1 parent 68f5686 commit 70d2399

10 files changed

Lines changed: 260 additions & 56 deletions

File tree

src/dve/core_engine/backends/base/reference_data.py

Lines changed: 24 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -7,23 +7,29 @@
77
from pydantic import BaseModel, Field
88
from typing_extensions import Annotated, Literal
99

10+
import dve.parser.file_handling as fh
1011
from dve.core_engine.backends.base.core import get_entity_type
11-
from dve.core_engine.backends.exceptions import MissingRefDataEntity, RefdataLacksFileExtensionSupport
12+
from dve.core_engine.backends.exceptions import (
13+
MissingRefDataEntity,
14+
RefdataLacksFileExtensionSupport,
15+
)
1216
from dve.core_engine.backends.types import EntityType
1317
from dve.core_engine.type_hints import URI, EntityName
14-
import dve.parser.file_handling as fh
1518
from dve.parser.file_handling.implementations.file import LocalFilesystemImplementation
1619
from dve.parser.file_handling.service import _get_implementation
1720

1821
_FILE_EXTENSION_NAME: str = "_REFDATA_FILE_EXTENSION"
1922
"""Name of attribute added to methods where they relate
2023
to loading a particular reference file type."""
2124

25+
2226
def mark_refdata_file_extension(file_extension):
2327
"""Mark a method for loading a particular file extension"""
28+
2429
def wrapper(func: Callable):
2530
setattr(func, _FILE_EXTENSION_NAME, file_extension)
2631
return func
32+
2733
return wrapper
2834

2935

@@ -52,9 +58,11 @@ class ReferenceFile(BaseModel, frozen=True):
5258
"""The object type."""
5359
filename: str
5460
"""The path to the reference data relative to the contract."""
61+
5562
@property
5663
def file_extension(self) -> str:
57-
return fh.get_file_suffix(self.filename)
64+
"""The file extension of the reference file"""
65+
return fh.get_file_suffix(self.filename) # type: ignore
5866

5967

6068
class ReferenceURI(BaseModel, frozen=True):
@@ -64,9 +72,11 @@ class ReferenceURI(BaseModel, frozen=True):
6472
"""The object type."""
6573
uri: str
6674
"""The absolute URI of the reference data (as Parquet)."""
75+
6776
@property
6877
def file_extension(self) -> str:
69-
return fh.get_file_suffix(self.uri)
78+
"""The file extension of the reference uri"""
79+
return fh.get_file_suffix(self.uri) # type: ignore
7080

7181

7282
ReferenceConfig = Union[ReferenceFile, ReferenceTable, ReferenceURI]
@@ -91,7 +101,7 @@ class BaseRefDataLoader(Generic[EntityType], Mapping[EntityName, EntityType], AB
91101
A mapping between refdata config types and functions to call to load these configs
92102
into reference data entities
93103
"""
94-
104+
95105
__reader_functions__: ClassVar[dict[str, Callable]] = {}
96106
"""
97107
A mapping between file extensions and functions to load the file uris
@@ -108,6 +118,9 @@ class variable for the subclass.
108118
if cls is not BaseRefDataLoader:
109119
cls.__entity_type__ = get_entity_type(cls, "BaseRefDataLoader")
110120

121+
# ensure that dicts are specific to each subclass - redefine rather
122+
# than keep the same reference
123+
cls.__reader_functions__ = {}
111124
cls.__step_functions__ = {}
112125

113126
for method_name in dir(cls):
@@ -117,7 +130,7 @@ class variable for the subclass.
117130
method = getattr(cls, method_name, None)
118131
if method is None or not callable(method):
119132
continue
120-
133+
121134
if ext := getattr(method, _FILE_EXTENSION_NAME, None):
122135
cls.__reader_functions__[ext] = method
123136
continue
@@ -136,7 +149,7 @@ def __init__(
136149
self,
137150
reference_entity_config: dict[EntityName, ReferenceConfig],
138151
dataset_config_uri: Optional[URI] = None,
139-
**kwargs
152+
**kwargs,
140153
) -> None:
141154
self.reference_entity_config = reference_entity_config
142155
self.dataset_config_uri = dataset_config_uri
@@ -164,8 +177,8 @@ def load_file(self, config: ReferenceFile) -> EntityType:
164177
try:
165178
impl = self.__reader_functions__[config.file_extension]
166179
return impl(self, target_location)
167-
except KeyError:
168-
raise RefdataLacksFileExtensionSupport(file_extension=config.file_extension)
180+
except KeyError as exc:
181+
raise RefdataLacksFileExtensionSupport(file_extension=config.file_extension) from exc
169182

170183
def load_uri(self, config: ReferenceURI) -> EntityType:
171184
"Load reference entity from an absolute URI"
@@ -176,8 +189,8 @@ def load_uri(self, config: ReferenceURI) -> EntityType:
176189
try:
177190
impl = self.__reader_functions__[config.file_extension]
178191
return impl(self, target_location)
179-
except KeyError:
180-
raise RefdataLacksFileExtensionSupport(file_extension=config.file_extension)
192+
except KeyError as exc:
193+
raise RefdataLacksFileExtensionSupport(file_extension=config.file_extension) from exc
181194

182195
def load_entity(self, entity_name: EntityName, config: ReferenceConfig) -> EntityType:
183196
"""Load a reference entity given the reference config"""

src/dve/core_engine/backends/base/rules.py

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -406,7 +406,11 @@ def apply_sync_filters(
406406
[
407407
CriticalProcessingError(
408408
"Issue occurred while applying filter logic",
409-
messages=[msg.error_message for msg in temp_messages if msg.error_message],
409+
messages=[
410+
msg.error_message
411+
for msg in temp_messages
412+
if msg.error_message
413+
],
410414
)
411415
],
412416
)
@@ -440,7 +444,7 @@ def apply_sync_filters(
440444
msg_writer.write_queue.put(temp_messages)
441445
self.logger.info(
442446
f"Filter {rule.reporting.code} found {len(temp_messages)} issues"
443-
)
447+
)
444448

445449
else:
446450
temp_messages, success = self.evaluate(
@@ -466,15 +470,15 @@ def apply_sync_filters(
466470
return processing_errors_uri, False
467471
if temp_messages:
468472
msg_writer.write_queue.put(temp_messages)
469-
473+
470474
self.logger.info(
471475
f"Filter {rule.reporting.code} found {len(temp_messages)} issues"
472-
)
476+
)
473477

474478
if filter_column_names:
475479
self.logger.info(
476-
f"Filtering records from entity {entity_name} for error code {rule.reporting.code}" # pylint: disable=line-too-long
477-
)
480+
f"Filtering records from entity {entity_name} for error code {rule.reporting.code}" # pylint: disable=line-too-long
481+
)
478482
success_condition = " AND ".join(
479483
[f"({c_name} IS NOT NULL AND {c_name})" for c_name in filter_column_names]
480484
)

src/dve/core_engine/backends/exceptions.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,7 @@ def get_joiner(self):
108108
return "required by"
109109

110110

111-
class MissingRefDataEntity(MissingEntity): # pylint: disable=too-many-ancestors
111+
class MissingRefDataEntity(MissingEntity, BackendErrorMixin): # pylint: disable=too-many-ancestors
112112
"""An error to be emitted when a required refdata entity is missing."""
113113

114114
def get_message_preamble(self) -> str:
@@ -165,9 +165,10 @@ def __init__(self, *args: object, entity_type: Any) -> None:
165165
def get_message_preamble(self) -> EntityName:
166166
return f"Reader does not support reading directly to entity type {self.entity_type!r}"
167167

168+
168169
class RefdataLacksFileExtensionSupport(BackendError):
169170
"""An error raised when trying to load a refdata file where the loader
170-
lacks support for the given file type
171+
lacks support for the given file type
171172
172173
"""
173174

@@ -178,6 +179,7 @@ def __init__(self, *args: object, file_extension: str) -> None:
178179
refdata loader"""
179180

180181
def get_message_preamble(self) -> EntityName:
182+
"""Message for logging purposes"""
181183
return f"Refdata loader does not support reading refdata from {self.file_extension} files"
182184

183185

src/dve/core_engine/backends/implementations/duckdb/contract.py

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -169,10 +169,8 @@ def apply_data_contract(
169169
if msgs:
170170
msg_writer.write_queue.put(msgs)
171171
msg_count += len(msgs)
172-
173-
self.logger.info(
174-
f"Data contract found {msg_count} issues in {entity_name}"
175-
)
172+
173+
self.logger.info(f"Data contract found {msg_count} issues in {entity_name}")
176174

177175
casting_statements = [
178176
(

src/dve/core_engine/backends/implementations/duckdb/reference_data.py

Lines changed: 8 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -3,23 +3,15 @@
33
from typing import Optional
44

55
from duckdb import DuckDBPyConnection, DuckDBPyRelation
6-
import pyarrow.ipc as ipc
6+
from pyarrow import ipc # type: ignore
77

8-
import dve.parser.file_handling as fh
98
from dve.core_engine.backends.base.reference_data import (
109
BaseRefDataLoader,
1110
ReferenceConfigUnion,
12-
ReferenceFile,
1311
ReferenceTable,
14-
ReferenceURI,
15-
mark_refdata_file_extension
12+
mark_refdata_file_extension,
1613
)
1714
from dve.core_engine.type_hints import EntityName
18-
from dve.parser.file_handling.implementations.file import (
19-
LocalFilesystemImplementation,
20-
file_uri_to_local_path,
21-
)
22-
from dve.parser.file_handling.service import _get_implementation
2315
from dve.parser.type_hints import URI
2416

2517

@@ -44,13 +36,14 @@ def __init__(
4436

4537
def load_table(self, config: ReferenceTable) -> DuckDBPyRelation:
4638
"""Load reference entity from a database table"""
47-
return self.connection.sql(f"select * from {config.fq_table_name}")
48-
39+
return self.connection.sql(f"select * from {config.fq_table_name}")
40+
4941
@mark_refdata_file_extension("parquet")
5042
def load_parquet_file(self, uri: str) -> DuckDBPyRelation:
43+
"""Load a parquet file into a duckdb relation"""
5144
return self.connection.read_parquet(uri)
52-
45+
5346
@mark_refdata_file_extension("arrow")
5447
def load_arrow_file(self, uri: str) -> DuckDBPyRelation:
55-
return self.connection.from_arrow(ipc.open_file(uri).read_all())
56-
48+
"""Load an arrow ipc file into a duckdb relation"""
49+
return self.connection.from_arrow(ipc.open_file(uri).read_all()) # type:ignore

src/dve/core_engine/backends/implementations/spark/reference_data.py

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,10 @@
55

66
from pyspark.sql import DataFrame, SparkSession
77

8-
import dve.parser.file_handling as fh
98
from dve.core_engine.backends.base.reference_data import (
109
BaseRefDataLoader,
1110
ReferenceConfig,
12-
ReferenceFile,
1311
ReferenceTable,
14-
ReferenceURI,
1512
mark_refdata_file_extension,
1613
)
1714
from dve.core_engine.type_hints import EntityName
@@ -40,5 +37,6 @@ def load_table(self, config: ReferenceTable) -> DataFrame:
4037
return self.spark.table(f"{config.fq_table_name}")
4138

4239
@mark_refdata_file_extension("parquet")
43-
def load_parquet_file(self, uri:str) -> DataFrame:
40+
def load_parquet_file(self, uri: str) -> DataFrame:
41+
"""Load a parquet file into a spark dataframe"""
4442
return self.spark.read.parquet(uri)

tests/fixtures.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -113,12 +113,11 @@ def spark_test_database(spark: SparkSession) -> Iterator[str]:
113113

114114

115115

116-
@pytest.fixture()
116+
@pytest.fixture(scope="function")
117117
def temp_ddb_conn() -> Iterator[Tuple[Path, DuckDBPyConnection]]:
118118
"""Temp DuckDB directory for the database"""
119-
db = uuid4().hex
119+
db = f"dve_{uuid4().hex}"
120120
with tempfile.TemporaryDirectory(prefix="ddb_audit_testing") as tmp:
121121
db_file = Path(tmp, db + ".duckdb")
122122
conn = connect(database=db_file, read_only=False)
123-
124123
yield db_file, conn

0 commit comments

Comments
 (0)