-
Notifications
You must be signed in to change notification settings - Fork 16
Expand file tree
/
Copy pathdeploy.py
More file actions
608 lines (490 loc) · 20.7 KB
/
deploy.py
File metadata and controls
608 lines (490 loc) · 20.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
"""Build and publish Databus datasets (JSON-LD) from provided metadata.
This module exposes helpers to create distribution strings, compute file
information (sha256 and size), construct dataset JSON-LD payloads and
publish them to a Databus instance using the Databus publish API.
"""
import hashlib
import json
from enum import Enum
from typing import Dict, List, Optional, Tuple, Union
import requests
_debug = False
class DeployError(Exception):
"""Raised if deploy fails"""
class BadArgumentException(Exception):
"""Raised if an argument does not fit its requirements"""
class DeployLogLevel(Enum):
"""Logging levels for the Databus deploy"""
error = 0
info = 1
debug = 2
def _get_content_variants(distribution_str: str) -> Optional[Dict[str, str]]:
"""Parse content-variant key/value pairs from a distribution string.
The CLI supports passing a distribution as ``url|lang=en_type=parsed|...``.
This helper extracts the ``lang``/``type`` style key/value pairs as a
dictionary.
"""
args = distribution_str.split("|")
# cv string is ALWAYS at position 1 after the URL
# if not return empty dict and handle it separately
if len(args) < 2 or args[1].strip() == "":
return {}
cv_str = args[1].strip("_")
cvs = {}
for kv in cv_str.split("_"):
if "=" not in kv:
raise BadArgumentException(
f"Invalid content variant format: '{kv}'. Expected 'key=value' format."
)
key, value = kv.split("=")
cvs[key] = value
return cvs
def _get_filetype_definition(
distribution_str: str,
) -> Tuple[Optional[str], Optional[str]]:
"""Extract an explicit file format and compression from a distribution string.
Returns (file_extension, compression) where each may be ``None`` if the
format should be inferred from the URL path.
"""
file_ext = None
compression = None
# take everything except URL
metadata_list = distribution_str.split("|")[1:]
if len(metadata_list) == 4:
# every parameter is set
file_ext = metadata_list[-3]
compression = metadata_list[-2]
elif len(metadata_list) == 3:
# when last item is shasum:length -> only file_ext set
if ":" in metadata_list[-1]:
file_ext = metadata_list[-2]
else:
# compression and format are set
file_ext = metadata_list[-2]
compression = metadata_list[-1]
elif len(metadata_list) == 2:
# if last argument is shasum:length -> both none
if ":" in metadata_list[-1]:
pass
else:
# only format -> compression is None
file_ext = metadata_list[-1]
compression = None
elif len(metadata_list) == 1:
# let them be None to be later inferred from URL path
pass
else:
# in this case only URI is given, let all be later inferred
pass
return file_ext, compression
def _get_extensions(distribution_str: str) -> Tuple[str, str, str]:
"""Return tuple `(extension_part, format_extension, compression)`.
``extension_part`` is the textual extension appended to generated
filenames (e.g. ".ttl.gz").
"""
extension_part = ""
format_extension, compression = _get_filetype_definition(distribution_str)
if format_extension is not None:
# build the format extension (only append compression if not none)
extension_part = f".{format_extension}"
if compression is not None:
extension_part += f".{compression}"
else:
compression = "none"
return extension_part, format_extension, compression
# here we go if format not explicitly set: infer it from the path
# first set default values
format_extension = "file"
compression = "none"
# get the last segment of the URL
last_segment = str(distribution_str).split("|")[0].split("/")[-1]
# cut of fragments and split by dots
dot_splits = last_segment.split("#")[0].rsplit(".", 2)
if len(dot_splits) > 1:
# if only format is given (no compression)
format_extension = dot_splits[-1]
extension_part = f".{format_extension}"
if len(dot_splits) > 2:
# if format and compression is in the filename
compression = dot_splits[-1]
format_extension = dot_splits[-2]
extension_part = f".{format_extension}.{compression}"
return extension_part, format_extension, compression
def _get_file_stats(distribution_str: str) -> Tuple[Optional[str], Optional[int]]:
"""Parse an optional ``sha256sum:length`` tuple from a distribution string.
Returns (sha256sum, content_length) or (None, None) when not provided.
"""
metadata_list = distribution_str.split("|")[1:]
# check whether there is the shasum:length tuple separated by :
if len(metadata_list) == 0 or ":" not in metadata_list[-1]:
return None, None
last_arg_split = metadata_list[-1].split(":")
if len(last_arg_split) != 2:
raise ValueError(
f"Can't parse Argument {metadata_list[-1]}. Too many values, submit shasum and "
f"content_length in the form of shasum:length"
)
sha256sum = last_arg_split[0]
content_length = int(last_arg_split[1])
return sha256sum, content_length
def _load_file_stats(url: str) -> Tuple[str, int]:
"""Download the file at ``url`` and compute its SHA-256 and length.
This is used as a fallback when the caller did not supply checksum/size
information in the CLI or metadata file.
"""
resp = requests.get(url, timeout=30)
if resp.status_code >= 400:
raise requests.exceptions.RequestException(response=resp)
sha256sum = hashlib.sha256(bytes(resp.content)).hexdigest()
content_length = len(resp.content)
return sha256sum, content_length
def get_file_info(distribution_str: str) -> Tuple[Dict[str, str], str, str, str, int]:
"""Return parsed file information for a distribution string.
Returns a tuple `(cvs, format_extension, compression, sha256sum, size)`.
"""
cvs = _get_content_variants(distribution_str)
extension_part, format_extension, compression = _get_extensions(distribution_str)
# content_variant_part = "_".join([f"{key}={value}" for key, value in cvs.items()])
if _debug:
print("DEBUG", distribution_str, extension_part)
sha256sum, content_length = _get_file_stats(distribution_str)
if sha256sum is None or content_length is None:
__url = str(distribution_str).split("|")[0]
sha256sum, content_length = _load_file_stats(__url)
return cvs, format_extension, compression, sha256sum, content_length
def _get_file_info_from_dict(dist_dict: Dict[str, any]) -> Tuple[Dict[str, str], str, str, str, int]:
"""
Extract file info from a pre-parsed distribution dictionary.
Parameters
----------
dist_dict : dict
A dictionary with keys: url, variants, formatExtension, compression
(as returned by parse_distribution_str in cli.py)
Returns
-------
Tuple containing:
- cvs: Dict of content variants
- format_extension: File format extension
- compression: Compression type
- sha256sum: SHA-256 hash of file
- content_length: File size in bytes
"""
url = dist_dict.get("url", "")
cvs = dist_dict.get("variants", {})
format_extension = dist_dict.get("formatExtension") or "file"
compression = dist_dict.get("compression") or "none"
# Check if sha256sum and content_length are provided
sha256sum = dist_dict.get("sha256sum")
content_length = dist_dict.get("byteSize")
# If not provided, load from URL
if sha256sum is None or content_length is None:
sha256sum, content_length = _load_file_stats(url)
return cvs, format_extension, compression, sha256sum, content_length
def create_distribution(
url: str,
cvs: Dict[str, str],
file_format: str = None,
compression: str = None,
sha256_length_tuple: Tuple[str, int] = None,
) -> str:
"""Creates the identifier-string for a distribution used as downloadURLs in the createDataset function.
url: is the URL of the dataset
cvs: dict of content variants identifying a certain distribution (needs to be unique for each distribution in the dataset)
file_format: identifier for the file format (e.g. json). If set to None client tries to infer it from the path
compression: identifier for the compression format (e.g. gzip). If set to None client tries to infer it from the path
sha256_length_tuple: sha256sum and content_length of the file in the form of Tuple[shasum, length].
If left out file will be downloaded extra and calculated.
"""
meta_string = "_".join([f"{key}={value}" for key, value in cvs.items()])
# check whether to add the custom file format
if file_format is not None:
meta_string += f"|{file_format}"
# check whether to add the custom compression string
if compression is not None:
meta_string += f"|{compression}"
# add shasum and length if present
if sha256_length_tuple is not None:
sha256sum, content_length = sha256_length_tuple
meta_string += f"|{sha256sum}:{content_length}"
return f"{url}|{meta_string}"
def _create_distributions_from_metadata(
metadata: List[Dict[str, Union[str, int]]],
) -> List[str]:
"""
Create distributions from metadata entries.
Parameters
----------
metadata : List[Dict[str, Union[str, int]]]
List of metadata entries, each containing:
- checksum: str - SHA-256 hex digest (64 characters)
- size: int - File size in bytes (positive integer)
- url: str - Download URL for the file
- file_format: str - File format of the file [optional]
- compression: str - Compression format of the file [optional]
Returns
-------
List[str]
List of distribution identifier strings for use with create_dataset
"""
distributions = []
counter = 0
for entry in metadata:
# Validate required keys
required_keys = ["checksum", "size", "url"]
missing_keys = [key for key in required_keys if key not in entry]
if missing_keys:
raise ValueError(f"Metadata entry missing required keys: {missing_keys}")
checksum = entry["checksum"]
size = entry["size"]
url = entry["url"]
if not isinstance(size, int) or size <= 0:
raise ValueError(
f"Invalid size for {url}: expected positive integer, got {size}"
)
# Validate SHA-256 hex digest (64 hex chars)
if (
not isinstance(checksum, str)
or len(checksum) != 64
or not all(c in "0123456789abcdefABCDEF" for c in checksum)
):
raise ValueError(f"Invalid checksum for {url}")
distributions.append(
create_distribution(
url=url,
cvs={"count": f"{counter}"},
file_format=entry.get("file_format"),
compression=entry.get("compression"),
sha256_length_tuple=(checksum, size),
)
)
counter += 1
return distributions
def create_dataset(
version_id: str,
artifact_version_title: str,
artifact_version_abstract: str,
artifact_version_description: str,
license_url: str,
distributions: Union[List[str], List[Dict]],
attribution: str = None,
derived_from: str = None,
group_title: str = None,
group_abstract: str = None,
group_description: str = None,
) -> Dict[str, Union[List[Dict[str, Union[bool, str, int, float, List]]], str]]:
"""
Creates a Databus Dataset as a python dict from distributions and submitted metadata. WARNING: If file stats (sha256sum, content length)
were not submitted, the client loads the files and calculates them. This can potentially take a lot of time, depending on the file size.
The result can be transformed to a JSON-LD by calling json.dumps(dataset).
Parameters
----------
version_id: str
The version ID representing the Dataset. Needs to be in the form of $DATABUS_BASE/$ACCOUNT/$GROUP/$ARTIFACT/$VERSION
artifact_version_title: str
Artifact & Version Title: used for BOTH artifact and version. Keep stable across releases; identifies the data series.
artifact_version_abstract: str
Artifact & Version Abstract: used for BOTH artifact and version (max 200 chars). Updating it changes both artifact and version metadata.
artifact_version_description: str
Artifact & Version Description: used for BOTH artifact and version. Supports Markdown. Updating it changes both artifact and version metadata.
license_url: str
The license of the dataset as a URI.
distributions: Union[List[str], List[Dict]]
Distribution information. Can be either:
- List[str]: Legacy format with pipe-separated strings (created by create_distribution function)
- List[Dict]: Pre-parsed dictionaries with keys: url, variants, formatExtension, compression
attribution: str
OPTIONAL! The attribution information for the Dataset
derived_from: str
OPTIONAL! Short text explain what the dataset was
group_title: str
OPTIONAL! Metadata for the Group: Title. NOTE: Is only used if all group metadata is set
group_abstract: str
OPTIONAL! Metadata for the Group: Abstract. NOTE: Is only used if all group metadata is set
group_description: str
OPTIONAL! Metadata for the Group: Description. NOTE: Is only used if all group metadata is set
"""
_versionId = str(version_id).strip("/")
parts = _versionId.rsplit("/", 4)
if len(parts) < 5:
raise BadArgumentException(
f"Invalid version_id format: '{version_id}'. "
f"Expected format: <BASE>/<ACCOUNT>/<GROUP>/<ARTIFACT>/<VERSION>"
)
_, _account_name, _group_name, _artifact_name, version = parts
# could be build from stuff above,
# was not sure if there are edge cases BASE=http://databus.example.org/"base"/...
group_id = _versionId.rsplit("/", 2)[0]
artifact_id = _versionId.rsplit("/", 1)[0]
distribution_list = []
for dst in distributions:
# Check if distribution is a pre-parsed dict or a legacy string
if isinstance(dst, dict):
# New format: pre-parsed dictionary from parse_distribution_str()
__url = dst.get("url", "")
(
cvs,
formatExtension,
compression,
sha256sum,
content_length,
) = _get_file_info_from_dict(dst)
else:
# Legacy format: pipe-separated string
__url = str(dst).split("|")[0]
(
cvs,
formatExtension,
compression,
sha256sum,
content_length,
) = get_file_info(dst)
if not cvs and len(distributions) > 1:
raise BadArgumentException(
"If there are more than one file in the dataset, the files must be annotated "
"with content variants"
)
entity = {
"@type": "Part",
"formatExtension": formatExtension,
"compression": compression,
"downloadURL": __url,
"byteSize": content_length,
"sha256sum": sha256sum,
}
# set content variants
for key, value in cvs.items():
entity[f"dcv:{key}"] = value
distribution_list.append(entity)
graphs = []
# only add the group graph if the necessary group properties are set
if None not in [group_title, group_description, group_abstract]:
group_dict = {
"@id": group_id,
"@type": "Group",
}
# add group metadata if set, else it can be left out
for k, val in [
("title", group_title),
("abstract", group_abstract),
("description", group_description),
]:
group_dict[k] = val
graphs.append(group_dict)
# add the artifact graph
artifact_graph = {
"@id": artifact_id,
"@type": "Artifact",
"title": artifact_version_title,
"abstract": artifact_version_abstract,
"description": artifact_version_description,
}
graphs.append(artifact_graph)
# add the dataset graph
dataset_graph = {
"@type": ["Version", "Dataset"],
"@id": _versionId,
"hasVersion": version,
"title": artifact_version_title,
"abstract": artifact_version_abstract,
"description": artifact_version_description,
"license": license_url,
"distribution": distribution_list,
}
def append_to_dataset_graph_if_existent(add_key: str, add_value: str):
if add_value is not None:
dataset_graph[add_key] = add_value
append_to_dataset_graph_if_existent("attribution", attribution)
append_to_dataset_graph_if_existent("wasDerivedFrom", derived_from)
graphs.append(dataset_graph)
dataset = {
"@context": "https://downloads.dbpedia.org/databus/context.jsonld",
"@graph": graphs,
}
return dataset
def deploy(
dataid: Dict[str, Union[List[Dict[str, Union[bool, str, int, float, List]]], str]],
api_key: str,
verify_parts: bool = False,
log_level: DeployLogLevel = DeployLogLevel.debug,
debug: bool = False,
) -> None:
"""Deploys a dataset to the databus. The endpoint is inferred from the DataID identifier.
Parameters
----------
dataid: Dict[str, Union[List[Dict[str, Union[bool, str, int, float, List]]], str]]
The dataid represented as a python dict. Preferably created by the creaateDataset function
api_key: str
the API key of the user noted in the Dataset identifier
verify_parts: bool
flag of the publish POST request, prevents the databus from checking shasum and content length (is already handled by the client, reduces load on the Databus). Default is False
log_level: DeployLogLevel
log level of the deploy output
debug: bool
controls whether output shold be printed to the console (stdout)
"""
headers = {"X-API-KEY": f"{api_key}", "Content-Type": "application/json"}
data = json.dumps(dataid)
try:
base = "/".join(dataid["@graph"][0]["@id"].split("/")[0:3])
except (KeyError, IndexError, TypeError) as e:
raise DeployError(f"Invalid dataid structure: {e}")
api_uri = (
base
+ f"/api/publish?verify-parts={str(verify_parts).lower()}&log-level={log_level.name}"
)
resp = requests.post(api_uri, data=data, headers=headers, timeout=30)
if debug or _debug:
try:
dataset_uri = dataid["@graph"][0]["@id"]
except (KeyError, IndexError, TypeError) as e:
raise DeployError(f"Invalid dataid structure: {e}")
print(f"Trying submitting data to {dataset_uri}:")
print(data)
if resp.status_code != 200:
raise DeployError(f"Could not deploy dataset to databus. Reason: '{resp.text}'")
if debug or _debug:
print("---------")
print(resp.text)
def deploy_from_metadata(
metadata: List[Dict[str, Union[str, int]]],
version_id: str,
artifact_version_title: str,
artifact_version_abstract: str,
artifact_version_description: str,
license_url: str,
apikey: str,
) -> None:
"""
Deploy a dataset from metadata entries.
Parameters
----------
metadata : List[Dict[str, Union[str, int]]]
List of file metadata entries (see _create_distributions_from_metadata)
version_id : str
Dataset version ID in the form $DATABUS_BASE/$ACCOUNT/$GROUP/$ARTIFACT/$VERSION
artifact_version_title : str
Artifact & Version Title: used for BOTH artifact and version.
artifact_version_abstract : str
Artifact & Version Abstract: used for BOTH artifact and version.
artifact_version_description : str
Artifact & Version Description: used for BOTH artifact and version.
license_url : str
License URI
apikey : str
API key for authentication
"""
distributions = _create_distributions_from_metadata(metadata)
dataset = create_dataset(
version_id=version_id,
artifact_version_title=artifact_version_title,
artifact_version_abstract=artifact_version_abstract,
artifact_version_description=artifact_version_description,
license_url=license_url,
distributions=distributions,
)
print(f"Deploying dataset version: {version_id}")
deploy(dataset, apikey)
print(f"Successfully deployed to {version_id}")
print(f"Deployed {len(metadata)} file(s):")
for entry in metadata:
print(f" - {entry['url']}")