Skip to content

Commit b5bdf84

Browse files
committed
[_746] a reasonable default sort function for data replicas
1 parent f3a4fa7 commit b5bdf84

2 files changed

Lines changed: 36 additions & 16 deletions

File tree

irods/data_object.py

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,9 @@
1+
import ast
2+
import datetime
13
import io
2-
import sys
34
import logging
45
import os
5-
import ast
6+
import sys
67

78
from irods.models import DataObject
89
from irods.meta import iRODSMetaCollection
@@ -41,8 +42,22 @@ def __repr__(self):
4142
return "<{}.{} {}>".format(self.__class__.__module__, self.__class__.__name__, self.resource_name)
4243

4344

45+
_REPL_STATUSES = (1, 0, 2, 3, 4)
46+
_REFERENCE_DATETIME = datetime.datetime(1970, 1, 1, 0, 0, 0, tzinfo=datetime.timezone.utc)
47+
48+
def _DEFAULT_SORT_KEY_FN(row):
49+
repl_status = int(row[DataObject.replica_status])
50+
51+
repl_status_rank = (
52+
_REPL_STATUSES.index(repl_status) if _REPL_STATUSES.count(repl_status)
53+
else sys.maxsize
54+
)
55+
56+
return (repl_status_rank, _REFERENCE_DATETIME - row[DataObject.modify_time])
57+
58+
4459
class iRODSDataObject:
45-
def __init__(self, manager, parent=None, results=None):
60+
def __init__(self, manager, parent=None, results=None, replica_sort_function=None):
4661
self.manager = manager
4762
if parent and results:
4863
self.collection = parent
@@ -54,7 +69,7 @@ def __init__(self, manager, parent=None, results=None):
5469
# backward compatibility with older schema versions
5570
pass
5671
self.path = self.collection.path + "/" + self.name
57-
replicas = sorted(results, key=lambda r: r[DataObject.replica_number])
72+
replicas = sorted(results, key=(replica_sort_function or _DEFAULT_SORT_KEY_FN))
5873

5974
# The status quo before iRODS 5
6075

irods/manager/data_object_manager.py

Lines changed: 17 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -233,12 +233,12 @@ def _download(self, obj, local_path, num_threads, updatables=(), **options):
233233
raise ex.OVERWRITE_WITHOUT_FORCE_FLAG
234234

235235
data_open_returned_values_ = {}
236-
with self.open(obj, "r", returned_values=data_open_returned_values_, **options) as o:
236+
with self.open(obj_path, "r", returned_values=data_open_returned_values_, **options) as o:
237237
if self.should_parallelize_transfer(num_threads, o, open_options=options.items()):
238238
error = RuntimeError("parallel get failed")
239239
try:
240240
if not self.parallel_get(
241-
(obj, o),
241+
(obj_path, o),
242242
local_file,
243243
num_threads=num_threads,
244244
target_resource_name=options.get(kw.RESC_NAME_KW, ""),
@@ -265,6 +265,8 @@ def get(self, path, local_path=None, num_threads=DEFAULT_NUMBER_OF_THREADS, upda
265265
"""
266266
parent = self.sess.collections.get(irods_dirname(path))
267267

268+
replica_sort_function = options.pop('replica_sort_function', None)
269+
268270
# TODO: optimize
269271
if local_path:
270272
self._download(path, local_path, num_threads=num_threads, updatables=updatables, **options)
@@ -284,7 +286,7 @@ def get(self, path, local_path=None, num_threads=DEFAULT_NUMBER_OF_THREADS, upda
284286
results = query.all() # get up to max_rows replicas
285287
if len(results) <= 0:
286288
raise ex.DataObjectDoesNotExist()
287-
return iRODSDataObject(self, parent, results)
289+
return iRODSDataObject(self, parent, results, replica_sort_function=replica_sort_function)
288290

289291
@staticmethod
290292
def _resolve_force_put_option(options, default_setting=None, true_value=""):
@@ -317,23 +319,25 @@ def put(
317319
self._resolve_force_put_option(options, default_setting=client_config.data_objects.force_put_by_default)
318320

319321
if self.sess.collections.exists(irods_path):
320-
obj = iRODSCollection.normalize_path(irods_path, os.path.basename(local_path))
322+
obj_path = iRODSCollection.normalize_path(irods_path, os.path.basename(local_path))
321323
else:
322-
obj = irods_path
323-
if kw.FORCE_FLAG_KW not in options and self.exists(obj):
324+
obj_path = irods_path
325+
if kw.FORCE_FLAG_KW not in options and self.exists(obj_path):
324326
raise ex.OVERWRITE_WITHOUT_FORCE_FLAG
325327
options.pop(kw.FORCE_FLAG_KW, None)
326328

329+
replica_sort_function = options.pop('replica_sort_function',None)
330+
327331
with open(local_path, "rb") as f:
328332
sizelist = []
329333
if self.should_parallelize_transfer(num_threads, f, measured_obj_size=sizelist, open_options=options):
330-
o = deferred_call(self.open, (obj, "w"), options)
334+
o = deferred_call(self.open, (obj_path, "w"), options)
331335
f.close()
332336
error = RuntimeError("parallel put failed")
333337
try:
334338
if not self.parallel_put(
335339
local_path,
336-
(obj, o),
340+
(obj_path, o),
337341
total_bytes=sizelist[0],
338342
num_threads=num_threads,
339343
target_resource_name=options.get(kw.RESC_NAME_KW, "") or options.get(kw.DEST_RESC_NAME_KW, ""),
@@ -346,7 +350,7 @@ def put(
346350
except BaseException as e:
347351
raise error from e
348352
else:
349-
with self.open(obj, "w", **options) as o:
353+
with self.open(obj_path, "w", **options) as o:
350354
# Set operation type to trigger acPostProcForPut
351355
if kw.OPR_TYPE_KW not in options:
352356
options[kw.OPR_TYPE_KW] = 1 # PUT_OPR
@@ -360,10 +364,10 @@ def put(
360364
# Requested to register checksum without verifying, but source replica has a checksum. This can result
361365
# in multiple replicas being marked good with different checksums, which is an inconsistency.
362366
del repl_options[kw.REG_CHKSUM_KW]
363-
self.replicate(obj, **repl_options)
367+
self.replicate(obj_path, **repl_options)
364368

365369
if return_data_object:
366-
return self.get(obj)
370+
return self.get(obj_path, replica_sort_function=replica_sort_function)
367371

368372
def chksum(self, path, **options):
369373
"""
@@ -480,6 +484,7 @@ def create(
480484
raise ex.DataObjectExistsAtLogicalPath
481485

482486
options = {**options, kw.DATA_TYPE_KW: "generic"}
487+
replica_sort_function = options.pop('replica_sort_function',None)
483488

484489
if resource:
485490
options[kw.DEST_RESC_NAME_KW] = resource
@@ -508,7 +513,7 @@ def create(
508513
desc = response.int_info
509514
conn.close_file(desc)
510515

511-
return self.get(path)
516+
return self.get(path, replica_sort_function=replica_sort_function)
512517

513518
def open_with_FileRaw(self, *arg, **kw_options):
514519
holder = []

0 commit comments

Comments
 (0)