Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions .vscode/mcp.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
{
"inputs": [
{
"id": "ado_org",
"type": "promptString",
"description": "msazure"
}
],
"servers": {
"ado": {
"type": "stdio",
"command": "npx",
"args": ["-y", "@azure-devops/mcp", "msazure"]
}
}
}
Comment thread
andystaples marked this conversation as resolved.
Outdated
66 changes: 66 additions & 0 deletions docs/features.md
Original file line number Diff line number Diff line change
Expand Up @@ -166,3 +166,69 @@ with DurableTaskSchedulerWorker(host_address=endpoint, secure_channel=secure_cha

**NOTE**
The worker and client output many logs at the `DEBUG` level that will be useful when understanding orchestration flow and diagnosing issues with Durable applications. Before submitting issues, please attempt a repro of the issue with debug logging enabled.

### Work item filtering

By default a worker receives **all** work items from the backend,
regardless of which orchestrations, activities, or entities are
registered. Work item filtering lets you explicitly tell the backend
which work items a worker can handle so that only matching items are
dispatched. This is useful when running multiple specialized workers
against the same task hub.

Work item filtering is **opt-in**. Call `use_work_item_filters()` on
the worker before starting it.

#### Auto-generated filters

Calling `use_work_item_filters()` with no arguments builds filters
automatically from the worker's registry at start time:

```python
with DurableTaskSchedulerWorker(...) as w:
w.add_orchestrator(my_orchestrator)
w.add_activity(my_activity)
w.use_work_item_filters() # auto-generate from registry
w.start()
```

When versioning is configured with `VersionMatchStrategy.STRICT`,
the worker's version is included in every filter so the backend
only dispatches work items that match that exact version.

#### Explicit filters

Pass a `WorkItemFilters` instance for fine-grained control:

```python
from durabletask.worker import (
WorkItemFilters,
OrchestrationWorkItemFilter,
ActivityWorkItemFilter,
EntityWorkItemFilter,
)

w.use_work_item_filters(WorkItemFilters(
orchestrations=[
OrchestrationWorkItemFilter(name="my_orch", versions=["2.0.0"]),
],
activities=[
ActivityWorkItemFilter(name="my_activity"),
],
entities=[
EntityWorkItemFilter(name="my_entity"),
],
))
```

#### Clearing filters

Pass `None` to clear any previously configured filters and return
to the default behaviour of processing all work items:

```python
w.use_work_item_filters(None)
```

See the full
[work item filtering sample](../examples/work_item_filtering.py).
47 changes: 46 additions & 1 deletion docs/supported-patterns.md
Original file line number Diff line number Diff line change
Expand Up @@ -118,4 +118,49 @@ def my_orchestrator(ctx: task.OrchestrationContext, order: Order):
return "Success"
```

See the full [version-aware orchestrator sample](../examples/version_aware_orchestrator.py)
See the full [version-aware orchestrator sample](../examples/version_aware_orchestrator.py)

### Work item filtering

When running multiple workers against the same task hub, each
worker can declare which work items it handles. The backend then
dispatches only the matching orchestrations, activities, and
entities, avoiding unnecessary round-trips. Filtering is opt-in
and supports both auto-generated and explicit filter sets.

The simplest approach auto-generates filters from the worker's
registry:

```python
with DurableTaskSchedulerWorker(...) as w:
w.add_orchestrator(greeting_orchestrator)
w.add_activity(greet)
w.use_work_item_filters() # auto-generate from registry
w.start()
```

For more control you can provide explicit filters, including
version constraints:

```python
from durabletask.worker import (
WorkItemFilters,
OrchestrationWorkItemFilter,
ActivityWorkItemFilter,
)

w.use_work_item_filters(WorkItemFilters(
orchestrations=[
OrchestrationWorkItemFilter(
name="greeting_orchestrator",
versions=["2.0.0"],
),
],
activities=[
ActivityWorkItemFilter(name="greet"),
],
))
```

See the full
[work item filtering sample](../examples/work_item_filtering.py).
4 changes: 2 additions & 2 deletions durabletask/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@

"""Durable Task SDK for Python"""

from durabletask.worker import ConcurrencyOptions, VersioningOptions
from durabletask.worker import ConcurrencyOptions, VersioningOptions, WorkItemFilters
Comment thread
andystaples marked this conversation as resolved.
Outdated

__all__ = ["ConcurrencyOptions", "VersioningOptions"]
__all__ = ["ConcurrencyOptions", "VersioningOptions", "WorkItemFilters"]

PACKAGE_NAME = "durabletask"
78 changes: 66 additions & 12 deletions durabletask/testing/in_memory_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import durabletask.internal.orchestrator_service_pb2 as pb
import durabletask.internal.orchestrator_service_pb2_grpc as stubs
import durabletask.internal.helpers as helpers
from durabletask.entities.entity_instance_id import EntityInstanceId


@dataclass
Expand Down Expand Up @@ -436,16 +437,29 @@ def RestartInstance(self, request: pb.RestartInstanceRequest, context):
f"Restarted instance '{request.instanceId}' as '{new_instance_id}'")
return pb.RestartInstanceResponse(instanceId=new_instance_id)

@staticmethod
def _parse_work_item_filters(request: pb.GetWorkItemsRequest):
"""Extract filter name sets from the request, or None if unfiltered."""
if not request.HasField("workItemFilters"):
return None, None, None
wf = request.workItemFilters
orch_names = {f.name for f in wf.orchestrations} if wf.orchestrations else None
activity_names = {f.name for f in wf.activities} if wf.activities else None
entity_names = {f.name for f in wf.entities} if wf.entities else None
Comment thread
andystaples marked this conversation as resolved.
Outdated
return orch_names, activity_names, entity_names

def GetWorkItems(self, request: pb.GetWorkItemsRequest, context):
"""Streams work items to the worker (orchestration and activity work items)."""
self._logger.info("Worker connected and requesting work items")
orch_filter, activity_filter, entity_filter = self._parse_work_item_filters(request)

try:
while context.is_active() and not self._shutdown_event.is_set():
work_item = None

with self._lock:
# Check for orchestration work
skipped_orchs: list[str] = []
while self._orchestration_queue:
instance_id = self._orchestration_queue.popleft()
self._orchestration_queue_set.discard(instance_id)
Expand All @@ -454,11 +468,14 @@ def GetWorkItems(self, request: pb.GetWorkItemsRequest, context):
if not instance or not instance.pending_events:
continue

# Skip if orchestration name doesn't match filters
if orch_filter is not None and instance.name not in orch_filter:
skipped_orchs.append(instance_id)
continue

if instance_id in self._orchestration_in_flight:
# Already being processed — re-add to queue
if instance_id not in self._orchestration_queue_set:
self._orchestration_queue.append(instance_id)
self._orchestration_queue_set.add(instance_id)
skipped_orchs.append(instance_id)
break

# Move pending events to dispatched_events
Expand All @@ -485,27 +502,58 @@ def GetWorkItems(self, request: pb.GetWorkItemsRequest, context):
)
break

# Re-queue skipped orchestrations for other workers
for s in skipped_orchs:
if s not in self._orchestration_queue_set:
self._orchestration_queue.append(s)
self._orchestration_queue_set.add(s)

# Check for activity work
if not work_item and self._activity_queue:
activity = self._activity_queue.popleft()
work_item = pb.WorkItem(
completionToken=str(activity.completion_token),
activityRequest=pb.ActivityRequest(
name=activity.name,
taskId=activity.task_id,
input=wrappers_pb2.StringValue(value=activity.input) if activity.input else None,
orchestrationInstance=pb.OrchestrationInstance(instanceId=activity.instance_id)
# Scan for the first matching activity
skipped: list = []
matched_activity = None
while self._activity_queue:
candidate = self._activity_queue.popleft()
if activity_filter is not None and candidate.name not in activity_filter:
skipped.append(candidate)
continue
matched_activity = candidate
break
# Put back non-matching items
for s in skipped:
self._activity_queue.append(s)

if matched_activity is not None:
work_item = pb.WorkItem(
completionToken=str(matched_activity.completion_token),
activityRequest=pb.ActivityRequest(
name=matched_activity.name,
taskId=matched_activity.task_id,
input=wrappers_pb2.StringValue(value=matched_activity.input) if matched_activity.input else None,
orchestrationInstance=pb.OrchestrationInstance(instanceId=matched_activity.instance_id)
)
)
)

# Check for entity work
if not work_item:
skipped_entities: list[str] = []
while self._entity_queue:
entity_id = self._entity_queue.popleft()
self._entity_queue_set.discard(entity_id)
entity = self._entities.get(entity_id)

if entity and entity.pending_operations:
# Skip if entity name doesn't match filters
if entity_filter is not None:
try:
parsed = EntityInstanceId.parse(entity_id)
if parsed.entity not in entity_filter:
skipped_entities.append(entity_id)
continue
except ValueError:
pass

# Skip if this entity is already being processed
if entity_id in self._entity_in_flight:
continue
Expand All @@ -532,6 +580,12 @@ def GetWorkItems(self, request: pb.GetWorkItemsRequest, context):
)
break

# Re-queue skipped entities for other workers
for s in skipped_entities:
if s not in self._entity_queue_set:
self._entity_queue.append(s)
self._entity_queue_set.add(s)

if work_item:
yield work_item
else:
Expand Down
Loading
Loading