Skip to content

Commit 600fefe

Browse files
d-w-mooretrel
authored andcommitted
[#452] implement client redirection to resource server
Also refined session clone() method in order to implement the redirect. Note: Instead of GET_HOST_FOR_{GET|PUT}, we're now using the newer GET_RESOURCE_INFO_FOR_OPERATION api. This is primarily for discovery of the proper resource hierarchy string to apply to the OPEN operation once we've redirected to the proper server. Resulting from the use of the new api, the client redirect feature will be available only in iRODS >= 4.3.1. Slight corrections were made to eliminate errors introduced into older tests by these changes: - In test_that_connection_timeout_works__issue_377, we update a variable 'sess' reference to make sure network timeout change is applied to the proper session. - enableLogging() has moved to the irods.test.helpers module for use in tests generally.
1 parent 45e8ff0 commit 600fefe

11 files changed

Lines changed: 390 additions & 74 deletions

irods/account.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,11 @@ def __init__(self, irods_host, irods_port, irods_user_name, irods_zone_name,
55
password=None, client_user=None,
66
server_dn=None, client_zone=None, **kwargs):
77

8+
# Allowed overrides when cloning sessions. (Currently hostname only.)
9+
for k,v in kwargs.pop('_overrides',{}).items():
10+
if k =='irods_host':
11+
irods_host = v
12+
813
self.authentication_scheme = irods_authentication_scheme.lower()
914
self.host = irods_host
1015
self.port = int(irods_port)

irods/api_number.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -176,6 +176,7 @@
176176
# 1100 - 1200 - SSL API calls
177177
"SSL_START_AN": 1100,
178178
"SSL_END_AN": 1101,
179+
"GET_RESOURCE_INFO_FOR_OPERATION_AN":10220,
179180
"ATOMIC_APPLY_METADATA_OPERATIONS_APN": 20002,
180181
"GET_FILE_DESCRIPTOR_INFO_APN": 20000,
181182
"REPLICA_CLOSE_APN": 20004

irods/data_object.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,8 @@ class iRODSDataObjectFileRaw(io.RawIOBase):
124124
"""The raw object supporting file-like operations (read/write/seek) for the
125125
iRODSDataObject."""
126126

127+
session = None # codacy
128+
127129
def __init__(self, conn, descriptor, finalize_on_close = True, **options):
128130
"""
129131
Constructor needs a connection and an iRODS data object descriptor. If the

irods/keywords.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,7 @@
100100
TICKET_KW = "ticket" # for ticket-based-access #
101101
PURGE_CACHE_KW = "purgeCache" # purge the cache copy right after the operation JMC - backport 4537
102102
EMPTY_BUNDLE_ONLY_KW = "emptyBundleOnly" # delete emptyBundleOnly # # JMC - backport 4552
103+
GET_RESOURCE_INFO_OP_TYPE_KW = "getResourceInfoOpType"
103104

104105
# =-=-=-=-=-=-=-
105106
# JMC - backport 4599
@@ -247,6 +248,7 @@
247248
REPL_REQUESTED_KW = "repl_requested"
248249
# borrowed IN_PDMO_KW
249250

251+
250252
# =-=-=-=-=-=-=-
251253
# irods structured_object keyword definitions
252254
HOST_ADDR_KW = "host_addr"

irods/manager/__init__.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,4 +12,7 @@ def server_version(self):
1212
return tuple( self.__server_version )
1313

1414
def __init__(self, sess):
15+
self._set_manager_session(sess)
16+
17+
def _set_manager_session(self, sess):
1518
self.sess = sess

irods/manager/data_object_manager.py

Lines changed: 82 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,8 @@
55
from irods.manager import Manager
66
from irods.message import (
77
iRODSMessage, FileOpenRequest, ObjCopyRequest, StringStringMap, DataObjInfo, ModDataObjMeta,
8-
DataObjChksumRequest, DataObjChksumResponse, RErrorStack)
8+
DataObjChksumRequest, DataObjChksumResponse, RErrorStack, STR_PI
9+
)
910
import irods.exception as ex
1011
from irods.api_number import api_number
1112
from irods.collection import iRODSCollection
@@ -19,14 +20,14 @@
1920
import json
2021
import logging
2122

22-
2323
MAXIMUM_SINGLE_THREADED_TRANSFER_SIZE = 32 * ( 1024 ** 2)
2424

2525
DEFAULT_NUMBER_OF_THREADS = 0 # Defaults for reasonable number of threads -- optimized to be
2626
# performant but allow no more worker threads than available CPUs.
2727

2828
DEFAULT_QUEUE_DEPTH = 32
2929

30+
logger = logging.getLogger(__name__)
3031

3132
class Server_Checksum_Warning(Exception):
3233
"""Error from iRODS server indicating some replica checksums are missing or incorrect."""
@@ -96,16 +97,18 @@ def _download(self, obj, local_path, num_threads, **options):
9697
if os.path.exists(local_file) and kw.FORCE_FLAG_KW not in options:
9798
raise ex.OVERWRITE_WITHOUT_FORCE_FLAG
9899

99-
with open(local_file, 'wb') as f, self.open(obj, 'r', **options) as o:
100-
101-
if self.should_parallelize_transfer (num_threads, o):
102-
f.close()
103-
if not self.parallel_get( (obj,o), local_path, num_threads = num_threads,
104-
target_resource_name = options.get(kw.RESC_NAME_KW,'')):
105-
raise RuntimeError("parallel get failed")
106-
else:
107-
for chunk in chunks(o, self.READ_BUFFER_SIZE):
108-
f.write(chunk)
100+
data_open_returned_values_ = {}
101+
with open(local_file, 'wb') as f:
102+
with self.open(obj, 'r', returned_values = data_open_returned_values_, **options) as o:
103+
if self.should_parallelize_transfer (num_threads, o):
104+
f.close()
105+
if not self.parallel_get( (obj,o), local_path, num_threads = num_threads,
106+
target_resource_name = options.get(kw.RESC_NAME_KW,''),
107+
data_open_returned_values = data_open_returned_values_):
108+
raise RuntimeError("parallel get failed")
109+
else:
110+
for chunk in chunks(o, self.READ_BUFFER_SIZE):
111+
f.write(chunk)
109112

110113

111114
def get(self, path, local_path = None, num_threads = DEFAULT_NUMBER_OF_THREADS, **options):
@@ -214,6 +217,7 @@ def parallel_get(self,
214217
async_ = False,
215218
num_threads = 0,
216219
target_resource_name = '',
220+
data_open_returned_values = None,
217221
progressQueue = False):
218222
"""Call into the irods.parallel library for multi-1247 GET.
219223
@@ -224,6 +228,7 @@ def parallel_get(self,
224228
"""
225229
return parallel.io_main( self.sess, data_or_path_, parallel.Oper.GET | (parallel.Oper.NONBLOCKING if async_ else 0), file_,
226230
num_threads = num_threads, target_resource_name = target_resource_name,
231+
data_open_returned_values = data_open_returned_values,
227232
queueLength = (DEFAULT_QUEUE_DEPTH if progressQueue else 0))
228233

229234
def parallel_put(self,
@@ -296,7 +301,9 @@ def open_with_FileRaw(self, *arg, **kw_options):
296301
kw.RESC_HIER_STR_KW
297302
))
298303

299-
def open(self, path, mode, create = True, finalize_on_close = True, **options):
304+
305+
def open(self, path, mode, create = True, finalize_on_close = True, returned_values = None, allow_redirect = True, **options):
306+
300307
_raw_fd_holder = options.get('_raw_fd_holder',[])
301308
# If no keywords are used that would influence the server as to the choice of a storage resource,
302309
# then use the default resource in the client configuration.
@@ -317,29 +324,79 @@ def open(self, path, mode, create = True, finalize_on_close = True, **options):
317324
}[mode]
318325
# TODO: Use seek_to_end
319326

327+
if not isinstance(returned_values, dict):
328+
returned_values = {}
329+
320330
try:
321331
oprType = options[kw.OPR_TYPE_KW]
322332
except KeyError:
323333
oprType = 0
324334

325-
message_body = FileOpenRequest(
326-
objPath=path,
327-
createMode=0,
328-
openFlags=flags,
329-
offset=0,
330-
dataSize=-1,
331-
numThreads=self.sess.numThreads,
332-
oprType=oprType,
333-
KeyValPair_PI=StringStringMap(options),
334-
)
335-
message = iRODSMessage('RODS_API_REQ', msg=message_body,
336-
int_info=api_number['DATA_OBJ_OPEN_AN'])
335+
def make_FileOpenRequest(**extra_opts):
336+
options_ = dict(options) if extra_opts else options
337+
options_.update(extra_opts)
338+
return FileOpenRequest(
339+
objPath=path,
340+
createMode=0,
341+
openFlags=flags,
342+
offset=0,
343+
dataSize=-1,
344+
numThreads=self.sess.numThreads,
345+
oprType=oprType,
346+
KeyValPair_PI=StringStringMap(options_),
347+
)
348+
349+
requested_hierarchy = options.get(kw.RESC_HIER_STR_KW, None)
337350

338351
conn = self.sess.pool.get_connection()
352+
redirected_host = ''
353+
354+
use_get_rescinfo_apis = False
355+
356+
if allow_redirect and conn.server_version >= (4,3,1):
357+
key = 'CREATE' if mode[0] in ('w','a') else 'OPEN'
358+
message = iRODSMessage('RODS_API_REQ',
359+
msg=make_FileOpenRequest(**{kw.GET_RESOURCE_INFO_OP_TYPE_KW:key}),
360+
int_info=api_number['GET_RESOURCE_INFO_FOR_OPERATION_AN'])
361+
conn.send(message)
362+
response = conn.recv()
363+
msg = response.get_main_message( STR_PI )
364+
use_get_rescinfo_apis = True
365+
366+
# Get the information needed for the redirect
367+
_ = json.loads(msg.myStr)
368+
redirected_host = _["host"]
369+
requested_hierarchy = _["resource_hierarchy"]
370+
371+
target_zone = list(filter(None, path.split('/')))
372+
if target_zone:
373+
target_zone = target_zone[0]
374+
375+
directed_sess = self.sess
376+
377+
if redirected_host and use_get_rescinfo_apis:
378+
# Redirect only if the local zone is being targeted, and if the hostname is changed from the original.
379+
if target_zone == self.sess.zone and (self.sess.host != redirected_host):
380+
# This is the actual redirect.
381+
directed_sess = self.sess.clone(host = redirected_host)
382+
returned_values['session'] = directed_sess
383+
conn = directed_sess.pool.get_connection()
384+
logger.debug('redirect_to_host = %s', redirected_host)
385+
386+
# Restore RESC HIER for DATA_OBJ_OPEN call
387+
if requested_hierarchy is not None:
388+
options[kw.RESC_HIER_STR_KW] = requested_hierarchy
389+
message_body = make_FileOpenRequest()
390+
391+
# Perform DATA_OBJ_OPEN call
392+
message = iRODSMessage('RODS_API_REQ', msg=message_body,
393+
int_info=api_number['DATA_OBJ_OPEN_AN'])
339394
conn.send(message)
340395
desc = conn.recv().int_info
341396

342397
raw = iRODSDataObjectFileRaw(conn, desc, finalize_on_close = finalize_on_close, **options)
398+
raw.session = directed_sess
399+
343400
(_raw_fd_holder).append(raw)
344401
return io.BufferedRandom(raw)
345402

irods/parallel.py

Lines changed: 13 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -74,28 +74,6 @@ def wait(self):
7474
self.barrier.release()
7575
return count - 1
7676

77-
@contextlib.contextmanager
78-
def enableLogging(handlerType,args,level_ = logging.INFO):
79-
"""Context manager for temporarily enabling a logger. For debug or test.
80-
81-
Usage Example -
82-
with irods.parallel.enableLogging(logging.FileHandler,('/tmp/logfile.txt',)):
83-
# parallel put/get code here
84-
"""
85-
h = None
86-
saveLevel = logger.level
87-
try:
88-
logger.setLevel(level_)
89-
h = handlerType(*args)
90-
h.setLevel( level_ )
91-
logger.addHandler(h)
92-
yield
93-
finally:
94-
logger.setLevel(saveLevel)
95-
if h in logger.handlers:
96-
logger.removeHandler(h)
97-
98-
9977
RECOMMENDED_NUM_THREADS_PER_TRANSFER = 3
10078

10179
verboseConnection = False
@@ -380,12 +358,13 @@ def bytes_range_for_thread( i, num_threads, total_bytes, chunk ):
380358
for byte_range in ranges:
381359
if Io is None:
382360
Io = session.data_objects.open( Data_object.path, Operation.data_object_mode(initial_open = False),
383-
create = False, finalize_on_close = False,
361+
create = False, finalize_on_close = False, allow_redirect = False,
384362
**{ kw.NUM_THREADS_KW: str(num_threads),
385363
kw.DATA_SIZE_KW: str(total_size),
386364
kw.RESC_HIER_STR_KW: hier_str,
387365
kw.REPLICA_TOKEN_KW: replica_token })
388366
mgr.add_io( Io )
367+
logger.debug('target_host = %s', Io.raw.session.pool.account.host)
389368
if File is None: File = gen_file_handle()
390369
futures.append(executor.submit( _io_part, Io, byte_range, File, Operation, mgr, str(counter), queueObject))
391370
counter += 1
@@ -446,21 +425,29 @@ def io_main( session, Data, opr_, fname, R='', **kwopt):
446425
open_options[kw.NUM_THREADS_KW] = str(num_threads)
447426
open_options[kw.DATA_SIZE_KW] = str(total_bytes)
448427

428+
output_values = {}
449429
if (not Io):
450430
(Io, rawfile) = session.data_objects.open_with_FileRaw( (d_path or Data.path),
451431
Operation.data_object_mode(initial_open = True),
452-
finalize_on_close = True, **open_options )
432+
finalize_on_close = True, returned_values = output_values, **open_options )
453433
else:
454434
if type(Io) is deferred_call:
455435
Io[kw.NUM_THREADS_KW] = str(num_threads)
456-
Io[kw.DATA_SIZE_KW] = str(total_bytes)
436+
Io[kw.DATA_SIZE_KW] = str(total_bytes)
437+
Io['returned_values'] = output_values
457438
Io = Io()
458439
rawfile = Io.raw
459440

441+
if not output_values:
442+
output_values = kwopt.get('data_open_returned_values',{})
443+
444+
if 'session' in output_values:
445+
session = output_values['session']
446+
460447
# At this point, the data object's existence in the catalog is guaranteed,
461448
# whether the Operation is a GET or PUT.
462449

463-
if not isinstance(Data,iRODSDataObject):
450+
if not isinstance(Data,iRODSDataObject) or 'session' in output_values:
464451
Data = session.data_objects.get(d_path)
465452

466453
# Determine total number of bytes for transfer.

irods/session.py

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -168,10 +168,17 @@ def __del__(self):
168168
def clone(self, **kwargs):
169169
other = copy.copy(self)
170170
other.pool = None
171-
for k,v in vars(other).items():
172-
setter = getattr(v,'_set_manager_session',None)
173-
if setter:
174-
setter(other)
171+
for k,v in vars(self).items():
172+
if getattr(v,'_set_manager_session',None) is not None:
173+
vcopy = copy.copy(v)
174+
# Deep-copy into the manager object for the cloned session and set its parent session
175+
# reference to correspond to the clone.
176+
setattr(other,k,vcopy)
177+
vcopy._set_manager_session(other)
178+
elif isinstance(v,iRODSAccount):
179+
# Deep-copy the iRODSAccount subobject, since we might be setting the hostname on that object.
180+
setattr(other,k,copy.copy(v))
181+
175182
other.cleanup(new_host = kwargs.pop('host',''))
176183
other.ticket__ = kwargs.pop('ticket',self.ticket__)
177184
self.ticket_applied = weakref.WeakKeyDictionary() # conn -> ticket applied
@@ -255,7 +262,7 @@ def configure(self, **kwargs):
255262
account = self._configure_account(**kwargs)
256263
connection_refresh_time = self.get_connection_refresh_time(**kwargs)
257264
logger.debug("In iRODSSession's configure(). connection_refresh_time set to {}".format(connection_refresh_time))
258-
self.pool = Pool(account, application_name=kwargs.pop('application_name',''), connection_refresh_time=connection_refresh_time)
265+
self.pool = Pool(account, application_name=kwargs.pop('application_name',''), connection_refresh_time=connection_refresh_time, session = self)
259266
conn_timeout = getattr(self,'_cached_connection_timeout',None)
260267
self.pool.connection_timeout = conn_timeout
261268
return account

irods/test/connection_test.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,10 @@ def test_that_connection_timeout_works__issue_377(self):
8383
# Set a very short socket timeout and remove all pre-existing socket connections.
8484
# This forces a new connection to be made for any ensuing connections to the iRODS server.
8585

86+
sess = obj.manager.sess # Because of client-redirect it is possible that self.sess and
87+
# obj.manager.sess do not refer to the same object. In any case,
88+
# it is the latter of the two iRODSSession objects that is
89+
# involved in the data PUT connection.
8690
sess.connection_timeout = timeout = 0.01
8791
sess.cleanup()
8892

0 commit comments

Comments
 (0)