Skip to content

Commit 9777a36

Browse files
committed
Add DTS default, e2e test
1 parent 57d8b7d commit 9777a36

2 files changed

Lines changed: 71 additions & 1 deletion

File tree

durabletask-azuremanaged/durabletask/azuremanaged/worker.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,4 +80,6 @@ def __init__(self, *,
8080
log_handler=log_handler,
8181
log_formatter=log_formatter,
8282
interceptors=interceptors,
83-
concurrency_options=concurrency_options)
83+
concurrency_options=concurrency_options,
84+
maximum_timer_interval=None # DTS allows timers of indefinite length
85+
)

tests/durabletask/test_orchestration_e2e.py

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -586,3 +586,71 @@ def empty_orchestrator(ctx: task.OrchestrationContext, _):
586586
assert uuid.UUID(results[0]) != uuid.UUID(results[1])
587587
assert uuid.UUID(results[0]) != uuid.UUID(results[2])
588588
assert uuid.UUID(results[1]) != uuid.UUID(results[2])
589+
590+
591+
@pytest.mark.parametrize("raise_event", [True, False])
592+
def test_when_any_cancels_timer_when_event_wins(raise_event: bool):
593+
"""Verify that the losing timer in a when_any race can be explicitly
594+
cancelled without causing errors or affecting the orchestration result."""
595+
596+
def orchestrator(ctx: task.OrchestrationContext, _):
597+
approval: task.Task[bool] = ctx.wait_for_external_event('Approval')
598+
timeout = ctx.create_timer(timedelta(seconds=3))
599+
winner = yield task.when_any([approval, timeout])
600+
if winner == approval:
601+
# Explicitly cancel the timer so it does not linger
602+
timeout.cancel()
603+
return "approved"
604+
else:
605+
return "timed out"
606+
607+
with worker.TaskHubGrpcWorker(host_address=HOST) as w:
608+
w.add_orchestrator(orchestrator)
609+
w.start()
610+
611+
task_hub_client = client.TaskHubGrpcClient(host_address=HOST)
612+
id = task_hub_client.schedule_new_orchestration(orchestrator)
613+
if raise_event:
614+
task_hub_client.raise_orchestration_event(id, 'Approval')
615+
state = task_hub_client.wait_for_orchestration_completion(id, timeout=30)
616+
617+
assert state is not None
618+
assert state.runtime_status == client.OrchestrationStatus.COMPLETED
619+
assert state.failure_details is None
620+
if raise_event:
621+
assert state.serialized_output == json.dumps("approved")
622+
else:
623+
assert state.serialized_output == json.dumps("timed out")
624+
625+
626+
@pytest.mark.parametrize("winning_event", ["Approve", "Reject"])
627+
def test_when_any_cancels_competing_external_event(winning_event: str):
628+
"""Verify that the losing external-event task in a when_any race is
629+
explicitly cancelled, preventing it from consuming a late-arriving event
630+
and leaving the orchestration in a clean state."""
631+
632+
def orchestrator(ctx: task.OrchestrationContext, _):
633+
approve: task.Task = ctx.wait_for_external_event('Approve')
634+
reject: task.Task = ctx.wait_for_external_event('Reject')
635+
winner = yield task.when_any([approve, reject])
636+
if winner == approve:
637+
reject.cancel()
638+
return "approved"
639+
else:
640+
approve.cancel()
641+
return "rejected"
642+
643+
with worker.TaskHubGrpcWorker(host_address=HOST) as w:
644+
w.add_orchestrator(orchestrator)
645+
w.start()
646+
647+
task_hub_client = client.TaskHubGrpcClient(host_address=HOST)
648+
id = task_hub_client.schedule_new_orchestration(orchestrator)
649+
task_hub_client.raise_orchestration_event(id, winning_event)
650+
state = task_hub_client.wait_for_orchestration_completion(id, timeout=30)
651+
652+
assert state is not None
653+
assert state.runtime_status == client.OrchestrationStatus.COMPLETED
654+
assert state.failure_details is None
655+
expected = "approved" if winning_event == "Approve" else "rejected"
656+
assert state.serialized_output == json.dumps(expected)

0 commit comments

Comments
 (0)