Skip to content

Commit eecc56a

Browse files
authored
Package reorganization, setup, and samples (#2)
1 parent 4e4651c commit eecc56a

29 files changed

Lines changed: 1089 additions & 816 deletions

.flake8

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
[flake8]
2+
ignore = E501,C901
3+
exclude =
4+
.git
5+
*_pb2*
6+
__pycache__

.github/workflows/pr-validation.yml

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,9 +31,7 @@ jobs:
3131
pip install -r requirements.txt
3232
- name: Lint with flake8
3333
run: |
34-
# flake8 settings are in setup.cfg
3534
flake8 . --count --show-source --statistics --exit-zero
36-
- name: Test with pytest
35+
- name: Pytest unit tests
3736
run: |
38-
# pytest markers are defined in setup.cfg
3937
pytest -m "not e2e" --verbose

CHANGELOG.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
## v0.1.0
2+
3+
Initial release, which includes the following features:
4+
5+
- Orchestrations and activities
6+
- Durable timers
7+
- Sub-orchestrations
8+

Makefile

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
init:
2+
pip3 install -r requirements.txt
3+
4+
test-unit:
5+
pytest -m "not e2e" --verbose
6+
7+
test-e2e:
8+
pytest -m e2e --verbose
9+
10+
install:
11+
python3 -m pip install .
12+
13+
gen-proto:
14+
# NOTE: There is currently a hand-edit that we make to the generated orchestrator_service_pb2.py file after it's generated to help resolve import problems.
15+
python3 -m grpc_tools.protoc --proto_path=./submodules/durabletask-protobuf/protos --python_out=./durabletask/internal --pyi_out=./durabletask/internal --grpc_python_out=./durabletask/internal orchestrator_service.proto
16+
17+
.PHONY: init test-unit test-e2e gen-proto install

README.md

Lines changed: 99 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -9,46 +9,131 @@ This repo contains a Python client SDK for use with the [Durable Task Framework
99

1010
> Note that this project is **not** currently affiliated with the [Durable Functions](https://docs.microsoft.com/azure/azure-functions/durable/durable-functions-overview) project for Azure Functions. If you are looking for a Python SDK for Durable Functions, please see [this repo](https://github.com/Azure/azure-functions-durable-python).
1111
12+
## Features
13+
14+
- [x] Orchestrations
15+
- [x] Activities
16+
- [x] Durable timers
17+
- [x] Sub-orchestrations
18+
- [ ] External events
19+
- [ ] Suspend and resume
20+
- [ ] Retry policies
21+
22+
## Supported patterns
23+
24+
The following orchestration patterns are currently supported.
25+
26+
### Function chaining
27+
28+
An orchestration can chain a sequence of function calls using the following syntax:
29+
30+
```python
31+
# simple activity function that returns a greeting
32+
def hello(ctx: task.ActivityContext, name: str) -> str:
33+
return f'Hello {name}!'
34+
35+
# orchestrator function that sequences the activity calls
36+
def sequence(ctx: task.OrchestrationContext, _):
37+
result1 = yield ctx.call_activity(hello, input='Tokyo')
38+
result2 = yield ctx.call_activity(hello, input='Seattle')
39+
result3 = yield ctx.call_activity(hello, input='London')
40+
41+
return [result1, result2, result3]
42+
```
43+
44+
You can find the full sample [here](./examples/activity_sequence.py).
45+
46+
### Fan-out/fan-in
47+
48+
An orchestration can fan-out a dynamic number of function calls in parallel and then fan-in the results using the following syntax:
49+
50+
```python
51+
# activity function for getting the list of work items
52+
def get_work_items(ctx: task.ActivityContext, _) -> List[str]:
53+
# ...
54+
55+
# activity function for processing a single work item
56+
def process_work_item(ctx: task.ActivityContext, item: str) -> int:
57+
# ...
58+
59+
# orchestrator function that fans-out the work items and then fans-in the results
60+
def orchestrator(ctx: task.OrchestrationContext, _):
61+
# the number of work-items is unknown in advance
62+
work_items = yield ctx.call_activity(get_work_items)
63+
64+
# fan-out: schedule the work items in parallel and wait for all of them to complete
65+
tasks = [ctx.call_activity(process_work_item, input=item) for item in work_items]
66+
results = yield task.when_all(tasks)
67+
68+
# fan-in: summarize and return the results
69+
return {'work_items': work_items, 'results': results, 'total': sum(results)}
70+
```
71+
72+
You can find the full sample [here](./examples/fanout_fanin.py).
73+
1274
## Getting Started
1375

1476
### Prerequisites
1577

1678
- Python 3.10 or higher
79+
- A Durable Task-compatible sidecar, like [Dapr Workflow](https://docs.dapr.io/developing-applications/building-blocks/workflow/workflow-overview/)
80+
81+
### Installing the Durable Task Python client SDK
82+
83+
Installation is currently only supported from source. Ensure pip, setuptools, and wheel are up-to-date.
84+
85+
```sh
86+
python3 -m pip install --upgrade pip setuptools wheel
87+
```
1788

18-
### Installing
89+
To install this package from source, clone this repository and run the following command from the project root:
1990

20-
#### Install from PyPI
21-
This package is not yet published to [PyPI](https://pypi.org/).
91+
```sh
92+
python3 -m pip install .
93+
```
2294

23-
#### Install from source
24-
TODO
95+
### Run the samples
96+
97+
See the [examples](./examples) directory for a list of sample orchestrations and instructions on how to run them.
2598

2699
## Development
27-
The following is more information about how to develop this project.
100+
101+
The following is more information about how to develop this project. Note that development commands require that `make` is installed on your local machine. If you're using Windows, you can install `make` using [Chocolatey](https://chocolatey.org/) or use WSL.
28102

29103
### Generating protobufs
30-
If the gRPC proto definitions need to be updated, the corresponding source code can be regenerated using the following command from the project root:
31104

32-
```bash
33-
python3 -m grpc_tools.protoc --proto_path=./submodules/durabletask-protobuf/protos --python_out=./durabletask/protos --pyi_out=./durabletask/protos --grpc_python_out=./durabletask/protos orchestrator_service.proto
105+
Protobuf definitions are stored in the [./submodules/durabletask-proto](./submodules/durabletask-proto) directory, which is a submodule. To update the submodule, run the following command from the project root:
106+
107+
```sh
108+
git submodule update --init
34109
```
35110

36-
### Linting and running unit tests
111+
Once the submodule is available, the corresponding source code can be regenerated using the following command from the project root:
112+
113+
```sh
114+
make proto-gen
115+
```
37116

38-
See the [pr-validation.yml](.github/workflows/pr-validation.yml) workflow for the full list of commands that are run as part of the CI/CD pipeline.
117+
### Running unit tests
118+
119+
Unit tests can be run using the following command from the project root. Unit tests _don't_ require a sidecar process to be running.
120+
121+
```sh
122+
make test-unit
123+
```
39124

40125
### Running E2E tests
41126

42-
The E2E (end-to-end) tests require a sidecar process to be running. You can run a sidecar process using the following `docker` command (assumes you have Docker installed on your local system.)
127+
The E2E (end-to-end) tests require a sidecar process to be running. You can use the Dapr sidecar for this or run a Durable Task test sidecar using the following `docker` command:
43128

44129
```sh
45130
docker run --name durabletask-sidecar -p 4001:4001 --env 'DURABLETASK_SIDECAR_LOGLEVEL=Debug' --rm cgillum/durabletask-sidecar:latest start --backend Emulator
46131
```
47132

48-
To run the E2E tests, run the following command from the project root (we assume you have `pytest` installed locally):
133+
To run the E2E tests, run the following command from the project root:
49134

50135
```sh
51-
pytest -m e2e --verbose
136+
make test-e2e
52137
```
53138

54139
## Contributing

durabletask/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,4 +3,5 @@
33

44
"""Durable Task SDK for Python"""
55

6+
67
PACKAGE_NAME = "durabletask"

durabletask/api/state.py

Lines changed: 0 additions & 38 deletions
This file was deleted.
Lines changed: 55 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -3,50 +3,93 @@
33

44
import logging
55
import uuid
6+
from dataclasses import dataclass
67
from datetime import datetime
8+
from enum import Enum
79
from typing import TypeVar
810

911
import grpc
1012
import simplejson as json
1113
from google.protobuf import wrappers_pb2
1214

15+
import durabletask.internal.helpers as helpers
16+
import durabletask.internal.orchestrator_service_pb2 as pb
17+
import durabletask.internal.orchestrator_service_pb2_grpc as stubs
1318
import durabletask.internal.shared as shared
14-
import durabletask.protos.helpers as helpers
15-
import durabletask.protos.orchestrator_service_pb2 as pb
16-
import durabletask.task.registry as registry
17-
from durabletask.api.state import OrchestrationState, new_orchestration_state
18-
from durabletask.protos.orchestrator_service_pb2_grpc import \
19-
TaskHubSidecarServiceStub
20-
from durabletask.task.orchestration import Orchestrator
19+
from durabletask import task
2120

2221
TInput = TypeVar('TInput')
2322
TOutput = TypeVar('TOutput')
2423

2524

25+
class OrchestrationStatus(Enum):
26+
"""The status of an orchestration instance."""
27+
RUNNING = pb.ORCHESTRATION_STATUS_RUNNING
28+
COMPLETED = pb.ORCHESTRATION_STATUS_COMPLETED
29+
FAILED = pb.ORCHESTRATION_STATUS_FAILED
30+
TERMINATED = pb.ORCHESTRATION_STATUS_TERMINATED
31+
CONTINUED_AS_NEW = pb.ORCHESTRATION_STATUS_CONTINUED_AS_NEW
32+
PENDING = pb.ORCHESTRATION_STATUS_PENDING
33+
SUSPENDED = pb.ORCHESTRATION_STATUS_SUSPENDED
34+
35+
def __str__(self):
36+
return helpers.get_orchestration_status_str(self.value)
37+
38+
39+
@dataclass
40+
class OrchestrationState:
41+
instance_id: str
42+
name: str
43+
runtime_status: OrchestrationStatus
44+
created_at: datetime
45+
last_updated_at: datetime
46+
serialized_input: str | None
47+
serialized_output: str | None
48+
serialized_custom_status: str | None
49+
failure_details: pb.TaskFailureDetails | None
50+
51+
52+
def new_orchestration_state(instance_id: str, res: pb.GetInstanceResponse) -> OrchestrationState | None:
53+
if not res.exists:
54+
return None
55+
56+
state = res.orchestrationState
57+
return OrchestrationState(
58+
instance_id,
59+
state.name,
60+
OrchestrationStatus(state.orchestrationStatus),
61+
state.createdTimestamp.ToDatetime(),
62+
state.lastUpdatedTimestamp.ToDatetime(),
63+
state.input.value if not helpers.is_empty(state.input) else None,
64+
state.output.value if not helpers.is_empty(state.output) else None,
65+
state.customStatus.value if not helpers.is_empty(state.customStatus) else None,
66+
state.failureDetails if state.failureDetails.errorMessage != '' or state.failureDetails.errorType != '' else None)
67+
68+
2669
class TaskHubGrpcClient:
2770

2871
def __init__(self, *,
2972
host_address: str | None = None,
3073
log_handler=None,
3174
log_formatter: logging.Formatter | None = None):
3275
channel = shared.get_grpc_channel(host_address)
33-
self._stub = TaskHubSidecarServiceStub(channel)
76+
self._stub = stubs.TaskHubSidecarServiceStub(channel)
3477
self._logger = shared.get_logger(log_handler, log_formatter)
3578

36-
def schedule_new_orchestration(self, orchestrator: Orchestrator[TInput, TOutput], *,
79+
def schedule_new_orchestration(self, orchestrator: task.Orchestrator[TInput, TOutput], *,
3780
input: TInput | None = None,
3881
instance_id: str | None = None,
3982
start_at: datetime | None = None) -> str:
4083

41-
name = registry.get_name(orchestrator)
84+
name = task.get_name(orchestrator)
4285

4386
req = pb.CreateInstanceRequest(
4487
name=name,
4588
instanceId=instance_id if instance_id else uuid.uuid4().hex,
4689
input=wrappers_pb2.StringValue(value=json.dumps(input)) if input else None,
4790
scheduledStartTimestamp=helpers.new_timestamp(start_at) if start_at else None)
4891

49-
self._logger.info(f"Starting new '{name}' instance with ID = '{instance_id}'.")
92+
self._logger.info(f"Starting new '{name}' instance with ID = '{req.instanceId}'.")
5093
res: pb.CreateInstanceResponse = self._stub.StartInstance(req)
5194
return res.instanceId
5295

@@ -60,7 +103,7 @@ def wait_for_orchestration_start(self, instance_id: str, *,
60103
timeout: int = 60) -> OrchestrationState | None:
61104
req = pb.GetInstanceRequest(instanceId=instance_id, getInputsAndOutputs=fetch_payloads)
62105
try:
63-
self._logger.info(f"Waiting {timeout}s for instance '{instance_id}' to start.")
106+
self._logger.info(f"Waiting up to {timeout}s for instance '{instance_id}' to start.")
64107
res: pb.GetInstanceResponse = self._stub.WaitForInstanceStart(req, timeout=timeout)
65108
return new_orchestration_state(req.instanceId, res)
66109
except grpc.RpcError as rpc_error:

durabletask/internal/__init__.py

Whitespace-only changes.
Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
import simplejson as json
99
from google.protobuf import timestamp_pb2, wrappers_pb2
1010

11-
import durabletask.protos.orchestrator_service_pb2 as pb
11+
import durabletask.internal.orchestrator_service_pb2 as pb
1212

1313
# TODO: The new_xxx_event methods are only used by test code and should be moved elsewhere
1414

@@ -175,3 +175,12 @@ def new_create_sub_orchestration_action(
175175

176176
def is_empty(v: wrappers_pb2.StringValue):
177177
return v is None or v.value == ''
178+
179+
180+
def get_orchestration_status_str(status: pb.OrchestrationStatus):
181+
try:
182+
const_name = pb.OrchestrationStatus.Name(status)
183+
if const_name.startswith('ORCHESTRATION_STATUS_'):
184+
return const_name[len('ORCHESTRATION_STATUS_'):]
185+
except Exception:
186+
return "UNKNOWN"

0 commit comments

Comments
 (0)