6262TIMEOUT = 300
6363DIFF = TIMEOUT * 10
6464
65+
66+ def to_str (strlike , encoding = "latin-1" , errors = "backslashescape" ):
67+ if not isinstance (strlike , str ):
68+ if str is bytes :
69+ return strlike .encode (encoding , errors )
70+ else :
71+ return strlike .decode (encoding , errors )
72+ return strlike
73+
74+
6575def doWriteBack (source , destination , debug = False ):
6676 """
6777 Do a write back to Stash using SciTokens
@@ -95,6 +105,7 @@ def doWriteBack(source, destination, debug=False):
95105 logging .debug ("curl command: %s" % command )
96106 curl = subprocess .Popen ([command ],shell = True ,stdout = subprocess .PIPE ,stderr = subprocess .PIPE )
97107 (stdout , stderr ) = curl .communicate ()
108+ (stdout , stderr ) = (to_str (stdout ), to_str (stderr ))
98109 curl_exit = curl .returncode
99110 if statinfo .st_size == 0 and curl_exit == 28 :
100111 logging .debug ("Got curl exit code 28, but that's ok for zero-length files. This doesn't capture connection timeouts" )
@@ -306,6 +317,9 @@ def download_xrootd(sourceFile, destination, debug, payload):
306317 # If the cache is not specified by the command line, then look for the closest
307318 if not nearest_cache :
308319 nearest_cache = get_best_stashcache ()
320+ if not nearest_cache :
321+ logging .error ("No cache found" )
322+ return False
309323 cache = nearest_cache
310324 logging .debug ("Using Cache %s" , nearest_cache )
311325
@@ -351,7 +365,7 @@ def check_for_xrootd():
351365 check_command = "xrdcp -V 2>&1"
352366 logging .debug ("Running the command to check of xrdcp existance: %s" , check_command )
353367 command_object = subprocess .Popen ([check_command ], stdout = subprocess .PIPE , shell = True )
354- xrdcp_version = command_object .communicate ()[0 ]
368+ xrdcp_version = to_str ( command_object .communicate ()[0 ])
355369 if command_object .returncode == 0 :
356370 logging .debug ("xrdcp version: %s" , xrdcp_version )
357371 return xrdcp_version
@@ -473,11 +487,11 @@ def parse_job_ad():
473487 return temp_list
474488
475489def dostashcpdirectory (sourceDir , destination , methods , debug = False ):
476- sourceItems = subprocess .Popen (["xrdfs" , stash_origin , "ls" , sourceDir ], stdout = subprocess .PIPE ).communicate ()[0 ].split ()
490+ sourceItems = to_str ( subprocess .Popen (["xrdfs" , stash_origin , "ls" , sourceDir ], stdout = subprocess .PIPE ).communicate ()[0 ]) .split ()
477491
478492 for remote_file in sourceItems :
479493 command2 = 'xrdfs ' + stash_origin + ' stat ' + remote_file + ' | grep "IsDir" | wc -l'
480- isdir = subprocess .Popen ([command2 ],stdout = subprocess .PIPE ,shell = True ).communicate ()[0 ].split ()[0 ]
494+ isdir = to_str ( subprocess .Popen ([command2 ],stdout = subprocess .PIPE ,shell = True ).communicate ()[0 ].split ()[0 ])
481495 if isdir != '0' :
482496 result = dostashcpdirectory (remote_file , destination , debug )
483497 else :
@@ -499,11 +513,11 @@ def _es_send(payload):
499513 data = json .dumps (data )
500514 try :
501515 url = "http://uct2-collectd.mwt2.org:9951"
502- req = Request (url , data = data , headers = {'Content-Type' : 'application/json' })
516+ req = Request (url , data = data . encode ( "utf-8" ) , headers = {'Content-Type' : 'application/json' })
503517 f = urlopen (req )
504518 f .read ()
505519 f .close ()
506- except URLError as e :
520+ except ( URLError , UnicodeError ) as e :
507521 logging .warning ("Error posting to ES: %s" , str (e ))
508522
509523 p = multiprocessing .Process (target = _es_send , name = "_es_send" , args = (payload ,))
@@ -600,7 +614,7 @@ def get_json_caches(caches_json_location):
600614
601615
602616# Return list of caches as root:// URLs
603- def get_stashservers_caches (responselines ):
617+ def get_stashservers_caches (responselines_b ):
604618
605619 # After the geo order of the selected server list on line zero,
606620 # the rest of the response is in .cvmfswhitelist format.
@@ -630,18 +644,19 @@ def get_stashservers_caches(responselines):
630644 # which would have caused it to have been split into multiple
631645 # response "lines".
632646
633- if len (responselines ) < 8 :
647+ if len (responselines_b ) < 8 :
634648 logging .error ("stashservers response too short, less than 8 lines" )
635649 return None
636- hashname = responselines [4 ][- 5 :]
637- if hashname != "-sha1" :
638- logging .error ("stashservers response does not have sha1 hash: %s" , hashname )
650+ assert isinstance (responselines_b [4 ], bytes )
651+ hashname_b = responselines_b [4 ][- 5 :]
652+ if hashname_b != b"-sha1" :
653+ logging .error ("stashservers response does not have sha1 hash: %s" , to_str (hashname_b ))
639654 return None
640- hashedtext = '\n ' .join (responselines [1 :5 ]) + '\n '
641- hash = hashlib .sha1 (hashedtext ).hexdigest ()
642- if responselines [6 ] != hash :
643- logging .debug ("stashservers hash %s does not match expected hash %s" , responselines [6 ], hash )
644- logging .debug ("hashed text:\n %s" , hashedtext )
655+ hashedtext_b = b '\n ' .join (responselines_b [1 :5 ]) + b '\n '
656+ hash_str = hashlib .sha1 (hashedtext_b ).hexdigest ()
657+ if to_str ( responselines_b [6 ]) != hash_str :
658+ logging .debug ("stashservers hash %s does not match expected hash %s" , to_str ( responselines_b [6 ]), hash_str )
659+ logging .debug ("hashed text:\n %s" , to_str ( hashedtext_b ) )
645660 logging .error ("stashservers response hash does not match expected hash" )
646661 return None
647662
@@ -653,7 +668,7 @@ def get_stashservers_caches(responselines):
653668 # investigated. Usually openssl is present.
654669 logging .debug ("openssl not installed, skipping signature check" )
655670 else :
656- sig = '\n ' .join (responselines [7 :])
671+ sig = b '\n ' .join (responselines_b [7 :])
657672
658673 # Look for the OSG cvmfs public key to verify signature
659674 prefix = os .environ .get ("OSG_LOCATION" , "/" )
@@ -676,19 +691,20 @@ def get_stashservers_caches(responselines):
676691
677692 cmd = "/usr/bin/openssl rsautl -verify -pubin -inkey " + pubkey_file
678693 logging .debug ("Running %s" , cmd )
694+ assert isinstance (sig , bytes )
679695 p = subprocess .Popen (cmd , shell = True ,
680696 stdin = subprocess .PIPE , stdout = subprocess .PIPE )
681697 p .stdin .write (sig )
682698 p .stdin .close ()
683- decryptedhash = p .stdout .read ()
699+ decryptedhash = to_str ( p .stdout .read () )
684700 p .stdout .close ()
685- if hash != decryptedhash :
686- logging .debug ("stashservers hash %s does not match decrypted signature %s" , hash , decryptedhash )
701+ if hash_str != decryptedhash :
702+ logging .debug ("stashservers hash %s does not match decrypted signature %s" , hash_str , decryptedhash )
687703 logging .error ("stashservers signature does not verify" )
688704 return None
689705 logging .debug ("Signature matched" )
690706
691- lists = responselines [4 ].split (';' )
707+ lists = to_str ( responselines_b [4 ]) .split (';' )
692708 logging .debug ("Cache lists: %s" , lists )
693709
694710 if print_cache_list_names :
@@ -752,9 +768,9 @@ def get_best_stashcache():
752768 if cache_list_name != None :
753769 api_text += "?list=" + cache_list_name
754770
755- responselines = []
771+ responselines_b = []
756772 i = 0
757- while len (responselines ) == 0 and i < len (geo_ip_sites ):
773+ while len (responselines_b ) == 0 and i < len (geo_ip_sites ):
758774 cur_site = geo_ip_sites [i ]
759775 headers ['Host' ] = cur_site
760776 logging .debug ("Trying server site of %s" , cur_site )
@@ -767,7 +783,8 @@ def get_best_stashcache():
767783 response = urlopen (req , timeout = 10 )
768784 if response .getcode () == 200 :
769785 logging .debug ("Got OK code 200 from %s" , cur_site )
770- responselines = response .read ().split ('\n ' )
786+ responsetext_b = response .read () # type: bytes
787+ responselines_b = responsetext_b .split (b'\n ' )
771788 response .close ()
772789 break
773790 response .close ()
@@ -778,8 +795,9 @@ def get_best_stashcache():
778795 i += 1
779796
780797 order_str = ""
781- if len (responselines ) > 0 :
782- order_str = responselines [0 ]
798+ if len (responselines_b ) > 0 :
799+ assert isinstance (responselines_b [0 ], bytes )
800+ order_str = to_str (responselines_b [0 ])
783801
784802 if order_str == "" :
785803 if len (caches_list ) == 0 :
@@ -800,7 +818,7 @@ def get_best_stashcache():
800818
801819 if len (caches_list ) == 0 :
802820 # Used the stashservers.dat api
803- caches_list = get_stashservers_caches (responselines )
821+ caches_list = get_stashservers_caches (responselines_b )
804822 if caches_list is None :
805823 return None
806824
0 commit comments