阅读视图

发现新文章,点击刷新页面。

LangGraph 使用指南

LangGraph 使用指南

基础概念

LangGraph 是一个用于构建有状态、多步骤 AI 工作流的框架,基于 LangChain 构建,核心概念包括:

  • Graph(图):工作流的整体结构,由节点和边组成
  • Node(节点):工作流中的处理步骤,可以是函数、LLM 调用或任何可执行逻辑
  • Edge(边):连接节点的路径,定义执行顺序
  • State(状态):在节点之间传递的共享数据对象
  • Compile(编译):将图转换为可执行对象

安装方法

# 基础安装
pip install langgraph

# 如果使用 LangChain 模型
pip install langchain langchain-openai

# 可选:用于可视化
pip install matplotlib

核心功能

1. 构建基本图结构

from typing import TypedDict, List
from langgraph.graph import StateGraph, END

# 定义状态类型
class State(TypedDict):
    messages: List[str]
    count: int

# 定义节点函数
def node1(state: State) -> State:
    state["messages"].append("Node 1 executed")
    state["count"] += 1
    return state

def node2(state: State) -> State:
    state["messages"].append("Node 2 executed")
    state["count"] += 1
    return state

# 创建图
graph = StateGraph(State)

# 添加节点
graph.add_node("step1", node1)
graph.add_node("step2", node2)

# 添加边
graph.set_entry_point("step1")
graph.add_edge("step1", "step2")
graph.set_finish_point("step2")

# 编译图
app = graph.compile()

# 执行
result = app.invoke({"messages": [], "count": 0})
print(result)

2. 条件边(条件路由)

from langgraph.graph import StateGraph, END

class State(TypedDict):
    input_text: str
    category: str

def classify(state: State) -> State:
    # 模拟分类逻辑
    if "?" in state["input_text"]:
        state["category"] = "question"
    else:
        state["category"] = "statement"
    return state

def handle_question(state: State) -> State:
    state["input_text"] = f"Answer to: {state['input_text']}"
    return state

def handle_statement(state: State) -> State:
    state["input_text"] = f"Processed statement: {state['input_text']}"
    return state

# 条件路由函数
def decide_category(state: State) -> str:
    if state["category"] == "question":
        return "question_node"
    return "statement_node"

# 构建图
graph = StateGraph(State)
graph.add_node("classify", classify)
graph.add_node("question_node", handle_question)
graph.add_node("statement_node", handle_statement)

graph.set_entry_point("classify")
graph.add_conditional_edges(
    "classify",
    decide_category,
    {
        "question_node": "question_node",
        "statement_node": "statement_node"
    }
)
graph.add_edge("question_node", END)
graph.add_edge("statement_node", END)

app = graph.compile()

result = app.invoke({"input_text": "What is LangGraph?", "category": ""})
print(result)

3. 循环和递归

class State(TypedDict):
    count: int
    max_count: int
    result: str

def increment(state: State) -> State:
    state["count"] += 1
    state["result"] = f"Step {state['count']}"
    return state

def should_continue(state: State) -> str:
    if state["count"] < state["max_count"]:
        return "increment"
    return "end"

graph = StateGraph(State)
graph.add_node("increment", increment)

graph.set_entry_point("increment")
graph.add_conditional_edges(
    "increment",
    should_continue,
    {"increment": "increment", "end": END}
)

app = graph.compile()
result = app.invoke({"count": 0, "max_count": 3, "result": ""})
print(result)

4. 集成 LLM(以 OpenAI 为例)

from langchain_openai import ChatOpenAI
from langchain.schema import HumanMessage
from langgraph.graph import StateGraph, END

class State(TypedDict):
    query: str
    response: str

def call_llm(state: State) -> State:
    llm = ChatOpenAI(temperature=0)
    messages = [HumanMessage(content=state["query"])]
    state["response"] = llm.invoke(messages).content
    return state

graph = StateGraph(State)
graph.add_node("llm_call", call_llm)
graph.set_entry_point("llm_call")
graph.set_finish_point("llm_call")

app = graph.compile()
result = app.invoke({"query": "What is the capital of France?", "response": ""})
print(result["response"])

最佳实践

1. 状态管理最佳实践

# 使用 TypedDict 确保类型安全
from typing import TypedDict, Optional, List

class ChatState(TypedDict):
    messages: List[dict]
    user_id: str
    session_data: Optional[dict]
    error: Optional[str]

2. 错误处理

def safe_node(state: State) -> State:
    try:
        # 业务逻辑
        result = process_data(state)
        return {"...": result}
    except Exception as e:
        state["error"] = str(e)
        return state

# 添加错误处理路径
def check_error(state: State) -> str:
    return "error_handler" if state.get("error") else "next_node"

3. 性能优化

# 使用缓存避免重复计算
from functools import lru_cache

@lru_cache(maxsize=100)
def expensive_computation(input_data: str) -> str:
    # 耗时操作
    return processed_result

def node_with_cache(state: State) -> State:
    state["result"] = expensive_computation(state["input"])
    return state

4. 可观测性

# 添加日志记录
import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

def monitored_node(state: State) -> State:
    logger.info(f"Processing state: {state}")
    result = process(state)
    logger.info(f"Result: {result}")
    return result

5. 测试策略

# 单元测试节点
def test_node():
    state = {"input": "test", "output": ""}
    result = my_node(state)
    assert result["output"] == "expected_output"
    
# 集成测试整个图
def test_graph():
    app = build_graph()
    result = app.invoke({"input": "test"})
    assert "output" in result

6. 常见陷阱避免

避免在节点内部修改共享状态

# ❌ 错误做法
def bad_node(state: State) -> State:
    state["shared_data"].append("value")  # 直接修改
    return state

# ✅ 正确做法
def good_node(state: State) -> State:
    new_state = state.copy()
    new_state["shared_data"] = state["shared_data"] + ["value"]
    return new_state

完整示例:问答系统

from typing import TypedDict, List, Optional
from langgraph.graph import StateGraph, END
from langchain_openai import ChatOpenAI
from langchain.schema import HumanMessage, SystemMessage

class QnAState(TypedDict):
    question: str
    context: Optional[str]
    answer: str
    confidence: float
    needs_clarification: bool

def validate_question(state: QnAState) -> QnAState:
    """验证问题是否有效"""
    if not state["question"] or len(state["question"]) < 3:
        state["needs_clarification"] = True
    return state

def handle_clarification(state: QnAState) -> QnAState:
    """处理需要澄清的问题"""
    state["answer"] = "Please provide a more specific question."
    return state

def retrieve_context(state: QnAState) -> QnAState:
    """检索相关上下文(模拟)"""
    # 实际中会从数据库或文档中检索
    state["context"] = f"Context related to: {state['question']}"
    return state

def generate_answer(state: QnAState) -> QnAState:
    """使用 LLM 生成答案"""
    llm = ChatOpenAI(temperature=0.7)
    system_msg = SystemMessage(content="Answer the question accurately.")
    human_msg = HumanMessage(content=f"Context: {state.get('context', 'No context')}\n\nQuestion: {state['question']}")
    response = llm.invoke([system_msg, human_msg])
    state["answer"] = response.content
    state["confidence"] = 0.9 if state.get("context") else 0.5
    return state

def decide_path(state: QnAState) -> str:
    """条件路由决策"""
    if state["needs_clarification"]:
        return "clarification"
    return "answer_generation"

# 构建图
graph = StateGraph(QnAState)
graph.add_node("validate", validate_question)
graph.add_node("clarification", handle_clarification)
graph.add_node("retrieval", retrieve_context)
graph.add_node("answer_generation", generate_answer)

graph.set_entry_point("validate")

# 条件边
graph.add_conditional_edges(
    "validate",
    decide_path,
    {
        "clarification": "clarification",
        "answer_generation": "retrieval"
    }
)

# 顺序边
graph.add_edge("retrieval", "answer_generation")
graph.add_edge("clarification", END)
graph.add_edge("answer_generation", END)

app = graph.compile()

# 执行
result = app.invoke({
    "question": "What is machine learning?",
    "context": None,
    "answer": "",
    "confidence": 0.0,
    "needs_clarification": False
})

print(f"Answer: {result['answer']}")
print(f"Confidence: {result['confidence']}")

进阶技巧

  1. 并行执行:使用 add_parallel_edges 实现并行节点
  2. 子图:创建可复用的子图模块
  3. 状态持久化:配合数据库实现长期状态存储
  4. 流式输出:使用 stream 方法实现实时输出

LangGraph 的强大之处在于将复杂的 AI 工作流抽象为有向图,使代码更清晰、可维护且易于调试。开始构建你的第一个图形化 AI 应用吧!

❌