Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 36 additions & 1 deletion src/dve/core_engine/backends/base/reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,11 @@
from pydantic import BaseModel
from typing_extensions import Protocol

from dve.core_engine.backends.exceptions import ReaderLacksEntityTypeSupport
from dve.core_engine.backends.exceptions import MessageBearingError, ReaderLacksEntityTypeSupport
from dve.core_engine.backends.types import EntityName, EntityType
from dve.core_engine.message import FeedbackMessage
from dve.core_engine.type_hints import URI, ArbitraryFunction, WrapDecorator
from dve.parser.file_handling.service import open_stream

T = TypeVar("T")
ET_co = TypeVar("ET_co", covariant=True)
Expand Down Expand Up @@ -115,6 +117,8 @@ def read_to_entity_type(
"""
if entity_name == Iterator[dict[str, Any]]:
return self.read_to_py_iterator(resource, entity_name, schema) # type: ignore

self.raise_if_not_sensible_file(resource, entity_name)

try:
reader_func = self.__read_methods__[entity_type]
Expand All @@ -137,3 +141,34 @@ def write_parquet(

"""
raise NotImplementedError(f"write_parquet not implemented in {self.__class__}")

@staticmethod
def _check_likely_text_file(resource: URI) -> bool:
"""Quick sense check of file to see if it looks like text
- not 100% full proof, but hopefully enough to weed out most
non-text files"""
with open_stream(resource, "rb") as fle:
start_chunk = fle.read(4096)
# check for BOM character - utf-16 can contain NULL bytes
if start_chunk.startswith((b"\xff\xfe", b"\xfe\xff")):
return True
# if null byte in - unlikely text
if b"\x00" in start_chunk:
return False
return True

def raise_if_not_sensible_file(self, resource: URI, entity_name:str):
if not self._check_likely_text_file(resource):
raise MessageBearingError(
"The submitted file doesn't appear to be text",
messages=[
FeedbackMessage(
entity=entity_name,
record=None,
failure_type="submission",
error_location="Whole File",
error_code="MalformedFile",
error_message=f"The submitted resource doesn't seem to be a valid text file",
)
],
)
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
get_duckdb_type_from_annotation,
)
from dve.core_engine.backends.implementations.duckdb.types import SQLType
from dve.core_engine.backends.implementations.duckdb.utilities import check_csv_header_expected
from dve.core_engine.backends.utilities import get_polars_type_from_annotation
from dve.core_engine.message import FeedbackMessage
from dve.core_engine.type_hints import URI, EntityName
Expand All @@ -35,15 +36,46 @@ def __init__(
delim: str = ",",
quotechar: str = '"',
connection: Optional[DuckDBPyConnection] = None,
field_check: bool = False,
Comment thread
georgeRobertson marked this conversation as resolved.
field_check_error_code: Optional[str] = "ExpectedVsActualFieldMismatch",
field_check_error_message: Optional[str] = "The submitted header is missing fields",
**_,
):
self.header = header
self.delim = delim
self.quotechar = quotechar
self._connection = connection if connection else default_connection
self.field_check = field_check
self.field_check_error_code = field_check_error_code
self.field_check_error_message = field_check_error_message

super().__init__()

def perform_field_check(
self, resource: URI, entity_name: str, expected_schema: type[BaseModel]
):
if not self.header:
raise ValueError("Cannot perform field check without a CSV header")

if missing := check_csv_header_expected(
resource,
expected_schema,
self.delim
):
raise MessageBearingError(
"The CSV header doesn't match what is expected",
messages=[
FeedbackMessage(
entity=entity_name,
record=None,
failure_type="submission",
error_location="Whole File",
error_code=self.field_check_error_code,
error_message=f"{self.field_check_error_message} - missing fields: {missing}",
)
],
)

def read_to_py_iterator(
self, resource: URI, entity_name: EntityName, schema: type[BaseModel]
) -> Iterator[dict[str, Any]]:
Expand All @@ -58,6 +90,9 @@ def read_to_relation( # pylint: disable=unused-argument
if get_content_length(resource) == 0:
raise EmptyFileError(f"File at {resource} is empty.")

if self.field_check:
self.perform_field_check(resource, entity_name, schema)

reader_options: dict[str, Any] = {
"header": self.header,
"delimiter": self.delim,
Expand Down Expand Up @@ -89,6 +124,9 @@ def read_to_relation( # pylint: disable=unused-argument
if get_content_length(resource) == 0:
raise EmptyFileError(f"File at {resource} is empty.")

if self.field_check:
self.perform_field_check(resource, entity_name, schema)

reader_options: dict[str, Any] = {
"has_header": self.header,
"separator": self.delim,
Expand Down Expand Up @@ -132,6 +170,12 @@ class DuckDBCSVRepeatingHeaderReader(PolarsToDuckDBCSVReader):
| shop1 | clothes | 2025-01-01 |
"""

def __init__(
self, non_unique_header_error_code: Optional[str] = "NonUniqueHeader", *args, **kwargs
):
self._non_unique_header_code = non_unique_header_error_code
super().__init__(*args, **kwargs)

@read_function(DuckDBPyRelation)
def read_to_relation( # pylint: disable=unused-argument
self, resource: URI, entity_name: EntityName, schema: type[BaseModel]
Expand Down Expand Up @@ -159,7 +203,7 @@ def read_to_relation( # pylint: disable=unused-argument
),
error_location=entity_name,
category="Bad file",
error_code="NonUniqueHeader",
error_code=self._non_unique_header_code,
)
],
)
Expand Down
19 changes: 19 additions & 0 deletions src/dve/core_engine/backends/implementations/duckdb/utilities.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,15 @@
"""Utility objects for use with duckdb backend"""

import itertools
from typing import Optional

from pydantic import BaseModel

from dve.core_engine.backends.base.utilities import _split_multiexpr_string
from dve.core_engine.backends.exceptions import MessageBearingError
from dve.core_engine.message import FeedbackMessage
from dve.core_engine.type_hints import URI
from dve.parser.file_handling import open_stream


def parse_multiple_expressions(expressions) -> list[str]:
Expand Down Expand Up @@ -39,3 +46,15 @@ def multiexpr_string_to_columns(expressions: str) -> list[str]:
"""
expression_list = _split_multiexpr_string(expressions)
return expr_array_to_columns(expression_list)

def check_csv_header_expected(
Comment thread
georgeRobertson marked this conversation as resolved.
Outdated
resource: URI,
expected_schema: type[BaseModel],
delimiter: Optional[str] = ",",
quote_char: str = '"') -> set[str]:
"""Check the header of a CSV matches the expected fields"""
with open_stream(resource) as fle:
header_fields = fle.readline().rstrip().replace(quote_char,"").split(delimiter)
expected_fields = expected_schema.__fields__.keys()
return set(expected_fields).difference(header_fields)

4 changes: 3 additions & 1 deletion src/dve/pipeline/foundry_ddb_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ def error_report(
self._logger.exception(exc)
sub_stats = None
report_uri = None
submission_status.processing_failed = True
Comment thread
georgeRobertson marked this conversation as resolved.
dump_processing_errors(
fh.joinuri(self.processed_files_path, submission_info.submission_id),
"error_report",
Expand Down Expand Up @@ -148,7 +149,8 @@ def run_pipeline(
sub_info, sub_status, sub_stats, report_uri = self.error_report(
submission_info=submission_info, submission_status=sub_status
)
self._audit_tables.add_submission_statistics_records(sub_stats=[sub_stats])
if sub_stats:
self._audit_tables.add_submission_statistics_records(sub_stats=[sub_stats])
except Exception as err: # pylint: disable=W0718
self._logger.error(
f"During processing of submission_id: {sub_id}, this exception was raised: {err}"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
from typing import Dict, List
import tempfile
import datetime as dt
from pathlib import Path
from uuid import uuid4
from pydantic import BaseModel, create_model
import pytest

from dve.core_engine.backends.implementations.duckdb.utilities import (
expr_mapping_to_columns,
expr_array_to_columns,
check_csv_header_expected,
)


Expand All @@ -16,7 +21,7 @@
),
],
)
def test_expr_mapping_to_columns(expressions: Dict[str, str], expected: list[str]):
def test_expr_mapping_to_columns(expressions: dict[str, str], expected: list[str]):
observed = expr_mapping_to_columns(expressions)
assert observed == expected

Expand Down Expand Up @@ -51,6 +56,47 @@ def test_expr_mapping_to_columns(expressions: Dict[str, str], expected: list[str
),
],
)
def test_expr_array_to_columns(expressions: Dict[str, str], expected: list[str]):
def test_expr_array_to_columns(expressions: dict[str, str], expected: list[str]):
observed = expr_array_to_columns(expressions)
assert observed == expected


@pytest.mark.parametrize(
["header_row", "delim", "schema", "expected"],
[
(
"field1,field2,field3",
",",
{"field1": (str, ...), "field2": (int, ...), "field3": (float, 1.2)},
set(),
),
(
"field2,field3,field1",
",",
{"field1": (str, ...), "field2": (int, ...), "field3": (float, 1.2)},
set(),
),
(
"str_field|int_field|date_field|",
",",
{"str_field": (str, ...), "int_field": (int, ...), "date_field": (dt.date, dt.date.today())},
{"str_field","int_field","date_field"},
),
(
'"str_field"|"int_field"|"date_field"',
"|",
{"str_field": (str, ...), "int_field": (int, ...), "date_field": (dt.date, dt.date.today())},
set(),
),
Comment thread
georgeRobertson marked this conversation as resolved.
Outdated

],
)
def test_check_csv_header_expected(
header_row: str, delim: str, schema: type[BaseModel], expected: set[str]
):
mdl = create_model("TestModel", **schema)
with tempfile.TemporaryDirectory() as tmpdir:
fle = Path(tmpdir).joinpath(f"test_file_{uuid4().hex}.csv")
fle.open("w+").write(header_row)
res = check_csv_header_expected(fle.as_posix(), mdl, delim)
assert res == expected
Loading