Skip to content

Commit 68f5686

Browse files
committed
feat: Add read of arrow ipc files to reference data loaders
1 parent fc120b1 commit 68f5686

7 files changed

Lines changed: 124 additions & 36 deletions

File tree

src/dve/core_engine/backends/base/reference_data.py

Lines changed: 58 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,23 @@
88
from typing_extensions import Annotated, Literal
99

1010
from dve.core_engine.backends.base.core import get_entity_type
11-
from dve.core_engine.backends.exceptions import MissingRefDataEntity
11+
from dve.core_engine.backends.exceptions import MissingRefDataEntity, RefdataLacksFileExtensionSupport
1212
from dve.core_engine.backends.types import EntityType
13-
from dve.core_engine.type_hints import EntityName
13+
from dve.core_engine.type_hints import URI, EntityName
14+
import dve.parser.file_handling as fh
15+
from dve.parser.file_handling.implementations.file import LocalFilesystemImplementation
16+
from dve.parser.file_handling.service import _get_implementation
17+
18+
_FILE_EXTENSION_NAME: str = "_REFDATA_FILE_EXTENSION"
19+
"""Name of attribute added to methods where they relate
20+
to loading a particular reference file type."""
21+
22+
def mark_refdata_file_extension(file_extension):
23+
"""Mark a method for loading a particular file extension"""
24+
def wrapper(func: Callable):
25+
setattr(func, _FILE_EXTENSION_NAME, file_extension)
26+
return func
27+
return wrapper
1428

1529

1630
class ReferenceTable(BaseModel, frozen=True):
@@ -37,7 +51,10 @@ class ReferenceFile(BaseModel, frozen=True):
3751
type: Literal["filename"]
3852
"""The object type."""
3953
filename: str
40-
"""The path to the reference data (as Parquet) relative to the contract."""
54+
"""The path to the reference data relative to the contract."""
55+
@property
56+
def file_extension(self) -> str:
57+
return fh.get_file_suffix(self.filename)
4158

4259

4360
class ReferenceURI(BaseModel, frozen=True):
@@ -47,6 +64,9 @@ class ReferenceURI(BaseModel, frozen=True):
4764
"""The object type."""
4865
uri: str
4966
"""The absolute URI of the reference data (as Parquet)."""
67+
@property
68+
def file_extension(self) -> str:
69+
return fh.get_file_suffix(self.uri)
5070

5171

5272
ReferenceConfig = Union[ReferenceFile, ReferenceTable, ReferenceURI]
@@ -71,6 +91,12 @@ class BaseRefDataLoader(Generic[EntityType], Mapping[EntityName, EntityType], AB
7191
A mapping between refdata config types and functions to call to load these configs
7292
into reference data entities
7393
"""
94+
95+
__reader_functions__: ClassVar[dict[str, Callable]] = {}
96+
"""
97+
A mapping between file extensions and functions to load the file uris
98+
into reference data entities
99+
"""
74100
prefix: str = "refdata_"
75101

76102
def __init_subclass__(cls, *_, **__) -> None:
@@ -91,20 +117,29 @@ class variable for the subclass.
91117
method = getattr(cls, method_name, None)
92118
if method is None or not callable(method):
93119
continue
120+
121+
if ext := getattr(method, _FILE_EXTENSION_NAME, None):
122+
cls.__reader_functions__[ext] = method
123+
continue
94124

95125
type_hints = get_type_hints(method)
96126
if set(type_hints.keys()) != {"config", "return"}:
97127
continue
98128
config_type = type_hints["config"]
99129
if not issubclass(config_type, BaseModel):
100130
continue
131+
101132
cls.__step_functions__[config_type] = method # type: ignore
102133

103134
# pylint: disable=unused-argument
104135
def __init__(
105-
self, reference_entity_config: dict[EntityName, ReferenceConfig], **kwargs
136+
self,
137+
reference_entity_config: dict[EntityName, ReferenceConfig],
138+
dataset_config_uri: Optional[URI] = None,
139+
**kwargs
106140
) -> None:
107141
self.reference_entity_config = reference_entity_config
142+
self.dataset_config_uri = dataset_config_uri
108143
"""
109144
Configuration options for the reference data. This is likely to vary
110145
from backend to backend (e.g. might be locations and file types for
@@ -119,15 +154,30 @@ def load_table(self, config: ReferenceTable) -> EntityType:
119154
"""Load reference entity from a database table"""
120155
raise NotImplementedError()
121156

122-
@abstractmethod
123157
def load_file(self, config: ReferenceFile) -> EntityType:
124158
"Load reference entity from a relative file path"
125-
raise NotImplementedError()
159+
if not self.dataset_config_uri:
160+
raise AttributeError("dataset_config_uri must be specified if using relative paths")
161+
target_location = fh.build_relative_uri(self.dataset_config_uri, config.filename)
162+
if isinstance(_get_implementation(self.dataset_config_uri), LocalFilesystemImplementation):
163+
target_location = fh.file_uri_to_local_path(target_location).as_posix()
164+
try:
165+
impl = self.__reader_functions__[config.file_extension]
166+
return impl(self, target_location)
167+
except KeyError:
168+
raise RefdataLacksFileExtensionSupport(file_extension=config.file_extension)
126169

127-
@abstractmethod
128170
def load_uri(self, config: ReferenceURI) -> EntityType:
129171
"Load reference entity from an absolute URI"
130-
raise NotImplementedError()
172+
if isinstance(_get_implementation(config.uri), LocalFilesystemImplementation):
173+
target_location = fh.file_uri_to_local_path(config.uri).as_posix()
174+
else:
175+
target_location = config.uri
176+
try:
177+
impl = self.__reader_functions__[config.file_extension]
178+
return impl(self, target_location)
179+
except KeyError:
180+
raise RefdataLacksFileExtensionSupport(file_extension=config.file_extension)
131181

132182
def load_entity(self, entity_name: EntityName, config: ReferenceConfig) -> EntityType:
133183
"""Load a reference entity given the reference config"""

src/dve/core_engine/backends/exceptions.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -165,6 +165,21 @@ def __init__(self, *args: object, entity_type: Any) -> None:
165165
def get_message_preamble(self) -> EntityName:
166166
return f"Reader does not support reading directly to entity type {self.entity_type!r}"
167167

168+
class RefdataLacksFileExtensionSupport(BackendError):
169+
"""An error raised when trying to load a refdata file where the loader
170+
lacks support for the given file type
171+
172+
"""
173+
174+
def __init__(self, *args: object, file_extension: str) -> None:
175+
super().__init__(*args)
176+
self.file_extension = file_extension
177+
"""The file extension that is not supported directly by the
178+
refdata loader"""
179+
180+
def get_message_preamble(self) -> EntityName:
181+
return f"Refdata loader does not support reading refdata from {self.file_extension} files"
182+
168183

169184
class EmptyFileError(ReaderErrorMixin, ValueError):
170185
"""The read file was empty."""

src/dve/core_engine/backends/implementations/duckdb/reference_data.py

Lines changed: 13 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
from typing import Optional
44

55
from duckdb import DuckDBPyConnection, DuckDBPyRelation
6+
import pyarrow.ipc as ipc
67

78
import dve.parser.file_handling as fh
89
from dve.core_engine.backends.base.reference_data import (
@@ -11,6 +12,7 @@
1112
ReferenceFile,
1213
ReferenceTable,
1314
ReferenceURI,
15+
mark_refdata_file_extension
1416
)
1517
from dve.core_engine.type_hints import EntityName
1618
from dve.parser.file_handling.implementations.file import (
@@ -35,28 +37,20 @@ def __init__(
3537
reference_entity_config: dict[EntityName, ReferenceConfigUnion],
3638
**kwargs,
3739
) -> None:
38-
super().__init__(reference_entity_config, **kwargs)
40+
super().__init__(reference_entity_config, self.dataset_config_uri, **kwargs)
3941

4042
if not self.connection:
4143
raise AttributeError("DuckDBConnection must be specified")
4244

4345
def load_table(self, config: ReferenceTable) -> DuckDBPyRelation:
4446
"""Load reference entity from a database table"""
45-
return self.connection.sql(f"select * from {config.fq_table_name}")
46-
47-
def load_file(self, config: ReferenceFile) -> DuckDBPyRelation:
48-
"Load reference entity from a relative file path"
49-
if not self.dataset_config_uri:
50-
raise AttributeError("dataset_config_uri must be specified if using relative paths")
51-
target_location = fh.build_relative_uri(self.dataset_config_uri, config.filename)
52-
if isinstance(_get_implementation(self.dataset_config_uri), LocalFilesystemImplementation):
53-
target_location = file_uri_to_local_path(target_location).as_posix()
54-
return self.connection.read_parquet(target_location)
55-
56-
def load_uri(self, config: ReferenceURI) -> DuckDBPyRelation:
57-
"Load reference entity from an absolute URI"
58-
if isinstance(_get_implementation(config.uri), LocalFilesystemImplementation):
59-
target_location = file_uri_to_local_path(config.uri).as_posix()
60-
else:
61-
target_location = config.uri
62-
return self.connection.read_parquet(target_location)
47+
return self.connection.sql(f"select * from {config.fq_table_name}")
48+
49+
@mark_refdata_file_extension("parquet")
50+
def load_parquet_file(self, uri: str) -> DuckDBPyRelation:
51+
return self.connection.read_parquet(uri)
52+
53+
@mark_refdata_file_extension("arrow")
54+
def load_arrow_file(self, uri: str) -> DuckDBPyRelation:
55+
return self.connection.from_arrow(ipc.open_file(uri).read_all())
56+

src/dve/core_engine/backends/implementations/spark/reference_data.py

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
ReferenceFile,
1313
ReferenceTable,
1414
ReferenceURI,
15+
mark_refdata_file_extension,
1516
)
1617
from dve.core_engine.type_hints import EntityName
1718
from dve.parser.type_hints import URI
@@ -31,18 +32,13 @@ def __init__(
3132
reference_entity_config: dict[EntityName, ReferenceConfig],
3233
**kwargs,
3334
) -> None:
34-
super().__init__(reference_entity_config, **kwargs)
35+
super().__init__(reference_entity_config, self.dataset_config_uri, **kwargs)
3536
if not self.spark:
3637
raise AttributeError("Spark session must be provided")
3738

3839
def load_table(self, config: ReferenceTable) -> DataFrame:
3940
return self.spark.table(f"{config.fq_table_name}")
4041

41-
def load_file(self, config: ReferenceFile) -> DataFrame:
42-
if not self.dataset_config_uri:
43-
raise AttributeError("dataset_config_uri must be specified if using relative paths")
44-
target_location = fh.build_relative_uri(self.dataset_config_uri, config.filename)
45-
return self.spark.read.parquet(target_location)
46-
47-
def load_uri(self, config: ReferenceURI) -> DataFrame:
48-
return self.spark.read.parquet(config.uri)
42+
@mark_refdata_file_extension("parquet")
43+
def load_parquet_file(self, uri:str) -> DataFrame:
44+
return self.spark.read.parquet(uri)
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
import shutil
2+
3+
import duckdb
4+
import pytest
5+
from dve.core_engine.backends.implementations.duckdb.reference_data import DuckDBRefDataLoader
6+
from dve.core_engine.backends.base.reference_data import ReferenceFile
7+
from tempfile import TemporaryDirectory
8+
9+
from tests.conftest import get_test_file_path
10+
11+
@pytest.fixture(scope="module")
12+
def temp_working_dir():
13+
with TemporaryDirectory(prefix="refdata_test") as tmp:
14+
refdata_path = get_test_file_path("movies/refdata/movies_sequels.arrow")
15+
shutil.copy(refdata_path.as_posix(), tmp)
16+
yield tmp
17+
18+
@pytest.fixture(scope="module")
19+
def ddb_refdata_loader(temp_working_dir):
20+
DuckDBRefDataLoader.connection = duckdb.connect()
21+
DuckDBRefDataLoader.dataset_config_uri = temp_working_dir
22+
yield DuckDBRefDataLoader
23+
24+
def test_load_arrow_file(ddb_refdata_loader):
25+
config = {
26+
"test_refdata": ReferenceFile(type="filename",
27+
filename="./movies_sequels.arrow")
28+
}
29+
refdata_loader: DuckDBRefDataLoader = ddb_refdata_loader(config)
30+
31+
test = refdata_loader.load_file(config.get("test_refdata"))
32+
33+
assert test.shape == (2, 3)

tests/test_core_engine/test_backends/test_implementations/test_spark/test_spark_refdata.py

Whitespace-only changes.
1.06 KB
Binary file not shown.

0 commit comments

Comments
 (0)