@@ -20,6 +20,9 @@ main_redirector = "root://redirector.osgstorage.org"
2020stash_origin = "root://stash.osgconnect.net"
2121writeback_host = "http://stash-xrd.osgconnect.net:1094"
2222
23+ # Global variable for nearest cache
24+ nearest_cache = None
25+
2326TIMEOUT = 300
2427DIFF = TIMEOUT * 10
2528
@@ -102,8 +105,9 @@ def doWriteBack(source, destination):
102105 return curl_exit
103106
104107
105- def doStashCpSingle (sourceFile , destination , cache , debug = False ):
108+ def doStashCpSingle (sourceFile , destination , debug = False ):
106109
110+ global nearest_cache
107111
108112 # Check if the desitnation is a protocol like stash:///user/blah
109113 if destination .startswith ("stash://" ):
@@ -164,9 +168,9 @@ def doStashCpSingle(sourceFile, destination, cache, debug=False):
164168 logging .debug ("CVMFS File does not exist" )
165169
166170 # If the cache is not specified by the command line, then look for the closest
167- if not cache :
168- cache = get_best_stashcache ()
169- logging .debug ("Using Cache %s" , cache )
171+ if not nearest_cache :
172+ nearest_cache = get_best_stashcache ()
173+ logging .debug ("Using Cache %s" , nearest_cache )
170174
171175 # Now check the size of the file with xrootd
172176 logging .debug ("Checking size of file." )
@@ -191,7 +195,7 @@ def doStashCpSingle(sourceFile, destination, cache, debug=False):
191195
192196 start2 = int (time .time ()* 1000 )
193197
194- xrd_exit = timed_transfer (filename = sourceFile , debug = debug , cache = cache , destination = destination )
198+ xrd_exit = timed_transfer (filename = sourceFile , debug = debug , destination = destination )
195199
196200 end2 = int (time .time ()* 1000 )
197201 if os .path .exists (destination ):
@@ -204,7 +208,7 @@ def doStashCpSingle(sourceFile, destination, cache, debug=False):
204208
205209
206210 if xrd_exit == '0' : #worked first try
207- logging .debug ("Transfer success using %s" , cache )
211+ logging .debug ("Transfer success using %s" , nearest_cache )
208212 dltime = end2 - start2
209213 status = 'Success'
210214 tries = 2
@@ -215,14 +219,14 @@ def doStashCpSingle(sourceFile, destination, cache, debug=False):
215219 payload ['destination_space' ]= destSpace
216220 payload ['status' ]= status
217221 payload ['tries' ]= tries
218- payload ['cache' ]= cache
222+ payload ['cache' ]= nearest_cache
219223 es_send (payload )
220224
221225 else : #pull from origin
222- logging .warning ("XrdCP from cache failed on %s, pulling from main redirector" , cache )
223- cache = main_redirector
226+ logging .warning ("XrdCP from cache failed on %s, pulling from main redirector" , nearest_cache )
227+ nearest_cache = main_redirector
224228 start3 = int (time .time ()* 1000 )
225- xrd_exit = timed_transfer (filename = sourceFile , debug = debug , cache = cache , destination = destination )
229+ xrd_exit = timed_transfer (filename = sourceFile , debug = debug , destination = destination )
226230 end3 = int (time .time ()* 1000 )
227231 if os .path .exists (destination ):
228232 dlSz = os .stat (destination ).st_size
@@ -243,7 +247,7 @@ def doStashCpSingle(sourceFile, destination, cache, debug=False):
243247 payload ['tries' ]= tries
244248 payload ['start3' ]= start3
245249 payload ['end3' ]= end3
246- payload ['cache' ]= cache
250+ payload ['cache' ]= nearest_cache
247251 es_send (payload )
248252 if xrd_exit == '0' :
249253 return 0
@@ -274,16 +278,9 @@ def parse_job_ad():
274278
275279 return temp_list
276280
277- def dostashcpdirectory (sourceDir , destination , cache , debug = False ):
281+ def dostashcpdirectory (sourceDir , destination , debug = False ):
278282 sourceItems = subprocess .Popen (["xrdfs" , stash_origin , "ls" , sourceDir ], stdout = subprocess .PIPE ).communicate ()[0 ].split ()
279283
280- # If the cache is not specified by the command line, then look for the closest
281- # Do it here, so that we only do it once per stashcp call. Though, if everything is in
282- # CVMFS, then this is unncessary.
283- if not cache :
284- cache = get_best_stashcache ()
285- logging .debug ("Using Cache %s" , cache )
286-
287284 for remote_file in sourceItems :
288285 command2 = 'xrdfs ' + stash_origin + ' stat ' + remote_file + ' | grep "IsDir" | wc -l'
289286 isdir = subprocess .Popen ([command2 ],stdout = subprocess .PIPE ,shell = True ).communicate ()[0 ].split ()[0 ]
@@ -321,7 +318,7 @@ def es_send(payload):
321318
322319
323320
324- def timed_transfer (filename , cache , destination , debug = False ):
321+ def timed_transfer (filename , destination , debug = False ):
325322 """
326323 Transfer the filename from the cache to the destination using xrdcp
327324 """
@@ -335,7 +332,7 @@ def timed_transfer(filename, cache, destination, debug=False):
335332 os .environ .setdefault ("XRD_CONNECTIONRETRY" , "2" ) # How many time should we retry the TCP connection
336333 os .environ .setdefault ("XRD_STREAMTIMEOUT" , "30" ) # How long to wait for TCP activity
337334
338- filepath = cache + ":1094//" + filename
335+ filepath = nearest_cache + ":1094//" + filename
339336 if debug :
340337 command = "xrdcp -d 2 --nopbar -f " + filepath + " " + destination
341338 else :
@@ -454,6 +451,8 @@ def get_best_stashcache():
454451
455452
456453def main ():
454+ global nearest_cache
455+
457456 usage = "usage: %prog [options] source destination"
458457 parser = optparse .OptionParser (usage )
459458 parser .add_option ('-d' , '--debug' , dest = 'debug' , action = 'store_true' , help = 'debug' )
@@ -482,15 +481,14 @@ def main():
482481 source = opts [0 ]
483482 destination = opts [1 ]
484483
485- cache = None
486484 # Check for manually entered cache to use
487485 if args .cache and len (args .cache ) > 0 :
488- cache = args .cache
486+ nearest_cache = args .cache
489487
490488 if not args .recursive :
491- result = doStashCpSingle (sourceFile = source , destination = destination , cache = cache , debug = args .debug )
489+ result = doStashCpSingle (sourceFile = source , destination = destination , debug = args .debug )
492490 else :
493- result = dostashcpdirectory (sourceDir = source , destination = destination , cache = cache , debug = args .debug )
491+ result = dostashcpdirectory (sourceDir = source , destination = destination , debug = args .debug )
494492 # Exit with failure
495493 sys .exit (result )
496494
0 commit comments