7979
8080
8181# Set of "not running" job statuses
82- _SLEEP_STATUSES = {psutil .STATUS_SLEEPING , psutil .STATUS_DEAD ,
83- psutil .STATUS_IDLE , psutil .STATUS_STOPPED ,
84- psutil .STATUS_ZOMBIE }
85-
86- # Placeholder string, since a job could potentially return None on purpose
87- _JOB_NOT_FINISHED = '*@#%$*@#___GRIDMAP___NOT___DONE___@#%**#*$&*%'
82+ SLEEP_STATUSES = {psutil .STATUS_SLEEPING , psutil .STATUS_DEAD ,
83+ psutil .STATUS_IDLE , psutil .STATUS_STOPPED ,
84+ psutil .STATUS_ZOMBIE }
8885
8986
9087class JobException (Exception ):
@@ -140,7 +137,7 @@ def __init__(self, f, args, kwlist=None, cleanup=True, mem_free="1G",
140137 self .track_mem = []
141138 self .track_cpu = []
142139 self .heart_beat = None
143- self .traceback = None
140+ self .exception = None
144141 self .host_name = ''
145142 self .timestamp = None
146143 self .log_stdout_fn = ''
@@ -155,7 +152,7 @@ def __init__(self, f, args, kwlist=None, cleanup=True, mem_free="1G",
155152 self .jobid = - 1
156153 self .kwlist = kwlist if kwlist is not None else {}
157154 self .cleanup = cleanup
158- self .ret = _JOB_NOT_FINISHED
155+ self .ret = None
159156 self .num_slots = num_slots
160157 self .mem_free = mem_free
161158 self .white_list = []
@@ -225,7 +222,6 @@ def execute(self):
225222 self .ret = self .function (* self .args , ** self .kwlist )
226223 except Exception as exception :
227224 self .ret = exception
228- self .traceback = traceback .format_exc ()
229225 traceback .print_exc ()
230226 del self .args
231227 del self .kwlist
@@ -331,21 +327,15 @@ def check(self, session_id, jobs):
331327
332328 if msg ["command" ] == "fetch_input" :
333329 return_msg = self .jobid_to_job [job_id ]
334- job .timestamp = datetime .now ()
335330
336331 if msg ["command" ] == "store_output" :
337332 # be nice
338333 return_msg = "thanks"
339-
340334 # store tmp job object
341- if isinstance (msg ["data" ], Job ):
342- tmp_job = msg ["data" ]
343- # copy relevant fields
344- job .ret = tmp_job .ret
345- job .traceback = tmp_job .traceback
346- # Return an exception instead of a job, so store that
347- else :
348- job .ret = msg ["data" ]
335+ tmp_job = msg ["data" ]
336+ # copy relevant fields
337+ job .ret = tmp_job .ret
338+ job .exception = tmp_job .exception
349339 # is assigned in submission process and not written back
350340 # server-side
351341 job .timestamp = datetime .now ()
@@ -396,14 +386,14 @@ def check_if_alive(self):
396386 check if jobs are alive and determine cause of death if not
397387 """
398388 logger = logging .getLogger (__name__ )
399- logger .debug ('Checking if jobs are alive' )
400389 for job in self .jobs :
401390
402391 # noting was returned yet
403- if job .ret == _JOB_NOT_FINISHED :
392+ if job .ret is None :
404393
405394 # exclude first-timers
406395 if job .timestamp is not None :
396+
407397 # check heart-beats if there was a long delay
408398 current_time = datetime .now ()
409399 time_delta = current_time - job .timestamp
@@ -412,7 +402,7 @@ def check_if_alive(self):
412402 job .cause_of_death = "unknown"
413403 elif (len (job .track_cpu ) > MAX_IDLE_HEARTBEATS and
414404 all ((cpu_load <= IDLE_THRESHOLD and
415- state in _SLEEP_STATUSES ) for cpu_load , state in
405+ state in SLEEP_STATUSES ) for cpu_load , state in
416406 job .track_cpu [- MAX_IDLE_HEARTBEATS :])):
417407 logger .error ('Job stalled for unknown reason.' )
418408 job .cause_of_death = 'stalled'
@@ -426,7 +416,7 @@ def check_if_alive(self):
426416
427417 # attempt to resubmit
428418 if job .cause_of_death :
429- logger .info ("Creating error report" )
419+ logger .info ("creating error report" )
430420
431421 # send report
432422 send_error_mail (job )
@@ -494,7 +484,7 @@ def send_error_mail(job):
494484
495485 if isinstance (job .ret , Exception ):
496486 body_text += "job encountered exception: {}\n " .format (job .ret )
497- body_text += "stacktrace: {}\n \n " .format (job .traceback )
487+ body_text += "stacktrace: {}\n \n " .format (job .exception )
498488
499489 logger .info ('Email body: %s' , body_text )
500490
0 commit comments