Skip to content

Commit 71cfe09

Browse files
authored
Merge branch 'main' into andystaples/add-replay-safe-logging
2 parents 151c50a + 7c82f24 commit 71cfe09

15 files changed

Lines changed: 2289 additions & 58 deletions

File tree

.flake8

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
[flake8]
2-
ignore = E501,C901
2+
ignore = E501,C901,W503
33
exclude =
44
.git
55
*_pb2*

docs/features.md

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -352,3 +352,69 @@ with DurableTaskSchedulerWorker(host_address=endpoint, secure_channel=secure_cha
352352

353353
> [!NOTE]
354354
> The worker and client output many logs at the `DEBUG` level that will be useful when understanding orchestration flow and diagnosing issues with Durable applications. Before submitting issues, please attempt a repro of the issue with debug logging enabled.
355+
356+
### Work item filtering
357+
358+
By default a worker receives **all** work items from the backend,
359+
regardless of which orchestrations, activities, or entities are
360+
registered. Work item filtering lets you explicitly tell the backend
361+
which work items a worker can handle so that only matching items are
362+
dispatched. This is useful when running multiple specialized workers
363+
against the same task hub.
364+
365+
Work item filtering is **opt-in**. Call `use_work_item_filters()` on
366+
the worker before starting it.
367+
368+
#### Auto-generated filters
369+
370+
Calling `use_work_item_filters()` with no arguments builds filters
371+
automatically from the worker's registry at start time:
372+
373+
```python
374+
with DurableTaskSchedulerWorker(...) as w:
375+
w.add_orchestrator(my_orchestrator)
376+
w.add_activity(my_activity)
377+
w.use_work_item_filters() # auto-generate from registry
378+
w.start()
379+
```
380+
381+
When versioning is configured with `VersionMatchStrategy.STRICT`,
382+
the worker's version is included in every filter so the backend
383+
only dispatches work items that match that exact version.
384+
385+
#### Explicit filters
386+
387+
Pass a `WorkItemFilters` instance for fine-grained control:
388+
389+
```python
390+
from durabletask.worker import (
391+
WorkItemFilters,
392+
OrchestrationWorkItemFilter,
393+
ActivityWorkItemFilter,
394+
EntityWorkItemFilter,
395+
)
396+
397+
w.use_work_item_filters(WorkItemFilters(
398+
orchestrations=[
399+
OrchestrationWorkItemFilter(name="my_orch", versions=["2.0.0"]),
400+
],
401+
activities=[
402+
ActivityWorkItemFilter(name="my_activity"),
403+
],
404+
entities=[
405+
EntityWorkItemFilter(name="my_entity"),
406+
],
407+
))
408+
```
409+
410+
#### Clearing filters
411+
412+
Pass `None` to clear any previously configured filters and return
413+
to the default behaviour of processing all work items:
414+
415+
```python
416+
w.use_work_item_filters(None)
417+
```
418+
419+
See the full
420+
[work item filtering sample](../examples/work_item_filtering.py).

docs/supported-patterns.md

Lines changed: 47 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -64,12 +64,12 @@ def purchase_order_workflow(ctx: task.OrchestrationContext, order: Order):
6464
# Orders of $1000 or more require manager approval
6565
yield ctx.call_activity(send_approval_request, input=order)
6666

67-
# Approvals must be received within 24 hours or they will be canceled.
67+
# Approvals must be received within 24 hours or they will be cancelled.
6868
approval_event = ctx.wait_for_external_event("approval_received")
6969
timeout_event = ctx.create_timer(timedelta(hours=24))
7070
winner = yield task.when_any([approval_event, timeout_event])
7171
if winner == timeout_event:
72-
return "Canceled"
72+
return "Cancelled"
7373

7474
# The order was approved
7575
yield ctx.call_activity(place_order, input=order)
@@ -120,6 +120,51 @@ def my_orchestrator(ctx: task.OrchestrationContext, order: Order):
120120

121121
See the full [version-aware orchestrator sample](../examples/version_aware_orchestrator.py)
122122

123+
### Work item filtering
124+
125+
When running multiple workers against the same task hub, each
126+
worker can declare which work items it handles. The backend then
127+
dispatches only the matching orchestrations, activities, and
128+
entities, avoiding unnecessary round-trips. Filtering is opt-in
129+
and supports both auto-generated and explicit filter sets.
130+
131+
The simplest approach auto-generates filters from the worker's
132+
registry:
133+
134+
```python
135+
with DurableTaskSchedulerWorker(...) as w:
136+
w.add_orchestrator(greeting_orchestrator)
137+
w.add_activity(greet)
138+
w.use_work_item_filters() # auto-generate from registry
139+
w.start()
140+
```
141+
142+
For more control you can provide explicit filters, including
143+
version constraints:
144+
145+
```python
146+
from durabletask.worker import (
147+
WorkItemFilters,
148+
OrchestrationWorkItemFilter,
149+
ActivityWorkItemFilter,
150+
)
151+
152+
w.use_work_item_filters(WorkItemFilters(
153+
orchestrations=[
154+
OrchestrationWorkItemFilter(
155+
name="greeting_orchestrator",
156+
versions=["2.0.0"],
157+
),
158+
],
159+
activities=[
160+
ActivityWorkItemFilter(name="greet"),
161+
],
162+
))
163+
```
164+
165+
See the full
166+
[work item filtering sample](../examples/work_item_filtering.py).
167+
123168
### Large payload externalization
124169

125170
When orchestrations work with very large inputs, outputs, or event

durabletask-azuremanaged/durabletask/azuremanaged/worker.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,4 +85,7 @@ def __init__(self, *,
8585
log_formatter=log_formatter,
8686
interceptors=interceptors,
8787
concurrency_options=concurrency_options,
88-
payload_store=payload_store)
88+
# DTS natively supports long timers so chunking is unnecessary
89+
maximum_timer_interval=None,
90+
payload_store=payload_store
91+
)

durabletask/__init__.py

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,24 @@
44
"""Durable Task SDK for Python"""
55

66
from durabletask.payload.store import LargePayloadStorageOptions, PayloadStore
7-
from durabletask.worker import ConcurrencyOptions, VersioningOptions
7+
from durabletask.worker import (
8+
ActivityWorkItemFilter,
9+
ConcurrencyOptions,
10+
EntityWorkItemFilter,
11+
OrchestrationWorkItemFilter,
12+
VersioningOptions,
13+
WorkItemFilters,
14+
)
815

916
__all__ = [
17+
"ActivityWorkItemFilter",
1018
"ConcurrencyOptions",
19+
"EntityWorkItemFilter",
1120
"LargePayloadStorageOptions",
21+
"OrchestrationWorkItemFilter",
1222
"PayloadStore",
1323
"VersioningOptions",
24+
"WorkItemFilters",
1425
]
1526

1627
PACKAGE_NAME = "durabletask"

durabletask/task.py

Lines changed: 67 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ def set_custom_status(self, custom_status: Any) -> None:
9999
pass
100100

101101
@abstractmethod
102-
def create_timer(self, fire_at: Union[datetime, timedelta]) -> Task:
102+
def create_timer(self, fire_at: Union[datetime, timedelta]) -> CancellableTask:
103103
"""Create a Timer Task to fire after at the specified deadline.
104104
105105
Parameters
@@ -229,10 +229,10 @@ def call_sub_orchestrator(self, orchestrator: Union[Orchestrator[TInput, TOutput
229229
"""
230230
pass
231231

232-
# TOOD: Add a timeout parameter, which allows the task to be canceled if the event is
232+
# TOOD: Add a timeout parameter, which allows the task to be cancelled if the event is
233233
# not received within the specified timeout. This requires support for task cancellation.
234234
@abstractmethod
235-
def wait_for_external_event(self, name: str) -> CompletableTask:
235+
def wait_for_external_event(self, name: str) -> CancellableTask:
236236
"""Wait asynchronously for an event to be raised with the name `name`.
237237
238238
Parameters
@@ -370,6 +370,10 @@ class OrchestrationStateError(Exception):
370370
pass
371371

372372

373+
class TaskCancelledError(Exception):
374+
"""Exception type for cancelled orchestration tasks."""
375+
376+
373377
class Task(ABC, Generic[T]):
374378
"""Abstract base class for asynchronous tasks in a durable orchestration."""
375379
_result: T
@@ -481,6 +485,48 @@ def fail(self, message: str, details: Union[Exception, pb.TaskFailureDetails]):
481485
self._parent.on_child_completed(self)
482486

483487

488+
class CancellableTask(CompletableTask[T]):
489+
"""A completable task that can be cancelled before it finishes."""
490+
491+
def __init__(self) -> None:
492+
super().__init__()
493+
self._is_cancelled = False
494+
self._cancel_handler: Optional[Callable[[], None]] = None
495+
496+
@property
497+
def is_cancelled(self) -> bool:
498+
"""Returns True if the task was cancelled, False otherwise."""
499+
return self._is_cancelled
500+
501+
def get_result(self) -> T:
502+
if self._is_cancelled:
503+
raise TaskCancelledError('The task was cancelled.')
504+
return super().get_result()
505+
506+
def set_cancel_handler(self, cancel_handler: Callable[[], None]) -> None:
507+
self._cancel_handler = cancel_handler
508+
509+
def cancel(self) -> bool:
510+
"""Attempts to cancel this task.
511+
512+
Returns
513+
-------
514+
bool
515+
True if cancellation was applied, False if the task had already completed.
516+
"""
517+
if self._is_complete:
518+
return False
519+
520+
if self._cancel_handler is not None:
521+
self._cancel_handler()
522+
523+
self._is_cancelled = True
524+
self._is_complete = True
525+
if self._parent is not None:
526+
self._parent.on_child_completed(self)
527+
return True
528+
529+
484530
class RetryableTask(CompletableTask[T]):
485531
"""A task that can be retried according to a retry policy."""
486532

@@ -520,14 +566,29 @@ def compute_next_delay(self) -> Optional[timedelta]:
520566
return None
521567

522568

523-
class TimerTask(CompletableTask[T]):
524-
525-
def __init__(self) -> None:
569+
class TimerTask(CancellableTask[None]):
570+
def __init__(self, final_fire_at: Optional[datetime] = None,
571+
maximum_timer_interval: Optional[timedelta] = None):
526572
super().__init__()
573+
self._final_fire_at = final_fire_at
574+
self._maximum_timer_interval = maximum_timer_interval
527575

528576
def set_retryable_parent(self, retryable_task: RetryableTask):
529577
self._retryable_parent = retryable_task
530578

579+
def _handle_timer_fired(self, current_utc_datetime: datetime) -> Optional[datetime]:
580+
if (self._final_fire_at is not None
581+
and self._maximum_timer_interval is not None
582+
and current_utc_datetime < self._final_fire_at):
583+
return self._get_next_fire_at(current_utc_datetime)
584+
super().complete(None)
585+
return None
586+
587+
def _get_next_fire_at(self, current_utc_datetime: datetime) -> datetime:
588+
if current_utc_datetime + self._maximum_timer_interval < self._final_fire_at:
589+
return current_utc_datetime + self._maximum_timer_interval
590+
return self._final_fire_at
591+
531592

532593
class WhenAnyTask(CompositeTask[Task]):
533594
"""A task that completes when any of its child tasks complete."""

0 commit comments

Comments
 (0)