Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .github/copilot-instructions.md
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,13 @@ pytest
## Project Structure

- `durabletask/` — core SDK source
- `payload/` — public payload externalization API (`PayloadStore` ABC,
`LargePayloadStorageOptions`, helper functions)
- `extensions/azure_blob_payloads/` — Azure Blob Storage payload store
(installed via `pip install durabletask[azure-blob-payloads]`)
- `entities/` — durable entity support
- `testing/` — in-memory backend for testing without a sidecar
- `internal/` — protobuf definitions, gRPC helpers, tracing (not public API)
- `durabletask-azuremanaged/` — Azure managed provider source
- `examples/` — example orchestrations (see `examples/README.md`)
- `tests/` — test suite
Expand Down
26 changes: 25 additions & 1 deletion .github/workflows/durabletask.yml
Original file line number Diff line number Diff line change
Expand Up @@ -47,16 +47,40 @@ jobs:
steps:
- name: Checkout repository
uses: actions/checkout@v4

- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v5
with:
python-version: ${{ matrix.python-version }}

- name: Set up Node.js (needed for Azurite)
uses: actions/setup-node@v4
with:
node-version: '20.x'

- name: Cache npm
uses: actions/cache@v3
with:
path: ~/.npm
key: ${{ runner.os }}-npm-azurite

- name: Install Azurite
run: npm install -g azurite

- name: Start Azurite
shell: bash
run: |
azurite-blob --silent --blobPort 10000 &
sleep 2

- name: Install durabletask dependencies and the library itself
run: |
python -m pip install --upgrade pip
pip install flake8 pytest
pip install -r requirements.txt
pip install .
pip install ".[azure-blob-payloads]"
pip install aiohttp

- name: Pytest unit tests
working-directory: tests/durabletask
run: |
Expand Down
16 changes: 16 additions & 0 deletions .vscode/mcp.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
{
"inputs": [
{
"id": "ado_org",
"type": "promptString",
"description": "msazure"
}
],
"servers": {
"ado": {
"type": "stdio",
"command": "npx",
"args": ["-y", "@azure-devops/mcp", "msazure"]
}
}
}
9 changes: 9 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,15 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

ADDED

- Added large payload externalization support for automatically
offloading oversized orchestration payloads to Azure Blob Storage.
Install with `pip install durabletask[azure-blob-payloads]`.
Pass a `BlobPayloadStore` to the worker and client via the
`payload_store` parameter.
- Added `durabletask.extensions.azure_blob_payloads` extension
package with `BlobPayloadStore` and `BlobPayloadStoreOptions`
- Added `PayloadStore` abstract base class in
`durabletask.payload` for custom storage backends
- Added `durabletask.testing` module with `InMemoryOrchestrationBackend` for testing orchestrations without a sidecar process
- Added `AsyncTaskHubGrpcClient` for asyncio-based applications using `grpc.aio`
- Added `DefaultAsyncClientInterceptorImpl` for async gRPC metadata interceptors
Expand Down
14 changes: 14 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,20 @@ This repo contains a Python SDK for use with the [Azure Durable Task Scheduler](
- [Development Guide](./docs/development.md)
- [Contributing Guide](./CONTRIBUTING.md)

## Optional Features

### Large Payload Externalization

Install the `azure-blob-payloads` extra to automatically offload
oversized orchestration payloads to Azure Blob Storage:

```bash
pip install durabletask[azure-blob-payloads]
```

See the [feature documentation](./docs/features.md#large-payload-externalization)
and the [example](./examples/large_payload/) for usage details.

## Trademarks
This project may contain trademarks or logos for projects, products, or services. Authorized use of Microsoft
trademarks or logos is subject to and must follow
Expand Down
145 changes: 145 additions & 0 deletions docs/features.md
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,151 @@ Orchestrations can be suspended using the `suspend_orchestration` client API and

Orchestrations can specify retry policies for activities and sub-orchestrations. These policies control how many times and how frequently an activity or sub-orchestration will be retried in the event of a transient error.

### Large payload externalization

Orchestration inputs, outputs, and event data are transmitted through
gRPC messages. When these payloads become very large they can exceed
gRPC message size limits or degrade performance. Large payload
externalization solves this by transparently offloading oversized
payloads to an external store (such as Azure Blob Storage) and
replacing them with compact reference tokens in the gRPC messages.

This feature is **opt-in** and requires installing an optional
dependency:

```bash
pip install durabletask[azure-blob-payloads]
```

#### How it works

1. When the worker or client sends a payload that exceeds the
configured threshold (default 900 KB), the payload is
compressed (GZip, enabled by default) and uploaded to the
external store.
2. The original payload in the gRPC message is replaced with a
compact token (e.g. `blob:v1:<container>:<blobName>`).
3. When the worker or client receives a message containing a token,
it downloads and decompresses the original payload automatically.

This process is fully transparent to orchestrator and activity code —
no changes are needed in your workflow logic.

#### Configuring the blob payload store

The built-in `BlobPayloadStore` uses Azure Blob Storage. Create a
store instance and pass it to both the worker and client:

```python
from durabletask.extensions.azure_blob_payloads import BlobPayloadStore, BlobPayloadStoreOptions

options = BlobPayloadStoreOptions(
connection_string="DefaultEndpointsProtocol=https;...",
container_name="durabletask-payloads", # default
threshold_bytes=900_000, # default (900 KB)
max_stored_payload_bytes=10_485_760, # default (10 MB)
enable_compression=True, # default
)
store = BlobPayloadStore(options)
Comment thread
andystaples marked this conversation as resolved.
Outdated
```

Then pass the store to the worker and client:

```python
with DurableTaskSchedulerWorker(
host_address=endpoint,
secure_channel=secure_channel,
taskhub=taskhub_name,
token_credential=credential,
payload_store=store,
) as w:
# ... register orchestrators and activities ...
w.start()

c = DurableTaskSchedulerClient(
host_address=endpoint,
secure_channel=secure_channel,
taskhub=taskhub_name,
token_credential=credential,
payload_store=store,
)
```

You can also authenticate using `account_url` and a
`TokenCredential` instead of a connection string:

```python
from azure.identity import DefaultAzureCredential

options = BlobPayloadStoreOptions(
account_url="https://<account>.blob.core.windows.net",
credential=DefaultAzureCredential(),
)
store = BlobPayloadStore(options)
Comment thread
andystaples marked this conversation as resolved.
Outdated
```

#### Configuration options

| Option | Default | Description |
|---|---|---|
| `threshold_bytes` | 900,000 (900 KB) | Payloads larger than this are externalized |
| `max_stored_payload_bytes` | 10,485,760 (10 MB) | Maximum size for externalized payloads |
| `enable_compression` | `True` | GZip-compress payloads before uploading |
| `container_name` | `"durabletask-payloads"` | Azure Blob container name |
| `connection_string` | `None` | Azure Storage connection string |
| `account_url` | `None` | Azure Storage account URL (use with `credential`) |
| `credential` | `None` | `TokenCredential` for token-based auth |

#### Cross-SDK compatibility

The blob token format (`blob:v1:<container>:<blobName>`) is
compatible with the .NET Durable Task SDK, enabling
interoperability between Python and .NET workers sharing the same
task hub and storage account. Note that message serilization strategies
Comment thread
andystaples marked this conversation as resolved.
Outdated
may differ for complex objects and custom types.

#### Custom payload stores

You can implement a custom payload store by subclassing
`PayloadStore` from `durabletask.payload` and implementing
the `upload`, `upload_async`, `download`, `download_async`, and
`is_known_token` methods:

```python
from durabletask.payload import PayloadStore, LargePayloadStorageOptions


class MyPayloadStore(PayloadStore):

def __init__(self, options: LargePayloadStorageOptions):
self._options = options

@property
def options(self) -> LargePayloadStorageOptions:
return self._options

def upload(self, data: bytes) -> str:
# Store data and return a unique token string
...

async def upload_async(self, data: bytes) -> str:
...

Comment thread
andystaples marked this conversation as resolved.
Outdated
def download(self, token: str) -> bytes:
# Retrieve data by token
...

async def download_async(self, token: str) -> bytes:
...

def is_known_token(self, value: str) -> bool:
# Return True if the value looks like a token from this store
...
```

See the [large payload example](../examples/large_payload/) for a
complete working sample.

### Logging configuration

Both the TaskHubGrpcWorker and TaskHubGrpcClient (as well as DurableTaskSchedulerWorker and DurableTaskSchedulerClient for durabletask-azuremanaged) accept a log_handler and log_formatter object from `logging`. These can be used to customize verbosity, output location, and format of logs emitted by these sources.
Expand Down
Loading
Loading