Skip to content

Commit 5d04559

Browse files
committed
Always terminate local heartbeat when there's an error.
1 parent 8d6703a commit 5d04559

1 file changed

Lines changed: 78 additions & 79 deletions

File tree

gridmap/job.py

Lines changed: 78 additions & 79 deletions
Original file line numberDiff line numberDiff line change
@@ -331,88 +331,87 @@ def check(self, session_id, jobs):
331331
args=(-1, self.home_address, -1,
332332
"", CHECK_FREQUENCY))
333333
local_heart.start()
334-
self.logger.debug("Starting ZMQ event loop")
335-
# main loop
336-
while not self.all_jobs_done():
337-
self.logger.debug('Waiting for message')
338-
msg_str = self.socket.recv()
339-
msg = zloads(msg_str)
340-
self.logger.debug('Received message: %s', msg)
341-
return_msg = ""
342-
343-
job_id = msg["job_id"]
344-
345-
# only if its not the local beat
346-
if job_id != -1:
347-
# If message is from a valid job, process that message
348-
if job_id in self.jobid_to_job:
349-
job = self.jobid_to_job[job_id]
350-
351-
if msg["command"] == "fetch_input":
352-
return_msg = self.jobid_to_job[job_id]
353-
job.timestamp = datetime.now()
354-
355-
if msg["command"] == "store_output":
356-
# be nice
357-
return_msg = "thanks"
358-
359-
# store tmp job object
360-
if isinstance(msg["data"], Job):
361-
tmp_job = msg["data"]
362-
# copy relevant fields
363-
job.ret = tmp_job.ret
364-
job.traceback = tmp_job.traceback
365-
# Return an exception instead of a job, so store that
366-
elif isinstance(msg["data"], tuple):
367-
job.ret, job.traceback = msg["data"]
334+
try:
335+
self.logger.debug("Starting ZMQ event loop")
336+
# main loop
337+
while not self.all_jobs_done():
338+
self.logger.debug('Waiting for message')
339+
msg_str = self.socket.recv()
340+
msg = zloads(msg_str)
341+
self.logger.debug('Received message: %s', msg)
342+
return_msg = ""
343+
344+
job_id = msg["job_id"]
345+
346+
# only if its not the local beat
347+
if job_id != -1:
348+
# If message is from a valid job, process that message
349+
if job_id in self.jobid_to_job:
350+
job = self.jobid_to_job[job_id]
351+
352+
if msg["command"] == "fetch_input":
353+
return_msg = self.jobid_to_job[job_id]
354+
job.timestamp = datetime.now()
355+
356+
if msg["command"] == "store_output":
357+
# be nice
358+
return_msg = "thanks"
359+
360+
# store tmp job object
361+
if isinstance(msg["data"], Job):
362+
tmp_job = msg["data"]
363+
# copy relevant fields
364+
job.ret = tmp_job.ret
365+
job.traceback = tmp_job.traceback
366+
# Returned exception instead of job, so store that
367+
elif isinstance(msg["data"], tuple):
368+
job.ret, job.traceback = msg["data"]
369+
else:
370+
self.logger.error(("Received message with " +
371+
"invalid data: %s"), msg)
372+
job.ret = msg["data"]
373+
job.timestamp = datetime.now()
374+
375+
if msg["command"] == "heart_beat":
376+
job.heart_beat = msg["data"]
377+
378+
# keep track of mem and cpu
379+
try:
380+
job.track_mem.append(job.heart_beat["memory"])
381+
job.track_cpu.append(job.heart_beat["cpu_load"])
382+
except (ValueError, TypeError):
383+
self.logger.error("Error decoding heart-beat",
384+
exc_info=True)
385+
return_msg = "all good"
386+
job.timestamp = datetime.now()
387+
388+
if msg["command"] == "get_job":
389+
# serve job for display
390+
return_msg = job
368391
else:
369-
self.logger.error(("Received message with invalid" +
370-
" data: %s"), msg)
371-
job.ret = msg["data"]
372-
# is assigned in submission process and not written back
373-
# server-side
374-
job.timestamp = datetime.now()
375-
376-
if msg["command"] == "heart_beat":
377-
job.heart_beat = msg["data"]
378-
379-
# keep track of mem and cpu
380-
try:
381-
job.track_mem.append(job.heart_beat["memory"])
382-
job.track_cpu.append(job.heart_beat["cpu_load"])
383-
except (ValueError, TypeError):
384-
self.logger.error("Error decoding heart-beat",
385-
exc_info=True)
386-
return_msg = "all good"
387-
job.timestamp = datetime.now()
388-
389-
if msg["command"] == "get_job":
390-
# serve job for display
391-
return_msg = job
392+
# update host name
393+
job.host_name = msg["host_name"]
394+
# If this is an unknown job, report it and reply
392395
else:
393-
# update host name
394-
job.host_name = msg["host_name"]
395-
# If this is an unknown job, report it and reply
396+
self.logger.error(('Received message from unknown job' +
397+
' with ID %s. Known job IDs are: ' +
398+
'%s'), job_id,
399+
list(self.jobid_to_job.keys()))
400+
return_msg = 'thanks, but no thanks'
396401
else:
397-
self.logger.error(('Received message from unknown job ' +
398-
'with ID %s. Known job IDs are: %s'),
399-
job_id,
400-
list(self.jobid_to_job.keys()))
401-
return_msg = 'thanks, but no thanks'
402-
else:
403-
# run check
404-
self.check_if_alive()
405-
406-
if msg["command"] == "get_jobs":
407-
# serve list of jobs for display
408-
return_msg = self.jobs
409-
410-
# send back compressed response
411-
self.logger.debug('Sending reply: %s', return_msg)
412-
self.socket.send(zdumps(return_msg))
413-
414-
# Kill child processes that we don't need anymore
415-
local_heart.terminate()
402+
# run check
403+
self.check_if_alive()
404+
405+
if msg["command"] == "get_jobs":
406+
# serve list of jobs for display
407+
return_msg = self.jobs
408+
409+
# send back compressed response
410+
self.logger.debug('Sending reply: %s', return_msg)
411+
self.socket.send(zdumps(return_msg))
412+
finally:
413+
# Kill child processes that we don't need anymore
414+
local_heart.terminate()
416415

417416
def check_if_alive(self):
418417
"""

0 commit comments

Comments
 (0)