Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci_testing.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ jobs:
- name: Install extra dependencies for a python install
run: |
sudo apt-get update
sudo apt -y install --no-install-recommends liblzma-dev libbz2-dev libreadline-dev
sudo apt -y install --no-install-recommends liblzma-dev libbz2-dev libreadline-dev libxml2-utils

- name: Install asdf cli
uses: asdf-vm/actions/setup@v4
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,12 @@ class DuckDBCSVReader(BaseFileReader):
# TODO - stringify or not
def __init__(
self,
*,
header: bool = True,
delim: str = ",",
quotechar: str = '"',
connection: Optional[DuckDBPyConnection] = None,
**_,
):
self.header = header
self.delim = delim
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,12 @@
class DuckDBJSONReader(BaseFileReader):
"""A reader for JSON files"""

def __init__(self, json_format: Optional[str] = "array"):
def __init__(
self,
*,
json_format: Optional[str] = "array",
**_,
):
self._json_format = json_format

super().__init__()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from pydantic import BaseModel

from dve.core_engine.backends.base.reader import read_function
from dve.core_engine.backends.exceptions import MessageBearingError
from dve.core_engine.backends.implementations.duckdb.duckdb_helpers import duckdb_write_parquet
from dve.core_engine.backends.readers.xml import XMLStreamReader
from dve.core_engine.backends.utilities import get_polars_type_from_annotation, stringify_model
Expand All @@ -18,13 +19,21 @@
class DuckDBXMLStreamReader(XMLStreamReader):
"""A reader for XML files"""

def __init__(self, ddb_connection: Optional[DuckDBPyConnection] = None, **kwargs):
def __init__(self, *, ddb_connection: Optional[DuckDBPyConnection] = None, **kwargs):
self.ddb_connection = ddb_connection if ddb_connection else default_connection
super().__init__(**kwargs)

@read_function(DuckDBPyRelation)
def read_to_relation(self, resource: URI, entity_name: str, schema: type[BaseModel]):
"""Returns a relation object from the source xml"""
if self.xsd_location:
msg = self._run_xmllint(file_uri=resource)
if msg:
raise MessageBearingError(
"Submitted file failed XSD validation.",
messages=[msg],
)

polars_schema: dict[str, pl.DataType] = { # type: ignore
fld.name: get_polars_type_from_annotation(fld.annotation)
for fld in stringify_model(schema).__fields__.values()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ def __init__(
multi_line: bool = False,
encoding: str = "utf-8-sig",
spark_session: Optional[SparkSession] = None,
**_,
) -> None:

self.delimiter = delimiter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ def __init__(
encoding: Optional[str] = "utf-8",
multi_line: Optional[bool] = True,
spark_session: Optional[SparkSession] = None,
**_,
) -> None:

self.encoding = encoding
Expand Down
41 changes: 31 additions & 10 deletions src/dve/core_engine/backends/implementations/spark/readers/xml.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,17 @@
from pyspark.sql.utils import AnalysisException
from typing_extensions import Literal

from dve.core_engine.backends.base.reader import BaseFileReader, read_function
from dve.core_engine.backends.base.reader import read_function
from dve.core_engine.backends.exceptions import EmptyFileError
from dve.core_engine.backends.implementations.spark.spark_helpers import (
df_is_empty,
get_type_from_annotation,
spark_write_parquet,
)
from dve.core_engine.backends.readers.xml import XMLStreamReader
from dve.core_engine.backends.readers.xml import BasicXMLFileReader, XMLStreamReader
from dve.core_engine.backends.utilities import dump_errors
from dve.core_engine.type_hints import URI, EntityName
from dve.parser.file_handling import get_content_length
from dve.parser.file_handling import get_content_length, get_parent
from dve.parser.file_handling.service import open_stream

SparkXMLMode = Literal["PERMISSIVE", "FAILFAST", "DROPMALFORMED"]
Expand Down Expand Up @@ -52,39 +53,51 @@


@spark_write_parquet
class SparkXMLReader(BaseFileReader): # pylint: disable=too-many-instance-attributes
class SparkXMLReader(BasicXMLFileReader): # pylint: disable=too-many-instance-attributes
"""A reader for XML files built atop Spark-XML."""

def __init__(
self,
*,
record_tag: str,
root_tag: Optional[str] = None,
spark_session: Optional[SparkSession] = None,
sampling_ratio: int = 1,
exclude_attribute: bool = True,
mode: SparkXMLMode = "PERMISSIVE",
infer_schema: bool = False,
ignore_namespace: bool = True,
null_values: Collection[str] = frozenset(("NULL", "null", "")),
sanitise_multiline: bool = True,
namespace=None,
trim_cells=True,
xsd_location: Optional[URI] = None,
xsd_error_code: Optional[str] = None,
xsd_error_message: Optional[str] = None,
rules_location: Optional[URI] = None,
**_,

Check warning on line 78 in src/dve/core_engine/backends/implementations/spark/readers/xml.py

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Method "__init__" has 17 parameters, which is greater than the 13 authorized.

See more on https://sonarcloud.io/project/issues?id=NHSDigital_data-validation-engine&issues=AZrA-ejpT-V0F1DOJ4tF&open=AZrA-ejpT-V0F1DOJ4tF&pullRequest=15
) -> None:
self.record_tag = record_tag

super().__init__(
record_tag=record_tag,
root_tag=root_tag,
trim_cells=trim_cells,
null_values=null_values,
sanitise_multiline=sanitise_multiline,
Comment thread
georgeRobertson marked this conversation as resolved.
xsd_location=xsd_location,
xsd_error_code=xsd_error_code,
xsd_error_message=xsd_error_message,
rules_location=rules_location,
)

self.spark_session = spark_session or SparkSession.builder.getOrCreate()
self.sampling_ratio = sampling_ratio
self.exclude_attribute = exclude_attribute
self.mode = mode
self.infer_schema = infer_schema
self.ignore_namespace = ignore_namespace
self.root_tag = root_tag
self.sanitise_multiline = sanitise_multiline
self.null_values = null_values
self.namespace = namespace
self.trim_cells = trim_cells
super().__init__()

def read_to_py_iterator(
self, resource: URI, entity_name: EntityName, schema: type[BaseModel]
Expand All @@ -106,6 +119,14 @@
if get_content_length(resource) == 0:
raise EmptyFileError(f"File at {resource} is empty.")

if self.xsd_location:
msg = self._run_xmllint(file_uri=resource)
if msg:
working_folder = get_parent(resource)
dump_errors(
working_folder=working_folder, step_name="file_transformation", messages=[msg]
)

spark_schema: StructType = get_type_from_annotation(schema)
kwargs = {
"rowTag": self.record_tag,
Expand Down Expand Up @@ -143,7 +164,7 @@
kwargs["rowTag"] = f"{namespace}:{self.record_tag}"
df = (
self.spark_session.read.format("xml")
.options(**kwargs)
.options(**kwargs) # type: ignore
.load(resource, schema=read_schema)
)
if self.root_tag and df.columns:
Expand Down
1 change: 1 addition & 0 deletions src/dve/core_engine/backends/readers/csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ def __init__(
trim_cells: bool = True,
null_values: Collection[str] = frozenset({"NULL", "null", ""}),
encoding: str = "utf-8-sig",
**_,
):
"""Init function for the base CSV reader.

Expand Down
33 changes: 32 additions & 1 deletion src/dve/core_engine/backends/readers/xml.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@

from dve.core_engine.backends.base.reader import BaseFileReader
from dve.core_engine.backends.exceptions import EmptyFileError
from dve.core_engine.backends.readers.xml_linting import run_xmllint
from dve.core_engine.backends.utilities import get_polars_type_from_annotation, stringify_model
from dve.core_engine.loggers import get_logger
from dve.core_engine.message import FeedbackMessage
from dve.core_engine.type_hints import URI, EntityName
from dve.parser.file_handling import NonClosingTextIOWrapper, get_content_length, open_stream
from dve.parser.file_handling.implementations.file import (
Expand Down Expand Up @@ -101,7 +103,7 @@ def clear(self) -> None:
def __iter__(self) -> Iterator["XMLElement"]: ...


class BasicXMLFileReader(BaseFileReader):
class BasicXMLFileReader(BaseFileReader): # pylint: disable=R0902
"""A reader for XML files built atop LXML."""

def __init__(
Expand All @@ -114,6 +116,10 @@ def __init__(
sanitise_multiline: bool = True,
encoding: str = "utf-8-sig",
n_records_to_read: Optional[int] = None,
xsd_location: Optional[URI] = None,
xsd_error_code: Optional[str] = None,
xsd_error_message: Optional[str] = None,
rules_location: Optional[URI] = None,
**_,
):
"""Init function for the base XML reader.
Expand Down Expand Up @@ -148,6 +154,15 @@ def __init__(
"""Encoding of the XML file."""
self.n_records_to_read = n_records_to_read
"""The maximum number of records to read from a document."""
if rules_location is not None and xsd_location is not None:
self.xsd_location = rules_location + xsd_location
else:
self.xsd_location = xsd_location # type: ignore
"""The URI of the xsd file if wishing to perform xsd validation."""
self.xsd_error_code = xsd_error_code
"""The error code to be reported if xsd validation fails (if xsd)"""
self.xsd_error_message = xsd_error_message
"""The error message to be reported if xsd validation fails"""
super().__init__()
self._logger = get_logger(__name__)

Expand Down Expand Up @@ -259,6 +274,22 @@ def _parse_xml(
for element in elements:
yield self._parse_element(element, template_row)

def _run_xmllint(self, file_uri: URI) -> FeedbackMessage | None:
"""Run xmllint package to validate against a given xsd. Requires xmlint to be installed
onto the system to run succesfully."""
if self.xsd_location is None:
raise AttributeError("Trying to run XML lint with no `xsd_location` provided.")
if self.xsd_error_code is None:
raise AttributeError("Trying to run XML with no `xsd_error_code` provided.")
if self.xsd_error_message is None:
raise AttributeError("Trying to run XML with no `xsd_error_message` provided.")
return run_xmllint(
file_uri=file_uri,
schema_uri=self.xsd_location,
error_code=self.xsd_error_code,
error_message=self.xsd_error_message,
)

def read_to_py_iterator(
self,
resource: URI,
Expand Down
Loading