Skip to content

Commit fbb8e36

Browse files
committed
Added gRPC worker, client, and e2e test (#3)
1 parent 3f700bb commit fbb8e36

10 files changed

Lines changed: 703 additions & 394 deletions

File tree

README.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,13 @@ If the gRPC proto definitions need to be updated, the corresponding source code
1717
python3 -m grpc_tools.protoc --proto_path=./submodules/durabletask-protobuf/protos --python_out=./durabletask/protos --pyi_out=./durabletask/protos --grpc_python_out=./durabletask/protos orchestrator_service.proto
1818
```
1919

20+
### Running E2E tests
21+
The E2E (end-to-end) tests require a sidecar process to be running. You can run a sidecar process using the following `docker` command (assumes you have Docker installed on your local system.)
22+
23+
```sh
24+
docker run --name durabletask-sidecar -p 4001:4001 --env 'DURABLETASK_SIDECAR_LOGLEVEL=Debug' --rm cgillum/durabletask-sidecar:latest start --backend Emulator
25+
```
26+
2027
## Contributing
2128

2229
This project welcomes contributions and suggestions. Most contributions require you to agree to a

durabletask/api/state.py

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
from dataclasses import dataclass
2+
from datetime import datetime
3+
4+
import durabletask.protos.orchestrator_service_pb2 as pb
5+
from durabletask.protos.orchestrator_service_pb2 import TaskFailureDetails
6+
7+
8+
@dataclass
9+
class OrchestrationState:
10+
instance_id: str
11+
name: str
12+
runtime_status: pb.OrchestrationStatus
13+
created_at: datetime
14+
last_updated_at: datetime
15+
serialized_input: str | None
16+
serialized_output: str | None
17+
serialized_custom_status: str | None
18+
failure_details: pb.TaskFailureDetails | None
19+
20+
21+
def new_orchestration_state(instance_id: str, res: pb.GetInstanceResponse) -> OrchestrationState | None:
22+
if not res.exists:
23+
return None
24+
25+
state = res.orchestrationState
26+
return OrchestrationState(
27+
instance_id,
28+
state.name,
29+
state.orchestrationStatus,
30+
state.createdTimestamp.ToDatetime(),
31+
state.lastUpdatedTimestamp.ToDatetime(),
32+
state.input.value if state.input is not None else None,
33+
state.output.value if state.output is not None else None,
34+
state.customStatus.value if state.customStatus is not None else None,
35+
state.failureDetails if state.failureDetails.errorMessage != '' or state.failureDetails.errorType != '' else None)

durabletask/api/task_hub_client.py

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
from dataclasses import dataclass
2+
from datetime import datetime
3+
import logging
4+
from typing import Any
5+
import uuid
6+
import grpc
7+
from durabletask.api.state import OrchestrationState, new_orchestration_state
8+
import durabletask.protos.orchestrator_service_pb2 as pb
9+
import durabletask.internal.shared as shared
10+
import simplejson as json
11+
12+
from google.protobuf import timestamp_pb2, wrappers_pb2
13+
14+
from durabletask.protos.orchestrator_service_pb2_grpc import TaskHubSidecarServiceStub
15+
16+
17+
class TaskHubGrpcClient:
18+
19+
def __init__(self, *,
20+
host_address: str | None = None,
21+
log_handler=None,
22+
log_formatter: logging.Formatter | None = None):
23+
channel = shared.get_grpc_channel(host_address)
24+
self._stub = TaskHubSidecarServiceStub(channel)
25+
self._logger = shared.get_logger(log_handler, log_formatter)
26+
27+
def schedule_new_orchestration(self, name: str, *,
28+
input: Any = None,
29+
instance_id: str | None = None,
30+
start_at: datetime | None = None) -> str:
31+
req = pb.CreateInstanceRequest(name=name)
32+
if instance_id is None:
33+
instance_id = uuid.uuid4().hex
34+
req.instanceId = instance_id
35+
36+
if input is not None:
37+
json_input = json.dumps(input)
38+
req.input = wrappers_pb2.StringValue(value=json_input)
39+
40+
if start_at is not None:
41+
req.scheduledStartTimestamp = timestamp_pb2.Timestamp()
42+
req.scheduledStartTimestamp.FromDatetime(start_at)
43+
self._logger.info(f"Starting new '{name}' instance with ID = '{instance_id}'.")
44+
res: pb.CreateInstanceResponse = self._stub.StartInstance(req)
45+
return res.instanceId
46+
47+
def get_orchestration_state(self, instance_id: str, *, fetch_payloads: bool = True) -> OrchestrationState | None:
48+
req = pb.GetInstanceRequest(instanceId=instance_id, getInputsAndOutputs=fetch_payloads)
49+
res: pb.GetInstanceResponse = self._stub.GetInstance(req)
50+
return new_orchestration_state(req.instanceId, res)
51+
52+
def wait_for_orchestration_start(self, instance_id: str, *,
53+
fetch_payloads: bool = False,
54+
timeout: int = 60) -> OrchestrationState | None:
55+
req = pb.GetInstanceRequest(instanceId=instance_id, getInputsAndOutputs=fetch_payloads)
56+
try:
57+
self._logger.info(f"Waiting {timeout}s for instance '{instance_id}' to start.")
58+
res: pb.GetInstanceResponse = self._stub.WaitForInstanceStart(req, timeout=timeout)
59+
return new_orchestration_state(req.instanceId, res)
60+
except grpc.RpcError as rpc_error:
61+
if rpc_error.code() == grpc.StatusCode.DEADLINE_EXCEEDED: # type: ignore
62+
# Replace gRPC error with the built-in TimeoutError
63+
raise TimeoutError("Timed-out waiting for the orchestration to start")
64+
else:
65+
raise
66+
67+
def wait_for_orchestration_completion(self, instance_id: str, *,
68+
fetch_payloads: bool = True,
69+
timeout: int = 60) -> OrchestrationState | None:
70+
req = pb.GetInstanceRequest(instanceId=instance_id, getInputsAndOutputs=fetch_payloads)
71+
try:
72+
self._logger.info(f"Waiting {timeout}s for instance '{instance_id}' to complete.")
73+
res: pb.GetInstanceResponse = self._stub.WaitForInstanceCompletion(req, timeout=timeout)
74+
return new_orchestration_state(req.instanceId, res)
75+
except grpc.RpcError as rpc_error:
76+
if rpc_error.code() == grpc.StatusCode.DEADLINE_EXCEEDED: # type: ignore
77+
# Replace gRPC error with the built-in TimeoutError
78+
raise TimeoutError("Timed-out waiting for the orchestration to complete")
79+
else:
80+
raise
81+
82+
def terminate_orchestration(self):
83+
pass
84+
85+
def suspend_orchestration(self):
86+
pass
87+
88+
def resume_orchestration(self):
89+
pass
90+
91+
def raise_orchestration_event(self):
92+
pass

durabletask/internal/shared.py

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
import logging
2+
3+
import grpc
4+
5+
6+
def get_default_host_address() -> str:
7+
return "localhost:4001"
8+
9+
10+
def get_grpc_channel(host_address: str | None) -> grpc.Channel:
11+
if host_address is None:
12+
host_address = get_default_host_address()
13+
channel = grpc.insecure_channel(host_address)
14+
return channel
15+
16+
17+
def get_logger(log_handler: logging.Handler | None, log_formatter: logging.Formatter | None) -> logging.Logger:
18+
logger = logging.Logger("durabletask")
19+
20+
# Add a default log handler if none is provided
21+
if log_handler is None:
22+
log_handler = logging.StreamHandler()
23+
log_handler.setLevel(logging.INFO)
24+
logger.handlers.append(log_handler)
25+
26+
# Set a default log formatter to our handler if none is provided
27+
if log_formatter is None:
28+
log_formatter = logging.Formatter(
29+
fmt="%(asctime)s.%(msecs)03d %(name)s %(levelname)s: %(message)s",
30+
datefmt='%Y-%m-%d %H:%M:%S')
31+
log_handler.setFormatter(log_formatter)
32+
return logger

durabletask/protos/helpers.py

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -57,19 +57,23 @@ def new_complete_orchestration_action(
5757
id: int,
5858
status: OrchestrationStatus,
5959
result: str | None = None,
60-
failureDetails: TaskFailureDetails | None = None) -> OrchestratorAction:
61-
resultWrapper = None
60+
failure_details: TaskFailureDetails | None = None) -> OrchestratorAction:
61+
62+
result_pb: wrappers_pb2.StringValue | None = None
6263
if result is not None:
63-
resultWrapper = wrappers_pb2.StringValue(value=result)
64-
return OrchestratorAction(id=id, completeOrchestration=CompleteOrchestrationAction(
64+
result_pb = wrappers_pb2.StringValue(value=result)
65+
66+
completeOrchestrationAction = CompleteOrchestrationAction(
6567
orchestrationStatus=status,
66-
result=resultWrapper,
67-
carryoverEvents=None, # TODO: Populate carryoverEvents
68-
failureDetails=failureDetails,
69-
))
68+
result=result_pb,
69+
failureDetails=failure_details)
70+
71+
# TODO: CarryoverEvents
72+
73+
return OrchestratorAction(id=id, completeOrchestration=completeOrchestrationAction)
7074

7175

72-
def create_timer_action(id: int, fire_at: datetime) -> OrchestratorAction:
76+
def new_create_timer_action(id: int, fire_at: datetime) -> OrchestratorAction:
7377
timestamp = timestamp_pb2.Timestamp()
7478
timestamp.FromDatetime(fire_at)
7579
return OrchestratorAction(id=id, createTimer=CreateTimerAction(fireAt=timestamp))

0 commit comments

Comments
 (0)