LangGraph 使用指南
LangGraph 使用指南
基础概念
LangGraph 是一个用于构建有状态、多步骤 AI 工作流的框架,基于 LangChain 构建,核心概念包括:
- Graph(图):工作流的整体结构,由节点和边组成
- Node(节点):工作流中的处理步骤,可以是函数、LLM 调用或任何可执行逻辑
- Edge(边):连接节点的路径,定义执行顺序
- State(状态):在节点之间传递的共享数据对象
- Compile(编译):将图转换为可执行对象
安装方法
# 基础安装
pip install langgraph
# 如果使用 LangChain 模型
pip install langchain langchain-openai
# 可选:用于可视化
pip install matplotlib
核心功能
1. 构建基本图结构
from typing import TypedDict, List
from langgraph.graph import StateGraph, END
# 定义状态类型
class State(TypedDict):
messages: List[str]
count: int
# 定义节点函数
def node1(state: State) -> State:
state["messages"].append("Node 1 executed")
state["count"] += 1
return state
def node2(state: State) -> State:
state["messages"].append("Node 2 executed")
state["count"] += 1
return state
# 创建图
graph = StateGraph(State)
# 添加节点
graph.add_node("step1", node1)
graph.add_node("step2", node2)
# 添加边
graph.set_entry_point("step1")
graph.add_edge("step1", "step2")
graph.set_finish_point("step2")
# 编译图
app = graph.compile()
# 执行
result = app.invoke({"messages": [], "count": 0})
print(result)
2. 条件边(条件路由)
from langgraph.graph import StateGraph, END
class State(TypedDict):
input_text: str
category: str
def classify(state: State) -> State:
# 模拟分类逻辑
if "?" in state["input_text"]:
state["category"] = "question"
else:
state["category"] = "statement"
return state
def handle_question(state: State) -> State:
state["input_text"] = f"Answer to: {state['input_text']}"
return state
def handle_statement(state: State) -> State:
state["input_text"] = f"Processed statement: {state['input_text']}"
return state
# 条件路由函数
def decide_category(state: State) -> str:
if state["category"] == "question":
return "question_node"
return "statement_node"
# 构建图
graph = StateGraph(State)
graph.add_node("classify", classify)
graph.add_node("question_node", handle_question)
graph.add_node("statement_node", handle_statement)
graph.set_entry_point("classify")
graph.add_conditional_edges(
"classify",
decide_category,
{
"question_node": "question_node",
"statement_node": "statement_node"
}
)
graph.add_edge("question_node", END)
graph.add_edge("statement_node", END)
app = graph.compile()
result = app.invoke({"input_text": "What is LangGraph?", "category": ""})
print(result)
3. 循环和递归
class State(TypedDict):
count: int
max_count: int
result: str
def increment(state: State) -> State:
state["count"] += 1
state["result"] = f"Step {state['count']}"
return state
def should_continue(state: State) -> str:
if state["count"] < state["max_count"]:
return "increment"
return "end"
graph = StateGraph(State)
graph.add_node("increment", increment)
graph.set_entry_point("increment")
graph.add_conditional_edges(
"increment",
should_continue,
{"increment": "increment", "end": END}
)
app = graph.compile()
result = app.invoke({"count": 0, "max_count": 3, "result": ""})
print(result)
4. 集成 LLM(以 OpenAI 为例)
from langchain_openai import ChatOpenAI
from langchain.schema import HumanMessage
from langgraph.graph import StateGraph, END
class State(TypedDict):
query: str
response: str
def call_llm(state: State) -> State:
llm = ChatOpenAI(temperature=0)
messages = [HumanMessage(content=state["query"])]
state["response"] = llm.invoke(messages).content
return state
graph = StateGraph(State)
graph.add_node("llm_call", call_llm)
graph.set_entry_point("llm_call")
graph.set_finish_point("llm_call")
app = graph.compile()
result = app.invoke({"query": "What is the capital of France?", "response": ""})
print(result["response"])
最佳实践
1. 状态管理最佳实践
# 使用 TypedDict 确保类型安全
from typing import TypedDict, Optional, List
class ChatState(TypedDict):
messages: List[dict]
user_id: str
session_data: Optional[dict]
error: Optional[str]
2. 错误处理
def safe_node(state: State) -> State:
try:
# 业务逻辑
result = process_data(state)
return {"...": result}
except Exception as e:
state["error"] = str(e)
return state
# 添加错误处理路径
def check_error(state: State) -> str:
return "error_handler" if state.get("error") else "next_node"
3. 性能优化
# 使用缓存避免重复计算
from functools import lru_cache
@lru_cache(maxsize=100)
def expensive_computation(input_data: str) -> str:
# 耗时操作
return processed_result
def node_with_cache(state: State) -> State:
state["result"] = expensive_computation(state["input"])
return state
4. 可观测性
# 添加日志记录
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def monitored_node(state: State) -> State:
logger.info(f"Processing state: {state}")
result = process(state)
logger.info(f"Result: {result}")
return result
5. 测试策略
# 单元测试节点
def test_node():
state = {"input": "test", "output": ""}
result = my_node(state)
assert result["output"] == "expected_output"
# 集成测试整个图
def test_graph():
app = build_graph()
result = app.invoke({"input": "test"})
assert "output" in result
6. 常见陷阱避免
避免在节点内部修改共享状态:
# ❌ 错误做法
def bad_node(state: State) -> State:
state["shared_data"].append("value") # 直接修改
return state
# ✅ 正确做法
def good_node(state: State) -> State:
new_state = state.copy()
new_state["shared_data"] = state["shared_data"] + ["value"]
return new_state
完整示例:问答系统
from typing import TypedDict, List, Optional
from langgraph.graph import StateGraph, END
from langchain_openai import ChatOpenAI
from langchain.schema import HumanMessage, SystemMessage
class QnAState(TypedDict):
question: str
context: Optional[str]
answer: str
confidence: float
needs_clarification: bool
def validate_question(state: QnAState) -> QnAState:
"""验证问题是否有效"""
if not state["question"] or len(state["question"]) < 3:
state["needs_clarification"] = True
return state
def handle_clarification(state: QnAState) -> QnAState:
"""处理需要澄清的问题"""
state["answer"] = "Please provide a more specific question."
return state
def retrieve_context(state: QnAState) -> QnAState:
"""检索相关上下文(模拟)"""
# 实际中会从数据库或文档中检索
state["context"] = f"Context related to: {state['question']}"
return state
def generate_answer(state: QnAState) -> QnAState:
"""使用 LLM 生成答案"""
llm = ChatOpenAI(temperature=0.7)
system_msg = SystemMessage(content="Answer the question accurately.")
human_msg = HumanMessage(content=f"Context: {state.get('context', 'No context')}\n\nQuestion: {state['question']}")
response = llm.invoke([system_msg, human_msg])
state["answer"] = response.content
state["confidence"] = 0.9 if state.get("context") else 0.5
return state
def decide_path(state: QnAState) -> str:
"""条件路由决策"""
if state["needs_clarification"]:
return "clarification"
return "answer_generation"
# 构建图
graph = StateGraph(QnAState)
graph.add_node("validate", validate_question)
graph.add_node("clarification", handle_clarification)
graph.add_node("retrieval", retrieve_context)
graph.add_node("answer_generation", generate_answer)
graph.set_entry_point("validate")
# 条件边
graph.add_conditional_edges(
"validate",
decide_path,
{
"clarification": "clarification",
"answer_generation": "retrieval"
}
)
# 顺序边
graph.add_edge("retrieval", "answer_generation")
graph.add_edge("clarification", END)
graph.add_edge("answer_generation", END)
app = graph.compile()
# 执行
result = app.invoke({
"question": "What is machine learning?",
"context": None,
"answer": "",
"confidence": 0.0,
"needs_clarification": False
})
print(f"Answer: {result['answer']}")
print(f"Confidence: {result['confidence']}")
进阶技巧
-
并行执行:使用
add_parallel_edges实现并行节点 - 子图:创建可复用的子图模块
- 状态持久化:配合数据库实现长期状态存储
-
流式输出:使用
stream方法实现实时输出
LangGraph 的强大之处在于将复杂的 AI 工作流抽象为有向图,使代码更清晰、可维护且易于调试。开始构建你的第一个图形化 AI 应用吧!