Skip to content

Commit ac8b13f

Browse files
committed
Major change in stashcp to allow for methods
1 parent 1b02bc1 commit ac8b13f

2 files changed

Lines changed: 140 additions & 119 deletions

File tree

bin/stashcp

Lines changed: 107 additions & 119 deletions
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,10 @@ def doWriteBack(source, destination):
114114
return curl_exit
115115

116116

117-
def doStashCpSingle(sourceFile, destination, debug=False):
117+
def doStashCpSingle(sourceFile, destination, methods, debug=False):
118+
"""
119+
Perform a single copy from StashCache federation
120+
"""
118121

119122
global nearest_cache
120123

@@ -137,7 +140,56 @@ def doStashCpSingle(sourceFile, destination, debug=False):
137140

138141
# Calculate the starting time
139142
start1 = int(time.time()*1000)
140-
143+
144+
# Go through the download methods
145+
cur_method = methods[0]
146+
success = False
147+
for method in methods:
148+
cur_method = method
149+
if method == "cvmfs":
150+
logging.info("Trying CVMFS...")
151+
if download_cvmfs(sourceFile, destination, debug, payload):
152+
success = True
153+
break
154+
elif method == "xrootd":
155+
logging.info("Trying XrootD...")
156+
if download_xrootd(sourceFile, destination, debug, payload):
157+
success = True
158+
break
159+
elif method == "http":
160+
logging.info("Trying HTTP...")
161+
if download_http(sourceFile, destination, debug, payload):
162+
success = True
163+
break
164+
165+
end1 = int(time.time()*1000)
166+
payload['start1']=start1
167+
payload['end1']=end1
168+
payload['timestamp']=end1
169+
payload['download_time']=end1-start1
170+
if success:
171+
payload['status'] = 'Success'
172+
173+
# Get the final size of the downloaded file
174+
if os.path.isdir(destination):
175+
destination += "/"
176+
dest_dir, dest_filename = os.path.split(destination)
177+
178+
if dest_filename:
179+
final_destination = destination
180+
else:
181+
final_destination = os.path.join(dest_dir, os.path.basename(sourceFile))
182+
payload['filesize'] = os.stat(final_destination).st_size
183+
payload['download_size'] = payload['filesize']
184+
else:
185+
payload['status'] = 'Fail'
186+
187+
es_send(payload)
188+
return success
189+
190+
191+
def download_cvmfs(sourceFile, destination, debug, payload):
192+
141193
# First, check if the file is available in CVMFS
142194
if sourceFile[0] == '/':
143195
cvmfs_file = os.path.join("/cvmfs/stash.osgstorage.org/", sourceFile[1:])
@@ -149,127 +201,71 @@ def doStashCpSingle(sourceFile, destination, debug=False):
149201
shutil.copy(cvmfs_file, destination)
150202
logging.debug("Succesfully copied file from CVMFS!")
151203
end1 = int(time.time()*1000)
152-
dlSz=os.stat(destination).st_size
153-
filesize = os.stat(cvmfs_file).st_size
154-
dltime=end1-start1
155-
destSpace=1
156-
status = 'Success'
157-
payload['timestamp']=end1
158-
payload['host']="CVMFS"
159-
payload['download_size']=dlSz
160-
payload['filesize'] = filesize
161-
payload['download_time']=dltime
162-
payload['destination_space']=destSpace
163-
payload['status']=status
164204
payload['tries']=1
165-
payload['start1']=start1
166-
payload['end1']=end1
167205
payload['cache']="CVMFS"
168-
es_send(payload)
169-
170-
return 0
206+
payload['host']="CVMFS"
207+
return True
171208

172209
except IOError as e:
173210
logging.error("Unable to copy with CVMFS, even though file exists: %s", str(e))
174-
211+
return False
212+
175213
else:
176214
logging.debug("CVMFS File does not exist")
177215

216+
return False
217+
218+
def download_xrootd(sourceFile, destination, debug, payload):
219+
"""
220+
Download from the nearest cache, if that fails, fallback to
221+
the stash origin.
222+
"""
223+
global nearest_cache
224+
global nearest_cache_list
225+
226+
# Check for xrootd, return quickly if it's not available
178227
if not check_for_xrootd():
179-
return download_with_http(sourceFile, destination, debug)
228+
return False
180229

181230
# If the cache is not specified by the command line, then look for the closest
182231
if not nearest_cache:
183232
nearest_cache = get_best_stashcache()
233+
cache = nearest_cache
184234
logging.debug("Using Cache %s", nearest_cache)
185-
186-
# Now check the size of the file with xrootd
187-
logging.debug("Checking size of file.")
188-
(xrdfs_stdout, xrdfs_stderr) = subprocess.Popen(["xrdfs", main_redirector, "stat", sourceFile], stdout=subprocess.PIPE).communicate()
189-
xrdcp_version = subprocess.Popen(['echo $(xrdcp -V 2>&1)'], stdout=subprocess.PIPE, shell=True).communicate()[0][:-1]
190-
try:
191-
fileSize = int(re.findall(r"Size: \d+", xrdfs_stdout)[0].split(": ")[1])
192-
logging.debug("Size of the file %s is %s", sourceFile, fileSize)
193-
payload['filesize'] = fileSize
194-
except (ValueError, IndexError) as e:
195-
sys.stderr.write("Unable to find size of file from the origin\n")
196-
print str(xrdfs_stdout)
197-
sys.stderr.write(str(xrdfs_stderr))
198-
sys.stderr.write("\n")
199-
200-
payload['xrdcp_version'] = xrdcp_version
201-
202-
end1=int(time.time()*1000)
203-
payload['end1']=end1
204-
payload['start1']=start1
205-
206-
start2 = int(time.time()*1000)
207235

208-
xrd_exit=timed_transfer(filename=sourceFile, debug=debug, destination=destination)
209-
210-
end2=int(time.time()*1000)
211-
212-
dlSz=0
213-
if os.path.exists(destination):
214-
dlSz=os.stat(destination).st_size
215-
destSpace=1
236+
xrd_exit = timed_transfer(filename=sourceFile, debug=debug, cache=cache, destination=destination)
216237

217238
payload['xrdexit1']=xrd_exit
218-
payload['start2']=start2
219-
payload['end2']=end2
220-
221239

222240
if xrd_exit=='0': #worked first try
223241
logging.debug("Transfer success using %s", nearest_cache)
224-
dltime=end2-start2
225-
status = 'Success'
226-
tries=2
227-
228-
payload['download_size']=dlSz
229-
payload['download_time']=dltime
230-
payload['sitename']=sitename
231-
payload['destination_space']=destSpace
232-
payload['status']=status
233-
payload['tries']=tries
234-
payload['cache']=nearest_cache
235-
if 'filesize' not in payload:
236-
payload['filesize']=dlSz
237-
es_send(payload)
242+
payload['tries'] = 1
243+
payload['cache'] = cache
238244

239245
else: #pull from origin
240246
logging.warning("XrdCP from cache failed on %s, pulling from main redirector", nearest_cache)
241-
nearest_cache=main_redirector
242-
start3 = int(time.time()*1000)
243-
xrd_exit=timed_transfer(filename=sourceFile, debug=debug, destination=destination)
244-
end3=int(time.time()*1000)
245-
if os.path.exists(destination):
246-
dlSz=os.stat(destination).st_size
247-
dltime=end3-start3
247+
cache = main_redirector
248+
xrd_exit=timed_transfer(filename=sourceFile, cache=cache, debug=debug, destination=destination)
249+
248250
if xrd_exit=='0':
249251
logging.info("Trunk Success")
250252
status = 'Trunk Sucess'
251-
tries=3
253+
tries=2
252254
else:
253-
logging.error("stashcp failed after 3 attempts")
255+
logging.info("stashcp failed after 2 xrootd attempts")
254256
status = 'Timeout'
255-
tries = 3
256-
payload['download_size']=dlSz
257-
payload['download_time']=dltime
258-
payload['destination_space']=destSpace
257+
tries = 2
258+
259259
payload['status']=status
260260
payload['xrdexit2']=xrd_exit
261261
payload['tries']=tries
262-
payload['start3']=start3
263-
payload['end3']=end3
264-
payload['cache']=nearest_cache
265-
if 'filesize' not in payload:
266-
payload['filesize']=dlSz
267-
es_send(payload)
262+
payload['cache'] = cache
263+
268264
if xrd_exit == '0':
269-
return 0
265+
return True
270266
else:
271-
return 1
272-
return 0
267+
return False
268+
return True
273269

274270
def check_for_xrootd():
275271
"""
@@ -288,19 +284,17 @@ def check_for_xrootd():
288284
return False
289285

290286

291-
def download_with_http(source, destination, debug):
287+
def download_http(source, destination, debug, payload):
292288
"""
293289
Download from the nearest cache with HTTP
294290
"""
295291
global nearest_cache
296292
global nearest_cache_list
297-
logging.debug("Downloading with HTTP")
298293

299-
payload = {}
300-
payload['filename'] = source
301-
sitename = os.environ.setdefault("OSG_SITE_NAME", "siteNotFound")
302-
payload['sitename'] = sitename
303-
payload.update(parse_job_ad())
294+
if not nearest_cache:
295+
nearest_cache = get_best_stashcache()
296+
297+
logging.debug("Downloading with HTTP")
304298

305299
if not nearest_cache:
306300
nearest_cache = get_best_stashcache()
@@ -359,24 +353,14 @@ def download_with_http(source, destination, debug):
359353
status = 'Success'
360354
payload['download_size']=dlSz
361355
payload['filesize'] = filesize
362-
else:
363-
status = 'Failure'
364-
dltime=end-start
365-
destSpace=1
366-
payload['timestamp']=end
356+
367357
payload['host']=tried_cache
368-
payload['download_time']=dltime
369-
payload['destination_space']=destSpace
370-
payload['status']=status
371358
payload['tries']=1
372-
payload['start1']=start
373-
payload['end1']=end
374359
payload['cache']=tried_cache
375-
es_send(payload)
376360
if success:
377-
return 0
361+
return True
378362
else:
379-
return 1
363+
return False
380364

381365

382366
def parse_job_ad():
@@ -401,7 +385,7 @@ def parse_job_ad():
401385

402386
return temp_list
403387

404-
def dostashcpdirectory(sourceDir, destination, debug=False):
388+
def dostashcpdirectory(sourceDir, destination, methods, debug=False):
405389
sourceItems = subprocess.Popen(["xrdfs", stash_origin, "ls", sourceDir], stdout=subprocess.PIPE).communicate()[0].split()
406390

407391
for remote_file in sourceItems:
@@ -410,7 +394,7 @@ def dostashcpdirectory(sourceDir, destination, debug=False):
410394
if isdir!='0':
411395
result = dostashcpdirectory(remote_file, destination, debug)
412396
else:
413-
result = doStashCpSingle(remote_file, destination, debug)
397+
result = doStashCpSingle(remote_file, destination, methods, debug)
414398
# Stop transfers if something fails
415399
if result != 0:
416400
return result
@@ -420,7 +404,7 @@ def es_send(payload):
420404

421405
# Calculate the curernt timestamp
422406
payload['timestamp'] = int(time.time()*1000)
423-
payload['host'] = payload['cache']
407+
#payload['host'] = payload['cache']
424408

425409
def _es_send(payload):
426410
data = payload
@@ -433,15 +417,15 @@ def es_send(payload):
433417
f.close()
434418
except urllib2.URLError, e:
435419
logging.warning("Error posting to ES: %s", str(e))
436-
420+
print payload
437421
p = multiprocessing.Process(target=_es_send, name="_es_send", args=(payload,))
438422
p.start()
439423
p.join(5)
440424
p.terminate()
441425

442426

443427

444-
def timed_transfer(filename, destination, debug=False):
428+
def timed_transfer(filename, destination, cache, debug=False, ):
445429
"""
446430
Transfer the filename from the cache to the destination using xrdcp
447431
"""
@@ -455,7 +439,7 @@ def timed_transfer(filename, destination, debug=False):
455439
os.environ.setdefault("XRD_CONNECTIONRETRY", "2") # How many time should we retry the TCP connection
456440
os.environ.setdefault("XRD_STREAMTIMEOUT", "30") # How long to wait for TCP activity
457441

458-
filepath=nearest_cache+":1094//"+ filename
442+
filepath=cache+":1094//"+ filename
459443
if debug:
460444
command="xrdcp -d 2 --nopbar -f " + filepath + " " + destination
461445
else:
@@ -602,6 +586,7 @@ def main():
602586
parser.add_option('-c', '--cache', dest='cache', help="Cache to use")
603587
parser.add_option('-j', '--caches-json', dest='caches_json', help="The JSON file containing the list of caches",
604588
default=None)
589+
parser.add_option('--methods', dest='methods', help="Comma separated list of methods to try, in order. Default: cvmfs,xrootd,http", default="cvmfs,xrootd,http")
605590
args,opts=parser.parse_args()
606591

607592
logging.basicConfig(format='%(asctime)s %(name)-12s %(levelname)-8s %(message)s',
@@ -630,12 +615,15 @@ def main():
630615
nearest_cache = args.cache
631616
nearest_cache_list = [ args.cache ]
632617

618+
# Convert the methods
619+
methods = args.methods.split(',')
620+
633621
if not args.recursive:
634-
result = doStashCpSingle(sourceFile=source, destination=destination, debug=args.debug)
622+
result = doStashCpSingle(sourceFile=source, destination=destination, methods = methods, debug=args.debug)
635623
else:
636-
result = dostashcpdirectory(sourceDir = source, destination = destination, debug=args.debug)
624+
result = dostashcpdirectory(sourceDir = source, destination = destination, methods = methods, debug=args.debug)
637625
# Exit with failure
638-
sys.exit(result)
626+
sys.exit(0 if result else 1)
639627

640628

641629
if __name__ == "__main__":

0 commit comments

Comments
 (0)