-
Notifications
You must be signed in to change notification settings - Fork 106
LangGraph Samples #272
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
LangGraph Samples #272
Changes from 48 commits
662053d
18b995a
f8f51a0
6b96e09
2e5c82e
0432967
ca793bf
5adef7d
3b62683
1fb4682
c18f9cb
d3fe116
771b46a
f579c0d
57110cf
0302320
e82d0a5
3475e6f
7db14fe
0beeb3c
3e6df8f
d772f0a
fe7134d
58516c1
9b17b34
6e320fa
3b54795
d097a07
982f75a
76162db
b083bfe
cc9480c
fa7f1bb
faf4dce
c7f240a
c95ddac
bb536f0
2a313c1
a68e14c
bb741b3
d69f011
4013b7b
f06c6a1
d3ce31e
7030c84
4788ea7
67b0e51
c15f496
4fc7d31
de57b8b
9df7258
d435497
dde6e5b
54a557b
d864d2a
e3c5257
1b57043
f421461
6118979
94ac0c0
bf7d7ff
75db484
75d28fb
42161bd
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,72 @@ | ||
| # Temporal Python Samples | ||
|
|
||
| ## Serena MCP Server | ||
|
|
||
| Always consult Serena memories at the start of a session using `mcp__serena__list_memories` and read relevant ones with `mcp__serena__read_memory`. Save important project-specific learnings to Serena for future sessions. | ||
|
|
||
| ## Pre-Commit and Pre-Push Checks | ||
|
|
||
| **ALWAYS run `poe lint` before committing or pushing** on both repositories: | ||
|
|
||
| ```bash | ||
| # In samples repo (langgraph_plugin) | ||
| poe lint | ||
|
|
||
| # In SDK repo (sdk-python langgraph-plugin branch) | ||
| cd /Users/maxim/temporal/sdk-python-root/langgraph-plugin | ||
| poe lint | ||
| ``` | ||
|
|
||
| This catches import sorting, formatting, type errors, and other style issues. Never push without confirming lint passes. | ||
|
|
||
| ## Test Failures | ||
|
|
||
| **NEVER delete tests just because they fail.** Failing tests indicate real issues with the implementation that need to be fixed. If tests fail: | ||
|
|
||
| 1. Investigate the root cause of the failure | ||
| 2. Fix the implementation, not the tests | ||
| 3. Only modify tests if they have incorrect assertions or are testing the wrong behavior | ||
|
|
||
| Tests are valuable signals - treat failures as bugs to fix, not inconveniences to remove. | ||
|
|
||
| ## Client Initialization Pattern | ||
|
|
||
| Use the `ClientConfig` pattern for client initialization to support environment-based configuration: | ||
|
|
||
| ```python | ||
| from temporalio.client import Client | ||
| from temporalio.envconfig import ClientConfig | ||
|
|
||
| config = ClientConfig.load_client_connect_config() | ||
| config.setdefault("target_host", "localhost:7233") | ||
| client = await Client.connect(**config) | ||
| ``` | ||
|
|
||
| This pattern allows configuration via environment variables while providing sensible defaults. | ||
|
|
||
| ## Design Decisions | ||
|
|
||
| **NEVER jump to implementation when presenting multiple design options.** When you identify several possible approaches to solve a problem: | ||
|
|
||
| 1. Present all options with their pros and cons | ||
| 2. Wait for the user to confirm which approach to take | ||
| 3. Only implement after receiving explicit confirmation | ||
|
|
||
| This prevents wasted effort implementing the wrong solution and ensures alignment with user preferences. | ||
|
|
||
| ## LangGraph Guidelines | ||
|
|
||
| ### Agent Creation | ||
|
|
||
| - **DO NOT** use `create_react_agent` from `langgraph.prebuilt` - it is deprecated | ||
| - **USE** `create_agent` from `langchain.agents` instead | ||
|
|
||
| ```python | ||
| # Wrong (deprecated) | ||
| from langgraph.prebuilt import create_react_agent | ||
| agent = create_react_agent(model=model, tools=[...], prompt="...") | ||
|
|
||
| # Correct | ||
| from langchain.agents import create_agent | ||
| agent = create_agent(model=model, tools=[...], system_prompt="...") | ||
| ``` |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,220 @@ | ||
| # Temporal LangGraph Samples | ||
|
|
||
| These samples demonstrate the Temporal LangGraph integration - combining LangGraph's agent framework with Temporal's durable execution. | ||
|
|
||
| > **Note:** The LangGraph integration is currently available as a preview feature in the `langgraph_plugin` branch of the SDK repository. | ||
|
|
||
| ## Overview | ||
|
|
||
| The integration combines: | ||
| - **Temporal workflows** for orchestrating agent control flow and state management | ||
| - **LangGraph** for defining agent graphs with conditional logic, cycles, and state | ||
|
|
||
| This approach ensures that AI agent workflows are durable, observable, and can handle failures gracefully. | ||
|
|
||
| ## Prerequisites | ||
|
|
||
| - Temporal server [running locally](https://docs.temporal.io/cli/server#start-dev) | ||
| - Python 3.9+ | ||
| - [uv](https://docs.astral.sh/uv/) package manager (recommended) | ||
|
|
||
| ## Installation | ||
|
|
||
| Since the LangGraph integration is currently in a branch, you need to install from the branch repositories. | ||
|
|
||
| ### Running the Samples | ||
|
|
||
| 1. Clone this samples repository: | ||
| ```bash | ||
| git clone -b langgraph_plugin https://github.com/mfateev/samples-python.git | ||
| cd samples-python | ||
| ``` | ||
|
|
||
| 2. Install dependencies: | ||
| ```bash | ||
| uv sync --group langgraph | ||
| ``` | ||
|
|
||
| 3. Install the SDK from the `langgraph-plugin` branch: | ||
| ```bash | ||
| uv pip install "temporalio @ git+https://github.com/mfateev/sdk-python.git@langgraph-plugin" | ||
| ``` | ||
|
|
||
| 4. Start a local Temporal server: | ||
| ```bash | ||
| temporal server start-dev | ||
| ``` | ||
|
|
||
| 5. Navigate to a sample directory and follow its README for specific instructions | ||
|
|
||
| ## LangGraph API Styles | ||
|
|
||
| LangGraph provides two API styles for defining workflows: | ||
|
|
||
| | Aspect | Graph API | Functional API | | ||
| |--------|-----------|----------------| | ||
| | Definition | `StateGraph` + `add_node()` + `add_edge()` | `@task` + `@entrypoint` | | ||
| | Control flow | Explicit graph edges | Python code (loops, conditionals) | | ||
| | State | Shared TypedDict with reducers | Function arguments/returns | | ||
| | Parallelism | Send API, conditional edges | Concurrent task calls | | ||
| | Compile | `compile(graph, "id")` | `compile_functional("id")` | | ||
|
|
||
| ## Examples | ||
|
|
||
| Examples are organized by API style: | ||
|
|
||
| ### Graph API (`graph_api/`) | ||
|
|
||
| StateGraph-based examples using nodes and edges: | ||
|
|
||
| | Sample | Description | | ||
| |--------|-------------| | ||
| | [hello_world](./graph_api/hello_world/) | Simple starter example demonstrating basic plugin setup and graph registration | | ||
| | [activity_from_node](./graph_api/activity_from_node/) | Calling Temporal activities from a graph node using run_in_workflow | | ||
| | [react_agent](./graph_api/react_agent/) | ReAct agent pattern with tool calling and multi-step reasoning | | ||
| | [human_in_the_loop](./graph_api/human_in_the_loop/) | Human-in-the-loop approval workflows using two approaches | | ||
| | ↳ [approval_graph_interrupt](./graph_api/human_in_the_loop/approval_graph_interrupt/) | Uses LangGraph's `interrupt()` function | | ||
| | ↳ [approval_wait_condition](./graph_api/human_in_the_loop/approval_wait_condition/) | Uses `run_in_workflow=True` with `workflow.wait_condition()` | | ||
| | [supervisor](./graph_api/supervisor/) | Multi-agent supervisor pattern coordinating specialized agents | | ||
| | [agentic_rag](./graph_api/agentic_rag/) | Retrieval-augmented generation with document grading and query rewriting | | ||
| | [deep_research](./graph_api/deep_research/) | Multi-step research with web search and iterative refinement | | ||
| | [plan_and_execute](./graph_api/plan_and_execute/) | Plan-and-execute pattern with structured step execution | | ||
| | [reflection](./graph_api/reflection/) | Self-reflection pattern for iterative improvement | | ||
|
|
||
| ### Functional API (`functional_api/`) | ||
|
|
||
| `@task` and `@entrypoint` decorator-based examples: | ||
|
|
||
| | Sample | Description | | ||
| |--------|-------------| | ||
| | [hello_world](./functional_api/hello_world/) | Simple starter example demonstrating basic plugin setup with `@task` and `@entrypoint` | | ||
| | [react_agent](./functional_api/react_agent/) | ReAct agent pattern with tool calling using tasks for model and tool execution | | ||
| | [human_in_the_loop](./functional_api/human_in_the_loop/) | Human-in-the-loop approval workflow using `interrupt()` for pause/resume | | ||
| | [supervisor](./functional_api/supervisor/) | Multi-agent supervisor pattern with tasks for each agent role | | ||
| | [agentic_rag](./functional_api/agentic_rag/) | RAG with document grading and query rewriting using task-based retrieval | | ||
| | [deep_research](./functional_api/deep_research/) | Multi-step research with parallel search execution via concurrent tasks | | ||
| | [plan_and_execute](./functional_api/plan_and_execute/) | Plan-and-execute pattern with step-by-step task execution | | ||
| | [reflection](./functional_api/reflection/) | Self-reflection pattern for iterative content improvement | | ||
|
|
||
| ## Usage | ||
|
|
||
| ### Graph API Usage | ||
|
|
||
| The Graph API uses `StateGraph` to define nodes and edges, with each node running as a Temporal activity: | ||
|
|
||
| ```python | ||
| from langgraph.graph import StateGraph, START, END | ||
| from temporalio import workflow | ||
| from temporalio.contrib.langgraph import LangGraphPlugin, compile | ||
|
|
||
| # 1. Define your graph | ||
| class State(TypedDict): | ||
| messages: Annotated[list, add_messages] | ||
|
|
||
| def chatbot(state: State) -> State: | ||
| response = model.invoke(state["messages"]) | ||
| return {"messages": [response]} | ||
|
|
||
| graph = StateGraph(State) | ||
| graph.add_node("chatbot", chatbot) | ||
| graph.add_edge(START, "chatbot") | ||
| graph.add_edge("chatbot", END) | ||
|
|
||
| # 2. Register graph with plugin | ||
| plugin = LangGraphPlugin(graphs={"my_graph": graph.compile()}) | ||
|
|
||
| # 3. Use in workflow | ||
| @workflow.defn | ||
| class MyWorkflow: | ||
| @workflow.run | ||
| async def run(self, query: str) -> dict: | ||
| app = compile("my_graph") # Get runner for registered graph | ||
| return await app.ainvoke({"messages": [("user", query)]}) | ||
|
|
||
| # 4. Start worker with plugin | ||
| async with Worker(client, task_queue="q", workflows=[MyWorkflow], plugins=[plugin]): | ||
| result = await client.execute_workflow(MyWorkflow.run, "Hello", ...) | ||
| ``` | ||
|
|
||
| ### Functional API Usage | ||
|
|
||
| The Functional API uses `@task` and `@entrypoint` decorators. Tasks run as Temporal activities: | ||
|
|
||
| ```python | ||
| from langgraph.func import task, entrypoint | ||
| from temporalio import workflow | ||
| from temporalio.contrib.langgraph import LangGraphFunctionalPlugin, compile_functional | ||
|
|
||
| # 1. Define tasks (run as Temporal activities) | ||
| @task | ||
| def research(topic: str) -> str: | ||
| """Each @task call becomes a Temporal activity.""" | ||
| return search_web(topic) | ||
|
|
||
| @task | ||
| def summarize(content: str) -> str: | ||
| return model.invoke(f"Summarize: {content}") | ||
|
|
||
| # 2. Define entrypoint (orchestrates tasks) | ||
| @entrypoint() | ||
| async def research_workflow(topic: str) -> dict: | ||
| """The entrypoint runs in the workflow, orchestrating tasks.""" | ||
| # Tasks can run in parallel | ||
| results = [research(q) for q in generate_queries(topic)] | ||
| content = [await r for r in results] # Wait for all | ||
|
|
||
| summary = await summarize("\n".join(content)) | ||
| return {"summary": summary} | ||
|
|
||
| # 3. Register entrypoint with plugin | ||
| plugin = LangGraphFunctionalPlugin( | ||
| entrypoints={"research": research_workflow} | ||
| ) | ||
|
|
||
| # 4. Use in workflow | ||
| @workflow.defn | ||
| class ResearchWorkflow: | ||
| @workflow.run | ||
| async def run(self, topic: str) -> dict: | ||
| app = compile_functional("research") | ||
| return await app.ainvoke(topic) | ||
|
|
||
| # 5. Start worker with plugin | ||
| async with Worker(client, task_queue="q", workflows=[ResearchWorkflow], plugins=[plugin]): | ||
| result = await client.execute_workflow(ResearchWorkflow.run, "AI trends", ...) | ||
| ``` | ||
|
|
||
| ### Key Differences | ||
|
|
||
| | Feature | Graph API | Functional API | | ||
| |---------|-----------|----------------| | ||
| | Task definition | Graph nodes | `@task` decorator | | ||
| | Orchestration | Graph edges | Python control flow | | ||
| | Parallel execution | `Send` API | Concurrent `await` | | ||
| | State management | Shared `TypedDict` | Function arguments | | ||
| | Compile function | `compile("graph_id")` | `compile_functional("entrypoint_id")` | | ||
| | Plugin class | `LangGraphPlugin` | `LangGraphFunctionalPlugin` | | ||
|
|
||
| ### Configuration Options | ||
|
|
||
| Both APIs support activity configuration: | ||
|
|
||
| ```python | ||
| # Graph API - per-node options | ||
| plugin = LangGraphPlugin( | ||
| graphs={"my_graph": graph}, | ||
| default_start_to_close_timeout=timedelta(minutes=5), | ||
| node_options={ | ||
| "expensive_node": {"start_to_close_timeout": timedelta(minutes=30)} | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Worth adding a sample for this, or incorporating into a sample? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is the suggestion here to add a sample for an "expensive node"? What's the significance of an expensive node? |
||
| } | ||
| ) | ||
|
|
||
| # Functional API - per-task options | ||
| plugin = LangGraphFunctionalPlugin( | ||
| entrypoints={"my_entrypoint": entrypoint_func}, | ||
| default_task_timeout=timedelta(minutes=5), | ||
| task_options={ | ||
| "expensive_task": {"start_to_close_timeout": timedelta(minutes=30)} | ||
| } | ||
| ) | ||
| ``` | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1 @@ | ||
| # Temporal LangGraph Samples |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1 @@ | ||
| """LangGraph Functional API + Temporal Integration Proposal.""" |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1 @@ | ||
| """Agentic RAG - LangGraph Functional API with Temporal.""" |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,79 @@ | ||
| """Agentic RAG Entrypoint Definition. | ||
|
|
||
| The @entrypoint function implements an agentic RAG pattern: | ||
| 1. Retrieve documents based on query | ||
| 2. Grade documents for relevance | ||
| 3. If not relevant, rewrite query and retry | ||
| 4. Generate answer using relevant documents | ||
| """ | ||
|
|
||
| from typing import Any | ||
|
|
||
| from langgraph.func import entrypoint | ||
|
|
||
| from langgraph_plugin.functional_api.agentic_rag.tasks import ( | ||
| generate_answer, | ||
| grade_documents, | ||
| retrieve_documents, | ||
| rewrite_query, | ||
| ) | ||
|
|
||
|
|
||
| @entrypoint() | ||
| async def agentic_rag_entrypoint(question: str, max_retries: int = 2) -> dict[str, Any]: | ||
| """Run an agentic RAG system. | ||
|
|
||
| The system will: | ||
| 1. Retrieve documents | ||
| 2. Grade them for relevance | ||
| 3. Rewrite query and retry if not relevant | ||
| 4. Generate answer from relevant documents | ||
|
|
||
| Each @task call runs as a Temporal activity with automatic retries. | ||
|
|
||
| Args: | ||
| question: The user's question. | ||
| max_retries: Maximum query rewrite attempts. | ||
|
|
||
| Returns: | ||
| Dict with answer, documents used, and metadata. | ||
| """ | ||
| current_query = question | ||
| all_documents: list[dict[str, Any]] = [] | ||
|
|
||
| for attempt in range(max_retries + 1): | ||
| # Step 1: Retrieve documents | ||
| documents = await retrieve_documents(current_query) | ||
| all_documents.extend(documents) | ||
|
|
||
| # Step 2: Grade documents | ||
| grade_result = await grade_documents(question, documents) | ||
|
|
||
| if grade_result["relevant"]: | ||
| # Step 3: Generate answer with relevant documents | ||
| answer = await generate_answer(question, documents) | ||
|
|
||
| return { | ||
| "question": question, | ||
| "answer": answer, | ||
| "documents_used": len(documents), | ||
| "query_rewrites": attempt, | ||
| "final_query": current_query, | ||
| "status": "success", | ||
| } | ||
|
|
||
| # Documents not relevant - rewrite query if retries left | ||
| if attempt < max_retries: | ||
| current_query = await rewrite_query(current_query) | ||
|
|
||
| # Max retries reached - generate best-effort answer | ||
| answer = await generate_answer(question, all_documents) | ||
|
|
||
| return { | ||
| "question": question, | ||
| "answer": answer, | ||
| "documents_used": len(all_documents), | ||
| "query_rewrites": max_retries, | ||
| "final_query": current_query, | ||
| "status": "max_retries_reached", | ||
| } |
Uh oh!
There was an error while loading. Please reload this page.