|
6 | 6 |
|
7 | 7 | import duckdb as ddb |
8 | 8 | import polars as pl |
9 | | -from duckdb import DuckDBPyConnection, DuckDBPyRelation, StarExpression, default_connection, read_csv |
| 9 | +from duckdb import ( |
| 10 | + DuckDBPyConnection, |
| 11 | + DuckDBPyRelation, |
| 12 | + StarExpression, |
| 13 | + default_connection, |
| 14 | + read_csv, |
| 15 | +) |
10 | 16 | from pydantic import BaseModel |
11 | 17 |
|
12 | 18 | from dve.core_engine.backends.base.reader import BaseFileReader, read_function |
|
24 | 30 | from dve.core_engine.type_hints import URI, EntityName |
25 | 31 | from dve.parser.file_handling import get_content_length |
26 | 32 |
|
| 33 | + |
27 | 34 | @duckdb_record_index |
28 | 35 | @duckdb_write_parquet |
29 | 36 | class DuckDBCSVReader(BaseFileReader): |
@@ -113,6 +120,7 @@ def read_to_relation( # pylint: disable=unused-argument |
113 | 120 | reader_options["columns"] = ddb_schema |
114 | 121 | return self.add_record_index(read_csv(resource, **reader_options, parallel=False)) |
115 | 122 |
|
| 123 | + |
116 | 124 | @polars_record_index |
117 | 125 | class PolarsToDuckDBCSVReader(DuckDBCSVReader): |
118 | 126 | """ |
@@ -144,11 +152,14 @@ def read_to_relation( # pylint: disable=unused-argument |
144 | 152 | for fld in schema.__fields__.values() |
145 | 153 | } |
146 | 154 | reader_options["dtypes"] = polars_types |
147 | | - |
148 | 155 |
|
149 | 156 | # there is a raise_if_empty arg for 0.18+. Future reference when upgrading. Makes L85 |
150 | 157 | # redundant |
151 | | - df = self.add_record_index(pl.scan_csv(resource, **reader_options).select(list(polars_types.keys()))) # type: ignore # pylint: disable=W0612 |
| 158 | + df = self.add_record_index( # pylint: disable=W0612 |
| 159 | + pl.scan_csv(resource, **reader_options).select( # type: ignore |
| 160 | + list(polars_types.keys()) |
| 161 | + ) |
| 162 | + ) |
152 | 163 |
|
153 | 164 | return ddb.sql("SELECT * FROM df") |
154 | 165 |
|
@@ -192,7 +203,9 @@ def __init__( |
192 | 203 | def read_to_relation( # pylint: disable=unused-argument |
193 | 204 | self, resource: URI, entity_name: EntityName, schema: type[BaseModel] |
194 | 205 | ) -> DuckDBPyRelation: |
195 | | - entity: DuckDBPyRelation = super().read_to_relation(resource=resource, entity_name=entity_name, schema=schema) |
| 206 | + entity: DuckDBPyRelation = super().read_to_relation( |
| 207 | + resource=resource, entity_name=entity_name, schema=schema |
| 208 | + ) |
196 | 209 | entity = entity.select(StarExpression(exclude=[RECORD_INDEX_COLUMN_NAME])).distinct() |
197 | 210 | no_records = entity.shape[0] |
198 | 211 |
|
|
0 commit comments