Skip to content

Commit b5b24c7

Browse files
authored
Add termination support (#5)
1 parent 65cade1 commit b5b24c7

7 files changed

Lines changed: 94 additions & 13 deletions

File tree

CHANGELOG.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,4 +5,4 @@ Initial release, which includes the following features:
55
- Orchestrations and activities
66
- Durable timers
77
- Sub-orchestrations
8-
8+
- Suspend, resume, and terminate client operations

README.md

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -117,9 +117,13 @@ Orchestrations can start child orchestrations using the `call_sub_orchestrator`
117117

118118
Orchestrations can wait for external events using the `wait_for_external_event` API. External events are useful for implementing human interaction patterns, such as waiting for a user to approve an order before continuing.
119119

120-
### Suspend and resume
120+
### Continue-as-new (TODO)
121121

122-
Orchestrations can be suspended using the `suspend_orchestration` client API and will remain suspended until resumed using the `resume_orchestration` client API. A suspended orchestration will stop processing new events, but will continue to buffer any that happen to arrive until resumed, ensuring that no data is lost.
122+
Orchestrations can be continued as new using the `continue_as_new` API. This API allows an orchestration to restart itself from scratch, optionally with a new input.
123+
124+
### Suspend, resume, and terminate
125+
126+
Orchestrations can be suspended using the `suspend_orchestration` client API and will remain suspended until resumed using the `resume_orchestration` client API. A suspended orchestration will stop processing new events, but will continue to buffer any that happen to arrive until resumed, ensuring that no data is lost. An orchestration can also be terminated using the `terminate_orchestration` client API. Terminated orchestrations will stop processing new events and will discard any buffered events.
123127

124128
### Retry policies (TODO)
125129

durabletask/client.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -161,8 +161,14 @@ def raise_orchestration_event(self, instance_id: str, event_name: str, *,
161161
self._logger.info(f"Raising event '{event_name}' for instance '{instance_id}'.")
162162
self._stub.RaiseEvent(req)
163163

164-
def terminate_orchestration(self):
165-
raise NotImplementedError()
164+
def terminate_orchestration(self, instance_id: str, *,
165+
output: Any | None = None):
166+
req = pb.TerminateRequest(
167+
instanceId=instance_id,
168+
output=wrappers_pb2.StringValue(value=shared.to_json(output)) if output else None)
169+
170+
self._logger.info(f"Terminating instance '{instance_id}'.")
171+
self._stub.TerminateInstance(req)
166172

167173
def suspend_orchestration(self, instance_id: str):
168174
req = pb.SuspendRequest(instanceId=instance_id)

durabletask/internal/helpers.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,16 @@ def new_resume_event() -> pb.HistoryEvent:
139139
)
140140

141141

142+
def new_terminated_event(*, encoded_output: str | None = None) -> pb.HistoryEvent:
143+
return pb.HistoryEvent(
144+
eventId=-1,
145+
timestamp=timestamp_pb2.Timestamp(),
146+
executionTerminated=pb.ExecutionTerminatedEvent(
147+
input=get_string_value(encoded_output)
148+
)
149+
)
150+
151+
142152
def get_string_value(val: str | None) -> wrappers_pb2.StringValue | None:
143153
if val is None:
144154
return None

durabletask/worker.py

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -277,17 +277,23 @@ def resume(self):
277277
if not self._previous_task.is_complete:
278278
break
279279

280-
def set_complete(self, result: Any):
280+
def set_complete(self, result: Any, status: pb.OrchestrationStatus, is_result_encoded: bool = False):
281+
if self._is_complete:
282+
return
283+
281284
self._is_complete = True
282285
self._result = result
283286
result_json: str | None = None
284287
if result is not None:
285-
result_json = shared.to_json(result)
288+
result_json = result if is_result_encoded else shared.to_json(result)
286289
action = ph.new_complete_orchestration_action(
287-
self.next_sequence_number(), pb.ORCHESTRATION_STATUS_COMPLETED, result_json)
290+
self.next_sequence_number(), status, result_json)
288291
self._pending_actions[action.id] = action
289292

290293
def set_failed(self, ex: Exception):
294+
if self._is_complete:
295+
return
296+
291297
self._is_complete = True
292298
self._pending_actions.clear() # Cancel any pending actions
293299
action = ph.new_complete_orchestration_action(
@@ -409,6 +415,8 @@ def execute(self, instance_id: str, old_events: Sequence[pb.HistoryEvent], new_e
409415
ctx._is_replaying = False
410416
for new_event in new_events:
411417
self.process_event(ctx, new_event)
418+
if ctx._is_complete:
419+
break
412420
except Exception as ex:
413421
# Unhandled exceptions fail the orchestration
414422
ctx.set_failed(ex)
@@ -450,7 +458,7 @@ def process_event(self, ctx: _RuntimeOrchestrationContext, event: pb.HistoryEven
450458
ctx.run(result)
451459
else:
452460
# This is an orchestrator that doesn't schedule any tasks
453-
ctx.set_complete(result)
461+
ctx.set_complete(result, pb.ORCHESTRATION_STATUS_COMPLETED)
454462
elif event.HasField("timerCreated"):
455463
# This history event confirms that the timer was successfully scheduled.
456464
# Remove the timerCreated event from the pending action list so we don't schedule it again.
@@ -597,12 +605,17 @@ def process_event(self, ctx: _RuntimeOrchestrationContext, event: pb.HistoryEven
597605
for e in self._suspended_events:
598606
self.process_event(ctx, e)
599607
self._suspended_events = []
608+
elif event.HasField("executionTerminated"):
609+
if not ctx.is_replaying:
610+
self._logger.info(f"{ctx.instance_id}: Execution terminating.")
611+
encoded_output = event.executionTerminated.input.value if not ph.is_empty(event.executionTerminated.input) else None
612+
ctx.set_complete(encoded_output, pb.ORCHESTRATION_STATUS_TERMINATED, is_result_encoded=True)
600613
else:
601614
eventType = event.WhichOneof("eventType")
602615
raise task.OrchestrationStateError(f"Don't know how to handle event of type '{eventType}'")
603616
except StopIteration as generatorStopped:
604617
# The orchestrator generator function completed
605-
ctx.set_complete(generatorStopped.value)
618+
ctx.set_complete(generatorStopped.value, pb.ORCHESTRATION_STATUS_COMPLETED)
606619

607620

608621
class _ActivityExecutor:

tests/test_orchestration_e2e.py

Lines changed: 26 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -101,10 +101,10 @@ def parent_orchestrator(ctx: task.OrchestrationContext, count: int):
101101

102102
# Start a worker, which will connect to the sidecar in a background thread
103103
with worker.TaskHubGrpcWorker() as w:
104-
w.start()
105104
w.add_activity(increment)
106105
w.add_orchestrator(orchestrator_child)
107106
w.add_orchestrator(parent_orchestrator)
107+
w.start()
108108

109109
task_hub_client = client.TaskHubGrpcClient()
110110
id = task_hub_client.schedule_new_orchestration(parent_orchestrator, input=10)
@@ -125,8 +125,8 @@ def orchestrator(ctx: task.OrchestrationContext, _):
125125

126126
# Start a worker, which will connect to the sidecar in a background thread
127127
with worker.TaskHubGrpcWorker() as w:
128-
w.start()
129128
w.add_orchestrator(orchestrator)
129+
w.start()
130130

131131
# Start the orchestration and immediately raise events to it.
132132
task_hub_client = client.TaskHubGrpcClient()
@@ -154,8 +154,8 @@ def orchestrator(ctx: task.OrchestrationContext, _):
154154

155155
# Start a worker, which will connect to the sidecar in a background thread
156156
with worker.TaskHubGrpcWorker() as w:
157-
w.start()
158157
w.add_orchestrator(orchestrator)
158+
w.start()
159159

160160
# Start the orchestration and immediately raise events to it.
161161
task_hub_client = client.TaskHubGrpcClient()
@@ -209,3 +209,26 @@ def orchestrator(ctx: task.OrchestrationContext, _):
209209
assert state is not None
210210
assert state.runtime_status == client.OrchestrationStatus.COMPLETED
211211
assert state.serialized_output == json.dumps(42)
212+
213+
214+
def test_terminate():
215+
def orchestrator(ctx: task.OrchestrationContext, _):
216+
result = yield ctx.wait_for_external_event("my_event")
217+
return result
218+
219+
# Start a worker, which will connect to the sidecar in a background thread
220+
with worker.TaskHubGrpcWorker() as w:
221+
w.add_orchestrator(orchestrator)
222+
w.start()
223+
224+
task_hub_client = client.TaskHubGrpcClient()
225+
id = task_hub_client.schedule_new_orchestration(orchestrator)
226+
state = task_hub_client.wait_for_orchestration_start(id, timeout=30)
227+
assert state is not None
228+
assert state.runtime_status == client.OrchestrationStatus.RUNNING
229+
230+
task_hub_client.terminate_orchestration(id, output="some reason for termination")
231+
state = task_hub_client.wait_for_orchestration_completion(id, timeout=30)
232+
assert state is not None
233+
assert state.runtime_status == client.OrchestrationStatus.TERMINATED
234+
assert state.serialized_output == json.dumps("some reason for termination")

tests/test_orchestration_executor.py

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -560,6 +560,31 @@ def orchestrator(ctx: task.OrchestrationContext, _):
560560
assert complete_action.result.value == "42"
561561

562562

563+
def test_terminate():
564+
"""Tests that an orchestration can be terminated before it completes"""
565+
566+
def orchestrator(ctx: task.OrchestrationContext, _):
567+
result = yield ctx.wait_for_external_event("my_event")
568+
return result
569+
570+
registry = worker._Registry()
571+
orchestrator_name = registry.add_orchestrator(orchestrator)
572+
573+
old_events = [
574+
helpers.new_orchestrator_started_event(),
575+
helpers.new_execution_started_event(orchestrator_name, TEST_INSTANCE_ID)]
576+
new_events = [
577+
helpers.new_terminated_event(encoded_output=json.dumps("terminated!")),
578+
helpers.new_event_raised_event("my_event", encoded_input="42")]
579+
580+
# Execute the orchestration. It should be in a running state waiting for an external event
581+
executor = worker._OrchestrationExecutor(registry, TEST_LOGGER)
582+
actions = executor.execute(TEST_INSTANCE_ID, old_events, new_events)
583+
complete_action = get_and_validate_single_complete_orchestration_action(actions)
584+
assert complete_action.orchestrationStatus == pb.ORCHESTRATION_STATUS_TERMINATED
585+
assert complete_action.result.value == json.dumps("terminated!")
586+
587+
563588
def test_fan_out():
564589
"""Tests that a fan-out pattern correctly schedules N tasks"""
565590
def hello(_, name: str):

0 commit comments

Comments
 (0)