@@ -233,12 +233,12 @@ def _download(self, obj, local_path, num_threads, updatables=(), **options):
233233 raise ex .OVERWRITE_WITHOUT_FORCE_FLAG
234234
235235 data_open_returned_values_ = {}
236- with self .open (obj , "r" , returned_values = data_open_returned_values_ , ** options ) as o :
236+ with self .open (obj_path , "r" , returned_values = data_open_returned_values_ , ** options ) as o :
237237 if self .should_parallelize_transfer (num_threads , o , open_options = options .items ()):
238238 error = RuntimeError ("parallel get failed" )
239239 try :
240240 if not self .parallel_get (
241- (obj , o ),
241+ (obj_path , o ),
242242 local_file ,
243243 num_threads = num_threads ,
244244 target_resource_name = options .get (kw .RESC_NAME_KW , "" ),
@@ -265,6 +265,8 @@ def get(self, path, local_path=None, num_threads=DEFAULT_NUMBER_OF_THREADS, upda
265265 """
266266 parent = self .sess .collections .get (irods_dirname (path ))
267267
268+ replica_sort_function = options .pop ('replica_sort_function' , None )
269+
268270 # TODO: optimize
269271 if local_path :
270272 self ._download (path , local_path , num_threads = num_threads , updatables = updatables , ** options )
@@ -284,7 +286,7 @@ def get(self, path, local_path=None, num_threads=DEFAULT_NUMBER_OF_THREADS, upda
284286 results = query .all () # get up to max_rows replicas
285287 if len (results ) <= 0 :
286288 raise ex .DataObjectDoesNotExist ()
287- return iRODSDataObject (self , parent , results )
289+ return iRODSDataObject (self , parent , results , replica_sort_function = replica_sort_function )
288290
289291 @staticmethod
290292 def _resolve_force_put_option (options , default_setting = None , true_value = "" ):
@@ -317,23 +319,25 @@ def put(
317319 self ._resolve_force_put_option (options , default_setting = client_config .data_objects .force_put_by_default )
318320
319321 if self .sess .collections .exists (irods_path ):
320- obj = iRODSCollection .normalize_path (irods_path , os .path .basename (local_path ))
322+ obj_path = iRODSCollection .normalize_path (irods_path , os .path .basename (local_path ))
321323 else :
322- obj = irods_path
323- if kw .FORCE_FLAG_KW not in options and self .exists (obj ):
324+ obj_path = irods_path
325+ if kw .FORCE_FLAG_KW not in options and self .exists (obj_path ):
324326 raise ex .OVERWRITE_WITHOUT_FORCE_FLAG
325327 options .pop (kw .FORCE_FLAG_KW , None )
326328
329+ replica_sort_function = options .pop ('replica_sort_function' ,None )
330+
327331 with open (local_path , "rb" ) as f :
328332 sizelist = []
329333 if self .should_parallelize_transfer (num_threads , f , measured_obj_size = sizelist , open_options = options ):
330- o = deferred_call (self .open , (obj , "w" ), options )
334+ o = deferred_call (self .open , (obj_path , "w" ), options )
331335 f .close ()
332336 error = RuntimeError ("parallel put failed" )
333337 try :
334338 if not self .parallel_put (
335339 local_path ,
336- (obj , o ),
340+ (obj_path , o ),
337341 total_bytes = sizelist [0 ],
338342 num_threads = num_threads ,
339343 target_resource_name = options .get (kw .RESC_NAME_KW , "" ) or options .get (kw .DEST_RESC_NAME_KW , "" ),
@@ -346,7 +350,7 @@ def put(
346350 except BaseException as e :
347351 raise error from e
348352 else :
349- with self .open (obj , "w" , ** options ) as o :
353+ with self .open (obj_path , "w" , ** options ) as o :
350354 # Set operation type to trigger acPostProcForPut
351355 if kw .OPR_TYPE_KW not in options :
352356 options [kw .OPR_TYPE_KW ] = 1 # PUT_OPR
@@ -360,10 +364,10 @@ def put(
360364 # Requested to register checksum without verifying, but source replica has a checksum. This can result
361365 # in multiple replicas being marked good with different checksums, which is an inconsistency.
362366 del repl_options [kw .REG_CHKSUM_KW ]
363- self .replicate (obj , ** repl_options )
367+ self .replicate (obj_path , ** repl_options )
364368
365369 if return_data_object :
366- return self .get (obj )
370+ return self .get (obj_path , replica_sort_function = replica_sort_function )
367371
368372 def chksum (self , path , ** options ):
369373 """
@@ -480,6 +484,7 @@ def create(
480484 raise ex .DataObjectExistsAtLogicalPath
481485
482486 options = {** options , kw .DATA_TYPE_KW : "generic" }
487+ replica_sort_function = options .pop ('replica_sort_function' ,None )
483488
484489 if resource :
485490 options [kw .DEST_RESC_NAME_KW ] = resource
@@ -508,7 +513,7 @@ def create(
508513 desc = response .int_info
509514 conn .close_file (desc )
510515
511- return self .get (path )
516+ return self .get (path , replica_sort_function = replica_sort_function )
512517
513518 def open_with_FileRaw (self , * arg , ** kw_options ):
514519 holder = []
0 commit comments