Skip to content

Commit 5d839fa

Browse files
committed
Make fetch_input count as a heartbeat, and ensure that functions that return None on purpose work.
1 parent 6aced63 commit 5d839fa

1 file changed

Lines changed: 12 additions & 8 deletions

File tree

gridmap/job.py

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -79,9 +79,12 @@
7979

8080

8181
# Set of "not running" job statuses
82-
SLEEP_STATUSES = {psutil.STATUS_SLEEPING, psutil.STATUS_DEAD,
83-
psutil.STATUS_IDLE, psutil.STATUS_STOPPED,
84-
psutil.STATUS_ZOMBIE}
82+
_SLEEP_STATUSES = {psutil.STATUS_SLEEPING, psutil.STATUS_DEAD,
83+
psutil.STATUS_IDLE, psutil.STATUS_STOPPED,
84+
psutil.STATUS_ZOMBIE}
85+
86+
# Placeholder string, since a job could potentially return None on purpose
87+
_JOB_NOT_FINISHED = '*@#%$*@#___GRIDMAP___NOT___DONE___@#%**#*$&*%'
8588

8689

8790
class JobException(Exception):
@@ -152,7 +155,7 @@ def __init__(self, f, args, kwlist=None, cleanup=True, mem_free="1G",
152155
self.jobid = -1
153156
self.kwlist = kwlist if kwlist is not None else {}
154157
self.cleanup = cleanup
155-
self.ret = None
158+
self.ret = _JOB_NOT_FINISHED
156159
self.num_slots = num_slots
157160
self.mem_free = mem_free
158161
self.white_list = []
@@ -327,6 +330,7 @@ def check(self, session_id, jobs):
327330

328331
if msg["command"] == "fetch_input":
329332
return_msg = self.jobid_to_job[job_id]
333+
job.timestamp = datetime.now()
330334

331335
if msg["command"] == "store_output":
332336
# be nice
@@ -386,14 +390,14 @@ def check_if_alive(self):
386390
check if jobs are alive and determine cause of death if not
387391
"""
388392
logger = logging.getLogger(__name__)
393+
logger.debug('Checking if jobs are alive')
389394
for job in self.jobs:
390395

391396
# noting was returned yet
392-
if job.ret is None:
397+
if job.ret == _JOB_NOT_FINISHED:
393398

394399
# exclude first-timers
395400
if job.timestamp is not None:
396-
397401
# check heart-beats if there was a long delay
398402
current_time = datetime.now()
399403
time_delta = current_time - job.timestamp
@@ -402,7 +406,7 @@ def check_if_alive(self):
402406
job.cause_of_death = "unknown"
403407
elif (len(job.track_cpu) > MAX_IDLE_HEARTBEATS and
404408
all((cpu_load <= IDLE_THRESHOLD and
405-
state in SLEEP_STATUSES) for cpu_load, state in
409+
state in _SLEEP_STATUSES) for cpu_load, state in
406410
job.track_cpu[-MAX_IDLE_HEARTBEATS:])):
407411
logger.error('Job stalled for unknown reason.')
408412
job.cause_of_death = 'stalled'
@@ -416,7 +420,7 @@ def check_if_alive(self):
416420

417421
# attempt to resubmit
418422
if job.cause_of_death:
419-
logger.info("creating error report")
423+
logger.info("Creating error report")
420424

421425
# send report
422426
send_error_mail(job)

0 commit comments

Comments
 (0)