@@ -15,31 +15,28 @@ import shutil
1515import logging
1616from urlparse import urlparse
1717
18-
18+ main_redirector = "root://redirector.osgstorage.org"
19+ stash_origin = "root://stash.osgconnect.net"
1920
2021TIMEOUT = 300
2122DIFF = TIMEOUT * 10
2223
2324
2425def doStashCpSingle (sourceFile , destination , cache , debug = False ):
2526
26- logging .debug ("Checking size of file." )
27- xrdfs = subprocess .Popen (["xrdfs" , "root://stash.osgconnect.net" , "stat" , sourceFile ], stdout = subprocess .PIPE ).communicate ()[0 ]
28- xrdcp_version = subprocess .Popen (['echo $(xrdcp -V 2>&1)' ], stdout = subprocess .PIPE , shell = True ).communicate ()[0 ][:- 1 ]
29- fileSize = int (re .findall (r"Size: \d+" , xrdfs )[0 ].split (": " )[1 ])
30- logging .debug ("Size of the file %s is %i" , sourceFile , fileSize )
3127 #cache=get_best_stashcache()
3228 logging .debug ("Using Cache %s" , cache )
3329
3430 sitename = os .environ .setdefault ("OSG_SITE_NAME" , "siteNotFound" )
3531
3632 # Fill out the payload as much as possible
3733 filename = destination + '/' + sourceFile .split ('/' )[- 1 ]
34+
3835 payload = {}
39- payload ['xrdcp_version' ] = xrdcp_version
40- payload ['filesize' ] = fileSize
36+
4137 payload ['filename' ] = sourceFile
4238 payload ['sitename' ] = sitename
39+ payload .update (parse_job_ad ())
4340
4441 # Calculate the starting time
4542 start1 = int (time .time ()* 1000 )
@@ -56,13 +53,15 @@ def doStashCpSingle(sourceFile, destination, cache, debug=False):
5653 shutil .copy (cvmfs_file , destination )
5754 logging .debug ("Succesfully copied file from CVMFS!" )
5855 end1 = int (time .time ()* 1000 )
59- dlSz = os .stat (filename ).st_size
56+ dlSz = os .stat (destination ).st_size
57+ filesize = os .stat (cvmfs_file ).st_size
6058 dltime = end1 - start1
6159 destSpace = 1
6260 status = 'Success'
6361 payload ['timestamp' ]= end1
6462 payload ['host' ]= "CVMFS"
6563 payload ['download_size' ]= dlSz
64+ payload ['filesize' ] = filesize
6665 payload ['download_time' ]= dltime
6766 payload ['destination_space' ]= destSpace
6867 payload ['status' ]= status
@@ -72,14 +71,31 @@ def doStashCpSingle(sourceFile, destination, cache, debug=False):
7271 payload ['cache' ]= "CVMFS"
7372 es_send (payload )
7473
75- return
74+ return 0
7675
7776 except IOError , e :
7877 logging .error ("Unable to copy with CVMFS, even though file exists: %s" , str (e ))
7978
8079 else :
8180 logging .debug ("CVMFS File does not exist" )
8281
82+ # Now check the size of the file with xrootd
83+ logging .debug ("Checking size of file." )
84+ (xrdfs_stdout , xrdfs_stderr ) = subprocess .Popen (["xrdfs" , main_redirector , "stat" , sourceFile ], stdout = subprocess .PIPE ).communicate ()
85+ xrdcp_version = subprocess .Popen (['echo $(xrdcp -V 2>&1)' ], stdout = subprocess .PIPE , shell = True ).communicate ()[0 ][:- 1 ]
86+ try :
87+ fileSize = int (re .findall (r"Size: \d+" , xrdfs_stdout )[0 ].split (": " )[1 ])
88+ except Exception as e :
89+ sys .stderr .write ("Unable to find size of file\n " )
90+ print str (xrdfs_stdout )
91+ sys .stderr .write (str (xrdfs_stderr ))
92+ sys .stderr .write ("\n " )
93+ return 1
94+ logging .debug ("Size of the file %s is %s" , sourceFile , str (fileSize ))
95+
96+ payload ['xrdcp_version' ] = xrdcp_version
97+ payload ['filesize' ] = fileSize
98+
8399 end1 = int (time .time ()* 1000 )
84100 payload ['end1' ]= end1
85101 payload ['start1' ]= start1
@@ -89,14 +105,15 @@ def doStashCpSingle(sourceFile, destination, cache, debug=False):
89105 xrd_exit = timed_transfer (filename = sourceFile , debug = debug , cache = cache , destination = destination )
90106
91107 end2 = int (time .time ()* 1000 )
92- if os .path .exists (filename ):
93- dlSz = os .stat (filename ).st_size
108+ if os .path .exists (destination ):
109+ dlSz = os .stat (destination ).st_size
94110 destSpace = 1
95111
96112 payload ['xrdexit1' ]= xrd_exit
97113 payload ['start2' ]= start2
98114 payload ['end2' ]= end2
99115
116+
100117 if xrd_exit == '0' : #worked first try
101118 logging .debug ("Transfer success using %s" , cache )
102119 dltime = end2 - start2
@@ -113,13 +130,13 @@ def doStashCpSingle(sourceFile, destination, cache, debug=False):
113130 es_send (payload )
114131
115132 else : #pull from origin
116- logging .warning ("XrdCP from cache failed on %s, pulling from origin " , cache )
117- cache = "root://stash.osgconnect.net"
133+ logging .warning ("XrdCP from cache failed on %s, pulling from main redirector " , cache )
134+ cache = main_redirector
118135 start3 = int (time .time ()* 1000 )
119136 xrd_exit = timed_transfer (filename = sourceFile , debug = debug , cache = cache , destination = destination )
120137 end3 = int (time .time ()* 1000 )
121- if os .path .exists (filename ):
122- dlSz = os .stat (filename ).st_size
138+ if os .path .exists (destination ):
139+ dlSz = os .stat (destination ).st_size
123140 dltime = end3 - start3
124141 if xrd_exit == '0' :
125142 logging .info ("Trunk Success" )
@@ -139,17 +156,47 @@ def doStashCpSingle(sourceFile, destination, cache, debug=False):
139156 payload ['end3' ]= end3
140157 payload ['cache' ]= cache
141158 es_send (payload )
159+ if xrd_exit == '0' :
160+ return 0
161+ else :
162+ return 1
163+ return 0
164+
165+
166+ def parse_job_ad ():
167+ """
168+ Parse the .job.ad file for the Owner (username) and ProjectName of the callee.
169+ """
170+ temp_list = {}
171+ try :
172+ if '_CONDOR_JOB_AD' in os .environ :
173+ filename = os .environ ['_CONDOR_JOB_AD' ]
174+ elif os .path .exists (".job.ad" ):
175+ filename = ".job.ad"
176+ else :
177+ return {}
178+ with open (filename ) as job_file :
179+ for line in job_file .readlines ():
180+ match = re .search ('^\s*(Owner|ProjectName)\s=\s"(.*)"' , line , re .IGNORECASE )
181+ if match :
182+ temp_list [match .group (1 )] = match .group (2 )
183+ except IOError , e :
184+ logging .error ("Unable to open the .job.ad file" )
142185
186+ return temp_list
143187
144188def dostashcpdirectory (sourceDir , destination , cache , debug = False ):
145- sourceItems = subprocess .Popen (["xrdfs" , "root://stash.osgconnect.net" , "ls" , sourceDir ], stdout = subprocess .PIPE ).communicate ()[0 ].split ()
189+ sourceItems = subprocess .Popen (["xrdfs" , stash_origin , "ls" , sourceDir ], stdout = subprocess .PIPE ).communicate ()[0 ].split ()
146190 for remote_file in sourceItems :
147- command2 = 'xrdfs root://stash.osgconnect.net stat ' + remote_file + ' | grep "IsDir" | wc -l'
191+ command2 = 'xrdfs ' + stash_origin + ' stat '+ remote_file + ' | grep "IsDir" | wc -l'
148192 isdir = subprocess .Popen ([command2 ],stdout = subprocess .PIPE ,shell = True ).communicate ()[0 ].split ()[0 ]
149193 if isdir != '0' :
150- dostashcpdirectory (remote_file , destination , cache , debug )
194+ result = dostashcpdirectory (remote_file , destination , cache , debug )
151195 else :
152- doStashCpSingle (remote_file ,destination , cache , debug )
196+ result = doStashCpSingle (remote_file ,destination , cache , debug )
197+ # Stop transfers if something fails
198+ if result != 0 :
199+ return result
153200
154201
155202def es_send (payload ):
@@ -168,7 +215,7 @@ def es_send(payload):
168215 f .read ()
169216 f .close ()
170217 except urllib2 .URLError , e :
171- logging .error ("Error posting to ES: %s" , str (e ))
218+ logging .warning ("Error posting to ES: %s" , str (e ))
172219
173220 p = multiprocessing .Process (target = _es_send , name = "_es_send" , args = (payload ,))
174221 p .start ()
@@ -242,7 +289,7 @@ def get_best_stashcache():
242289 geo_ip_sites = "http://cvmfs-s1fnal.opensciencegrid.org:8000/cvmfs/oasis.opensciencegrid.org;http://cvmfs-s1bnl.opensciencegrid.org:8000/cvmfs/oasis.opensciencegrid.org;http://cvmfs-egi.gridpp.rl.ac.uk:8000/cvmfs/oasis.opensciencegrid.org;http://klei.nikhef.nl:8000/cvmfs/oasis.opensciencegrid.org;http://cvmfsrep.grid.sinica.edu.tw:8000/cvmfs/oasis.opensciencegrid.org" .split (';' )
243290
244291 # Add HCC's, for good measure
245- geo_ip_sites .insert (0 ,"hcc-cvmfs.unl.edu:8000/cvmfs/config-osg.opensciencegrid.org" )
292+ geo_ip_sites .insert (0 ,"http:// hcc-cvmfs.unl.edu:8000/cvmfs/config-osg.opensciencegrid.org" )
246293
247294 # Append text before caches string
248295 append_text = "api/v1.0/geo/stashcp"
@@ -297,7 +344,11 @@ def main():
297344 parser .add_option ('-c' , '--cache' , dest = 'cache' , help = "Cache to use" )
298345 args ,opts = parser .parse_args ()
299346
347+ logging .basicConfig (format = '%(asctime)s %(name)-12s %(levelname)-8s %(message)s' ,
348+ datefmt = "%Y-%m-%dT%H:%M:%S%z" )
300349 logger = logging .getLogger ()
350+
351+
301352 if args .debug :
302353 logger .setLevel (logging .DEBUG )
303354 else :
@@ -320,10 +371,17 @@ def main():
320371 else :
321372 cache = get_best_stashcache ()
322373
374+ if not source .startswith ('/' ):
375+ logging .warning ("DEPRECIATED: The source path does not begin with a '/', but it is required. This functionality will be removed in an upcoming release" )
376+ source = "/" + source
377+
378+
323379 if not args .recursive :
324- doStashCpSingle (sourceFile = source , destination = destination , cache = cache , debug = args .debug )
380+ result = doStashCpSingle (sourceFile = source , destination = destination , cache = cache , debug = args .debug )
325381 else :
326- dostashcpdirectory (sourceDir = source , destination = destination , cache = cache , debug = args .debug )
382+ result = dostashcpdirectory (sourceDir = source , destination = destination , cache = cache , debug = args .debug )
383+ # Exit with failure
384+ sys .exit (result )
327385
328386
329387if __name__ == "__main__" :
0 commit comments