Skip to content

Commit 5541408

Browse files
authored
Update-with-start: shopping cart (#156)
1 parent de73a0a commit 5541408

9 files changed

Lines changed: 281 additions & 3 deletions

File tree

README.md

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -62,13 +62,14 @@ Some examples require extra dependencies. See each sample's directory for specif
6262
* [encryption](encryption) - Apply end-to-end encryption for all input/output.
6363
* [gevent_async](gevent_async) - Combine gevent and Temporal.
6464
* [langchain](langchain) - Orchestrate workflows for LangChain.
65-
* [message-passing introduction](message_passing/introduction/) - Introduction to queries, signals, and updates.
65+
* [message_passing/introduction](message_passing/introduction/) - Introduction to queries, signals, and updates.
66+
* [message_passing/safe_message_handlers](message_passing/safe_message_handlers/) - Safely handling updates and signals.
67+
* [message_passing/update_with_start/lazy_initialization](message_passing/update_with_start/lazy_initialization/) - Use update-with-start to update a Shopping Cart, starting it if it does not exist.
6668
* [open_telemetry](open_telemetry) - Trace workflows with OpenTelemetry.
6769
* [patching](patching) - Alter workflows safely with `patch` and `deprecate_patch`.
6870
* [polling](polling) - Recommended implementation of an activity that needs to periodically poll an external resource waiting its successful completion.
6971
* [prometheus](prometheus) - Configure Prometheus metrics on clients/workers.
7072
* [pydantic_converter](pydantic_converter) - Data converter for using Pydantic models.
71-
* [safe_message_handlers](message_passing/safe_message_handlers/) - Safely handling updates and signals.
7273
* [schedules](schedules) - Demonstrates a Workflow Execution that occurs according to a schedule.
7374
* [sentry](sentry) - Report errors to Sentry.
7475
* [worker_specific_task_queues](worker_specific_task_queues) - Use unique task queues to ensure activities run on specific workers.
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
# Update With Start: Lazy init
2+
3+
This sample illustrates the use of update-with-start to send Updates to a Workflow, starting the Workflow if
4+
it is not running yet ("lazy init"). The Workflow represents a Shopping Cart in an e-commerce application, and
5+
update-with-start is used to add items to the cart, receiving back the updated cart subtotal.
6+
7+
To run, first see the main [README.md](../../../README.md) for prerequisites.
8+
9+
Then run the following from this directory:
10+
11+
poetry run python worker.py
12+
13+
Then, in another terminal:
14+
15+
poetry run python starter.py
16+
17+
This will start a worker to run your workflow and activities, then simulate a backend application receiving
18+
requests to add items to a shopping cart, before finalizing the order.
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
TASK_QUEUE = "update-with-start-lazy-initialization"
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
import asyncio
2+
from dataclasses import dataclass
3+
from typing import Optional
4+
5+
from temporalio import activity
6+
7+
8+
@dataclass
9+
class ShoppingCartItem:
10+
sku: str
11+
quantity: int
12+
13+
14+
@activity.defn
15+
async def get_price(item: ShoppingCartItem) -> Optional[int]:
16+
await asyncio.sleep(0.1)
17+
price = None if item.sku == "sku-456" else 599
18+
if price is None:
19+
return None
20+
return price * item.quantity
Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
import asyncio
2+
import uuid
3+
from typing import Optional, Tuple
4+
5+
from temporalio import common
6+
from temporalio.client import (
7+
Client,
8+
WithStartWorkflowOperation,
9+
WorkflowHandle,
10+
WorkflowUpdateFailedError,
11+
)
12+
from temporalio.exceptions import ApplicationError
13+
14+
from message_passing.update_with_start.lazy_initialization import TASK_QUEUE
15+
from message_passing.update_with_start.lazy_initialization.workflows import (
16+
ShoppingCartItem,
17+
ShoppingCartWorkflow,
18+
)
19+
20+
21+
async def handle_add_item_request(
22+
session_id: str, item_id: str, quantity: int, temporal_client: Client
23+
) -> Tuple[Optional[int], WorkflowHandle]:
24+
"""
25+
Handle a client request to add an item to the shopping cart. The user is not logged in, but a session ID is
26+
available from a cookie, and we use this as the cart ID. The Temporal client was created at service-start
27+
time and is shared by all request handlers.
28+
29+
A Workflow Type exists that can be used to represent a shopping cart. The method uses update-with-start to
30+
add an item to the shopping cart, creating the cart if it doesn't already exist.
31+
32+
Note that the workflow handle is available, even if the Update fails.
33+
"""
34+
cart_id = f"cart-{session_id}"
35+
start_op = WithStartWorkflowOperation(
36+
ShoppingCartWorkflow.run,
37+
id=cart_id,
38+
id_conflict_policy=common.WorkflowIDConflictPolicy.USE_EXISTING,
39+
task_queue=TASK_QUEUE,
40+
)
41+
try:
42+
price = await temporal_client.execute_update_with_start_workflow(
43+
ShoppingCartWorkflow.add_item,
44+
ShoppingCartItem(sku=item_id, quantity=quantity),
45+
start_workflow_operation=start_op,
46+
)
47+
except WorkflowUpdateFailedError as err:
48+
if (
49+
isinstance(err.cause, ApplicationError)
50+
and err.cause.type == "ItemUnavailableError"
51+
):
52+
price = None
53+
else:
54+
raise err
55+
56+
workflow_handle = await start_op.workflow_handle()
57+
58+
return price, workflow_handle
59+
60+
61+
async def main():
62+
print("🛒")
63+
session_id = f"session-{uuid.uuid4()}"
64+
temporal_client = await Client.connect("localhost:7233")
65+
subtotal_1, _ = await handle_add_item_request(
66+
session_id, "sku-123", 1, temporal_client
67+
)
68+
subtotal_2, wf_handle = await handle_add_item_request(
69+
session_id, "sku-456", 1, temporal_client
70+
)
71+
print(f"subtotals were, {[subtotal_1, subtotal_2]}")
72+
await wf_handle.signal(ShoppingCartWorkflow.checkout)
73+
final_order = await wf_handle.result()
74+
print(f"final order: {final_order}")
75+
76+
77+
if __name__ == "__main__":
78+
asyncio.run(main())
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
import asyncio
2+
import logging
3+
4+
from temporalio.client import Client
5+
from temporalio.worker import Worker
6+
7+
from message_passing.update_with_start.lazy_initialization import TASK_QUEUE, workflows
8+
from message_passing.update_with_start.lazy_initialization.activities import get_price
9+
10+
interrupt_event = asyncio.Event()
11+
12+
13+
async def main():
14+
logging.basicConfig(level=logging.INFO)
15+
16+
client = await Client.connect("localhost:7233")
17+
18+
async with Worker(
19+
client,
20+
task_queue=TASK_QUEUE,
21+
workflows=[workflows.ShoppingCartWorkflow],
22+
activities=[get_price],
23+
):
24+
logging.info("Worker started, ctrl+c to exit")
25+
await interrupt_event.wait()
26+
logging.info("Shutting down")
27+
28+
29+
if __name__ == "__main__":
30+
loop = asyncio.new_event_loop()
31+
try:
32+
loop.run_until_complete(main())
33+
except KeyboardInterrupt:
34+
interrupt_event.set()
35+
loop.run_until_complete(loop.shutdown_asyncgens())
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
from dataclasses import dataclass
2+
from datetime import timedelta
3+
from typing import List, Tuple
4+
5+
from temporalio import workflow
6+
from temporalio.exceptions import ApplicationError
7+
8+
with workflow.unsafe.imports_passed_through():
9+
from message_passing.update_with_start.lazy_initialization.activities import (
10+
ShoppingCartItem,
11+
get_price,
12+
)
13+
14+
15+
@dataclass
16+
class FinalizedOrder:
17+
id: str
18+
items: List[Tuple[ShoppingCartItem, int]]
19+
total: int
20+
21+
22+
@workflow.defn
23+
class ShoppingCartWorkflow:
24+
def __init__(self):
25+
self.items: List[Tuple[ShoppingCartItem, int]] = []
26+
self.order_submitted = False
27+
28+
@workflow.run
29+
async def run(self) -> FinalizedOrder:
30+
await workflow.wait_condition(
31+
lambda: workflow.all_handlers_finished() and self.order_submitted
32+
)
33+
return FinalizedOrder(
34+
id=workflow.info().workflow_id,
35+
items=self.items,
36+
total=sum(price for _, price in self.items),
37+
)
38+
39+
@workflow.update
40+
async def add_item(self, item: ShoppingCartItem) -> int:
41+
price = await workflow.execute_activity(
42+
get_price, item, start_to_close_timeout=timedelta(seconds=10)
43+
)
44+
if price is None:
45+
raise ApplicationError(
46+
f"Item unavailable: {item}",
47+
type="ItemUnavailableError",
48+
)
49+
self.items.append((item, price))
50+
51+
return sum(price for _, price in self.items)
52+
53+
@add_item.validator
54+
def validate_add_item(self, item: ShoppingCartItem) -> None:
55+
if self.order_submitted:
56+
raise ApplicationError("Order already submitted")
57+
58+
@workflow.signal
59+
def checkout(self):
60+
self.order_submitted = True

tests/conftest.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,12 @@ def event_loop():
4141
async def env(request) -> AsyncGenerator[WorkflowEnvironment, None]:
4242
env_type = request.config.getoption("--workflow-environment")
4343
if env_type == "local":
44-
env = await WorkflowEnvironment.start_local()
44+
env = await WorkflowEnvironment.start_local(
45+
dev_server_extra_args=[
46+
"--dynamic-config-value",
47+
"frontend.enableExecuteMultiOperation=true",
48+
]
49+
)
4550
elif env_type == "time-skipping":
4651
env = await WorkflowEnvironment.start_time_skipping()
4752
else:
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
import pytest
2+
from temporalio import common
3+
from temporalio.client import Client, WithStartWorkflowOperation
4+
from temporalio.testing import WorkflowEnvironment
5+
from temporalio.worker import Worker
6+
7+
from message_passing.update_with_start.lazy_initialization.workflows import (
8+
ShoppingCartItem,
9+
ShoppingCartWorkflow,
10+
get_price,
11+
)
12+
13+
14+
async def test_shopping_cart_workflow(client: Client, env: WorkflowEnvironment):
15+
if env.supports_time_skipping:
16+
pytest.skip(
17+
"Java test server: https://github.com/temporalio/sdk-java/issues/1903"
18+
)
19+
async with Worker(
20+
client,
21+
task_queue="lazy-initialization-test",
22+
workflows=[ShoppingCartWorkflow],
23+
activities=[get_price],
24+
):
25+
cart_id = "cart--session-1234"
26+
make_start_op = lambda: WithStartWorkflowOperation(
27+
ShoppingCartWorkflow.run,
28+
id=cart_id,
29+
id_conflict_policy=common.WorkflowIDConflictPolicy.USE_EXISTING,
30+
task_queue="lazy-initialization-test",
31+
)
32+
start_op_1 = make_start_op()
33+
price = await client.execute_update_with_start_workflow(
34+
ShoppingCartWorkflow.add_item,
35+
ShoppingCartItem(sku="item-1", quantity=2),
36+
start_workflow_operation=start_op_1,
37+
)
38+
39+
assert price == 1198
40+
41+
workflow_handle = await start_op_1.workflow_handle()
42+
43+
start_op_2 = make_start_op()
44+
price = await client.execute_update_with_start_workflow(
45+
ShoppingCartWorkflow.add_item,
46+
ShoppingCartItem(sku="item-2", quantity=1),
47+
start_workflow_operation=start_op_2,
48+
)
49+
assert price == 1797
50+
51+
workflow_handle = await start_op_2.workflow_handle()
52+
53+
await workflow_handle.signal(ShoppingCartWorkflow.checkout)
54+
55+
finalized_order = await workflow_handle.result()
56+
assert finalized_order.items == [
57+
(ShoppingCartItem(sku="item-1", quantity=2), 1198),
58+
(ShoppingCartItem(sku="item-2", quantity=1), 599),
59+
]
60+
assert finalized_order.total == 1797

0 commit comments

Comments
 (0)