普通视图

发现新文章,点击刷新页面。
昨天以前首页

多智能体协作 - 使用 LangGraph 子图实现

2026年3月31日 17:39

多智能体协作系统 - 使用 LangGraph 子图实现 功能:并行执行两个智能体(直播文案 + 小红书文案)

未命名绘图.png

import os
from typing import TypedDict, Any, Annotated

import dotenv
from langchain_community.tools import GoogleSerperRun
from langchain_community.utilities import GoogleSerperAPIWrapper
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnableConfig
from langchain_openai import ChatOpenAI
from langgraph.graph import StateGraph, add_messages
from langgraph.prebuilt import ToolNode, tools_condition
from pydantic.v1 import Field, BaseModel

dotenv.load_dotenv()

# ==================== 初始化 LLM ====================
llm = ChatOpenAI(
    model="qwen3-max-2026-01-23",
    api_key=os.getenv("OPENAI_API_KEY"),
    base_url=os.getenv("OPENAI_API_BASE_URL")
)


# ==================== 工具定义 ====================
class GoogleSerperArgsSchema(BaseModel):
    query: str = Field(description="执行谷歌搜索的查询语句")


google_serper = GoogleSerperRun(
    api_wrapper=GoogleSerperAPIWrapper(),
    args_schema=GoogleSerperArgsSchema,
)


# ==================== 状态归约函数 ====================
def reduce_str(left: str | None, right: str | None) -> str:
    """
    字符串归约函数:用于合并状态字段
    逻辑:如果新值存在且非空则使用新值,否则保留旧值
    !! 如果为空 传递给llm 会造成无限循环
    """
    if right is not None and right != "":
        return right
    return left


# ==================== 状态定义 ====================
class AgentState(TypedDict):
    """主图状态 - 包含所有共享数据"""
    query: Annotated[str, reduce_str]  # 原始问题/商品名
    live_content: Annotated[str, reduce_str]  # 直播文案
    xhs_content: Annotated[str, reduce_str]  # 小红书文案
    messages: Annotated[list, add_messages]  # 对话历史(自动追加消息)


class LiveAgentState(TypedDict):
    """直播智能体状态 - 继承主图所有字段 + messages"""
    query: Annotated[str, reduce_str]
    live_content: Annotated[str, reduce_str]
    xhs_content: Annotated[str, reduce_str]
    messages: Annotated[list, add_messages]


class XHSAgentState(TypedDict):
    """小红书智能体状态 - 继承主图所有字段 + messages"""
    query: Annotated[str, reduce_str]
    live_content: Annotated[str, reduce_str]
    xhs_content: Annotated[str, reduce_str]
    messages: Annotated[list, add_messages]


# ==================== 子图 1: 直播文案智能体 ====================
def chatbot_live(state: LiveAgentState, config: RunnableConfig) -> Any:
    """
    直播文案生成节点
    功能:根据商品名生成直播带货脚本文案,支持调用搜索工具
    """
    # 创建提示模板 + 绑定工具
    prompt = ChatPromptTemplate.from_messages([
        (
            "system",
            "你是一个拥有 10 年经验的直播文案专家,请根据用户提供的产品整理一篇直播带货脚本文案,如果在你的知识库内找不到关于该产品的信息,可以使用搜索工具。"
        ),
        ("human", "{query}"),
        ("placeholder", "{chat_history}"),
    ])
    chain = prompt | llm.bind_tools([google_serper])

    # 调用链生成回复
    ai_message = chain.invoke({"query": state["query"], "chat_history": state["messages"]})

    # 返回更新的状态
    return {
        "messages": [ai_message],  # 追加到消息历史
        "live_content": ai_message.content,  # 更新直播文案
    }


# 创建子图 1 结构
live_agent_graph = StateGraph(LiveAgentState)

# 添加节点
live_agent_graph.add_node("chatbot_live", chatbot_live)  # LLM 聊天节点
live_agent_graph.add_node("tools", ToolNode([google_serper]))  # 工具执行节点

# 添加边(控制流)
live_agent_graph.set_entry_point("chatbot_live")  # 入口点
live_agent_graph.add_conditional_edges(
    "chatbot_live", 
    tools_condition  # 动态路由:如果 LLM 决定调用工具 → tools 节点,否则 → 结束
)
live_agent_graph.add_edge("tools", "chatbot_live")  # 工具执行后返回 LLM

"""
子图 1 流程:
┌─────────┐
│  START  │
└────┬────┘
     │
     ▼
┌─────────────┐
│ chatbot_live│ ←───┐
└──────┬──────┘     │
       │            │
       ├─[需要工具]─→│ tools │
       │            └──────┘
       │
       └─[无需工具]─→ END
"""


# ==================== 子图 2: 小红书文案智能体 ====================
def chatbot_xhs(state: XHSAgentState, config: RunnableConfig) -> Any:
    """
    小红书文案生成节点
    功能:根据商品名生成小红书笔记文案(风格活泼,带 emoji)
    """
    # 创建提示模板 + 解析器
    prompt = ChatPromptTemplate.from_messages([
        ("system",
         "你是一个小红书文案大师,请根据用户传递的商品名,生成一篇关于该商品的小红书笔记文案,注意风格活泼,多使用 emoji 表情。"),
        ("human", "{query}"),
    ])
    chain = prompt | llm | StrOutputParser()

    # 调用链生成文案
    return {"xhs_content": chain.invoke({"query": state["query"]})}


# 创建子图 2 结构
xhs_agent_graph = StateGraph(XHSAgentState)

# 添加节点
xhs_agent_graph.add_node("chatbot_xhs", chatbot_xhs)

# 添加边
xhs_agent_graph.set_entry_point("chatbot_xhs")  # 入口
xhs_agent_graph.set_finish_point("chatbot_xhs")  # 出口


子图 2 流程:
┌─────────┐
│  START  │
└────┬────┘
     │
     ▼
┌────────────┐
│ chatbot_xhs│
└──────┬─────┘
       │
       ▼
    END



# ==================== 主图:编排两个子图 ====================
def parallel_node(state: AgentState, config: RunnableConfig) -> Any:
    """
    并行分发节点
    功能:透传状态,将请求分发给两个子智能体
    """
    return state


# 创建主图结构
agent_graph = StateGraph(AgentState)

# 添加节点(关键:添加的是编译后的子图)
agent_graph.add_node("parallel_node", parallel_node)  # 分发节点
agent_graph.add_node("live_agent", live_agent_graph.compile())  # 直播智能体(子图)
agent_graph.add_node("xhs_agent", xhs_agent_graph.compile())  # 小红书智能体(子图)

# 添加边(控制流)
agent_graph.set_entry_point("parallel_node")  # 从分发节点开始
agent_graph.add_edge("parallel_node", "live_agent")  # 并行执行直播智能体
agent_graph.add_edge("parallel_node", "xhs_agent")  # 并行执行小红书智能体

# 设置结束点(两个子图都完成后结束)
agent_graph.set_finish_point("live_agent")
agent_graph.set_finish_point("xhs_agent")


# 编译主图
agent = agent_graph.compile()

# 打印图的 ASCII 结构
print(agent.get_graph().print_ascii())

# 执行并获取结果
print("\n=== 执行结果 ===")
result = agent.invoke({"query": "潮汕牛肉丸"})
print(f"商品:{result['query']}")
print(f"\n直播文案:\n{result['live_content']}")
print(f"\n小红书文案:\n{result['xhs_content']}")

ReACT Agent 开发知识点总结

2026年3月26日 12:41

📚 ReACT Agent 开发知识点总结

python langchain QW 文生图

一、核心概念

1. Agent 的两种创建方式

方法 create_react_agent create_tool_calling_agent
原理 文本解析(Thought/Action/Observation) 原生工具调用(Function Calling)
提示词 严格格式要求 简单通用
可靠性 ⭐⭐⭐(易解析失败) ⭐⭐⭐⭐⭐(结构化调用)
模型要求 任意 LLM 支持 Function Calling 的模型
推荐度 旧项目兼容 强烈推荐

2. 工作流程对比

ReAct Agent:
用户输入 → LLM 生成文本 → 正则解析 → 提取 Action → 执行工具 → 返回 Observation
          ↑                                                        ↓
          └─────────────────── 循环多次 ───────────────────────────┘

Tool Calling Agent:
用户输入 → LLM 结构化输出 → 自动匹配工具 → 执行 → 返回结果
          (JSON 格式)                              ↓
                                            自动生成回答

二、关键代码模式

正确的 Tool Calling Agent 实现

from langchain.agents import create_tool_calling_agent, AgentExecutor
from langchain_core.tools import Tool
from langchain_openai import ChatOpenAI

# 1. 定义工具
def search_func(query: str) -> str:
    return f"搜索结果:{query}"

search_tool = Tool(
    name="search",
    description="搜索信息",
    func=search_func,
)

tools = [search_tool]

# 2. 初始化 LLM
llm = ChatOpenAI(model="qwen3-max", api_key="...", base_url="...")

# 3. 🔑 关键步骤:绑定工具
llm_with_tools = llm.bind_tools(tools)

# 4. 创建 Agent
prompt = ChatPromptTemplate.from_messages([
    ("system", "你是助手,可用工具:{tools}"),
    ("human", "{input}"),
    ("placeholder", "{agent_scratchpad}"),
])

agent = create_tool_calling_agent(
    llm=llm_with_tools,  # ✅ 使用绑定工具的 LLM
    tools=tools,          # ✅ 工具列表
    prompt=prompt
)

# 5. 创建执行器
executor = AgentExecutor(
    agent=agent,
    tools=tools,
    verbose=True,
    handle_parsing_errors=True,
    max_iterations=5
)

# 6. ✅ 正确调用方式
result = executor.invoke({"input": "用户问题"})
print(result["output"])

三、常见错误与修复

错误 1:直接传字符串给 invoke

# ❌ 错误
executor.invoke("用户问题")

# ✅ 正确
executor.invoke({"input": "用户问题"})

错误 2:未绑定工具

# ❌ 错误
agent = create_tool_calling_agent(llm=llm, tools=tools, ...)

# ✅ 正确
llm_with_tools = llm.bind_tools(tools)
agent = create_tool_calling_agent(llm=llm_with_tools, tools=tools, ...)

错误 3:使用非标准 Tool 类

# ❌ 容易出错
google_serper = GoogleSerperRun(...)

# ✅ 推荐
google_serper = Tool(
    name="google_serper",
    func=google_serper_api.run,
    description="..."
)

错误 4:Placeholder 格式错误

# ❌ 错误
("placeholder", "chat_history")

# ✅ 正确
MessagesPlaceholder(variable_name="chat_history", optional=True)
# 或
("placeholder", "{chat_history}")  # 注意花括号

四、工具定义最佳实践

1. 简单工具 - 使用 Tool 包装

tool = Tool(
    name="tool_name",
    func=function,
    description="清晰的用途说明",
)

2. 复杂参数 - 添加 args_schema

from pydantic.v1 import BaseModel, Field

class ArgsSchema(BaseModel):
    query: str = Field(..., description="参数描述")

tool = Tool(
    name="tool_name",
    func=function,
    args_schema=ArgsSchema,
    description="..."
)

3. 自定义 API - 封装函数

def generate_image(prompt: str) -> str:
    """使用阿里云生成图片"""
    try:
        response = MultiModalConversation.call(...)
        
        # ✅ 正确解析嵌套结构
        if response.output and response.output.get('choices'):
            content = response.output['choices'][0]['message']['content']
            for item in content:
                if 'image' in item:
                    return f"图片链接:{item['image']}"
        
        return "生成失败"
    except Exception as e:
        return f"出错:{str(e)}"

image_tool = Tool(
    name="image_generator",
    func=generate_image,
    description="根据描述生成图片",
    args_schema=DallEArgsSchema
)

五、API 响应解析技巧

阿里云通义万相响应结构

{
  "output": {
    "choices": [
      {
        "message": {
          "content": [
            {"image": "https://..."}  // 或 {"text": "..."}
          ]
        }
      }
    ]
  }
}

安全解析代码

def parse_response(response):
    try:
        # 多层防御式解析
        if response.output:
            choices = response.output.get('choices', [])
            if choices:
                content = choices[0].get('message', {}).get('content', [])
                for item in content:
                    if isinstance(item, dict) and 'image' in item:
                        return item['image']
        
        # 备用方案
        if response.output and response.output.get('text'):
            return response.output['text']
            
        return "无法解析响应"
    except Exception as e:
        return f"解析出错:{str(e)}"

六、提示词设计要点

优秀提示词模板

prompt = ChatPromptTemplate.from_messages([
    ("system", 
     "你是智能助手,可使用工具帮助用户。\n"
     "\n"
     "可用工具:\n"
     "{tools}\n"
     "\n"
     "使用格式:\n"
     "- Thought: 分析问题\n"
     "- Action: 选择工具\n"
     "- Action Input: 参数\n"
     "- Observation: 结果\n"
     "- Final Answer: 最终回答\n"
     "\n"
     "注意:\n"
     "- 生成图片用 openai_dalle\n"
     "- 搜索信息用 google_serper\n"
     "- 英文描述图片效果更好"
    ),
    MessagesPlaceholder("chat_history", optional=True),
    ("human", "{input}"),
    ("placeholder", "{agent_scratchpad}"),
])

七、调试技巧

1. 启用详细日志

executor = AgentExecutor(verbose=True, ...)

2. 限制迭代次数

executor = AgentExecutor(max_iterations=5, ...)  # 防止无限循环

3. 错误处理

executor = AgentExecutor(handle_parsing_errors=True, ...)  # 自动修复

4. 打印中间结果

def generate_image(prompt):
    print(f"收到请求:{prompt}")
    response = ...
    print(f"API 响应:{response}")
    return result

八、完整流程图

用户提问
   ↓
┌─────────────────────────────────────┐
│  AgentExecutor.invoke({"input": ...}) │
└─────────────────────────────────────┘
   ↓
┌─────────────────────────────────────┐
│  create_tool_calling_agent           │
│  - 分析是否需要工具                  │
│  - 输出结构化 JSON                   │
└─────────────────────────────────────┘
   ↓
┌─────────────────────────────────────┐
│  判断 Action                          │
│  - None → 直接回答                   │
│  - tool_name → 执行工具              │
└─────────────────────────────────────┘
   ↓
┌─────────────────────────────────────┐
│  Tool.run()                          │
│  - 调用实际函数                      │
│  - 返回 Observation                  │
└─────────────────────────────────────┘
   ↓
┌─────────────────────────────────────┐
│  循环或生成最终回答                  │
└─────────────────────────────────────┘

九、核心要点速记

必须做的:

  1. 使用 bind_tools() 绑定工具
  2. invoke() 传入字典 {"input": "..."}
  3. 使用 Tool 包装自定义函数
  4. 添加 handle_parsing_errors=True
  5. 设置 max_iterations 防止死循环

避免做的:

  1. 不要直接传字符串给 invoke()
  2. 不要使用非标准 Tool 类(如 GoogleSerperRun
  3. 不要忘记绑定工具就创建 Agent
  4. 不要使用错误的 Placeholder 格式
  5. 不要假设 API 响应结构固定

十、扩展学习

下一步可以学习:

  • 🔄 添加对话历史记忆(Memory)
  • 📊 多工具协同工作
  • 🎯 自定义 Agent 路由逻辑
  • 📝 更复杂的提示词工程
  • 🔐 生产环境的错误处理和重试机制

十、核心代码

import os
from dashscope import MultiModalConversation

import dotenv
from langchain_classic.agents import AgentExecutor, create_react_agent, create_tool_calling_agent
from langchain_community.tools import GoogleSerperRun
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.tools import render_text_description_and_args, Tool
from langchain_openai import ChatOpenAI
from pydantic.v1 import Field, BaseModel
from langchain_community.utilities import GoogleSerperAPIWrapper

dotenv.load_dotenv()


class GoogleSerperArgsSchema(BaseModel):
    query: str = Field(..., description="执行谷歌搜索的查询语句")


class DallEArgsSchema(BaseModel):
    query: str = Field(..., description="输入是生成图像的文本提示(prompt)")




def generate_image(prompt: str) -> str:
    """使用阿里云通义万相生成图片"""
    messages = [
        {
            "role": "user",
            "content": [
                {"text": prompt}
            ]
        }
    ]

    try:
        response = MultiModalConversation.call(
            api_key=os.getenv("OPENAI_API_KEY"),
            model="qwen-image-2.0-pro",
            messages=messages,
            result_format='message',
            stream=False,
            watermark=False,
            prompt_extend=True,
            negative_prompt="低分辨率,低画质,肢体畸形,手指畸形,画面过饱和,蜡像感,人脸无细节,过度光滑,画面具有 AI 感。构图混乱。文字模糊,扭曲。",
            size='2048*2048'
        )

        print("API 响应:", response)

        # 从 output.choices 中获取图片
        # {"status_code": 200, "request_id": "6ab051b0-6c8f-4675-974f-919143e91d98", "code": "", "message": "", "output": {"text": null, "finish_reason": null, "choices": [{"finish_reason": "stop", "message": {"role": "assistant", "content": [{"image": "https://dashscope-7c2c.oss-accelerate.aliyuncs.com/7d/c3/20260326/e74037e8/2d748881-31a4-4e0a-b2c3-1508ee17a3f1.png?Expires=1775104234&OSSAccessKeyId=LTAI5tPxpiCM2hjmWrFXrym1&Signature=4t53r7bOV3%2Fg01L%2FUPvouMgqKeQ%3D"}]}}], "audio": null}, "usage": {"input_tokens": 0, "output_tokens": 0, "characters": 0, "height": 2048, "image_count": 1, "width": 2048}}
        if response.output and response.output.get('choices'):
            choices = response.output['choices']
            if len(choices) > 0:
                message = choices[0].get('message', {})
                content = message.get('content', [])

                if isinstance(content, list) and len(content) > 0:
                    # 查找包含 image 的内容
                    for item in content:
                        if isinstance(item, dict) and 'image' in item:
                            image_url = item['image']
                            return f"图片已生成成功!\n图片链接:{image_url}"

                    # 如果没有找到 image,返回文本内容
                    text_content = content[0].get('text', '')
                    if text_content:
                        return f"生成结果:{text_content}"

        # 备用方案:尝试从 output.text 获取
        if response.output and response.output.get('text'):
            return f"生成结果:{response.output['text']}"

        return "图片生成失败,无法解析响应数据"

    except Exception as e:
        print(f"异常详情:{str(e)}")
        return f"图片生成出错:{str(e)}"





google_serper = GoogleSerperRun(
    name="google_serper",
    description=(
        "根据传入的搜索内容,返回搜索结果"
        "谷歌搜索工具"
    ),
    args_schema=GoogleSerperArgsSchema,
    api_wrapper=GoogleSerperAPIWrapper(),
)

dalle = Tool(
    name="openai_dalle",
    description="根据传入的描述生成图片。当用户要求生成图像、创建图片、画图时使用此工具。输入应该是详细的图像描述。",
    func=generate_image,
    args_schema=DallEArgsSchema,
)
tools = [
    google_serper,
    dalle
]

prompt = ChatPromptTemplate.from_messages([
    ("system", "Answer the following questions as best you can. You have access to the following tools:"),
    ("placeholder", "{chat_history}"),
    ("human", "{input}"),
    ("placeholder", "{agent_scratchpad}"),
])

llm = ChatOpenAI(
    model="qwen3-max-2026-01-23",
    api_key=os.getenv("OPENAI_API_KEY"),
    base_url=os.getenv("OPENAI_API_BASE_URL")
)
# 绑定工具到 LLM
agent = create_tool_calling_agent(
    llm=llm,
    tools=tools,
    prompt=prompt
)

agent_executor = AgentExecutor(agent=agent, tools=tools, verbose=True)

print(agent_executor.invoke({"input": "帮我生成一张老爷爷爬山的图片, 要求是 亚洲脸"}))
❌
❌