@@ -401,6 +401,8 @@ async def ExporterStream(self, request_iterator, context):
401401 command_queue = asyncio .Queue ()
402402 pending_commands = []
403403
404+ startup_done = asyncio .Event ()
405+
404406 out_msg = labgrid_coordinator_pb2 .ExporterOutMessage ()
405407 out_msg .hello .version = labgrid_version ()
406408 yield out_msg
@@ -420,10 +422,15 @@ async def request_task():
420422 elif kind == "startup" :
421423 version = in_msg .startup .version
422424 name = in_msg .startup .name
425+ if existing := self .get_exporter_by_name (name ):
426+ raise ExporterError (
427+ f"exporter with name '{ name } ' is already connected from { existing .peer } "
428+ )
423429 session = self .exporters [peer ] = ExporterSession (self , peer , name , command_queue , version )
424430 logging .debug ("Exporters: %s" , self .exporters )
425431 logging .debug ("Received startup from %s with %s" , name , version )
426432 asyncio .current_task ().set_name (f"exporter-{ peer } -rx/started-{ name } " )
433+ startup_done .set ()
427434 elif kind == "resource" :
428435 logging .debug ("Received resource from %s with %s" , name , in_msg .resource )
429436 action , _ = session .set_resource (
@@ -439,10 +446,32 @@ async def request_task():
439446 logging .debug ("exporter request_task done: %s" , context .done ())
440447 except Exception :
441448 logging .exception ("error in exporter message handler" )
449+ raise
442450
443451 asyncio .current_task ().set_name (f"exporter-{ peer } -tx" )
444452 running_request_task = self .loop .create_task (request_task (), name = f"exporter-{ peer } -rx/init" )
445453
454+ startup_done_task = self .loop .create_task (startup_done .wait ())
455+ done , _ = await asyncio .wait (
456+ {startup_done_task , running_request_task },
457+ timeout = 3 ,
458+ return_when = asyncio .FIRST_COMPLETED ,
459+ )
460+ # clean up event task
461+ startup_done .set ()
462+ await startup_done_task
463+ if running_request_task in done :
464+ # we probably had an exception during startup
465+ try :
466+ await running_request_task
467+ except ExporterError as e :
468+ await context .abort (grpc .StatusCode .ALREADY_EXISTS , f"startup failed: { e } " )
469+ raise
470+ elif startup_done_task in done :
471+ await startup_done_task
472+ else :
473+ raise ExporterError (f"exporter connection from { peer } timed out during startup" )
474+
446475 try :
447476 async for cmd in queue_as_aiter (command_queue ):
448477 logging .debug ("exporter cmd %s" , cmd )
0 commit comments