Skip to content
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .flake8
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[flake8]
ignore = E501,C901
ignore = E501,C901,W503
exclude =
.git
*_pb2*
Expand Down
4 changes: 2 additions & 2 deletions docs/supported-patterns.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,12 +64,12 @@ def purchase_order_workflow(ctx: task.OrchestrationContext, order: Order):
# Orders of $1000 or more require manager approval
yield ctx.call_activity(send_approval_request, input=order)

# Approvals must be received within 24 hours or they will be canceled.
# Approvals must be received within 24 hours or they will be cancelled.
approval_event = ctx.wait_for_external_event("approval_received")
timeout_event = ctx.create_timer(timedelta(hours=24))
winner = yield task.when_any([approval_event, timeout_event])
if winner == timeout_event:
return "Canceled"
return "Cancelled"

# The order was approved
yield ctx.call_activity(place_order, input=order)
Expand Down
4 changes: 3 additions & 1 deletion durabletask-azuremanaged/durabletask/azuremanaged/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,4 +80,6 @@ def __init__(self, *,
log_handler=log_handler,
log_formatter=log_formatter,
interceptors=interceptors,
concurrency_options=concurrency_options)
concurrency_options=concurrency_options,
maximum_timer_interval=None # DTS allows timers of indefinite length
)
Comment thread
berndverst marked this conversation as resolved.
79 changes: 72 additions & 7 deletions durabletask/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ def set_custom_status(self, custom_status: Any) -> None:
pass

@abstractmethod
def create_timer(self, fire_at: Union[datetime, timedelta]) -> Task:
def create_timer(self, fire_at: Union[datetime, timedelta]) -> CancellableTask:
"""Create a Timer Task to fire after at the specified deadline.

Parameters
Expand Down Expand Up @@ -228,10 +228,10 @@ def call_sub_orchestrator(self, orchestrator: Union[Orchestrator[TInput, TOutput
"""
pass

# TOOD: Add a timeout parameter, which allows the task to be canceled if the event is
# TOOD: Add a timeout parameter, which allows the task to be cancelled if the event is
# not received within the specified timeout. This requires support for task cancellation.
@abstractmethod
def wait_for_external_event(self, name: str) -> CompletableTask:
def wait_for_external_event(self, name: str) -> CancellableTask:
"""Wait asynchronously for an event to be raised with the name `name`.

Parameters
Expand Down Expand Up @@ -324,6 +324,10 @@ class OrchestrationStateError(Exception):
pass


class TaskCancelledError(Exception):
"""Exception type for cancelled orchestration tasks."""


class Task(ABC, Generic[T]):
"""Abstract base class for asynchronous tasks in a durable orchestration."""
_result: T
Expand Down Expand Up @@ -435,6 +439,48 @@ def fail(self, message: str, details: Union[Exception, pb.TaskFailureDetails]):
self._parent.on_child_completed(self)


class CancellableTask(CompletableTask[T]):
Comment thread
andystaples marked this conversation as resolved.
"""A completable task that can be cancelled before it finishes."""

def __init__(self) -> None:
super().__init__()
self._is_cancelled = False
self._cancel_handler: Optional[Callable[[], None]] = None

@property
def is_cancelled(self) -> bool:
"""Returns True if the task was cancelled, False otherwise."""
return self._is_cancelled

def get_result(self) -> T:
if self._is_cancelled:
raise TaskCancelledError('The task was cancelled.')
return super().get_result()
Comment thread
andystaples marked this conversation as resolved.

def set_cancel_handler(self, cancel_handler: Callable[[], None]) -> None:
self._cancel_handler = cancel_handler

def cancel(self) -> bool:
"""Attempts to cancel this task.

Returns
-------
bool
True if cancellation was applied, False if the task had already completed.
"""
if self._is_complete:
return False

if self._cancel_handler is not None:
self._cancel_handler()

self._is_cancelled = True
self._is_complete = True
if self._parent is not None:
self._parent.on_child_completed(self)
return True


class RetryableTask(CompletableTask[T]):
"""A task that can be retried according to a retry policy."""

Expand Down Expand Up @@ -474,13 +520,32 @@ def compute_next_delay(self) -> Optional[timedelta]:
return None


class TimerTask(CompletableTask[T]):
class TimerTask(CancellableTask[None]):
def set_retryable_parent(self, retryable_task: RetryableTask):
self._retryable_parent = retryable_task

def complete(self, _: datetime) -> None:
Comment thread
andystaples marked this conversation as resolved.
Outdated
super().complete(None)

def __init__(self) -> None:

class LongTimerTask(TimerTask):
Comment thread
andystaples marked this conversation as resolved.
Outdated
def __init__(self, final_fire_at: datetime, maximum_timer_interval: timedelta):
super().__init__()
self._final_fire_at = final_fire_at
self._maximum_timer_interval = maximum_timer_interval

def set_retryable_parent(self, retryable_task: RetryableTask):
self._retryable_parent = retryable_task
def start(self, current_utc_datetime: datetime) -> datetime:
return self._get_next_fire_at(current_utc_datetime)

def complete(self, current_utc_datetime: datetime) -> Optional[datetime]:
if current_utc_datetime < self._final_fire_at:
return self._get_next_fire_at(current_utc_datetime)
return super().complete(current_utc_datetime)

def _get_next_fire_at(self, current_utc_datetime: datetime) -> datetime:
if current_utc_datetime + self._maximum_timer_interval < self._final_fire_at:
return current_utc_datetime + self._maximum_timer_interval
return self._final_fire_at


class WhenAnyTask(CompositeTask[Task]):
Comment thread
andystaples marked this conversation as resolved.
Expand Down
Loading
Loading