-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathreader.py
More file actions
176 lines (139 loc) · 6.38 KB
/
reader.py
File metadata and controls
176 lines (139 loc) · 6.38 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
"""Abstract implementation of the file parser."""
from abc import ABC, abstractmethod
from collections.abc import Iterator
from inspect import ismethod
from typing import Any, ClassVar, Optional, TypeVar
from pydantic import BaseModel
from typing_extensions import Protocol
from dve.core_engine.backends.exceptions import MessageBearingError, ReaderLacksEntityTypeSupport
from dve.core_engine.backends.types import EntityName, EntityType
from dve.core_engine.message import FeedbackMessage
from dve.core_engine.type_hints import URI, ArbitraryFunction, WrapDecorator
from dve.parser.file_handling.service import open_stream
T = TypeVar("T")
ET_co = TypeVar("ET_co", covariant=True)
# This needs to be defined outside the class since otherwise mypy expects
# BaseFileReader to be generic:
_ReadFunctions = dict[type[T], "_UnboundReadFunction[T]"]
"""A convenience type indicating a mapping from type to reader function."""
_ENTITY_TYPE_ATTR_NAME = "_read_func_entity_type"
"""The name of the read function's entity type annotation attribute."""
class _UnboundReadFunction(Protocol[ET_co]): # pylint: disable=too-few-public-methods
"""The protocol required to implement a read function for a new entity type."""
@staticmethod
def __call__( # pylint: disable=bad-staticmethod-argument
self: "BaseFileReader", # This is the protocol for an _unbound_ method.
resource: URI,
entity_name: EntityName,
schema: type[BaseModel],
) -> ET_co: ...
def read_function(entity_type: T) -> WrapDecorator:
"""A decorator function which tags read function methods within a reader class.
This is used to add support for different entity types in reader implementations.
"""
def reader_impl_decorator(func: ArbitraryFunction) -> ArbitraryFunction:
"""Wrap a read function to tag the entity type it implements support for."""
setattr(func, _ENTITY_TYPE_ATTR_NAME, entity_type)
return func
return reader_impl_decorator
class BaseFileReader(ABC):
"""An abstract representation of a reader for some file type."""
__read_methods__: ClassVar[_ReadFunctions] = {}
"""
A dictionary mapping implemented entity types to their read functions.
This enables readers to implement optimised support for specific entity
types (rather than relying on the data contract having an optimised implementation,
or on the 'fallback' via a Python iterator.
This is set and populated in `__init_subclass__` by identifying methods
decorated with the '@read_function' decorator, and is used in `read_entity_type`.
"""
def __init_subclass__(cls, *_, **__) -> None:
"""When this class is subclassed, create and populate the `__read_methods__`
class variable for the subclass.
"""
cls.__read_methods__ = {}
for attr_name in dir(cls):
method = getattr(cls, attr_name, None)
if not (ismethod(method) or callable(method)):
continue
entity_type: Optional[type] = getattr(method, _ENTITY_TYPE_ATTR_NAME, None)
if entity_type is None:
continue
cls.__read_methods__[entity_type] = method # type: ignore
@abstractmethod
def read_to_py_iterator(
self,
resource: URI,
entity_name: EntityName,
schema: type[BaseModel],
) -> Iterator[dict[str, Any]]:
"""Iterate through the contents of the resource, yielding dicts
representing each record.
NOTE: Simple types should either be returned as strings (if present) or
`None`. Format validation, casting, and parsing should be done in the
data contract.
"""
raise NotImplementedError
def read_to_entity_type(
self,
entity_type: type[EntityType],
resource: URI,
entity_name: EntityName,
schema: type[BaseModel],
) -> EntityType:
"""Read to the specified entity type, if supported.
NOTE: Simple types should either be returned as strings (if present) or
`None`. Format validation, casting, and parsing should be done in the
data contract.
"""
if entity_name == Iterator[dict[str, Any]]:
return self.read_to_py_iterator(resource, entity_name, schema) # type: ignore
self.raise_if_not_sensible_file(resource, entity_name)
try:
reader_func = self.__read_methods__[entity_type]
except KeyError as err:
raise ReaderLacksEntityTypeSupport(entity_type=entity_type) from err
return reader_func(self, resource, entity_name, schema)
def write_parquet(
self,
entity: EntityType,
target_location: URI,
schema: Optional[type[BaseModel]] = None,
**kwargs,
) -> URI:
"""Write entity to parquet.
NOTE: Simple types should be cast as strings (if present) or None.
If schema supplied then all simple types will be coerced to strings.
"""
raise NotImplementedError(f"write_parquet not implemented in {self.__class__}")
@staticmethod
def _check_likely_text_file(resource: URI) -> bool:
"""Quick sense check of file to see if it looks like text
- not 100% full proof, but hopefully enough to weed out most
non-text files"""
with open_stream(resource, "rb") as fle:
start_chunk = fle.read(4096)
# check for BOM character - utf-16 can contain NULL bytes
if start_chunk.startswith((b"\xff\xfe", b"\xfe\xff")):
return True
# if null byte in - unlikely text
if b"\x00" in start_chunk:
return False
return True
def raise_if_not_sensible_file(self, resource: URI, entity_name: str):
"""Sense check that the file is a text file. Raise error if doesn't
appear to be the case."""
if not self._check_likely_text_file(resource):
raise MessageBearingError(
"The submitted file doesn't appear to be text",
messages=[
FeedbackMessage(
entity=entity_name,
record=None,
failure_type="submission",
error_location="Whole File",
error_code="MalformedFile",
error_message="The resource doesn't seem to be a valid text file",
)
],
)