Skip to content

Commit 3f700bb

Browse files
cgillumdavidmrdavid
andcommitted
Initial orchestrator functionality (#2)
- Includes gRPC and protobuf classes - APIs for registering orchestrators as generator functions - Orchestration API support for durable timers - Unit test setup, including tests for orchestrators with timers Co-authored-by: David Justo <david.justo.1996@gmail.com>
1 parent 6d9f149 commit 3f700bb

18 files changed

Lines changed: 543 additions & 47 deletions

.gitignore

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,3 +127,5 @@ dmypy.json
127127

128128
# Pyre type checker
129129
.pyre/
130+
131+
coverage.lcov

.vscode/launch.json

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
{
2+
"version": "0.2.0",
3+
"configurations": [
4+
{
5+
"name": "Python: Debug Tests",
6+
"type": "python",
7+
"request": "launch",
8+
"program": "${file}",
9+
"purpose": [
10+
"debug-test"
11+
],
12+
"env": {
13+
// pytest-cov breaks debugging, so we have to disable it during debug sessions
14+
"PYTEST_ADDOPTS": "--no-cov"
15+
},
16+
"console": "integratedTerminal",
17+
"justMyCode": false
18+
}
19+
]
20+
}

.vscode/settings.json

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
{
2+
"editor.formatOnSave": true,
3+
"python.analysis.typeCheckingMode": "basic",
4+
"python.formatting.autopep8Args": [
5+
"--max-line-length=120"
6+
],
7+
"python.testing.pytestArgs": [
8+
"-v",
9+
"--cov=durabletask/",
10+
"--cov-report=lcov",
11+
"tests/"
12+
],
13+
"python.testing.unittestEnabled": false,
14+
"python.testing.pytestEnabled": true,
15+
"coverage-gutters.showLineCoverage": true,
16+
"coverage-gutters.coverageFileNames": [
17+
"coverage.lcov",
18+
"lcov.info",
19+
"cov.xml",
20+
"coverage.xml",
21+
"jacoco.xml",
22+
"coverage.cobertura.xml"
23+
]
24+
}

README.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,10 +11,10 @@ As the maintainer of this project, please make a few updates:
1111
- Remove this section from the README
1212

1313
### Generating protobufs
14-
If the gRPC proto definitions need to be updated, the corresponding source code can be regenerated using the following command from the `src/durabletask` directory:
14+
If the gRPC proto definitions need to be updated, the corresponding source code can be regenerated using the following command from the project root:
1515

1616
```bash
17-
python3 -m grpc_tools.protoc --proto_path=../../submodules/durabletask-protobuf/protos --python_out=. --pyi_out=. --grpc_python_out=. orchestrator_service.proto
17+
python3 -m grpc_tools.protoc --proto_path=./submodules/durabletask-protobuf/protos --python_out=./durabletask/protos --pyi_out=./durabletask/protos --grpc_python_out=./durabletask/protos orchestrator_service.proto
1818
```
1919

2020
## Contributing

durabletask/protos/helpers.py

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
import traceback
2+
3+
from datetime import datetime
4+
from google.protobuf import timestamp_pb2, wrappers_pb2
5+
6+
from durabletask.protos.orchestrator_service_pb2 import *
7+
8+
9+
def new_orchestrator_started_event(timestamp: datetime | None = None) -> HistoryEvent:
10+
ts = timestamp_pb2.Timestamp()
11+
if timestamp is not None:
12+
ts.FromDatetime(timestamp)
13+
return HistoryEvent(eventId=-1, timestamp=ts, orchestratorStarted=OrchestratorStartedEvent())
14+
15+
16+
def new_execution_started_event(name: str, instance_id: str, input: str | None = None) -> HistoryEvent:
17+
input_: wrappers_pb2.StringValue | None = None
18+
if input is not None:
19+
input_ = wrappers_pb2.StringValue(value=input)
20+
21+
return HistoryEvent(
22+
eventId=-1,
23+
timestamp=timestamp_pb2.Timestamp(),
24+
executionStarted=ExecutionStartedEvent(
25+
name=name, input=input_, orchestrationInstance=OrchestrationInstance(instanceId=instance_id)))
26+
27+
28+
def new_timer_created_event(timer_id: int, fire_at: datetime):
29+
ts = timestamp_pb2.Timestamp()
30+
ts.FromDatetime(fire_at)
31+
return HistoryEvent(
32+
eventId=timer_id,
33+
timestamp=timestamp_pb2.Timestamp(),
34+
timerCreated=TimerCreatedEvent(fireAt=ts)
35+
)
36+
37+
38+
def new_timer_fired_event(timer_id: int, fire_at: datetime):
39+
ts = timestamp_pb2.Timestamp()
40+
ts.FromDatetime(fire_at)
41+
return HistoryEvent(
42+
eventId=-1,
43+
timestamp=timestamp_pb2.Timestamp(),
44+
timerFired=TimerFiredEvent(fireAt=ts, timerId=timer_id)
45+
)
46+
47+
48+
def new_failure_details(ex: Exception) -> TaskFailureDetails:
49+
return TaskFailureDetails(
50+
errorType=type(ex).__name__,
51+
errorMessage=str(ex),
52+
stackTrace=wrappers_pb2.StringValue(value=''.join(traceback.format_tb(ex.__traceback__)))
53+
)
54+
55+
56+
def new_complete_orchestration_action(
57+
id: int,
58+
status: OrchestrationStatus,
59+
result: str | None = None,
60+
failureDetails: TaskFailureDetails | None = None) -> OrchestratorAction:
61+
resultWrapper = None
62+
if result is not None:
63+
resultWrapper = wrappers_pb2.StringValue(value=result)
64+
return OrchestratorAction(id=id, completeOrchestration=CompleteOrchestrationAction(
65+
orchestrationStatus=status,
66+
result=resultWrapper,
67+
carryoverEvents=None, # TODO: Populate carryoverEvents
68+
failureDetails=failureDetails,
69+
))
70+
71+
72+
def create_timer_action(id: int, fire_at: datetime) -> OrchestratorAction:
73+
timestamp = timestamp_pb2.Timestamp()
74+
timestamp.FromDatetime(fire_at)
75+
return OrchestratorAction(id=id, createTimer=CreateTimerAction(fireAt=timestamp))
File renamed without changes.
File renamed without changes.
File renamed without changes.
Lines changed: 177 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,177 @@
1+
import inspect
2+
import simplejson as json
3+
4+
from datetime import datetime
5+
from types import GeneratorType
6+
from typing import Any, Generator, List
7+
8+
import durabletask.protos.helpers as ph
9+
import durabletask.protos.orchestrator_service_pb2 as pb
10+
import durabletask.task.task as task
11+
12+
from durabletask.task.orchestrator import OrchestrationContext, Orchestrator
13+
from durabletask.task.registry import Registry
14+
from durabletask.task.task import Task
15+
16+
17+
class NonDeterminismError(Exception):
18+
pass
19+
20+
21+
class OrchestratorNotFound(ValueError):
22+
pass
23+
24+
25+
class OrchestrationStateError(Exception):
26+
pass
27+
28+
29+
class RuntimeOrchestrationContext(OrchestrationContext):
30+
_generator: Generator[Task, Any, Any] | None
31+
_previous_task: Task | None
32+
33+
def __init__(self):
34+
self._generator = None
35+
self._is_replaying = True
36+
self._is_complete = False
37+
self._result = None
38+
self._pending_actions = dict[int, pb.OrchestratorAction]()
39+
self._pending_tasks = dict[int, task.CompletableTask]()
40+
self._sequence_number = 0
41+
self._current_utc_datetime = datetime(1000, 1, 1)
42+
43+
def run(self, generator: Generator[task.Task, Any, Any]):
44+
self._generator = generator
45+
# TODO: Do something with this task
46+
task = next(generator) # this starts the generator
47+
# TODO: Check if the task is null?
48+
self._previous_task = task
49+
50+
def resume(self):
51+
if self._generator is None:
52+
# This is never expected unless maybe there's an issue with the history
53+
raise TypeError("The orchestrator generator is not initialized! Was the orchestration history corrupted?")
54+
55+
# We can resume the generator only if the previously yielded task
56+
# has reached a completed state. The only time this won't be the
57+
# case is if the user yielded on a WhenAll task and there are still
58+
# outstanding child tasks that need to be completed.
59+
if self._previous_task is not None and self._previous_task.is_complete():
60+
# Resume the generator. This will either return a Task or raise StopIteration if it's done.
61+
next_task = self._generator.send(self._previous_task.get_result())
62+
# TODO: Validate the return value
63+
self._previous_task = next_task
64+
65+
def set_complete(self, result: Any):
66+
self._is_complete = True
67+
self._result = result
68+
result_json: str | None = None
69+
if result != None:
70+
result_json = json.dumps(result)
71+
action = ph.new_complete_orchestration_action(
72+
self.next_sequence_number(), pb.ORCHESTRATION_STATUS_COMPLETED, result_json)
73+
self._pending_actions[action.id] = action
74+
75+
def set_failed(self, ex: Exception):
76+
self._is_complete = True
77+
self._pending_actions.clear() # Cancel any pending actions
78+
action = ph.new_complete_orchestration_action(
79+
self.next_sequence_number(), pb.ORCHESTRATION_STATUS_FAILED, None, ph.new_failure_details(ex)
80+
)
81+
self._pending_actions[action.id] = action
82+
83+
def get_actions(self) -> List[pb.OrchestratorAction]:
84+
return list(self._pending_actions.values())
85+
86+
def next_sequence_number(self) -> int:
87+
self._sequence_number += 1
88+
return self._sequence_number
89+
90+
@property
91+
def current_utc_datetime(self) -> datetime:
92+
return self._current_utc_datetime
93+
94+
@current_utc_datetime.setter
95+
def current_utc_datetime(self, value: datetime):
96+
self._current_utc_datetime = value
97+
98+
def create_timer(self, fire_at: datetime) -> task.Task:
99+
id = self.next_sequence_number()
100+
action = ph.create_timer_action(id, fire_at)
101+
self._pending_actions[id] = action
102+
103+
timer_task = task.CompletableTask()
104+
self._pending_tasks[id] = timer_task
105+
return timer_task
106+
107+
108+
class OrchestrationExecutor:
109+
generator: Orchestrator | None
110+
111+
def __init__(self, registry: Registry):
112+
self.registry = registry
113+
self.generator = None
114+
115+
def execute(self, instance_id: str, old_Events: List[pb.HistoryEvent], new_events: List[pb.HistoryEvent]) -> List[pb.OrchestratorAction]:
116+
if new_events is None or len(new_events) == 0:
117+
raise OrchestrationStateError(
118+
"The new history event list must have at least one event in it.")
119+
120+
ctx = RuntimeOrchestrationContext()
121+
122+
try:
123+
# Rebuild local state by replaying old history into the orchestrator function
124+
ctx._is_replaying = True
125+
for old_event in old_Events:
126+
self.process_event(ctx, old_event)
127+
128+
# Get new actions by executing newly received events into the orchestrator function
129+
ctx._is_replaying = False
130+
for new_event in new_events:
131+
self.process_event(ctx, new_event)
132+
except Exception as ex:
133+
# Unhandled exceptions fail the orchestration
134+
ctx.set_failed(ex)
135+
136+
return ctx.get_actions()
137+
138+
def process_event(self, ctx: RuntimeOrchestrationContext, event: pb.HistoryEvent) -> None:
139+
try:
140+
if event.HasField("orchestratorStarted"):
141+
ctx.current_utc_datetime = event.timestamp.ToDatetime()
142+
elif event.HasField("executionStarted"):
143+
# TODO: Check if we already started the orchestration
144+
fn = self.registry.get_orchestrator(event.executionStarted.name)
145+
if fn is None:
146+
raise OrchestratorNotFound(f"A '{event.executionStarted.name}' orchestrator was not registered.")
147+
result = fn(ctx) # this does not execute the generator, only creates it
148+
if isinstance(result, GeneratorType):
149+
# Start the orchestrator's generator function
150+
ctx.run(result)
151+
else:
152+
# This is an orchestrator that doesn't schedule any tasks
153+
ctx.set_complete(result)
154+
elif event.HasField("timerCreated"):
155+
id = event.eventId
156+
if ctx._pending_actions.pop(id, None) is None:
157+
raise NonDeterminismError(inspect.cleandoc(f"""
158+
A previous execution called create_timer with sequence number {id}, but the current
159+
execution doesn't have this action with this sequence number. This problem occurs
160+
when either the orchestration has non-deterministic logic or if the code was changed
161+
after an instance of this orchestration already started running."""))
162+
elif event.HasField("timerFired"):
163+
id = event.timerFired.timerId
164+
timer_task = ctx._pending_tasks.pop(id, None)
165+
if timer_task is None:
166+
# TODO: This could be a duplicate event or it could be a non-deterministic orchestration.
167+
# Duplicate events should be handled gracefully with a warning. Otherwise, the
168+
# orchestration should probably fail with an error.
169+
return
170+
timer_task.complete(None)
171+
ctx.resume()
172+
else:
173+
eventType = event.WhichOneof("eventType")
174+
raise OrchestrationStateError(f"Don't know how to handle event of type '{eventType}'")
175+
except StopIteration as generatorStopped:
176+
# The orchestrator generator function completed
177+
ctx.set_complete(generatorStopped.value)

0 commit comments

Comments
 (0)