Skip to content

Commit 6c83c2b

Browse files
authored
Merge pull request #82 from djw8605/look_up_cache
Only look up cache if not using CVMFS
2 parents 48fdd8f + c5d1023 commit 6c83c2b

1 file changed

Lines changed: 29 additions & 23 deletions

File tree

bin/stashcp

Lines changed: 29 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,9 @@ main_redirector = "root://redirector.osgstorage.org"
2020
stash_origin = "root://stash.osgconnect.net"
2121
writeback_host = "http://stash-xrd.osgconnect.net:1094"
2222

23+
# Global variable for nearest cache
24+
nearest_cache = None
25+
2326
TIMEOUT = 300
2427
DIFF = TIMEOUT * 10
2528

@@ -102,17 +105,14 @@ def doWriteBack(source, destination):
102105
return curl_exit
103106

104107

105-
def doStashCpSingle(sourceFile, destination, cache, debug=False):
108+
def doStashCpSingle(sourceFile, destination, debug=False):
106109

110+
global nearest_cache
107111

108112
# Check if the desitnation is a protocol like stash:///user/blah
109113
if destination.startswith("stash://"):
110114
# Source file exists, must be a writeback
111115
return doWriteBack(sourceFile, destination)
112-
113-
if not cache:
114-
cache = get_best_stashcache()
115-
logging.debug("Using Cache %s", cache)
116116

117117
sitename = os.environ.setdefault("OSG_SITE_NAME", "siteNotFound")
118118

@@ -130,7 +130,6 @@ def doStashCpSingle(sourceFile, destination, cache, debug=False):
130130
start1 = int(time.time()*1000)
131131

132132
# First, check if the file is available in CVMFS
133-
# Really, we don't need to check for close caches before this, but oh well
134133
if sourceFile[0] == '/':
135134
cvmfs_file = os.path.join("/cvmfs/stash.osgstorage.org/", sourceFile[1:])
136135
else:
@@ -166,7 +165,12 @@ def doStashCpSingle(sourceFile, destination, cache, debug=False):
166165

167166
else:
168167
logging.debug("CVMFS File does not exist")
169-
168+
169+
# If the cache is not specified by the command line, then look for the closest
170+
if not nearest_cache:
171+
nearest_cache = get_best_stashcache()
172+
logging.debug("Using Cache %s", nearest_cache)
173+
170174
# Now check the size of the file with xrootd
171175
logging.debug("Checking size of file.")
172176
(xrdfs_stdout, xrdfs_stderr) = subprocess.Popen(["xrdfs", main_redirector, "stat", sourceFile], stdout=subprocess.PIPE).communicate()
@@ -190,7 +194,7 @@ def doStashCpSingle(sourceFile, destination, cache, debug=False):
190194

191195
start2 = int(time.time()*1000)
192196

193-
xrd_exit=timed_transfer(filename=sourceFile, debug=debug, cache=cache, destination=destination)
197+
xrd_exit=timed_transfer(filename=sourceFile, debug=debug, destination=destination)
194198

195199
end2=int(time.time()*1000)
196200
if os.path.exists(destination):
@@ -203,7 +207,7 @@ def doStashCpSingle(sourceFile, destination, cache, debug=False):
203207

204208

205209
if xrd_exit=='0': #worked first try
206-
logging.debug("Transfer success using %s", cache)
210+
logging.debug("Transfer success using %s", nearest_cache)
207211
dltime=end2-start2
208212
status = 'Success'
209213
tries=2
@@ -214,14 +218,14 @@ def doStashCpSingle(sourceFile, destination, cache, debug=False):
214218
payload['destination_space']=destSpace
215219
payload['status']=status
216220
payload['tries']=tries
217-
payload['cache']=cache
221+
payload['cache']=nearest_cache
218222
es_send(payload)
219223

220224
else: #pull from origin
221-
logging.warning("XrdCP from cache failed on %s, pulling from main redirector", cache)
222-
cache=main_redirector
225+
logging.warning("XrdCP from cache failed on %s, pulling from main redirector", nearest_cache)
226+
nearest_cache=main_redirector
223227
start3 = int(time.time()*1000)
224-
xrd_exit=timed_transfer(filename=sourceFile, debug=debug, cache=cache, destination=destination)
228+
xrd_exit=timed_transfer(filename=sourceFile, debug=debug, destination=destination)
225229
end3=int(time.time()*1000)
226230
if os.path.exists(destination):
227231
dlSz=os.stat(destination).st_size
@@ -242,7 +246,7 @@ def doStashCpSingle(sourceFile, destination, cache, debug=False):
242246
payload['tries']=tries
243247
payload['start3']=start3
244248
payload['end3']=end3
245-
payload['cache']=cache
249+
payload['cache']=nearest_cache
246250
es_send(payload)
247251
if xrd_exit == '0':
248252
return 0
@@ -273,15 +277,16 @@ def parse_job_ad():
273277

274278
return temp_list
275279

276-
def dostashcpdirectory(sourceDir, destination, cache, debug=False):
280+
def dostashcpdirectory(sourceDir, destination, debug=False):
277281
sourceItems = subprocess.Popen(["xrdfs", stash_origin, "ls", sourceDir], stdout=subprocess.PIPE).communicate()[0].split()
282+
278283
for remote_file in sourceItems:
279284
command2 = 'xrdfs ' + stash_origin + ' stat '+ remote_file + ' | grep "IsDir" | wc -l'
280285
isdir=subprocess.Popen([command2],stdout=subprocess.PIPE,shell=True).communicate()[0].split()[0]
281286
if isdir!='0':
282-
result = dostashcpdirectory(remote_file, destination, cache, debug)
287+
result = dostashcpdirectory(remote_file, destination, debug)
283288
else:
284-
result = doStashCpSingle(remote_file,destination, cache, debug)
289+
result = doStashCpSingle(remote_file, destination, debug)
285290
# Stop transfers if something fails
286291
if result != 0:
287292
return result
@@ -312,7 +317,7 @@ def es_send(payload):
312317

313318

314319

315-
def timed_transfer(filename, cache, destination, debug=False):
320+
def timed_transfer(filename, destination, debug=False):
316321
"""
317322
Transfer the filename from the cache to the destination using xrdcp
318323
"""
@@ -326,7 +331,7 @@ def timed_transfer(filename, cache, destination, debug=False):
326331
os.environ.setdefault("XRD_CONNECTIONRETRY", "2") # How many time should we retry the TCP connection
327332
os.environ.setdefault("XRD_STREAMTIMEOUT", "30") # How long to wait for TCP activity
328333

329-
filepath=cache+":1094//"+ filename
334+
filepath=nearest_cache+":1094//"+ filename
330335
if debug:
331336
command="xrdcp -d 2 --nopbar -f " + filepath + " " + destination
332337
else:
@@ -445,6 +450,8 @@ def get_best_stashcache():
445450

446451

447452
def main():
453+
global nearest_cache
454+
448455
usage = "usage: %prog [options] source destination"
449456
parser = optparse.OptionParser(usage)
450457
parser.add_option('-d', '--debug', dest='debug', action='store_true', help='debug')
@@ -473,15 +480,14 @@ def main():
473480
source=opts[0]
474481
destination=opts[1]
475482

476-
cache = None
477483
# Check for manually entered cache to use
478484
if args.cache and len(args.cache) > 0:
479-
cache = args.cache
485+
nearest_cache = args.cache
480486

481487
if not args.recursive:
482-
result = doStashCpSingle(sourceFile=source, destination=destination, cache=cache, debug=args.debug)
488+
result = doStashCpSingle(sourceFile=source, destination=destination, debug=args.debug)
483489
else:
484-
result = dostashcpdirectory(sourceDir = source, destination = destination, cache=cache, debug=args.debug)
490+
result = dostashcpdirectory(sourceDir = source, destination = destination, debug=args.debug)
485491
# Exit with failure
486492
sys.exit(result)
487493

0 commit comments

Comments
 (0)