-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathutils.py
More file actions
96 lines (75 loc) · 3.33 KB
/
utils.py
File metadata and controls
96 lines (75 loc) · 3.33 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
# pylint: disable=R0903
"""Utilities to be used with services to abstract away some of the config loading and threading"""
import json
from threading import Lock
from typing import Optional
from pydantic.main import ModelMetaclass
from pyspark.sql import SparkSession
import dve.core_engine.backends.implementations.duckdb # pylint: disable=unused-import
import dve.core_engine.backends.implementations.spark # pylint: disable=unused-import
import dve.parser.file_handling as fh
from dve.core_engine.backends.readers import _READER_REGISTRY
from dve.core_engine.configuration.v1 import SchemaName, V1EngineConfig, _ModelConfig
from dve.core_engine.loggers import get_logger
from dve.core_engine.type_hints import URI, SubmissionResult
from dve.metadata_parser.model_generator import JSONtoPyd
Dataset = dict[SchemaName, _ModelConfig]
_configs: dict[str, tuple[dict[str, ModelMetaclass], V1EngineConfig, Dataset]] = {}
locks = Lock()
logger = get_logger(__name__)
def load_config(
dataset_id: str,
file_uri: URI,
) -> tuple[dict[SchemaName, ModelMetaclass], V1EngineConfig, dict[SchemaName, _ModelConfig]]:
"""Loads the configuration for a given dataset"""
if dataset_id in _configs:
return _configs[dataset_id]
with fh.open_stream(file_uri) as f:
generator = JSONtoPyd(json.load(f)["contract"])
models = generator.generate_models()
config = V1EngineConfig.load(file_uri)
dataset = config.contract.datasets
with locks:
_configs[dataset_id] = models, config, dataset
return models, config, dataset
def load_reader(dataset: Dataset, model_name: str, file_extension: str):
"""Loads the readers for the diven feed, model name and file extension"""
reader_config = dataset[model_name].reader_config[f".{file_extension.lower()}"]
reader = _READER_REGISTRY[reader_config.reader](**reader_config.kwargs_)
return reader
def unpersist_all_rdds(spark: SparkSession):
"""Unpersist any checkpointed or cached rdds to avoid memory leaks"""
for (
_,
rdd,
) in spark.sparkContext._jsc.getPersistentRDDs().items(): # type: ignore # pylint: disable=protected-access
rdd.unpersist()
def deadletter_file(source_uri: URI) -> None:
"""Move files that can't be processed to a deadletter location"""
try:
source_parent: URI = source_uri.rsplit("/", 1)[0]
deadletter_path: URI = fh.joinuri(source_parent.rsplit("/", 1)[0], "deadletter")
target_uri = fh.joinuri(deadletter_path, fh.get_file_name(source_uri))
return fh.move_resource(source_uri, target_uri)
except TypeError:
return None
class SubmissionStatus:
"""Submission status for a given submission."""
def __init__(
self,
validation_failed: bool = False,
number_of_records: Optional[int] = None,
processing_failed: bool = False,
):
self.validation_failed = validation_failed
self.number_of_records = number_of_records
self.processing_failed = processing_failed
@property
def submission_result(self) -> SubmissionResult:
"""The current submission result - assumes success if
neither validation nor processing has failed."""
if self.processing_failed:
return "processing_failed"
if self.validation_failed:
return "validation_failed"
return "success"