Skip to content

Commit a5f1f87

Browse files
committed
Add queue_name routing to RemoteProcessThreadController
Add required queue_name parameter to continue_process, launch_process, execute_process, and task_send methods. Tasks are now routed directly to named queues via communicator.task_queue(queue_name).task_send(). This enables AiiDA's multi-queue scheduling where different process types (root workchains, nested workchains, calcjobs) can be routed to separate queues with independent prefetch settings.
1 parent 2317b6f commit a5f1f87

1 file changed

Lines changed: 33 additions & 8 deletions

File tree

src/plumpy/process_comms.py

Lines changed: 33 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -345,7 +345,9 @@ async def execute_process(
345345

346346
class RemoteProcessThreadController:
347347
"""
348-
A class that can be used to control and launch remote processes
348+
A class that can be used to control and launch remote processes.
349+
350+
Supports routing tasks to specific named queues via the queue_name parameter.
349351
"""
350352

351353
def __init__(self, communicator: kiwipy.Communicator):
@@ -425,10 +427,25 @@ def kill_all(self, msg_text: Optional[str]) -> None:
425427
self._communicator.broadcast_send(msg, subject=Intent.KILL)
426428

427429
def continue_process(
428-
self, pid: 'PID_TYPE', tag: Optional[str] = None, nowait: bool = False, no_reply: bool = False
430+
self,
431+
pid: 'PID_TYPE',
432+
tag: Optional[str] = None,
433+
nowait: bool = False,
434+
no_reply: bool = False,
435+
*,
436+
queue_name: str,
429437
) -> Union[None, PID_TYPE, ProcessResult]:
438+
"""Continue a process, routing to the specified queue.
439+
440+
:param pid: the pid of the process to continue
441+
:param tag: the checkpoint tag to continue from
442+
:param nowait: if True don't wait for the process to complete
443+
:param no_reply: if True, this call will be fire-and-forget
444+
:param queue_name: queue name to route to (required)
445+
:return: the result of continuing the process
446+
"""
430447
message = create_continue_body(pid=pid, tag=tag, nowait=nowait)
431-
return self.task_send(message, no_reply=no_reply)
448+
return self.task_send(message, no_reply=no_reply, queue_name=queue_name)
432449

433450
def launch_process(
434451
self,
@@ -439,6 +456,8 @@ def launch_process(
439456
loader: Optional[loaders.ObjectLoader] = None,
440457
nowait: bool = False,
441458
no_reply: bool = False,
459+
*,
460+
queue_name: str,
442461
) -> Union[None, PID_TYPE, ProcessResult]:
443462
"""
444463
Launch the process
@@ -450,10 +469,11 @@ def launch_process(
450469
:param loader: the class loader to use
451470
:param nowait: if True only return when the process finishes
452471
:param no_reply: don't send a reply to the sender
472+
:param queue_name: queue name to route to (required)
453473
:return: the pid of the created process or the outputs (if nowait=False)
454474
"""
455475
message = create_launch_body(process_class, init_args, init_kwargs, persist, loader, nowait)
456-
return self.task_send(message, no_reply=no_reply)
476+
return self.task_send(message, no_reply=no_reply, queue_name=queue_name)
457477

458478
def execute_process(
459479
self,
@@ -463,6 +483,8 @@ def execute_process(
463483
loader: Optional[loaders.ObjectLoader] = None,
464484
nowait: bool = False,
465485
no_reply: bool = False,
486+
*,
487+
queue_name: str,
466488
) -> Union[None, PID_TYPE, ProcessResult]:
467489
"""
468490
Execute a process. This call will first send a create task and then a continue task over
@@ -475,6 +497,7 @@ def execute_process(
475497
:param loader: the class loader to use
476498
:param nowait: if True, don't wait for the process to send a response
477499
:param no_reply: if True, this call will be fire-and-forget, i.e. no return value
500+
:param queue_name: queue name to route to (required)
478501
:return: the result of executing the process
479502
"""
480503

@@ -486,21 +509,23 @@ def execute_process(
486509
def on_created(_: Any) -> None:
487510
with kiwipy.capture_exceptions(execute_future):
488511
pid: 'PID_TYPE' = create_future.result()
489-
continue_future = self.continue_process(pid, nowait=nowait, no_reply=no_reply)
512+
continue_future = self.continue_process(pid, nowait=nowait, no_reply=no_reply, queue_name=queue_name)
490513
kiwipy.chain(continue_future, execute_future)
491514

492515
create_future.add_done_callback(on_created)
493516
return execute_future
494517

495-
def task_send(self, message: Any, no_reply: bool = False) -> Optional[Any]:
518+
def task_send(self, message: Any, no_reply: bool = False, *, queue_name: str) -> Optional[Any]:
496519
"""
497-
Send a task to be performed using the communicator
520+
Send a task to be performed using the communicator.
498521
499522
:param message: the task message
500523
:param no_reply: if True, this call will be fire-and-forget, i.e. no return value
524+
:param queue_name: queue name to route to (required)
501525
:return: the response from the remote side (if no_reply=False)
502526
"""
503-
return self._communicator.task_send(message, no_reply=no_reply)
527+
# Send directly to the queue - AiiDA's broker handles queue setup
528+
return self._communicator.task_queue(queue_name).task_send(message, no_reply=no_reply)
504529

505530

506531
class ProcessLauncher:

0 commit comments

Comments
 (0)