Skip to content
Closed
Show file tree
Hide file tree
Changes from 48 commits
Commits
Show all changes
64 commits
Select commit Hold shift + click to select a range
662053d
Add LangGraph Hello World sample
mfateev Dec 27, 2025
18b995a
Add README for react_agent sample
mfateev Dec 27, 2025
f8f51a0
LangGraph: Use ClientConfig for client initialization in all samples
mfateev Dec 27, 2025
6b96e09
LangGraph: Add react_agent and approval_workflow sample files
mfateev Dec 27, 2025
2e5c82e
LangGraph: Add README for approval_workflow and update react_agent RE…
mfateev Dec 27, 2025
0432967
LangGraph: Add notification activity and CLI response tool to approva…
mfateev Dec 27, 2025
ca793bf
LangGraph: Wrap model with temporal_model for durable LLM calls in re…
mfateev Dec 27, 2025
5adef7d
LangGraph: Update react_agent sample to use create_durable_react_agent
mfateev Dec 27, 2025
3b62683
LangGraph: Update react_agent to use new activity_options API
mfateev Dec 27, 2025
1fb4682
LangGraph: Use native create_react_agent instead of wrappers
mfateev Dec 27, 2025
c18f9cb
LangGraph: Update SAMPLES_PROPOSAL.md to reflect simplified API
mfateev Dec 27, 2025
d3fe116
LangGraph: Use create_agent from langchain.agents (create_react_agent…
mfateev Dec 27, 2025
771b46a
LangGraph: Update react_agent sample to demonstrate multi-step agenti…
mfateev Dec 28, 2025
f579c0d
LangGraph: Add RAG sample and update documentation
mfateev Dec 28, 2025
57110cf
LangGraph: Update README with branch installation instructions
mfateev Dec 28, 2025
0302320
LangGraph: Remove unnecessary hasattr check in agentic_rag
mfateev Dec 28, 2025
e82d0a5
LangGraph samples: Use create_agent instead of deprecated create_reac…
mfateev Dec 29, 2025
3475e6f
langgraph-supervisor dependency added
mfateev Dec 29, 2025
7db14fe
LangGraph: Add deep research, plan-and-execute, and reflection samples
mfateev Dec 29, 2025
0beeb3c
LangGraph samples: Standardize usage instructions in READMEs
mfateev Dec 29, 2025
3e6df8f
LangGraph: Deep research sample uses real DuckDuckGo web search
mfateev Dec 29, 2025
d772f0a
LangGraph samples: Flatten directory structure to single level
mfateev Dec 29, 2025
fe7134d
LangGraph samples: Update README run commands for flattened structure
mfateev Dec 29, 2025
58516c1
Fix lint errors in langgraph samples
mfateev Dec 30, 2025
9b17b34
Add unit tests for hello_world LangGraph sample
mfateev Dec 30, 2025
6e320fa
Add tests for all LangGraph samples and fix serialization bugs
mfateev Dec 30, 2025
3b54795
Add activity_from_node sample demonstrating run_in_workflow
mfateev Dec 30, 2025
d097a07
Split approval workflow into two samples demonstrating different appr…
mfateev Dec 30, 2025
982f75a
Move approval workflow samples to human_in_the_loop directory
mfateev Dec 30, 2025
76162db
Rename human_in_the_loop samples for clarity
mfateev Dec 30, 2025
b083bfe
Update samples README with human_in_the_loop samples
mfateev Dec 30, 2025
cc9480c
Add README for human_in_the_loop samples
mfateev Dec 30, 2025
fa7f1bb
Add tests for human_in_the_loop samples and fix wait_condition graph
mfateev Dec 30, 2025
faf4dce
Add Serena consultation instruction to CLAUDE.md
mfateev Dec 30, 2025
c7f240a
Rename langgraph_samples to langgraph_plugin
mfateev Dec 31, 2025
c95ddac
Fix pyproject.toml reference to langgraph_plugin
mfateev Dec 31, 2025
bb536f0
Add graph visualization queries to approval workflow samples
mfateev Dec 31, 2025
2a313c1
Add get_graph_state query to approval workflow samples
mfateev Dec 31, 2025
a68e14c
Return typed GraphStateResponse from get_graph_state query
mfateev Dec 31, 2025
bb741b3
Add Available Queries section to human_in_the_loop README
mfateev Dec 31, 2025
d69f011
Add LangGraph Functional API Temporal integration proposal
mfateev Jan 1, 2026
4013b7b
LangGraph Functional API: Entrypoints run in workflow sandbox
mfateev Jan 2, 2026
f06c6a1
LangGraph: Reorganize samples into graph_api and functional_api
mfateev Jan 2, 2026
d3ce31e
Add functional API samples matching graph API examples
mfateev Jan 2, 2026
7030c84
Fix test isolation: clear functional entrypoint registry between tests
mfateev Jan 2, 2026
4788ea7
Add functional API tests and update CLAUDE.md with lint and test guid…
mfateev Jan 2, 2026
67b0e51
Fix functional API test assertions to match entrypoint return keys
mfateev Jan 2, 2026
c15f496
Add comprehensive usage documentation for Graph and Functional APIs
mfateev Jan 2, 2026
4fc7d31
Add migration guide for adapting LangGraph Functional API to Temporal
mfateev Jan 2, 2026
de57b8b
Add README documentation for each Functional API sample
mfateev Jan 3, 2026
9df7258
Fix configuration options documentation to use correct SDK parameters
mfateev Jan 3, 2026
d435497
Add activity_options to all graph_api samples
mfateev Jan 3, 2026
dde6e5b
LangGraph Functional API: Add activity_options() to all samples
mfateev Jan 3, 2026
54a557b
LangGraph: Update samples to use unified LangGraphPlugin API
mfateev Jan 3, 2026
d864d2a
Add continue-as-new sample for Functional API
mfateev Jan 5, 2026
e3c5257
Add continue_as_new sample to README
mfateev Jan 5, 2026
1b57043
Update Functional API continue-as-new to use should_continue callback
mfateev Jan 8, 2026
f421461
Update Graph API README paths to new directory structure
mfateev Jan 9, 2026
6118979
Add architecture diagrams for Temporal + LangGraph integration
mfateev Jan 9, 2026
94ac0c0
remove approval_timeout from run to allow dataclass deserialization
brianstrauch Mar 26, 2026
bf7d7ff
remove reference to optional timeout
brianstrauch Mar 26, 2026
75db484
use concrete types instead of any
brianstrauch Mar 26, 2026
75d28fb
move graph compilation to init function and remove defensive checks f…
brianstrauch Mar 26, 2026
42161bd
Merge branch 'main' into langgraph_plugin
brianstrauch Mar 26, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 72 additions & 0 deletions CLAUDE.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
# Temporal Python Samples

## Serena MCP Server

Always consult Serena memories at the start of a session using `mcp__serena__list_memories` and read relevant ones with `mcp__serena__read_memory`. Save important project-specific learnings to Serena for future sessions.

## Pre-Commit and Pre-Push Checks

**ALWAYS run `poe lint` before committing or pushing** on both repositories:

```bash
# In samples repo (langgraph_plugin)
poe lint

# In SDK repo (sdk-python langgraph-plugin branch)
cd /Users/maxim/temporal/sdk-python-root/langgraph-plugin
poe lint
```

This catches import sorting, formatting, type errors, and other style issues. Never push without confirming lint passes.

## Test Failures

**NEVER delete tests just because they fail.** Failing tests indicate real issues with the implementation that need to be fixed. If tests fail:

1. Investigate the root cause of the failure
2. Fix the implementation, not the tests
3. Only modify tests if they have incorrect assertions or are testing the wrong behavior

Tests are valuable signals - treat failures as bugs to fix, not inconveniences to remove.

## Client Initialization Pattern

Use the `ClientConfig` pattern for client initialization to support environment-based configuration:

```python
from temporalio.client import Client
from temporalio.envconfig import ClientConfig

config = ClientConfig.load_client_connect_config()
config.setdefault("target_host", "localhost:7233")
client = await Client.connect(**config)
```

This pattern allows configuration via environment variables while providing sensible defaults.

## Design Decisions

**NEVER jump to implementation when presenting multiple design options.** When you identify several possible approaches to solve a problem:

1. Present all options with their pros and cons
2. Wait for the user to confirm which approach to take
3. Only implement after receiving explicit confirmation

This prevents wasted effort implementing the wrong solution and ensures alignment with user preferences.

## LangGraph Guidelines

### Agent Creation

- **DO NOT** use `create_react_agent` from `langgraph.prebuilt` - it is deprecated
- **USE** `create_agent` from `langchain.agents` instead

```python
# Wrong (deprecated)
from langgraph.prebuilt import create_react_agent
agent = create_react_agent(model=model, tools=[...], prompt="...")

# Correct
from langchain.agents import create_agent
agent = create_agent(model=model, tools=[...], system_prompt="...")
```
220 changes: 220 additions & 0 deletions langgraph_plugin/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,220 @@
# Temporal LangGraph Samples

These samples demonstrate the Temporal LangGraph integration - combining LangGraph's agent framework with Temporal's durable execution.

> **Note:** The LangGraph integration is currently available as a preview feature in the `langgraph_plugin` branch of the SDK repository.

## Overview

The integration combines:
- **Temporal workflows** for orchestrating agent control flow and state management
- **LangGraph** for defining agent graphs with conditional logic, cycles, and state

This approach ensures that AI agent workflows are durable, observable, and can handle failures gracefully.

## Prerequisites

- Temporal server [running locally](https://docs.temporal.io/cli/server#start-dev)
- Python 3.9+
- [uv](https://docs.astral.sh/uv/) package manager (recommended)

## Installation

Since the LangGraph integration is currently in a branch, you need to install from the branch repositories.

### Running the Samples

1. Clone this samples repository:
```bash
git clone -b langgraph_plugin https://github.com/mfateev/samples-python.git
cd samples-python
```

2. Install dependencies:
```bash
uv sync --group langgraph
```

3. Install the SDK from the `langgraph-plugin` branch:
```bash
uv pip install "temporalio @ git+https://github.com/mfateev/sdk-python.git@langgraph-plugin"
```

4. Start a local Temporal server:
```bash
temporal server start-dev
```

5. Navigate to a sample directory and follow its README for specific instructions

## LangGraph API Styles

LangGraph provides two API styles for defining workflows:

| Aspect | Graph API | Functional API |
|--------|-----------|----------------|
| Definition | `StateGraph` + `add_node()` + `add_edge()` | `@task` + `@entrypoint` |
| Control flow | Explicit graph edges | Python code (loops, conditionals) |
| State | Shared TypedDict with reducers | Function arguments/returns |
| Parallelism | Send API, conditional edges | Concurrent task calls |
| Compile | `compile(graph, "id")` | `compile_functional("id")` |

## Examples

Examples are organized by API style:

### Graph API (`graph_api/`)

StateGraph-based examples using nodes and edges:

| Sample | Description |
|--------|-------------|
| [hello_world](./graph_api/hello_world/) | Simple starter example demonstrating basic plugin setup and graph registration |
| [activity_from_node](./graph_api/activity_from_node/) | Calling Temporal activities from a graph node using run_in_workflow |
| [react_agent](./graph_api/react_agent/) | ReAct agent pattern with tool calling and multi-step reasoning |
| [human_in_the_loop](./graph_api/human_in_the_loop/) | Human-in-the-loop approval workflows using two approaches |
| ↳ [approval_graph_interrupt](./graph_api/human_in_the_loop/approval_graph_interrupt/) | Uses LangGraph's `interrupt()` function |
| ↳ [approval_wait_condition](./graph_api/human_in_the_loop/approval_wait_condition/) | Uses `run_in_workflow=True` with `workflow.wait_condition()` |
| [supervisor](./graph_api/supervisor/) | Multi-agent supervisor pattern coordinating specialized agents |
| [agentic_rag](./graph_api/agentic_rag/) | Retrieval-augmented generation with document grading and query rewriting |
| [deep_research](./graph_api/deep_research/) | Multi-step research with web search and iterative refinement |
| [plan_and_execute](./graph_api/plan_and_execute/) | Plan-and-execute pattern with structured step execution |
| [reflection](./graph_api/reflection/) | Self-reflection pattern for iterative improvement |

### Functional API (`functional_api/`)

`@task` and `@entrypoint` decorator-based examples:

| Sample | Description |
|--------|-------------|
| [hello_world](./functional_api/hello_world/) | Simple starter example demonstrating basic plugin setup with `@task` and `@entrypoint` |
| [react_agent](./functional_api/react_agent/) | ReAct agent pattern with tool calling using tasks for model and tool execution |
| [human_in_the_loop](./functional_api/human_in_the_loop/) | Human-in-the-loop approval workflow using `interrupt()` for pause/resume |
| [supervisor](./functional_api/supervisor/) | Multi-agent supervisor pattern with tasks for each agent role |
| [agentic_rag](./functional_api/agentic_rag/) | RAG with document grading and query rewriting using task-based retrieval |
| [deep_research](./functional_api/deep_research/) | Multi-step research with parallel search execution via concurrent tasks |
| [plan_and_execute](./functional_api/plan_and_execute/) | Plan-and-execute pattern with step-by-step task execution |
| [reflection](./functional_api/reflection/) | Self-reflection pattern for iterative content improvement |

## Usage

### Graph API Usage

The Graph API uses `StateGraph` to define nodes and edges, with each node running as a Temporal activity:

```python
from langgraph.graph import StateGraph, START, END
from temporalio import workflow
from temporalio.contrib.langgraph import LangGraphPlugin, compile

# 1. Define your graph
class State(TypedDict):
messages: Annotated[list, add_messages]

def chatbot(state: State) -> State:
response = model.invoke(state["messages"])
return {"messages": [response]}

graph = StateGraph(State)
graph.add_node("chatbot", chatbot)
graph.add_edge(START, "chatbot")
graph.add_edge("chatbot", END)

# 2. Register graph with plugin
plugin = LangGraphPlugin(graphs={"my_graph": graph.compile()})

# 3. Use in workflow
@workflow.defn
class MyWorkflow:
@workflow.run
async def run(self, query: str) -> dict:
app = compile("my_graph") # Get runner for registered graph
return await app.ainvoke({"messages": [("user", query)]})

# 4. Start worker with plugin
async with Worker(client, task_queue="q", workflows=[MyWorkflow], plugins=[plugin]):
result = await client.execute_workflow(MyWorkflow.run, "Hello", ...)
```

### Functional API Usage

The Functional API uses `@task` and `@entrypoint` decorators. Tasks run as Temporal activities:

```python
from langgraph.func import task, entrypoint
from temporalio import workflow
from temporalio.contrib.langgraph import LangGraphFunctionalPlugin, compile_functional

# 1. Define tasks (run as Temporal activities)
@task
def research(topic: str) -> str:
"""Each @task call becomes a Temporal activity."""
return search_web(topic)

@task
def summarize(content: str) -> str:
return model.invoke(f"Summarize: {content}")

# 2. Define entrypoint (orchestrates tasks)
@entrypoint()
async def research_workflow(topic: str) -> dict:
"""The entrypoint runs in the workflow, orchestrating tasks."""
# Tasks can run in parallel
results = [research(q) for q in generate_queries(topic)]
content = [await r for r in results] # Wait for all

summary = await summarize("\n".join(content))
return {"summary": summary}

# 3. Register entrypoint with plugin
plugin = LangGraphFunctionalPlugin(
entrypoints={"research": research_workflow}
)

# 4. Use in workflow
@workflow.defn
class ResearchWorkflow:
@workflow.run
async def run(self, topic: str) -> dict:
app = compile_functional("research")
return await app.ainvoke(topic)

# 5. Start worker with plugin
async with Worker(client, task_queue="q", workflows=[ResearchWorkflow], plugins=[plugin]):
result = await client.execute_workflow(ResearchWorkflow.run, "AI trends", ...)
```

### Key Differences

| Feature | Graph API | Functional API |
|---------|-----------|----------------|
| Task definition | Graph nodes | `@task` decorator |
| Orchestration | Graph edges | Python control flow |
| Parallel execution | `Send` API | Concurrent `await` |
| State management | Shared `TypedDict` | Function arguments |
| Compile function | `compile("graph_id")` | `compile_functional("entrypoint_id")` |
| Plugin class | `LangGraphPlugin` | `LangGraphFunctionalPlugin` |

### Configuration Options

Both APIs support activity configuration:

```python
# Graph API - per-node options
plugin = LangGraphPlugin(
graphs={"my_graph": graph},
default_start_to_close_timeout=timedelta(minutes=5),
node_options={
Comment thread
brianstrauch marked this conversation as resolved.
Outdated
"expensive_node": {"start_to_close_timeout": timedelta(minutes=30)}
Copy link
Copy Markdown
Contributor

@drewhoskins-temporal drewhoskins-temporal Jan 3, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Worth adding a sample for this, or incorporating into a sample?

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the suggestion here to add a sample for an "expensive node"? What's the significance of an expensive node?

}
)

# Functional API - per-task options
plugin = LangGraphFunctionalPlugin(
entrypoints={"my_entrypoint": entrypoint_func},
default_task_timeout=timedelta(minutes=5),
task_options={
"expensive_task": {"start_to_close_timeout": timedelta(minutes=30)}
}
)
```
1 change: 1 addition & 0 deletions langgraph_plugin/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
# Temporal LangGraph Samples
1 change: 1 addition & 0 deletions langgraph_plugin/functional_api/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
"""LangGraph Functional API + Temporal Integration Proposal."""
1 change: 1 addition & 0 deletions langgraph_plugin/functional_api/agentic_rag/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
"""Agentic RAG - LangGraph Functional API with Temporal."""
79 changes: 79 additions & 0 deletions langgraph_plugin/functional_api/agentic_rag/entrypoint.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
"""Agentic RAG Entrypoint Definition.

The @entrypoint function implements an agentic RAG pattern:
1. Retrieve documents based on query
2. Grade documents for relevance
3. If not relevant, rewrite query and retry
4. Generate answer using relevant documents
"""

from typing import Any

from langgraph.func import entrypoint

from langgraph_plugin.functional_api.agentic_rag.tasks import (
generate_answer,
grade_documents,
retrieve_documents,
rewrite_query,
)


@entrypoint()
async def agentic_rag_entrypoint(question: str, max_retries: int = 2) -> dict[str, Any]:
"""Run an agentic RAG system.

The system will:
1. Retrieve documents
2. Grade them for relevance
3. Rewrite query and retry if not relevant
4. Generate answer from relevant documents

Each @task call runs as a Temporal activity with automatic retries.

Args:
question: The user's question.
max_retries: Maximum query rewrite attempts.

Returns:
Dict with answer, documents used, and metadata.
"""
current_query = question
all_documents: list[dict[str, Any]] = []

for attempt in range(max_retries + 1):
# Step 1: Retrieve documents
documents = await retrieve_documents(current_query)
all_documents.extend(documents)

# Step 2: Grade documents
grade_result = await grade_documents(question, documents)

if grade_result["relevant"]:
# Step 3: Generate answer with relevant documents
answer = await generate_answer(question, documents)

return {
"question": question,
"answer": answer,
"documents_used": len(documents),
"query_rewrites": attempt,
"final_query": current_query,
"status": "success",
}

# Documents not relevant - rewrite query if retries left
if attempt < max_retries:
current_query = await rewrite_query(current_query)

# Max retries reached - generate best-effort answer
answer = await generate_answer(question, all_documents)

return {
"question": question,
"answer": answer,
"documents_used": len(all_documents),
"query_rewrites": max_retries,
"final_query": current_query,
"status": "max_retries_reached",
}
Loading
Loading