Skip to content

Commit 65cade1

Browse files
authored
Suspend and resume support (#4)
1 parent 9954469 commit 65cade1

6 files changed

Lines changed: 186 additions & 26 deletions

File tree

README.md

Lines changed: 32 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -9,15 +9,6 @@ This repo contains a Python client SDK for use with the [Durable Task Framework
99

1010
> Note that this project is **not** currently affiliated with the [Durable Functions](https://docs.microsoft.com/azure/azure-functions/durable/durable-functions-overview) project for Azure Functions. If you are looking for a Python SDK for Durable Functions, please see [this repo](https://github.com/Azure/azure-functions-durable-python).
1111
12-
## Features
13-
14-
- [x] Orchestrations
15-
- [x] Activities
16-
- [x] Durable timers
17-
- [x] Sub-orchestrations
18-
- [ ] External events
19-
- [ ] Suspend and resume
20-
- [ ] Retry policies
2112

2213
## Supported patterns
2314

@@ -102,6 +93,38 @@ As an aside, you'll also notice that the example orchestration above works with
10293

10394
You can find the full sample [here](./examples/human_interaction.py).
10495

96+
## Feature overview
97+
98+
The following features are currently supported:
99+
100+
### Orchestrations
101+
102+
Orchestrations are implemented using ordinary Python functions that take an `OrchestrationContext` as their first parameter. The `OrchestrationContext` provides APIs for starting child orchestrations, scheduling activities, and waiting for external events, among other things. Orchestrations are fault-tolerant and durable, meaning that they can automatically recover from failures and rebuild their local execution state. Orchestrator functions must be deterministic, meaning that they must always produce the same output given the same input.
103+
104+
### Activities
105+
106+
Activities are implemented using ordinary Python functions that take an `ActivityContext` as their first parameter. Activity functions are scheduled by orchestrations and have at-least-once execution guarantees, meaning that they will be executed at least once but may be executed multiple times in the event of a transient failure. Activity functions are where the real "work" of any orchestration is done.
107+
108+
### Durable timers
109+
110+
Orchestrations can schedule durable timers using the `create_timer` API. These timers are durable, meaning that they will survive orchestrator restarts and will fire even if the orchestrator is not actively in memory. Durable timers can be of any duration, from milliseconds to months.
111+
112+
### Sub-orchestrations
113+
114+
Orchestrations can start child orchestrations using the `call_sub_orchestrator` API. Child orchestrations are useful for encapsulating complex logic and for breaking up large orchestrations into smaller, more manageable pieces.
115+
116+
### External events
117+
118+
Orchestrations can wait for external events using the `wait_for_external_event` API. External events are useful for implementing human interaction patterns, such as waiting for a user to approve an order before continuing.
119+
120+
### Suspend and resume
121+
122+
Orchestrations can be suspended using the `suspend_orchestration` client API and will remain suspended until resumed using the `resume_orchestration` client API. A suspended orchestration will stop processing new events, but will continue to buffer any that happen to arrive until resumed, ensuring that no data is lost.
123+
124+
### Retry policies (TODO)
125+
126+
Orchestrations can specify retry policies for activities and sub-orchestrations. These policies control how many times and how frequently an activity or sub-orchestration will be retried in the event of a transient error.
127+
105128
## Getting Started
106129

107130
### Prerequisites

durabletask/client.py

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -162,10 +162,14 @@ def raise_orchestration_event(self, instance_id: str, event_name: str, *,
162162
self._stub.RaiseEvent(req)
163163

164164
def terminate_orchestration(self):
165-
pass
165+
raise NotImplementedError()
166166

167-
def suspend_orchestration(self):
168-
pass
167+
def suspend_orchestration(self, instance_id: str):
168+
req = pb.SuspendRequest(instanceId=instance_id)
169+
self._logger.info(f"Suspending instance '{instance_id}'.")
170+
self._stub.SuspendInstance(req)
169171

170-
def resume_orchestration(self):
171-
pass
172+
def resume_orchestration(self, instance_id: str):
173+
req = pb.ResumeRequest(instanceId=instance_id)
174+
self._logger.info(f"Resuming instance '{instance_id}'.")
175+
self._stub.ResumeInstance(req)

durabletask/internal/helpers.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,22 @@ def new_event_raised_event(name: str, encoded_input: str | None = None) -> pb.Hi
123123
)
124124

125125

126+
def new_suspend_event() -> pb.HistoryEvent:
127+
return pb.HistoryEvent(
128+
eventId=-1,
129+
timestamp=timestamp_pb2.Timestamp(),
130+
executionSuspended=pb.ExecutionSuspendedEvent()
131+
)
132+
133+
134+
def new_resume_event() -> pb.HistoryEvent:
135+
return pb.HistoryEvent(
136+
eventId=-1,
137+
timestamp=timestamp_pb2.Timestamp(),
138+
executionResumed=pb.ExecutionResumedEvent()
139+
)
140+
141+
126142
def get_string_value(val: str | None) -> wrappers_pb2.StringValue | None:
127143
if val is None:
128144
return None

durabletask/worker.py

Lines changed: 54 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,7 @@ def __init__(self, *,
9393
self._logger = shared.get_logger(log_handler, log_formatter)
9494
self._shutdown = Event()
9595
self._response_stream = None
96+
self._is_running = False
9697

9798
def __enter__(self):
9899
return self
@@ -102,17 +103,24 @@ def __exit__(self, type, value, traceback):
102103

103104
def add_orchestrator(self, fn: task.Orchestrator) -> str:
104105
"""Registers an orchestrator function with the worker."""
106+
if self._is_running:
107+
raise RuntimeError('Orchestrators cannot be added while the worker is running.')
105108
return self._registry.add_orchestrator(fn)
106109

107110
def add_activity(self, fn: task.Activity) -> str:
108111
"""Registers an activity function with the worker."""
112+
if self._is_running:
113+
raise RuntimeError('Activities cannot be added while the worker is running.')
109114
return self._registry.add_activity(fn)
110115

111116
def start(self):
112117
"""Starts the worker on a background thread and begins listening for work items."""
113118
channel = shared.get_grpc_channel(self._host_address)
114119
stub = stubs.TaskHubSidecarServiceStub(channel)
115120

121+
if self._is_running:
122+
raise RuntimeError('The worker is already running.')
123+
116124
def run_loop():
117125
# TODO: Investigate whether asyncio could be used to enable greater concurrency for async activity
118126
# functions. We'd need to know ahead of time whether a function is async or not.
@@ -158,16 +166,21 @@ def run_loop():
158166
self._logger.info(f"starting gRPC worker that connects to {self._host_address}")
159167
self._runLoop = Thread(target=run_loop)
160168
self._runLoop.start()
169+
self._is_running = True
161170

162171
def stop(self):
163172
"""Stops the worker and waits for any pending work items to complete."""
173+
if not self._is_running:
174+
return
175+
164176
self._logger.info("Stopping gRPC worker...")
165177
self._shutdown.set()
166178
if self._response_stream is not None:
167179
self._response_stream.cancel()
168180
if self._runLoop is not None:
169181
self._runLoop.join(timeout=30)
170182
self._logger.info("Worker shutdown completed")
183+
self._is_running = False
171184

172185
def _execute_orchestrator(self, req: pb.OrchestratorRequest, stub: stubs.TaskHubSidecarServiceStub):
173186
try:
@@ -374,6 +387,8 @@ def __init__(self, registry: _Registry, logger: logging.Logger):
374387
self._registry = registry
375388
self._logger = logger
376389
self._generator = None
390+
self._is_suspended = False
391+
self._suspended_events: List[pb.HistoryEvent] = []
377392

378393
def execute(self, instance_id: str, old_events: Sequence[pb.HistoryEvent], new_events: Sequence[pb.HistoryEvent]) -> List[pb.OrchestratorAction]:
379394
if not new_events:
@@ -408,6 +423,12 @@ def execute(self, instance_id: str, old_events: Sequence[pb.HistoryEvent], new_e
408423
return actions
409424

410425
def process_event(self, ctx: _RuntimeOrchestrationContext, event: pb.HistoryEvent) -> None:
426+
if self._is_suspended and _is_suspendable(event):
427+
# We are suspended, so we need to buffer this event until we are resumed
428+
self._suspended_events.append(event)
429+
return
430+
431+
# CONSIDER: change to a switch statement with event.WhichOneof("eventType")
411432
try:
412433
if event.HasField("orchestratorStarted"):
413434
ctx.current_utc_datetime = event.timestamp.ToDatetime()
@@ -445,8 +466,9 @@ def process_event(self, ctx: _RuntimeOrchestrationContext, event: pb.HistoryEven
445466
timer_task = ctx._pending_tasks.pop(timer_id, None)
446467
if not timer_task:
447468
# TODO: Should this be an error? When would it ever happen?
448-
self._logger.warning(
449-
f"Ignoring unexpected timerFired event for '{ctx.instance_id}' with ID = {timer_id}.")
469+
if not ctx._is_replaying:
470+
self._logger.warning(
471+
f"{ctx.instance_id}: Ignoring unexpected timerFired event with ID = {timer_id}.")
450472
return
451473
timer_task.complete(None)
452474
ctx.resume()
@@ -472,8 +494,9 @@ def process_event(self, ctx: _RuntimeOrchestrationContext, event: pb.HistoryEven
472494
activity_task = ctx._pending_tasks.pop(task_id, None)
473495
if not activity_task:
474496
# TODO: Should this be an error? When would it ever happen?
475-
self._logger.warning(
476-
f"Ignoring unexpected taskCompleted event for '{ctx.instance_id}' with ID = {task_id}.")
497+
if not ctx.is_replaying:
498+
self._logger.warning(
499+
f"{ctx.instance_id}: Ignoring unexpected taskCompleted event with ID = {task_id}.")
477500
return
478501
result = None
479502
if not ph.is_empty(event.taskCompleted.result):
@@ -485,11 +508,12 @@ def process_event(self, ctx: _RuntimeOrchestrationContext, event: pb.HistoryEven
485508
activity_task = ctx._pending_tasks.pop(task_id, None)
486509
if not activity_task:
487510
# TODO: Should this be an error? When would it ever happen?
488-
self._logger.warning(
489-
f"Ignoring unexpected taskFailed event for '{ctx.instance_id}' with ID = {task_id}.")
511+
if not ctx.is_replaying:
512+
self._logger.warning(
513+
f"{ctx.instance_id}: Ignoring unexpected taskFailed event with ID = {task_id}.")
490514
return
491515
activity_task.fail(
492-
f"Activity task #{task_id} failed: {event.taskFailed.failureDetails.errorMessage}",
516+
f"{ctx.instance_id}: Activity task #{task_id} failed: {event.taskFailed.failureDetails.errorMessage}",
493517
event.taskFailed.failureDetails)
494518
ctx.resume()
495519
elif event.HasField("subOrchestrationInstanceCreated"):
@@ -513,8 +537,9 @@ def process_event(self, ctx: _RuntimeOrchestrationContext, event: pb.HistoryEven
513537
sub_orch_task = ctx._pending_tasks.pop(task_id, None)
514538
if not sub_orch_task:
515539
# TODO: Should this be an error? When would it ever happen?
516-
self._logger.warning(
517-
f"Ignoring unexpected subOrchestrationInstanceCompleted event for '{ctx.instance_id}' with ID = {task_id}.")
540+
if not ctx.is_replaying:
541+
self._logger.warning(
542+
f"{ctx.instance_id}: Ignoring unexpected subOrchestrationInstanceCompleted event with ID = {task_id}.")
518543
return
519544
result = None
520545
if not ph.is_empty(event.subOrchestrationInstanceCompleted.result):
@@ -527,8 +552,9 @@ def process_event(self, ctx: _RuntimeOrchestrationContext, event: pb.HistoryEven
527552
sub_orch_task = ctx._pending_tasks.pop(task_id, None)
528553
if not sub_orch_task:
529554
# TODO: Should this be an error? When would it ever happen?
530-
self._logger.warning(
531-
f"Ignoring unexpected subOrchestrationInstanceFailed event for '{ctx.instance_id}' with ID = {task_id}.")
555+
if not ctx.is_replaying:
556+
self._logger.warning(
557+
f"{ctx.instance_id}: Ignoring unexpected subOrchestrationInstanceFailed event with ID = {task_id}.")
532558
return
533559
sub_orch_task.fail(
534560
f"Sub-orchestration task #{task_id} failed: {failedEvent.failureDetails.errorMessage}",
@@ -559,7 +585,18 @@ def process_event(self, ctx: _RuntimeOrchestrationContext, event: pb.HistoryEven
559585
decoded_result = shared.from_json(event.eventRaised.input.value)
560586
event_list.append(_ExternalEvent(event.eventRaised.name, decoded_result))
561587
if not ctx.is_replaying:
562-
self._logger.info(f"Event '{event_name}' has been buffered as there are no tasks waiting for it.")
588+
self._logger.info(f"{ctx.instance_id}: Event '{event_name}' has been buffered as there are no tasks waiting for it.")
589+
elif event.HasField("executionSuspended"):
590+
if not self._is_suspended and not ctx.is_replaying:
591+
self._logger.info(f"{ctx.instance_id}: Execution suspended.")
592+
self._is_suspended = True
593+
elif event.HasField("executionResumed") and self._is_suspended:
594+
if not ctx.is_replaying:
595+
self._logger.info(f"{ctx.instance_id}: Resuming execution.")
596+
self._is_suspended = False
597+
for e in self._suspended_events:
598+
self.process_event(ctx, e)
599+
self._suspended_events = []
563600
else:
564601
eventType = event.WhichOneof("eventType")
565602
raise task.OrchestrationStateError(f"Don't know how to handle event of type '{eventType}'")
@@ -667,3 +704,8 @@ def _get_action_summary(new_actions: Sequence[pb.OrchestratorAction]) -> str:
667704
action_type = action.WhichOneof('orchestratorActionType')
668705
counts[action_type] = counts.get(action_type, 0) + 1
669706
return f"[{', '.join(f'{name}={count}' for name, count in counts.items())}]"
707+
708+
709+
def _is_suspendable(event: pb.HistoryEvent) -> bool:
710+
"""Returns true if the event is one that can be suspended and resumed."""
711+
return event.WhichOneof("eventType") not in ["executionResumed", "executionTerminated"]

tests/test_orchestration_e2e.py

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33

44
import json
55
import threading
6+
import time
67
from datetime import timedelta
78

89
import pytest
@@ -169,3 +170,42 @@ def orchestrator(ctx: task.OrchestrationContext, _):
169170
assert state.serialized_output == json.dumps("approved")
170171
else:
171172
assert state.serialized_output == json.dumps("timed out")
173+
174+
175+
def test_suspend_and_resume():
176+
def orchestrator(ctx: task.OrchestrationContext, _):
177+
result = yield ctx.wait_for_external_event("my_event")
178+
return result
179+
180+
# Start a worker, which will connect to the sidecar in a background thread
181+
with worker.TaskHubGrpcWorker() as w:
182+
w.add_orchestrator(orchestrator)
183+
w.start()
184+
185+
task_hub_client = client.TaskHubGrpcClient()
186+
id = task_hub_client.schedule_new_orchestration(orchestrator)
187+
state = task_hub_client.wait_for_orchestration_start(id, timeout=30)
188+
assert state is not None
189+
190+
# Suspend the orchestration and wait for it to go into the SUSPENDED state
191+
task_hub_client.suspend_orchestration(id)
192+
while state.runtime_status == client.OrchestrationStatus.RUNNING:
193+
time.sleep(0.1)
194+
state = task_hub_client.get_orchestration_state(id)
195+
assert state is not None
196+
assert state.runtime_status == client.OrchestrationStatus.SUSPENDED
197+
198+
# Raise an event to the orchestration and confirm that it does NOT complete
199+
task_hub_client.raise_orchestration_event(id, "my_event", data=42)
200+
try:
201+
state = task_hub_client.wait_for_orchestration_completion(id, timeout=3)
202+
assert False, "Orchestration should not have completed"
203+
except TimeoutError:
204+
pass
205+
206+
# Resume the orchestration and wait for it to complete
207+
task_hub_client.resume_orchestration(id)
208+
state = task_hub_client.wait_for_orchestration_completion(id, timeout=30)
209+
assert state is not None
210+
assert state.runtime_status == client.OrchestrationStatus.COMPLETED
211+
assert state.serialized_output == json.dumps(42)

tests/test_orchestration_executor.py

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -486,6 +486,7 @@ def orchestrator(ctx: task.OrchestrationContext, _):
486486
# the orchestration should complete.
487487
old_events = new_events
488488
new_events = [helpers.new_event_raised_event("my_event", encoded_input="42")]
489+
executor = worker._OrchestrationExecutor(registry, TEST_LOGGER)
489490
actions = executor.execute(TEST_INSTANCE_ID, old_events, new_events)
490491
complete_action = get_and_validate_single_complete_orchestration_action(actions)
491492
assert complete_action.orchestrationStatus == pb.ORCHESTRATION_STATUS_COMPLETED
@@ -519,6 +520,40 @@ def orchestrator(ctx: task.OrchestrationContext, _):
519520
timer_due_time = datetime.utcnow() + timedelta(days=1)
520521
old_events = new_events + [helpers.new_timer_created_event(1, timer_due_time)]
521522
new_events = [helpers.new_timer_fired_event(1, timer_due_time)]
523+
executor = worker._OrchestrationExecutor(registry, TEST_LOGGER)
524+
actions = executor.execute(TEST_INSTANCE_ID, old_events, new_events)
525+
complete_action = get_and_validate_single_complete_orchestration_action(actions)
526+
assert complete_action.orchestrationStatus == pb.ORCHESTRATION_STATUS_COMPLETED
527+
assert complete_action.result.value == "42"
528+
529+
530+
def test_suspend_resume():
531+
"""Tests that an orchestration can be suspended and resumed"""
532+
533+
def orchestrator(ctx: task.OrchestrationContext, _):
534+
result = yield ctx.wait_for_external_event("my_event")
535+
return result
536+
537+
registry = worker._Registry()
538+
orchestrator_name = registry.add_orchestrator(orchestrator)
539+
540+
old_events = [
541+
helpers.new_orchestrator_started_event(),
542+
helpers.new_execution_started_event(orchestrator_name, TEST_INSTANCE_ID)]
543+
new_events = [
544+
helpers.new_suspend_event(),
545+
helpers.new_event_raised_event("my_event", encoded_input="42")]
546+
547+
# Execute the orchestration. It should remain in a running state because it was suspended prior
548+
# to processing the event raised event.
549+
executor = worker._OrchestrationExecutor(registry, TEST_LOGGER)
550+
actions = executor.execute(TEST_INSTANCE_ID, old_events, new_events)
551+
assert len(actions) == 0
552+
553+
# Resume the orchestration. It should complete successfully.
554+
old_events = old_events + new_events
555+
new_events = [helpers.new_resume_event()]
556+
executor = worker._OrchestrationExecutor(registry, TEST_LOGGER)
522557
actions = executor.execute(TEST_INSTANCE_ID, old_events, new_events)
523558
complete_action = get_and_validate_single_complete_orchestration_action(actions)
524559
assert complete_action.orchestrationStatus == pb.ORCHESTRATION_STATUS_COMPLETED

0 commit comments

Comments
 (0)