@@ -24,9 +24,16 @@ DIFF = TIMEOUT * 10
2424def doStashCpSingle (sourceFile , destination , cache , debug = False ):
2525
2626 logging .debug ("Checking size of file." )
27- xrdfs = subprocess .Popen (["xrdfs" , "root://stash.osgconnect.net" , "stat" , sourceFile ], stdout = subprocess .PIPE ).communicate ()[ 0 ]
27+ ( xrdfs_stdout , xrdfs_stderr ) = subprocess .Popen (["xrdfs" , "root://stash.osgconnect.net" , "stat" , sourceFile ], stdout = subprocess .PIPE ).communicate ()
2828 xrdcp_version = subprocess .Popen (['echo $(xrdcp -V 2>&1)' ], stdout = subprocess .PIPE , shell = True ).communicate ()[0 ][:- 1 ]
29- fileSize = int (re .findall (r"Size: \d+" , xrdfs )[0 ].split (": " )[1 ])
29+ try :
30+ fileSize = int (re .findall (r"Size: \d+" , xrdfs_stdout )[0 ].split (": " )[1 ])
31+ except Exception as e :
32+ sys .stderr .write ("Unable to find size of file\n " )
33+ print str (xrdfs_stdout )
34+ sys .stderr .write (str (xrdfs_stderr ))
35+ sys .stderr .write ("\n " )
36+ return 1
3037 logging .debug ("Size of the file %s is %s" , sourceFile , str (fileSize ))
3138 #cache=get_best_stashcache()
3239 logging .debug ("Using Cache %s" , cache )
@@ -35,11 +42,13 @@ def doStashCpSingle(sourceFile, destination, cache, debug=False):
3542
3643 # Fill out the payload as much as possible
3744 filename = destination + '/' + sourceFile .split ('/' )[- 1 ]
45+
3846 payload = {}
3947 payload ['xrdcp_version' ] = xrdcp_version
4048 payload ['filesize' ] = fileSize
4149 payload ['filename' ] = sourceFile
4250 payload ['sitename' ] = sitename
51+ payload .update (parse_job_ad ())
4352
4453 # Calculate the starting time
4554 start1 = int (time .time ()* 1000 )
@@ -72,7 +81,7 @@ def doStashCpSingle(sourceFile, destination, cache, debug=False):
7281 payload ['cache' ]= "CVMFS"
7382 es_send (payload )
7483
75- return
84+ return 0
7685
7786 except IOError , e :
7887 logging .error ("Unable to copy with CVMFS, even though file exists: %s" , str (e ))
@@ -97,6 +106,7 @@ def doStashCpSingle(sourceFile, destination, cache, debug=False):
97106 payload ['start2' ]= start2
98107 payload ['end2' ]= end2
99108
109+
100110 if xrd_exit == '0' : #worked first try
101111 logging .debug ("Transfer success using %s" , cache )
102112 dltime = end2 - start2
@@ -139,17 +149,47 @@ def doStashCpSingle(sourceFile, destination, cache, debug=False):
139149 payload ['end3' ]= end3
140150 payload ['cache' ]= cache
141151 es_send (payload )
152+ if xrd_exit == '0' :
153+ return 0
154+ else :
155+ return 1
156+ return 0
142157
143158
159+ def parse_job_ad ():
160+ """
161+ Parse the .job.ad file for the Owner (username) and ProjectName of the callee.
162+ """
163+ temp_list = {}
164+ try :
165+ if '_CONDOR_JOB_AD' in os .environ :
166+ filename = os .environ ['_CONDOR_JOB_AD' ]
167+ elif os .path .exists (".job.ad" ):
168+ filename = ".job.ad"
169+ else :
170+ return {}
171+ with open (filename ) as job_file :
172+ for line in job_file .readlines ():
173+ match = re .search ('^\s*(Owner|ProjectName)\s=\s"(.*)"' , line , re .IGNORECASE )
174+ if match :
175+ temp_list [match .group ()] = line
176+ except IOError , e :
177+ logging .error ("Unable to open the .job.ad file" )
178+
179+ return temp_list
180+
144181def dostashcpdirectory (sourceDir , destination , cache , debug = False ):
145182 sourceItems = subprocess .Popen (["xrdfs" , "root://stash.osgconnect.net" , "ls" , sourceDir ], stdout = subprocess .PIPE ).communicate ()[0 ].split ()
146183 for remote_file in sourceItems :
147184 command2 = 'xrdfs root://stash.osgconnect.net stat ' + remote_file + ' | grep "IsDir" | wc -l'
148185 isdir = subprocess .Popen ([command2 ],stdout = subprocess .PIPE ,shell = True ).communicate ()[0 ].split ()[0 ]
149186 if isdir != '0' :
150- dostashcpdirectory (remote_file , destination , cache , debug )
187+ result = dostashcpdirectory (remote_file , destination , cache , debug )
151188 else :
152- doStashCpSingle (remote_file ,destination , cache , debug )
189+ result = doStashCpSingle (remote_file ,destination , cache , debug )
190+ # Stop transfers if something fails
191+ if result != 0 :
192+ return result
153193
154194
155195def es_send (payload ):
@@ -323,10 +363,13 @@ def main():
323363 logging .warning ("DEPRECIATED: The source path does not begin with a '/', but it is required. This functionality will be removed in an upcoming release" )
324364 source = "/" + source
325365
366+
326367 if not args .recursive :
327- doStashCpSingle (sourceFile = source , destination = destination , cache = cache , debug = args .debug )
368+ result = doStashCpSingle (sourceFile = source , destination = destination , cache = cache , debug = args .debug )
328369 else :
329- dostashcpdirectory (sourceDir = source , destination = destination , cache = cache , debug = args .debug )
370+ result = dostashcpdirectory (sourceDir = source , destination = destination , cache = cache , debug = args .debug )
371+ # Exit with failure
372+ sys .exit (result )
330373
331374
332375if __name__ == "__main__" :
0 commit comments