1212import socket
1313import random
1414import shutil
15+ from urlparse import urlparse
1516
1617try :
1718 from pkg_resources import resource_string
2324from urlparse import urlparse
2425
2526# Version information for user-agent
26- VERSION = "5.2.1 "
27+ VERSION = "5.3.0 "
2728
2829main_redirector = "root://redirector.osgstorage.org"
2930stash_origin = "root://stash.osgconnect.net"
3839# Global variable for the location of the caches.json file
3940caches_json_location = None
4041
42+ # Global variable for the location of the token to use for reading / writing
43+ token_location = None
44+
4145TIMEOUT = 300
4246DIFF = TIMEOUT * 10
4347
@@ -49,29 +53,11 @@ def doWriteBack(source, destination):
4953 :param str destination: The location of the remote file, in stash:// format
5054 """
5155 start1 = int (time .time ()* 1000 )
52-
53- # Get the scitoken content
54- scitoken_file = None
55- if '_CONDOR_CREDS' in os .environ :
56- # First, look for the scitokens.use file
57- # Format: _CONDOR_CREDS=/var/lib/condor/execute/dir_908/.condor_creds
58- scitoken_file = os .path .join (os .environ ['_CONDOR_CREDS' ], 'scitokens.use' )
59- if not os .path .exists (scitoken_file ):
60- scitoken_file = None
61-
62- if not scitoken_file and os .path .exists (".condor_creds/scitokens.use" ):
63- scitoken_file = ".condor_creds/scitokens.use"
64-
65- if not scitoken_file :
56+
57+ scitoken_contents = getToken ()
58+ if scitoken_contents is None :
6659 logging .error ("Unable to find scitokens.use file" )
6760 return 1
68-
69-
70- with open (scitoken_file , 'r' ) as scitoken_obj :
71- scitoken_contents = scitoken_obj .read ().strip ()
72-
73- # Remove the stash:// at the beginning, don't need it
74- destination = destination .replace ("stash://" , "" )
7561
7662 # Check if the source file is zero-length
7763 statinfo = os .stat (source )
@@ -119,6 +105,48 @@ def doWriteBack(source, destination):
119105 es_send (payload )
120106 return curl_exit
121107
108+ def getToken ():
109+ """
110+ Get the token / scitoken from the environment in order to read / write
111+ """
112+ # Get the scitoken content
113+ scitoken_file = None
114+
115+ # Command line
116+ if token_location :
117+ scitoken_file = token_location
118+ # Environ
119+ if 'TOKEN' in os .environ :
120+ scitoken_file = os .environ ['TOKEN' ]
121+
122+ # Backwards compatibility for getting scitokens
123+ if not scitoken_file and "_CONDOR_CREDS" in os .environ :
124+ # Token wasn't specified on the command line, try the default scitokens.use
125+ if os .path .exists (os .path .join (os .environ ["_CONDOR_CREDS" ], "scitokens.use" )):
126+ scitoken_file = os .path .join (os .environ ["_CONDOR_CREDS" ], "scitokens.use" )
127+ elif os .path .exists (".condor_creds/scitokens.use" ):
128+ scitoken_file = os .path .join (os .path .abspath (".condor_creds/scitokens.use" ))
129+
130+ if not scitoken_file or not os .path .exists (scitoken_file ):
131+ logging .info ("Unable to find token file" )
132+ return None
133+
134+ # If the scitoken file is relative, then assume it's relative
135+ # to the _CONDOR_CREDS directory.
136+ if not os .path .isabs (scitoken_file ) and "_CONDOR_CREDS" in os .environ :
137+ os .path .join (os .environ ['_CONDOR_CREDS' ], scitoken_file )
138+
139+ # Read in the JSON
140+ with open (scitoken_file , 'r' ) as scitoken_obj :
141+ try :
142+ token_json = json .load (scitoken_obj )
143+ scitoken_contents = token_json ['access_token' ]
144+ except ValueError as jsonfail :
145+ logging .info ("Falling back to old style scitoken parsing" )
146+ scitoken_obj .seek (0 )
147+ scitoken_contents = scitoken_obj .read ()
148+
149+ return scitoken_contents
122150
123151def doStashCpSingle (sourceFile , destination , methods , debug = False ):
124152 """
@@ -127,13 +155,32 @@ def doStashCpSingle(sourceFile, destination, methods, debug=False):
127155
128156 global nearest_cache
129157
130- # Check if the desitnation is a protocol like stash:///user/blah
131- if destination .startswith ("stash://" ):
132- # Source file exists, must be a writeback
133- return doWriteBack (sourceFile , destination )
134-
158+ # Parse the source and destination with urlparse
159+ source_url = urlparse (sourceFile )
160+ dest_url = urlparse (destination )
161+ understoodSchemes = ["stash" , "file" , "" ]
162+ if source_url .scheme not in understoodSchemes :
163+ logging .error ("Do not understand scheme: %s" , source_url .scheme )
164+ return 1
165+
166+ if dest_url .scheme not in understoodSchemes :
167+ logging .error ("Do not understand scheme: %s" , dest_url .scheme )
168+ return 1
169+
170+ if dest_url .scheme == "stash" :
171+ return doWriteBack (source_url .path , dest_url .path )
172+
173+ if dest_url .scheme == "file" :
174+ destination = dest_url .path
175+
176+ if source_url .scheme == "stash" :
177+ sourceFile = source_url .path
178+
179+ if not sourceFile .startswith ("/" ):
180+ sourceFile = "/" + sourceFile
181+
135182 sitename = os .environ .setdefault ("OSG_SITE_NAME" , "siteNotFound" )
136-
183+
137184 # Fill out the payload as much as possible
138185 filename = destination + '/' + sourceFile .split ('/' )[- 1 ]
139186
@@ -299,11 +346,11 @@ def download_http(source, destination, debug, payload):
299346 global nearest_cache
300347 global nearest_cache_list
301348
302- if not nearest_cache :
303- nearest_cache = get_best_stashcache ()
304-
305349 logging .debug ("Downloading with HTTP" )
306350
351+ #scitoken_contents = getToken()
352+ scitoken_contents = None
353+
307354 if not nearest_cache :
308355 nearest_cache = get_best_stashcache ()
309356
@@ -340,11 +387,17 @@ def download_http(source, destination, debug, payload):
340387 cache = cache .replace ('root://' , 'http://' )
341388
342389 # Append port 8000, which is just a convention for now, not set in stone
343- cache += ":8000"
390+ # Check if the cache already has a port attached to it
391+ parsed_url = urlparse (cache )
392+ if not parsed_url .port :
393+ cache += ":8000"
344394
345395 # Quote the source URL, which may have weird, dangerous characters
346396 quoted_source = urllib2 .quote (source )
347- curl_command = "curl %s -L --connect-timeout 30 --speed-limit 1024 %s --fail %s%s" % (output_mode , download_output , cache , quoted_source )
397+ if scitoken_contents :
398+ curl_command = "curl %s -L --connect-timeout 30 --speed-limit 1024 %s --fail -H \" Authorization: Bearer %s\" %s%s" % (output_mode , download_output , scitoken_contents , cache , quoted_source )
399+ else :
400+ curl_command = "curl %s -L --connect-timeout 30 --speed-limit 1024 %s --fail %s%s" % (output_mode , download_output , cache , quoted_source )
348401 logging .debug ("About to run curl command: %s" , curl_command )
349402 start = int (time .time ()* 1000 )
350403 command_object = subprocess .Popen ([curl_command ], shell = True , cwd = dest_dir )
@@ -447,7 +500,10 @@ def timed_transfer(filename, destination, cache, debug=False, ):
447500 os .environ .setdefault ("XRD_CONNECTIONRETRY" , "2" ) # How many time should we retry the TCP connection
448501 os .environ .setdefault ("XRD_STREAMTIMEOUT" , "30" ) # How long to wait for TCP activity
449502
450- filepath = cache + ":1094//" + filename
503+ if not filename .startswith ("/" ):
504+ filepath = cache + ":1094//" + filename
505+ else :
506+ filepath = cache + ":1094/" + filename
451507 if debug :
452508 command = "xrdcp -d 2 --nopbar -f " + filepath + " " + destination
453509 else :
@@ -591,6 +647,7 @@ def main():
591647 global nearest_cache
592648 global nearest_cache_list
593649 global caches_json_location
650+ global token_location
594651
595652 usage = "usage: %prog [options] source destination"
596653 parser = optparse .OptionParser (usage )
@@ -601,6 +658,7 @@ def main():
601658 parser .add_option ('-j' , '--caches-json' , dest = 'caches_json' , help = "The JSON file containing the list of caches" ,
602659 default = None )
603660 parser .add_option ('--methods' , dest = 'methods' , help = "Comma separated list of methods to try, in order. Default: cvmfs,xrootd,http" , default = "cvmfs,xrootd,http" )
661+ parser .add_option ('-t' , '--token' , dest = 'token' , help = "Token file to use for reading and/or writing" )
604662 args ,opts = parser .parse_args ()
605663
606664 logging .basicConfig (format = '%(asctime)s %(name)-12s %(levelname)-8s %(message)s' ,
@@ -613,7 +671,10 @@ def main():
613671 else :
614672 logger .setLevel (logging .WARNING )
615673
616- caches_json_location = args .caches_json
674+ if 'CACHES_JSON' in os .environ :
675+ caches_json_location = os .environ ['CACHES_JSON' ]
676+ else :
677+ caches_json_location = args .caches_json
617678 if args .closest :
618679 print get_best_stashcache ()
619680 sys .exit (0 )
@@ -625,9 +686,15 @@ def main():
625686 destination = opts [1 ]
626687
627688 # Check for manually entered cache to use
628- if args .cache and len (args .cache ) > 0 :
689+ if 'NEAREST_CACHE' in os .environ :
690+ nearest_cache = os .environ ['NEAREST_CACHE' ]
691+ nearest_cache_list = [nearest_cache ]
692+ elif args .cache and len (args .cache ) > 0 :
629693 nearest_cache = args .cache
630- nearest_cache_list = [ args .cache ]
694+ nearest_cache_list = [args .cache ]
695+
696+ if args .token :
697+ token_location = args .token
631698
632699 # Convert the methods
633700 methods = args .methods .split (',' )
0 commit comments