Skip to content

Commit dc0b68d

Browse files
committed
feat: add row index at file transform, if not using file transform add during dataa contract or business rules (but likely non-deterministic)
1 parent 5e358a0 commit dc0b68d

39 files changed

Lines changed: 845 additions & 141 deletions

poetry.lock

Lines changed: 597 additions & 3 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ include-groups = [
4242
[tool.poetry.group.dev.dependencies]
4343
commitizen = "4.9.1"
4444
pre-commit = "4.3.0"
45+
ipykernel = "^7.2.0"
4546

4647
[tool.poetry.group.test]
4748
optional = true

src/dve/core_engine/backends/base/backend.py

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -163,7 +163,7 @@ def apply(
163163
return entities, get_parent(processing_errors_uri), successful
164164

165165
for entity_name, entity in entities.items():
166-
entities[entity_name] = self.step_implementations.add_row_id(entity)
166+
entities[entity_name] = self.step_implementations.add_record_index(entity)
167167

168168
# TODO: Handle entity manager creation errors.
169169
entity_manager = EntityManager(entities, reference_data)
@@ -172,9 +172,6 @@ def apply(
172172
# TODO: and return uri to errors
173173
_ = self.step_implementations.apply_rules(working_dir, entity_manager, rule_metadata)
174174

175-
for entity_name, entity in entity_manager.entities.items():
176-
entity_manager.entities[entity_name] = self.step_implementations.drop_row_id(entity)
177-
178175
return entity_manager.entities, get_parent(dc_feedback_errors_uri), True
179176

180177
def process(

src/dve/core_engine/backends/base/contract.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -368,6 +368,14 @@ def read_raw_entities(
368368
messages.extend(new_messages)
369369

370370
return entities, dedup_messages(messages), successful
371+
372+
def add_record_index(self, entity: EntityType, **kwargs) -> EntityType:
373+
"""Add a record index to the entity"""
374+
raise NotImplementedError(f"add_record_index not implemented in {self.__class__}")
375+
376+
def drop_record_index(self, entity: EntityType, **kwargs) -> EntityType:
377+
"""Drop a record index from the entity"""
378+
raise NotImplementedError(f"drop_record_index not implemented in {self.__class__}")
371379

372380
@abstractmethod
373381
def apply_data_contract(

src/dve/core_engine/backends/base/reader.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,14 @@ def read_to_entity_type(
126126
raise ReaderLacksEntityTypeSupport(entity_type=entity_type) from err
127127

128128
return reader_func(self, resource, entity_name, schema)
129+
130+
def add_record_index(self, entity: EntityType, **kwargs) -> EntityType:
131+
"""Add a record index to the entity"""
132+
raise NotImplementedError(f"add_record_index not implemented in {self.__class__}")
133+
134+
def drop_record_index(self, entity: EntityType, **kwargs) -> EntityType:
135+
"""Drop a record index to the entity"""
136+
raise NotImplementedError(f"drop_record_index not implemented in {self.__class__}")
129137

130138
def write_parquet(
131139
self,

src/dve/core_engine/backends/base/rules.py

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -135,15 +135,13 @@ def register_udfs(cls, **kwargs):
135135
"""Method to register all custom dve functions for use during business rules application"""
136136
raise NotImplementedError()
137137

138-
@staticmethod
139-
def add_row_id(entity: EntityType) -> EntityType:
140-
"""Add a unique row id field to an entity"""
141-
raise NotImplementedError()
138+
def add_record_index(self, entity: EntityType, **kwargs) -> EntityType:
139+
"""Add a record index to the entity"""
140+
raise NotImplementedError(f"add_record_index not implemented in {self.__class__}")
142141

143-
@staticmethod
144-
def drop_row_id(entity: EntityType) -> EntityType:
145-
"""Add a unique row id field to an entity"""
146-
raise NotImplementedError()
142+
def drop_record_index(self, entity: EntityType) -> EntityType:
143+
"""Drop a unique row id field to an entity"""
144+
raise NotImplementedError(f"drop_record_index not implemented in {self.__class__}")
147145

148146
@classmethod
149147
def _raise_notimplemented_error(

src/dve/core_engine/backends/base/utilities.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,10 @@
1212
from dve.core_engine.type_hints import ExpressionArray, MultiExpression
1313
from dve.parser.type_hints import URI
1414

15+
import polars as pl
16+
17+
from dve.core_engine.constants import RECORD_INDEX_COLUMN_NAME
18+
1519
BRACKETS = {"(": ")", "{": "}", "[": "]", "<": ">"}
1620
"""A mapping of opening brackets to their closing counterpart."""
1721
STRING_START_CHARS = {'"', "'"}

src/dve/core_engine/backends/implementations/duckdb/contract.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
from pydantic import BaseModel
1717
from pydantic.fields import ModelField
1818

19+
from dve.core_engine.constants import RECORD_INDEX_COLUMN_NAME
1920
import dve.parser.file_handling as fh
2021
from dve.common.error_utils import (
2122
BackgroundMessageWriter,
@@ -29,6 +30,7 @@
2930
)
3031
from dve.core_engine.backends.implementations.duckdb.duckdb_helpers import (
3132
duckdb_read_parquet,
33+
duckdb_record_index,
3234
duckdb_write_parquet,
3335
get_duckdb_type_from_annotation,
3436
relation_is_empty,
@@ -53,7 +55,7 @@ def __call__(self, row: pd.Series):
5355
self.errors.extend(self.row_validator(row.to_dict())[1]) # type: ignore
5456
return row # no op
5557

56-
58+
@duckdb_record_index
5759
@duckdb_write_parquet
5860
@duckdb_read_parquet
5961
class DuckDBDataContract(BaseDataContract[DuckDBPyRelation]):
@@ -144,10 +146,12 @@ def apply_data_contract(
144146
fld.name: get_duckdb_type_from_annotation(fld.annotation)
145147
for fld in entity_fields.values()
146148
}
149+
ddb_schema[RECORD_INDEX_COLUMN_NAME] = get_duckdb_type_from_annotation(int)
147150
polars_schema: dict[str, PolarsType] = {
148151
fld.name: get_polars_type_from_annotation(fld.annotation)
149152
for fld in entity_fields.values()
150153
}
154+
polars_schema[RECORD_INDEX_COLUMN_NAME] = get_polars_type_from_annotation(int)
151155
if relation_is_empty(relation):
152156
self.logger.warning(f"+ Empty relation for {entity_name}")
153157
empty_df = pl.DataFrame([], schema=polars_schema) # type: ignore # pylint: disable=W0612
@@ -169,6 +173,9 @@ def apply_data_contract(
169173
msg_count += len(msgs)
170174

171175
self.logger.info(f"Data contract found {msg_count} issues in {entity_name}")
176+
177+
if not RECORD_INDEX_COLUMN_NAME in relation.columns:
178+
relation = self.add_record_index(relation)
172179

173180
casting_statements = [
174181
(

src/dve/core_engine/backends/implementations/duckdb/duckdb_helpers.py

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,13 +12,14 @@
1212

1313
import duckdb.typing as ddbtyp
1414
import numpy as np
15-
from duckdb import DuckDBPyConnection, DuckDBPyRelation
15+
from duckdb import DuckDBPyConnection, DuckDBPyRelation, StarExpression
1616
from duckdb.typing import DuckDBPyType
1717
from pandas import DataFrame
1818
from pydantic import BaseModel
1919
from typing_extensions import Annotated, get_args, get_origin, get_type_hints
2020

2121
from dve.core_engine.backends.base.utilities import _get_non_heterogenous_type
22+
from dve.core_engine.constants import RECORD_INDEX_COLUMN_NAME
2223
from dve.core_engine.type_hints import URI
2324
from dve.parser.file_handling.service import LocalFilesystemImplementation, _get_implementation
2425

@@ -286,3 +287,19 @@ def duckdb_rel_to_dictionaries(
286287
cols: tuple[str] = tuple(entity.columns) # type: ignore
287288
while rows := entity.fetchmany(batch_size):
288289
yield from (dict(zip(cols, rw)) for rw in rows)
290+
291+
def _add_duckdb_record_index(self, entity: DuckDBPyRelation) -> DuckDBPyRelation:
292+
if RECORD_INDEX_COLUMN_NAME in entity.columns:
293+
return entity
294+
295+
return entity.select(f"*, row_number() OVER () as {RECORD_INDEX_COLUMN_NAME}")
296+
297+
def _drop_duckdb_record_index(self, entity: DuckDBPyRelation) -> DuckDBPyRelation:
298+
if RECORD_INDEX_COLUMN_NAME not in entity.columns:
299+
return entity
300+
return entity.select(StarExpression(exclude=[RECORD_INDEX_COLUMN_NAME]))
301+
302+
def duckdb_record_index(cls):
303+
setattr(cls, "add_record_index", _add_duckdb_record_index)
304+
setattr(cls, "drop_record_index", _drop_duckdb_record_index)
305+
return cls

src/dve/core_engine/backends/implementations/duckdb/readers/csv.py

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -6,23 +6,25 @@
66

77
import duckdb as ddb
88
import polars as pl
9-
from duckdb import DuckDBPyConnection, DuckDBPyRelation, default_connection, read_csv
9+
from duckdb import DuckDBPyConnection, DuckDBPyRelation, StarExpression, default_connection, read_csv
1010
from pydantic import BaseModel
1111

1212
from dve.core_engine.backends.base.reader import BaseFileReader, read_function
1313
from dve.core_engine.backends.exceptions import EmptyFileError, MessageBearingError
1414
from dve.core_engine.backends.implementations.duckdb.duckdb_helpers import (
15+
duckdb_record_index,
1516
duckdb_write_parquet,
1617
get_duckdb_type_from_annotation,
1718
)
1819
from dve.core_engine.backends.implementations.duckdb.types import SQLType
1920
from dve.core_engine.backends.readers.utilities import check_csv_header_expected
20-
from dve.core_engine.backends.utilities import get_polars_type_from_annotation
21+
from dve.core_engine.backends.utilities import get_polars_type_from_annotation, polars_record_index
22+
from dve.core_engine.constants import RECORD_INDEX_COLUMN_NAME
2123
from dve.core_engine.message import FeedbackMessage
2224
from dve.core_engine.type_hints import URI, EntityName
2325
from dve.parser.file_handling import get_content_length
2426

25-
27+
@duckdb_record_index
2628
@duckdb_write_parquet
2729
class DuckDBCSVReader(BaseFileReader):
2830
"""A reader for CSV files including the ability to compare the passed model
@@ -109,9 +111,9 @@ def read_to_relation( # pylint: disable=unused-argument
109111
}
110112

111113
reader_options["columns"] = ddb_schema
112-
return read_csv(resource, **reader_options)
113-
114+
return self.add_record_index(read_csv(resource, **reader_options, parallel=False))
114115

116+
@polars_record_index
115117
class PolarsToDuckDBCSVReader(DuckDBCSVReader):
116118
"""
117119
Utilises the polars lazy csv reader which is then converted into a DuckDBPyRelation object.
@@ -142,10 +144,11 @@ def read_to_relation( # pylint: disable=unused-argument
142144
for fld in schema.__fields__.values()
143145
}
144146
reader_options["dtypes"] = polars_types
147+
145148

146149
# there is a raise_if_empty arg for 0.18+. Future reference when upgrading. Makes L85
147150
# redundant
148-
df = pl.scan_csv(resource, **reader_options).select(list(polars_types.keys())) # type: ignore # pylint: disable=W0612
151+
df = self.add_record_index(pl.scan_csv(resource, **reader_options).select(list(polars_types.keys()))) # type: ignore # pylint: disable=W0612
149152

150153
return ddb.sql("SELECT * FROM df")
151154

@@ -189,8 +192,8 @@ def __init__(
189192
def read_to_relation( # pylint: disable=unused-argument
190193
self, resource: URI, entity_name: EntityName, schema: type[BaseModel]
191194
) -> DuckDBPyRelation:
192-
entity = super().read_to_relation(resource=resource, entity_name=entity_name, schema=schema)
193-
entity = entity.distinct()
195+
entity: DuckDBPyRelation = super().read_to_relation(resource=resource, entity_name=entity_name, schema=schema)
196+
entity = entity.select(StarExpression(exclude=[RECORD_INDEX_COLUMN_NAME])).distinct()
194197
no_records = entity.shape[0]
195198

196199
if no_records != 1:
@@ -219,4 +222,4 @@ def read_to_relation( # pylint: disable=unused-argument
219222
],
220223
)
221224

222-
return entity
225+
return entity.select(f"*, 1 as {RECORD_INDEX_COLUMN_NAME}")

0 commit comments

Comments
 (0)