阅读视图

发现新文章,点击刷新页面。

用 LangChain 把大模型串起来:一个前端开发者的 AI 入门笔记

从零开始LangChain:构建你的第一个AI应用工作流

引言

2022年ChatGPT横空出世,让全世界见识了大型语言模型(LLM)的魔力。但你知道吗?有一个叫LangChain的框架其实比ChatGPT还早,最近它发布了1.0+版本,成为了AI应用开发的“明星框架”。

LangChain是什么?拆开名字:Lang(语言) + Chain(链)。它把大语言模型和一系列任务节点像链条一样连接起来,形成一个工作流。就像n8n、Coze这些可视化工具把节点串起来一样,LangChain用代码的方式帮你搭建AI应用。

这篇文章我会带你从零开始,一步步用LangChain写几个小例子,从最简单的模型调用,到用“链”组合复杂的任务流程。所有代码都基于ES Moduletype: "module"),你可以直接复制运行。

环境准备:先跑通一个最简单的例子

在开始之前,确保你安装了Node.js(18+版本),然后创建一个新项目,安装必要的依赖:

npm init -y
npm install dotenv langchain @langchain/deepseek

package.json中加入 "type": "module",这样我们就可以使用import语法。

创建一个.env文件,放你的DeepSeek API密钥(如果没有可以去platform.deepseek.com申请):

DEEPSEEK_API_KEY=你的密钥

现在,写第一个脚本main.js

import 'dotenv/config'
import { ChatDeepSeek } from '@langchain/deepseek'

// 初始化模型
const model = new ChatDeepSeek({
  model: 'deepseek-reasoner',
  temperature: 0,  // 控制随机性,0表示最确定
})

// 调用模型
const res = await model.invoke('用一句话解释什么是RAG')
console.log(res.content)

运行node main.js,你应该能看到模型输出的回答。

image.png

这段代码做了什么?

  • ChatDeepSeek是一个“适配器”,LangChain用它来统一不同大模型的接口。以后换GPT、Claude,只需要改一下import和配置,其余代码几乎不用动。
  • model.invoke是最核心的方法,把问题传给模型,然后得到回答。
  • API密钥从环境变量自动读取,不用写在代码里,安全又方便。

这就是LangChain最基础的用法:把大模型当成一个可调用的函数。

第一章:更灵活的提问——提示词模板

直接写死的提问太死板了。如果我想让模型扮演不同角色、限制回答长度,每次都拼接字符串会很麻烦。LangChain提供了PromptTemplate,像填空一样生成提示词。

新建1.js

import 'dotenv/config'
import { ChatDeepSeek } from '@langchain/deepseek'
import { PromptTemplate } from '@langchain/core/prompts'

// 定义一个模板
const prompt = PromptTemplate.fromTemplate(`
你是一个{role}。
请用不超过{limit}字回答以下问题:
{question}
`)

// 填入具体内容
const promptStr = await prompt.format({
  role: '前端面试官',
  limit: '50',
  question: '什么是闭包'
})

console.log('生成的提示词:', promptStr)

// 调用模型
const model = new ChatDeepSeek({
  model: 'deepseek-reasoner',
  temperature: 0.7,
})

const res = await model.invoke(promptStr)
console.log('回答:', res.content)

运行后,你会看到模型根据“前端面试官”的身份,用不超过50字解释了闭包。

  • 如图

image.pngPromptTemplate让我们把提示词的结构和内容分离,方便复用。比如你可以换一个角色问同样的问题,只需改format的参数即可。

第二章:什么是“链”?用pipe连接节点

上面的例子还是两步走:先生成提示词,再调用模型。LangChain的核心理念是“链”(Chain),它可以把多个步骤像管道一样连接起来,成为一个可执行的单元。

新建2.js

import 'dotenv/config'
import { ChatDeepSeek } from '@langchain/deepseek'
import { PromptTemplate } from '@langchain/core/prompts'

const model = new ChatDeepSeek({
  model: 'deepseek-reasoner',
  temperature: 0.7,
})

const prompt = PromptTemplate.fromTemplate(`
你是一个前端专家,用一句话解释:{topic}
`)

// 用 pipe 把 prompt 和 model 连接成一个链
const chain = prompt.pipe(model)
//打印chain可以看到它的类型为RunnableSequence,和它的节点
console.log(chain)
// 直接调用链,传入变量
const res = await chain.invoke({
  topic: '闭包'
})

console.log(res.content)
  • 执行效果图
  1. 执行打印chain

image.png

  1. 执行打印输出结果

image.png

注意看,prompt.pipe(model)返回了一个新的对象,它也是一个“可运行”的链。我们调用chain.invoke({ topic: '闭包' }),内部会自动执行:

  1. 用传入的{topic: '闭包'}填充prompt模板,生成提示词。
  2. 把提示词传给model,得到回答。
  3. 返回最终结果。

整个过程就像工厂流水线:原材料(topic)进入第一道工序(prompt模板),产物(提示词)直接传给下一道工序(模型),最后产出成品(回答)。

这就是LangChain最基础的链:RunnableSequence(可运行序列)。你不需要手动调用两次,代码更简洁,逻辑更清晰。

第三章:组合多个链——复杂任务的工作流

现实中的AI任务往往不止一步。比如我想让模型先详细解释一个概念,然后把这个解释总结成三个要点。这需要两个步骤,而且第二步要用到第一步的输出。

LangChain提供了RunnableSequence来组合多个链。我们新建3.js

import { ChatDeepSeek } from '@langchain/deepseek'
import { PromptTemplate } from '@langchain/core/prompts'
import { RunnableSequence } from '@langchain/core/runnables'
import 'dotenv/config'

const model = new ChatDeepSeek({
  model: 'deepseek-reasoner',
  temperature: 0.7,
})

// 第一步:详细解释概念
const explainPrompt = PromptTemplate.fromTemplate(`
你是一个前端专家,请详细介绍以下概念:{topic}
要求:覆盖定义、原理、使用方式,不超过300字。
`)

// 第二步:总结要点
const summaryPrompt = PromptTemplate.fromTemplate(`
请将以下前端概念解释总结为3个核心要点(每个要点不超过20字):
{explaination}
`)

// 分别构建两个链
const explainChain = explainPrompt.pipe(model)
//我们打印explainChain也可以看到它的类型和节点
console.log(explainChain)
const summaryChain = summaryPrompt.pipe(model)

// 用 RunnableSequence 组合它们
const fullChain = RunnableSequence.from([
  // 第一步:输入 topic -> 得到详细解释
  async (input) => {
    const res = await explainChain.invoke({ topic: input.topic })
    return res.content  // 将解释传给下一步
  },
  // 第二步:拿到上一步的解释 -> 生成总结
  async (explaination) => {
    const res = await summaryChain.invoke({ explaination })
    return `知识点:${explaination}\n总结:${res.content}`
  },
])

// 执行完整链
const res = await fullChain.invoke({
  topic: '闭包'
})
console.log(res)
  • 效果图
  1. 打印explainChain

image.png 2.打印输出结果

image.png

这段代码稍微复杂一点,但逻辑很清晰:

  • explainChain:输入topic,输出详细解释。
  • summaryChain:输入explaination(详细解释),输出总结。
  • 我们用RunnableSequence.from([...])把两个步骤串起来。数组里的每个元素是一个函数,接收上一步的输出,返回下一步的输入。
  • 最后调用fullChain.invoke({ topic: '闭包' }),内部自动执行两步,并把最终结果返回。

运行后你会看到模型先给出了关于闭包的详细解释(不超过300字),然后给出了三个要点总结。整个流程自动化完成,无需人工介入。

深入理解:LangChain的适配器模式与可拔插设计

你可能注意到,所有代码中我们只引用了@langchain/deepseek这一个具体模型包。如果我想换成OpenAI的GPT,该怎么做?只需要:

npm install @langchain/openai

然后把import { ChatDeepSeek }改成import { ChatOpenAI },model参数稍作调整即可,其余代码几乎不用动。

这就是LangChain的“适配器模式”。它定义了一套统一的接口(如invokestream等),各个模型厂商通过适配器实现这些接口。这样一来,你的业务逻辑和具体模型解耦,大模型更新换代再快,你只需换一个适配器,不用重写应用。

总结

通过这几个小例子,我们走过了LangChain的入门之路:

  1. 基础调用:用适配器连接大模型,执行最简单的问答。
  2. 提示词模板:用PromptTemplate动态构造输入,让提示词更灵活。
  3. 简单链:用pipe把模板和模型连接起来,形成可复用单元。
  4. 复杂链:用RunnableSequence组合多个链,实现多步骤工作流。

LangChain不仅仅是“链”,它还是一个完整的AI应用开发框架,提供了记忆、工具调用、代理(Agent)等高级功能。但无论多复杂的功能,底层都离不开我们今天学到的核心思想:把任务拆分成节点,用链条连接,让流程自动化

现在你已经掌握了LangChain的基本功,可以尝试用它搭建更酷的应用了,比如文档问答机器人、自动化报告生成器等等。如果在实践中遇到问题,欢迎在评论区留言交流。

从零搭建多 Agent 智能编排平台:NestJS + LangGraph 全栈实战

从零搭建多 Agent 智能编排平台:NestJS + LangGraph 全栈实战

本文分享一个开源项目 Nest-Agent——基于 NestJS + LangGraph 构建的多 Agent 编排平台,支持 Supervisor 自动路由、DAG 自定义工作流、RAG 知识库检索,遵循 AG-UI 标准协议实现 token 级流式输出。如果对你有帮助,欢迎 GitHub Star 支持 ⭐


为什么做这个项目?

当前 AI Agent 框架层出不穷,Python 生态的 LangChain、CrewAI 已经很成熟。但在 Node.js / TypeScript 生态,企业级的 Agent 编排后端方案相对匮乏。作为一个 NestJS 爱好者,我希望用最熟悉的技术栈打造一个:

  • 生产级的多 Agent 协同平台,不是 demo
  • 支持 Supervisor 自动路由DAG 自定义工作流 两种编排模式
  • 遵循 AG-UI 标准协议,前后端解耦,可对接 CopilotKit 等主流前端 SDK
  • 内置 RAG 知识库多 LLM 供应商多租户隔离,开箱即用

于是有了 Nest-Agent


技术栈一览

层次 技术
后端框架 NestJS 11
Agent 编排 LangChain + LangGraph
数据库 MySQL 8.0 (TypeORM)
缓存 Redis 7 (ioredis)
向量库 Milvus 2.3
认证 Passport JWT
前端 React 18 + shadcn/ui + Vite
包管理 pnpm workspace (monorepo)
部署 Docker Compose 一键启动

核心功能展示

1. AI 对话 — 流式输出 + 工具调用可视化

对话是整个平台的核心。输入一个问题,Supervisor 自动判断是否需要搜索、检索知识库还是直接回答,整个过程实时流式展示。

对话首页

流式输出

亮点:

  • Token 级别的流式输出,逐字显示 AI 回复
  • 实时展示 Agent 执行步骤(researcher → 搜索 → 生成回答)
  • 工具调用全程可视化——工具名、参数、返回结果一目了然
  • Markdown 富文本渲染(代码块、表格、列表、链接等)
  • 对话标题自动生成,不再是千篇一律的 "New Conversation"

对话详情

2. 工作流编排 — 可视化 DAG 工作流

除了 Supervisor 自动模式,你还可以自定义 DAG 工作流,精确控制 Agent 的执行流程。

工作流列表

创建工作流

支持的节点类型:

  • agent — 可调用工具的 AI Agent,支持自定义 system prompt
  • tool — 直接调用工具节点
  • condition — 条件分支,根据关键词路由到不同分支
  • start / end — 起止标记

3. RAG 知识库 — 语义检索

上传文档到知识库,Agent 对话时自动检索相关知识片段,实现基于私有数据的问答。

知识库列表

语义检索

RAG Pipeline:

文档 → 分块(1000/200) → BAAI/bge-m3 Embedding → Milvus 向量存储
查询 → Embedding → Milvus ANN 检索 → Top-K 结果

4. 认证系统 — 多租户隔离

登录注册

JWT 认证 + TenantGuard,所有数据按 tenantId 隔离,天然支持多团队使用。


架构设计

整体分层

┌──────────────────────────────────────────────────────────────┐
│                     React + shadcn/ui                         │
│                (SSE 客户端 + AG-UI 事件解析)                   │
└────────────────────────┬─────────────────────────────────────┘
                         │ HTTP POST + SSE 流
┌────────────────────────▼─────────────────────────────────────┐
│                     NestJS 应用层                              │
│  ┌──────┐ ┌──────┐ ┌───────┐ ┌──────┐                       │
│  │ Auth │ │ Chat │ │ Agent │ │ RAG  │  Controller 层          │
│  └──┬───┘ └──┬───┘ └───┬───┘ └──┬───┘                       │
│     │        │         │        │                             │
│  ┌──▼───┐ ┌──▼───┐ ┌───▼───┐ ┌──▼───┐                       │
│  │ Auth │ │ Chat │ │Agent  │ │ RAG  │  Service 层            │
│  └──────┘ └──────┘ └───┬───┘ └──────┘                       │
│                        │                                      │
│           ┌────────────┼────────────┐                        │
│     ┌─────▼─────┐ ┌────▼────┐ ┌────▼────┐                   │
│     │ Supervisor │ │   DAG   │ │  Tool   │  核心编排层        │
│     │  Factory   │ │ Engine  │ │Registry │                   │
│     └───────────┘ └─────────┘ └─────────┘                   │
│                                                               │
│     ┌─────────┐ ┌────────┐ ┌────────┐                       │
│     │   LLM   │ │ Milvus │ │ Redis  │  基础设施层            │
│     │ Service  │ │Service │ │Service │                       │
│     └─────────┘ └────────┘ └────────┘                       │
└──────────────────────────────────────────────────────────────┘
         │             │           │
    ┌────▼────┐  ┌─────▼────┐ ┌───▼──┐
    │OpenAI/  │  │ Milvus   │ │Redis │  存储层
    │Anthropic│  │ 2.3      │ │  7   │
    │/Qwen    │  └──────────┘ └──────┘
    └─────────┘

两种编排模式对比

Supervisor 模式(默认)

适合通用对话场景。LLM 充当"主管",根据用户意图动态路由到合适的 Agent:

用户: "搜索一下 NestJS 11 新功能"
  → Supervisor 判断: 需要搜索,路由到 researcher
  → researcher 调用 web_search 工具
  → researcher 根据搜索结果生成回答
  → Supervisor 判断: 任务完成,路由到 __end__

核心实现:Supervisor 使用 LLM + withStructuredOutput(zod schema) 做结构化输出,返回 { next: "researcher" | "responder" | "__end__" }

DAG 模式(自定义工作流)

适合复杂业务流程。用户预定义有向无环图,精确控制执行链路:

Start → 研究员Agent(调用搜索) → 条件判断 → 写手Agent(生成报告) → End
                                    ↘ 工具节点(直接检索) ↗

核心实现:将 JSON 格式的 nodes[] + edges[] 编译为 LangGraph 的 StateGraph,通过 Command({ goto, update }) 控制状态转移。


关键技术实现细节

1. AG-UI 流式事件协议

这是项目中最复杂也最有价值的部分。系统遵循 AG-UI 标准协议,通过 SSE 推送细粒度事件:

event: RUN_STARTED
data: {"type":"RUN_STARTED","threadId":"xxx","runId":"xxx"}

event: STEP_STARTED
data: {"type":"STEP_STARTED","stepName":"researcher"}

event: TOOL_CALL_START
data: {"type":"TOOL_CALL_START","toolCallId":"xxx","toolCallName":"web_search"}

event: TEXT_MESSAGE_CONTENT
data: {"type":"TEXT_MESSAGE_CONTENT","messageId":"xxx","delta":"逐字输出..."}

event: RUN_FINISHED
data: {"type":"RUN_FINISHED","threadId":"xxx","runId":"xxx"}

三段式文本消息TEXT_MESSAGE_START → TEXT_MESSAGE_CONTENT(增量) → TEXT_MESSAGE_END

四段式工具调用TOOL_CALL_START → TOOL_CALL_ARGS → TOOL_CALL_END → TOOL_CALL_RESULT

遵循标准协议意味着可以直接对接 CopilotKit 等前端 SDK,无需自定义解析。

2. LangGraph streamEvents → AG-UI 的精细转换

AgentService.processStreamEvents() 方法实现了 LangGraph 底层事件到 AG-UI 协议的映射,需要处理几个棘手的问题:

问题 1:Supervisor 路由节点的输出不应暴露给用户

Supervisor 的结构化输出({ next: "researcher" })是内部路由决策,不应该推送给前端。通过节点名过滤解决。

问题 2:嵌套子 Agent 的"思考"文本

createReactAgent 内部的 LLM 在调用工具前会输出一些"思考"文本(通常是复述 prompt),这些不应显示给用户。解决方案是通过 langgraph_checkpoint_ns 区分嵌套层级:

const isNestedAgent = parentNode && langgraphNode !== parentNode;
if (isNestedAgent && !nodeToolsDone.get(nodeKey)) {
  // 暂存到 pendingTextPerNode,工具完成后才释放
}

问题 3:XML 工具调用格式兼容

某些模型(如 qwen)会以 <tool_call> XML 标签输出工具调用。代码使用正则检测并过滤,避免 XML 标签泄露到前端。

3. 智能记忆管理

对话记忆采用 滑动窗口 + LLM 自动摘要 的混合策略:

消息数 ≤ 10        → 全量返回,无压缩
10 < 消息数 ≤ 20   → 滑动窗口,取最近 10 条
消息数 > 20        → LLM 对早期消息生成摘要,摘要 + 最近 10 条

摘要存储在 Conversation.summary 字段,支持增量摘要(新对话内容追加到现有摘要上)。会话消息缓存在 Redis,TTL 3600 秒,减少数据库查询。

4. 多 LLM 供应商抽象

统一的模型工厂,切换供应商只需一个参数:

// 用户请求时指定
{ llmOptions: { provider: "openai", model: "gpt-4o" } }
{ llmOptions: { provider: "dashscope", model: "qwen-max" } }
{ llmOptions: { provider: "anthropic", model: "claude-sonnet-4-20250514" } }

DashScope(通义千问)通过 OpenAI 兼容协议接入,自定义 baseURL 即可。同样的方式可以接入 SiliconFlow、Deepseek、vLLM 等任何兼容 OpenAI API 的服务。

5. 动态工具 + 多租户隔离

工具系统采用注册表模式。web_search 是全局静态工具,而 rag_retrieval 因为需要 tenantId 上下文,采用工厂函数动态创建

// 每次请求动态创建,确保 tenantId 隔离
const ragTool = createRagRetrievalTool(ragService, tenantId);

这样不同租户只能检索到自己的知识库数据。


项目结构

nest-agent/
├── src/                       # 后端(NestJS)
│   ├── auth/                  # JWT 认证 + 多租户守卫
│   ├── chat/                  # 对话管理 + SSE 流式接口 + 记忆策略
│   ├── agent/                 # 核心:Supervisor 路由 + DAG 引擎
│   │   ├── agent.service.ts   # 编排入口 + AG-UI 事件转换
│   │   ├── supervisor.factory.ts  # Supervisor 有向图构建
│   │   └── dag-engine.ts      # DAG 编译执行引擎
│   ├── rag/                   # RAG 知识库(Milvus 向量检索)
│   ├── llm/                   # 多 LLM 供应商抽象
│   ├── tools/                 # 工具注册中心
│   ├── redis/                 # Redis 缓存
│   ├── entities/              # TypeORM 实体
│   └── common/                # AG-UI 协议定义、配置、异常过滤器
├── web/                       # 前端(React + shadcn/ui + Vite)
│   └── src/
│       ├── pages/             # 对话、工作流、知识库、登录
│       ├── components/        # 布局 + shadcn/ui 组件
│       └── lib/               # API 封装、认证上下文
├── Dockerfile                 # 多阶段构建
├── docker-compose.yml         # MySQL + Redis + Milvus 一键启动
└── pnpm-workspace.yaml        # monorepo 配置

快速启动

Docker 一键部署

git clone https://github.com/peng-yin/nest-agent.git
cd nest-agent
cp .env.example .env
# 编辑 .env,填入 OPENAI_API_KEY 等配置
docker-compose up -d
# 访问 http://localhost:3000

本地开发

# 1. 启动基础设施
docker-compose up -d mysql redis etcd minio milvus-standalone

# 2. 安装依赖
pnpm install

# 3. 启动后端(热重载)
pnpm dev            # http://localhost:3000

# 4. 启动前端(另开终端)
pnpm dev:web        # http://localhost:5173

环境变量

变量 必填 说明
OPENAI_API_KEY OpenAI API Key
OPENAI_BASE_URL 自定义 API 地址(兼容 SiliconFlow/Deepseek)
TAVILY_API_KEY 网页搜索功能需要
JWT_SECRET 生产必填 JWT 签名密钥

踩过的坑

分享几个开发过程中遇到的典型问题:

1. createReactAgent 子图的"思考"文本泄露

使用 LangGraph 的 createReactAgent 时,内部 LLM 在决定调用工具前会输出一段"思考"文本。这些文本通过 streamEvents 被捕获并发送到前端,用户看到的是一堆 prompt 模板文字而非实际回答。

解决方案:通过 langgraph_checkpoint_ns 中的 langgraphNodeparentNode 区分顶层节点和子图内部调用。子图内部 LLM 的 langgraphNode"agent",而 parentNode"researcher",两者不同;普通顶层节点两者相同。利用 langgraphNode !== parentNode 识别嵌套调用,将工具调用前的文本暂存丢弃。

2. StateGraph 节点的 checkpointNs 理解偏差

最初以为只有 createReactAgent 构建的子图才有 checkpointNs,但实际上 StateGraph 中所有节点都有。导致 responder(直接回答)的正常文本也被错误暂存。debug 日志大法好。

3. 阿里云 DashScope 的 XML 工具调用

通过 OpenAI 兼容 API 调用 qwen 模型时,部分场景下工具调用不走标准的 tool_calls 字段,而是在文本中输出 <tool_call> XML 标签。需要正则检测并过滤,否则前端会显示一堆 XML。


后续计划

  • 支持更多工具(代码执行、文件上传、图片生成等)
  • 工作流可视化编辑器(拖拽式 DAG 编辑)
  • Agent 执行过程的可视化 Trace
  • 支持更多向量数据库(Pinecone、Qdrant)
  • 支持文件上传(PDF、Word 等文档直接入库)

写在最后

这个项目从架构设计到编码实现,全部由一个人完成。涵盖了 Agent 编排、流式通信、RAG 检索、多租户隔离等多个技术领域,希望能为 Node.js/TypeScript 生态的 AI Agent 开发提供一个可参考的实践案例。

代码完全开源,如果这个项目对你有帮助,或者你对 NestJS + AI Agent 感兴趣,欢迎:

  • Star 这个项目:GitHub - nest-agent
  • 🐛 提 Issue 或 PR,一起完善
  • 💬 留言交流,分享你的想法

感谢阅读!

langchain 1.0实现AI Agent 接入MCP实战

技术内容

前端:react TypeScript antd

后端:Nodejs express langchain

模型接口:硅基流动 阿里云百炼

functionCall: 天气查询(爬取数据) 搜索引擎(百度千帆) CSDN资讯获取

MCP: 12306票务查询 万相2.5-图像视频生成

oss: 阿里云oss

Node后端搭建

项目初始化

  1. 创建项目目录并初始化
pnpm init

生成 package.json 文件。

  1. 安装 TypeScript 及相关依赖
pnpm add -D typescript tsx @types/node

说明: typescript:TypeScript 编译器
tsx:直接运行 .ts 文件(开发时使用)
@types/node:Node.js 的类型定义

  1. 初始化 TypeScript 配置
npx tsc --init

这会生成 tsconfig.json。你可以根据需要调整配置,例如:

{
  "compilerOptions": {
    "target": "ES2020" /* 编译目标 JS 版本(匹配 Node.js 支持的版本,v16+ 支持 ES2020) */,
    "module": "nodenext" /* 模块系统(Node.js 默认使用 CommonJS,需与 Node 兼容) */,
    "outDir": "./dist" /* 编译后的 JS 文件输出目录(默认 dist,避免源码与编译产物混合) */,
    "rootDir": "./src" /* TS 源码目录(建议把所有 TS 代码放在 src 文件夹下) */,
    "strict": true /* 开启严格模式(强制类型检查,TS 核心优势,推荐必开) */,
    "esModuleInterop": true /* 兼容 ES 模块和 CommonJS 模块(避免导入第三方模块报错) */,
    "skipLibCheck": true /* 跳过第三方库的类型检查(加快编译速度) */,
    "forceConsistentCasingInFileNames": true /* 强制文件名大小写一致(避免跨系统问题) */,
    "moduleResolution": "nodenext",
    "lib": [
      "ES2022"
    ] /* 编译时包含的库文件(ES2020 包含 Promise、async/await 等) */
  },
  "include": ["./src/**/*"] /* 需要编译的 TS 文件(src 下所有文件及子目录) */,
  "exclude": ["node_modules", "dist"] /* 排除不需要编译的目录 */
}

注意:如果你使用的是较新版本的 Node.js(如 18+),推荐使用 "module": "NodeNext" 和 "moduleResolution": "NodeNext" 以支持 ESM。

  1. 通过 nodemon 实现代码修改后自动重启服务
    • 安装依赖
    pnpm add -D nodemon
    
    • 创建 nodemon.json 配置文件(可选但推荐) 在项目根目录创建 nodemon.json:
    {
        "ignore": [
            "chat-storage/**/*",
            "node_modules/**/*",
            "logs/**/*",
            "*.json",
            "*.csv",
            "*.txt"
        ],
        "watch": ["src/**/*.ts"],
        "delay": 1000
    }
    
    • 更新 package.json 脚本
    {
        "scripts": {
            "start": "nodemon --exec tsx ./src/main.ts"
        }
    }
    

依赖安装

  • express
pnpm add express
  • langchain
pnpm add langchain @langchain/langgraph @langchain/core @langchain/openai @langchain/mcp-adapters
  • 其他
pnpm add ali-oss uuid zod

ali-oss 用于处理oss
uuid是我这里用到了存储标识
zod类型限定

后端服务搭建

├── src/
│   ├── main.ts ★
│   ├── modelChat.ts ★
├── nodemon.json
├── package.json
├── tsconfig.json
└── README.md

在src下main.ts为express服务,modelChat.ts为路由和业务代码

// main.ts代码
// 服务器端代码(Express)
import express from "express";
import chatRoutes from "./modelChat.js";
import { fileURLToPath } from "url";
import { dirname, join, resolve } from "path";

const app: express.Express = express();

// 👇 暴露 Images 目录为静态资源
// 获取当前文件的绝对路径
const __filename = fileURLToPath(import.meta.url);
const __dirname = dirname(__filename);

// 使用 resolve(更健壮,自动处理路径分隔符和规范化)
export const IMAGES_DIR = resolve(__dirname, "..", "Images");

app.use("/images", express.static(IMAGES_DIR));
// 2. 配置 JSON 请求体解析中间件(关键!必须在路由前配置)
app.use(express.json());

// 3. 配置路由
chatRoutes(app);

app.listen(3000, () => {
  console.log("服务器运行在 http://localhost:3000");
});

modelChat.ts部分包含Agent主要逻辑。

Agent搭建

├── src/
│   ├── main.ts
│   ├── modelChat.ts ★
├── nodemon.json
├── package.json
├── tsconfig.json
└── README.md

搭建的Agent中包含了模型,工具调用或者MCP,中间件,存储等部分。

模型导入
import { ChatOpenAI } from "@langchain/openai";

// 使用deepSeek模型
const modelName = "deepseek-ai/DeepSeek-V3";

// 定义模型
const model = new ChatOpenAI({
    // CHAT_API 为实际模型方的key
  apiKey: CHAT_API,
  modelName: modelName,
  temperature: 0.7,
  timeout: 60000,
  configuration: {
    // 我使用了硅基流动的 因此修改基本Url为硅基流动官方网址
    baseURL: "https://api.siliconflow.cn/v1/"
  },
  streaming: true,
  maxTokens: 4096,
  frequencyPenalty: 0.5,
  n: 1,
});

其他各配置参数可看官方数据

functionCall创建
├── src/
│   ├── main.ts
│   ├── modelChat.ts
│   ├── tools.ts ★
├── nodemon.json
├── package.json
├── tsconfig.json
└── README.md

在src下新建tools.ts文件用来写functionCall。 文件中可以导入以下模块进行编写

import z from "zod";
// tool 工具创建
import { tool } from "@langchain/core/tools";
//tool中config类型
import { LangGraphRunnableConfig } from "@langchain/langgraph";

config是实现工具可观测、可控制的核心载体
方便后续:
调试;
前端展示(比如给用户显示「正在...」的加载状态);
审计 / 追溯。

函数调用是自定义的,可以按你自己的想法去创建。同时为了让ai更精准的找到要使用的工具,工具的描述一定要写详细明确。这里我使用了几个简单的功能。

获取CSDN资讯
// 获取csdn文章内容
const fetchData = tool(
  async (_, config: LangGraphRunnableConfig) => {
    config.writer?.("正在从CSDN论坛获取最新文章的相关数据内容...");
    const response = await fetch(
      "https://cms-api.csdn.net/v1/web_home/select_content?componentIds=www-info-list-new&channel=0"
    );
    const data = (await response.json()) as {
      data: { "www-info-list-new": { info: { list: any[] } } };
    };
    const allInfos = data.data["www-info-list-new"].info.list?.map((item) => {
      return {
        标题: item.title,
        摘要: item.summary,
        封面: item.cover,
        编辑时间: item.editTime,
        阅读量: item.viewCount,
        评论数: item.commentCount,
        点赞数: item.diggCount,
        收藏数: item.favoriteCount,
        发布时间: item.publish,
        链接: item.url,
        用户名: item.username,
        昵称: item.nickname,
        博客链接: item.blogUrl,
        来源: "CSDN",
      };
    });
    config.writer?.("CSDN论坛最新文章数据获取成功");
    return JSON.stringify(allInfos);
  },
  {
    name: "fetchData",
    description: "从CSDN论坛获取最新文章的相关数据内容",
  }
);
获取天气

类似功能

const getSubUrl = async (CityName: string) => {
  const res = await fetch("https://www.tianqi.com/chinacity.html");
  const html = await res.text();
  const reg = new RegExp(
    `<a\\s+href="(/[^"]+)"\\s*(title="[^"]+")?>${CityName}</a>`,
    "i"
  );
  const match = reg.exec(html);

  if (match) {
    return match[1];
  }
  return null;
};

// 获取天气情况
const getFutureWeather = tool(
  async ({ city }, config: LangGraphRunnableConfig) => {
    config.writer?.(`正在获取${city}的天气状况...`);
    const subUrl = await getSubUrl(city);
    const baseUrl = "https://www.tianqi.com";
    let url = "";
    if (subUrl) {
      url = baseUrl + subUrl + "7/";
    } else {
      return null;
    }
    console.log(url);
    // 2. 发送请求获取天气信息页面 HTML
    const res2 = await fetch(url);
    const html = await res2.text();

    const reg = /var prov = '([^']+)';/i;
    const match2 = html.match(reg);

    if (match2) {
      console.log(match2[1]);
      const prov = match2[1];
      const moreWeather = await fetch(
        `https://www.tianqi.com/tianqi/tianqidata/${prov}`,
        {
          headers: {
            "user-agent":
              "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/142.0.0.0 Safari/537.36",
          },
        }
      );
      const data = (await moreWeather.json()) as { data: any[] };
      config.writer?.(`${city}的天气状况获取成功`);
      return JSON.stringify({
        msg: "天气信息获取成功",
        data: data.data.slice(0, 7),
      });
    } else {
      config.writer?.(`${city}的天气状况获取失败`);
      return JSON.stringify({
        msg: "未匹配到天气信息内容",
      });
    }
  },
  {
    name: "getFutureWeather",
    schema: z.object({
      city: z.string().describe("城市中文名称"),
    }),
    description: "获取指定城市的天气状况",
  }
);
搜索引擎

这里使用了api调用,相关配置参数可以看官网文档。

// 搜索引擎
const searchTool = tool(
  async ({ keyword }, config: LangGraphRunnableConfig) => {
    config.writer?.(`正在搜索${keyword}...`);
    try {
      const res = await fetch(
        `https://qianfan.baidubce.com/v2/ai_search/web_search`,
        {
          method: "POST",
          headers: {
            "Content-Type": "application/json",
            // SEARCH_API 是你的个人api,这个接口每天可以免费使用一定次数
            Authorization: `Bearer ${SEARCH_API}`,
          },
          body: JSON.stringify({
            messages: [
              {
                role: "user",
                content: keyword,
              },
            ],
            edition: "standard",
            search_source: "baidu_search_v2",
            search_recency_filter: "week",
          }),
        }
      );
      const data = await res.json();
      config.writer?.(`${keyword}的搜索结果获取成功`);
      return JSON.stringify(data);
    } catch (e) {
      config.writer?.(`${keyword}的搜索结果获取失败: ${e}`);
      return JSON.stringify({
        msg: "搜索结果获取失败",
      });
    }
  },
  {
    name: "searchTool",
    schema: z.object({
      keyword: z.string().describe("搜索关键词"),
    }),
    description: `当需要调用搜索功能时使用。搜索结果需要在文中标注来源。
      通用搜索引擎工具,用于获取互联网实时信息、最新数据、新闻资讯、行业动态等,核心能力:
      - 支持模糊查询和场景化需求(如「今天金价」「最新新闻」「实时天气」「近期政策」);
      - 能解析时间限定词(今天/昨天/最近一周/2025年11月)、领域限定词(国内/国际/A股/科技);
      - 适用于以下场景:
        1. 查询实时数据(金价、油价、汇率、股票行情);
        2. 获取最新新闻(热点事件、行业资讯、政策公告);
        3. 查找时效性强的信息(天气、交通、赛事结果);
        4. 其他需要联网获取的动态信息;
      调用条件:当用户问题涉及「实时性」「最新动态」「需要联网确认」的内容时。
    `,
  }
);
MCP使用
├── src/
│   ├── main.ts
│   ├── modelChat.ts ★
│   ├── tools.ts 
├── nodemon.json
├── package.json
├── tsconfig.json
└── README.md

MCP使用非常简单,直接远程使用URL可以,也可以下载源码本地调用,下面我将使用两种方式实现。

12306-MCP车票查询工具

使用到了魔塔社区的MCP
www.modelscope.cn/mcp/servers… 本地找个文件目录(需要记得位置,后续配置使用),下载源码 在这里插入图片描述 配置MCP

import { MultiServerMCPClient } from "@langchain/mcp-adapters";

// 配置MCP
const client = new MultiServerMCPClient({
    // mcp名字随便取我使用12306
  "12306": {
    transport: "stdio", // Local subprocess communication
    command: "node",
    // 这里便是你下载源码的路径位置,我是放在D:\\Learn\\MCP\\12306-mcp\\build下
    args: !!["D:\\Learn\\MCP\\12306-mcp\\build\\index.js"]!!,
  },
});
万相2.5-图像视频生成

需要注意langchain的参数名字需要调整,其他和官方的示例差不多。 往MCP配置中加入万相MCP远程Url

// 配置MCP
const client = new MultiServerMCPClient({
  "12306": {
    transport: "stdio",
    command: "node",
    args: ["D:\\Learn\\MCP\\12306-mcp\\build\\index.js"],
  },
  WanImage: {
    transport: "sse",
    url: "https://dashscope.aliyuncs.com/api/v1/mcps/Wan25Media/sse",
    headers: {
        // 这里DASHSCOPE_API是你自己的key,从官网获取
      Authorization: `Bearer ${DASHSCOPE_API}`,
    },
  },
});

const MCPTools = await client.getTools();
中间件 middleware
├── src/
│   ├── main.ts
│   ├── modelChat.ts ★
│   ├── tools.ts 
├── nodemon.json
├── package.json
├── tsconfig.json
└── README.md

在每一步控制并自定义智能体的执行过程

中间件提供了一种更精细地控制智能体内部执行逻辑的方式。中间件适用于以下场景:

  • 通过日志、分析与调试来追踪智能体行为。
  • 对提示词、工具选择与输出格式进行转换处理。
  • 添加重试、降级方案与提前终止逻辑。
  • 应用限流、安全护栏与个人身份信息(PII)检测。

langchain官方有写好的中间件,我们也可以自定义中间件,详细可看文档 docs.langchain.com/oss/javascr…

下面我将使用几个简单的中间件。

重试

通过自定义实现

import {
  createMiddleware,
} from "langchain";

const createRetryMiddleware = (maxRetries = 3) => {
  return createMiddleware({
    name: "RetryMiddleware",
    wrapModelCall: (request: any, handler: any) => {
      for (let attempt = 0; attempt < maxRetries; attempt++) {
        try {
          return handler(request);
        } catch (e) {
          if (attempt === maxRetries - 1) {
            throw e;
          }
          console.log(`Retry ${attempt + 1}/${maxRetries} after error: ${e}`);
        }
      }
      throw new Error("Unreachable");
    },
  });
};
动态SystemPrompt

用于动态修改ai设定,直接从库里获取

import {
  dynamicSystemPromptMiddleware
} from "langchain";
Human-in-the-Loop (HITL)

直接从库里获取

用于为Agent工具调用时增加人工监督。

当模型提出可能需要审查的动作时——例如我这里用于图片提示词生成——中间件可以暂停执行并等待用户决定是否按当前提示词生成。

import {
  humanInTheLoopMiddleware
} from "langchain";
存储
├── src/
│   ├── main.ts
│   ├── modelChat.ts
│   ├── storage.ts ★
├── nodemon.json
├── package.json
├── tsconfig.json
└── README.md

可分为短期长期

这里我简单使用了文件记录方式实现对话记录存储。

  • 新增 storage.ts 文件封装核心存储逻辑,采用「用户-会话-文件分层」结构管理聊天记录,工具会自动按以下结构组织文件,无需手动创建:
    chat-storage/          # 存储根目录
    ├── user_001/          # 用户目录(以userId命名)
    │   ├── thread_001/    # 会话目录(以threadId命名)
    │   │   ├── meta.json  # 会话元信息文件
    │   │   ├── chatLog-1.json  # 第1个聊天文件
    │   │   ├── chatLog-2.json  # 第2个聊天文件(达到阈值后自动创建)
    │   │   └── ...
    │   └── thread_002/    # 其他会话
    └── user_002/          # 其他用户
  • 自动按消息数(单文件最多100条)/文件体积(单文件最大5MB)切分文件,避免单文件过大

  • 会话元信息文件:

    字段 类型 说明
    threadId string 会话 ID
    userId string 用户 ID
    currentFileIndex number 当前最新聊天文件序号(从 1 开始)
    totalMessages number 该会话总消息数
    lastUpdated string 会话最后更新时间
    systemMsg string 该会话的系统提示词
  • 核心能力:消息持久化存储、历史消息读取(全量/最新N条)、会话元信息管理、会话数据删除

具体方案代码如下:

import fs from "fs/promises";
import path from "path";
import { v4 as uuidv4 } from "uuid"; // 生成唯一消息ID(需安装:pnpm add uuid)
import { fileURLToPath } from "url"; // ESM 内置模块,无需安装
import { formatDate } from "./utils/tools.js";

// 1. 计算当前文件路径(等效于 __filename)
const __filename = fileURLToPath(import.meta.url);

// 2. 计算当前文件目录(等效于 __dirname)
const __dirname = path.dirname(__filename);

// 配置项(可根据需求调整)
const CONFIG = {
  STORAGE_ROOT: path.resolve(__dirname, "../chat-storage"), // 存储根目录
  MAX_MESSAGES_PER_FILE: 100, // 每个文件最多消息数
  MAX_FILE_SIZE_MB: 5, // 每个文件最大体积(MB)
  MAX_FILE_SIZE_BYTES: 5 * 1024 * 1024, // 转换为字节
};

// 消息结构定义
export interface ChatMessage {
  id: string; // 消息唯一ID
  role: "user" | "assistant" | "system";
  content: string;
  timestamp: string;
  metadata?: Record<string, any>; // 附加信息(可选)
}

// Thread 元信息结构
interface ThreadMeta {
  threadId: string;
  userId: string;
  currentFileIndex: number; // 当前最新文件序号(如 1、2、3)
  totalMessages: number; // 该 thread 总消息数
  lastUpdated: string; // 最后更新时间
  systemMsg: string; // 系统消息
}

/**
 * 对话存储工具类:支持按用户/threadId 分文件夹、自动切分大文件
 */
export class ChatStorage {
  private rootDir: string;

  constructor() {
    this.rootDir = CONFIG.STORAGE_ROOT;
    this.initRootDir(); // 初始化根目录
  }

  // 初始化根目录(不存在则创建)
  private async initRootDir() {
    try {
      await fs.access(this.rootDir);
    } catch {
      await fs.mkdir(this.rootDir, { recursive: true });
      console.log(`创建存储根目录:${this.rootDir}`);
    }
  }

  // 获取用户目录路径
  private getUserDir(userId: string): string {
    return path.join(this.rootDir, userId);
  }

  // 获取 Thread 目录路径
  private getThreadDir(userId: string, threadId: string): string {
    return path.join(this.getUserDir(userId), threadId);
  }

  // 获取 Thread 元信息文件路径
  private getThreadMetaPath(userId: string, threadId: string): string {
    return path.join(this.getThreadDir(userId, threadId), "meta.json");
  }

  // 获取当前对话文件路径(根据元信息的 currentFileIndex)
  private getCurrentChatFilePath(
    userId: string,
    threadId: string,
    fileIndex: number
  ): string {
    return path.join(
      this.getThreadDir(userId, threadId),
      `chatLog-${fileIndex}.json`
    );
  }

  // 初始化 Thread(创建用户/thread 目录 + 元信息文件)
  private async initThread(
    userId: string,
    threadId: string
  ): Promise<ThreadMeta> {
    const threadDir = this.getThreadDir(userId, threadId);
    const metaPath = this.getThreadMetaPath(userId, threadId);

    // 创建用户和 thread 目录
    await fs.mkdir(threadDir, { recursive: true });

    // 初始化元信息(如果元信息文件不存在)
    try {
      await fs.access(metaPath);
      const metaContent = await fs.readFile(metaPath, "utf-8");
      return JSON.parse(metaContent) as ThreadMeta;
    } catch {
      const initialMeta: ThreadMeta = {
        threadId,
        userId,
        currentFileIndex: 1, // 从第1个文件开始
        totalMessages: 0,
        lastUpdated: formatDate(new Date()),
        systemMsg: "", // 系统消息
      };
      await fs.writeFile(
        metaPath,
        JSON.stringify(initialMeta, null, 2),
        "utf-8"
      );
      return initialMeta;
    }
  }

  // 更新 Thread 元信息
  public async updateThreadMeta(
    userId: string,
    threadId: string,
    meta: Partial<ThreadMeta>
  ) {
    const metaPath = this.getThreadMetaPath(userId, threadId);
    const currentMeta = await this.getThreadMeta(userId, threadId);
    const updatedMeta = {
      ...currentMeta,
      ...meta,
      lastUpdated: formatDate(new Date()),
    };
    await fs.writeFile(metaPath, JSON.stringify(updatedMeta, null, 2), "utf-8");
    return updatedMeta;
  }

  // 获取 Thread 元信息
  public async getThreadMeta(
    userId: string,
    threadId: string
  ): Promise<ThreadMeta> {
    const metaPath = this.getThreadMetaPath(userId, threadId);
    try {
      const metaContent = await fs.readFile(metaPath, "utf-8");
      return JSON.parse(metaContent) as ThreadMeta;
    } catch {
      return await this.initThread(userId, threadId);
    }
  }

  // 检查当前文件是否需要切分(达到消息数或体积阈值)
  private async needSplitFile(
    userId: string,
    threadId: string,
    currentFileIndex: number,
    newMessage: ChatMessage
  ): Promise<boolean> {
    const filePath = this.getCurrentChatFilePath(
      userId,
      threadId,
      currentFileIndex
    );

    try {
      // 1. 读取当前文件的消息数
      const fileContent = await fs.readFile(filePath, "utf-8");
      const messages: ChatMessage[] = fileContent
        ? JSON.parse(fileContent)
        : [];

      // 2. 检查消息数阈值:当前消息数 + 1 条新消息 > 最大限制
      if (messages[0].content.length > CONFIG.MAX_MESSAGES_PER_FILE) {
        return true;
      }

      // 3. 检查文件体积阈值:计算添加新消息后的体积
      const updatedMessages = [...messages, newMessage];
      const updatedContent = JSON.stringify(updatedMessages, null, 2);
      const updatedSize = Buffer.byteLength(updatedContent, "utf-8");

      return updatedSize > CONFIG.MAX_FILE_SIZE_BYTES;
    } catch {
      // 文件不存在(如刚创建 thread),无需切分
      return false;
    }
  }

  /**
   * 保存单条对话消息(自动切分文件)
   * @param userId 用户名
   * @param threadId 会话ID
   * @param message 消息内容(无需传 id 和 timestamp,自动生成)
   */
  public async saveMessage(
    userId: string,
    threadId: string,
    message: Omit<ChatMessage, "id" | "timestamp">
  ): Promise<ChatMessage> {
    // 补全消息的 id 和 timestamp
    const fullMessage: ChatMessage = {
      id: `msg_${Date.now()}_${uuidv4().slice(-8)}`, // 时间戳+短UUID,确保唯一
      timestamp: new Date().toISOString(),
      ...message,
    };

    // 初始化 thread(创建目录和元信息)
    let meta = await this.initThread(userId, threadId);
    let currentFileIndex = meta.currentFileIndex;

    // 检查是否需要切分文件:需要则递增文件序号
    const needSplit = await this.needSplitFile(
      userId,
      threadId,
      currentFileIndex,
      fullMessage
    );
    console.log(needSplit, "是否需要切分文件");

    if (needSplit) {
      currentFileIndex = meta.currentFileIndex + 1;
      // 更新元信息中的当前文件序号
      await this.updateThreadMeta(userId, threadId, { currentFileIndex });
    }

    // 写入当前文件(追加新消息)
    const targetFilePath = this.getCurrentChatFilePath(
      userId,
      threadId,
      currentFileIndex
    );
    try {
      // 读取现有消息(文件不存在则为空数组)
      let existingMessages: ChatMessage[] = [];
      try {
        const fileContent = await fs.readFile(targetFilePath, "utf-8");
        existingMessages = fileContent ? JSON.parse(fileContent) : [];
      } catch {}
      // 追加新消息并写入文件
      const updatedMessages = [...existingMessages, fullMessage];
      await fs.writeFile(
        targetFilePath,
        JSON.stringify(updatedMessages, null, 2),
        "utf-8"
      );

      // 更新元信息:总消息数+1
      await this.updateThreadMeta(userId, threadId, {
        totalMessages: meta.totalMessages + 1,
      });

      console.log(
        `消息保存成功:${targetFilePath} (消息ID: ${fullMessage.id})`
      );
      return fullMessage;
    } catch (error) {
      console.error(`消息保存失败:`, error);
      throw new Error(`保存消息失败:${(error as Error).message}`);
    }
  }

  /**
   * 读取某个 thread 的所有对话消息(按时间排序)
   * @param userId 用户名
   * @param threadId 会话ID
   * @returns 按时间戳升序排列的所有消息
   */
  public async readAllMessages(
    userId: string,
    threadId: string
  ): Promise<ChatMessage[]> {
    const meta = await this.getThreadMeta(userId, threadId);
    const threadDir = this.getThreadDir(userId, threadId);
    const allMessages: ChatMessage[] = [];

    // 遍历所有 chatLog 文件(从 1 到 currentFileIndex)
    for (let i = 1; i <= meta.currentFileIndex; i++) {
      const filePath = this.getCurrentChatFilePath(userId, threadId, i);
      try {
        const fileContent = await fs.readFile(filePath, "utf-8");
        const messages: ChatMessage[] = fileContent
          ? JSON.parse(fileContent)
          : [];
        allMessages.push(...messages);
      } catch {
        console.warn(`跳过不存在的文件:${filePath}`);
        continue;
      }
    }

    // 按时间戳升序排序(确保消息顺序正确)
    allMessages.sort(
      (a, b) =>
        new Date(a.timestamp).getTime() - new Date(b.timestamp).getTime()
    );
    return allMessages;
  }

  /**
   * 读取某个 thread 的最新 N 条消息(用于智能体上下文回溯)
   * @param userId 用户名
   * @param threadId 会话ID
   * @param limit 最多读取条数
   * @returns 最新的 N 条消息(按时间降序)
   */
  public async readRecentMessages(
    userId: string,
    threadId: string,
    limit: number = 20
  ): Promise<ChatMessage[]> {
    const allMessages = await this.readAllMessages(userId, threadId);
    // 取最后 N 条,按时间降序排列
    return allMessages.slice(-limit).reverse();
  }

  /**
   * 删除某个 thread 的所有对话(含目录和文件)
   * @param userId 用户名
   * @param threadId 会话ID
   */
  public async deleteThread(
    userId: string,
    threadId: string
  ): Promise<boolean> {
    const threadDir = this.getThreadDir(userId, threadId);
    try {
      await fs.rm(threadDir, { recursive: true, force: true });
      console.log(`删除 thread 成功:${threadDir}`);
      return true;
    } catch (error) {
      console.error(`删除 thread 失败:`, error);
      return false;
    }
  }
}

interface IThreadIdInfo {
  threadId: string;
  systemMsg: string;
}

/**
 * 初始化加载已有文件到 threadId-用户名 映射
 * @returns Map<string, IThreadIdInfo[]>  key: threadId, value: 关联的用户信息数组(理论上一个 threadId 对应一个用户)
 */
export async function initThreadIdToUserNameMap(): Promise<
  Map<string, IThreadIdInfo[]>
> {
  const mapThreadIdToUserName = new Map<string, IThreadIdInfo[]>();
  try {
    // 1. 检查存储根目录是否存在,不存在则直接返回空映射
    try {
      await fs.access(CONFIG.STORAGE_ROOT);
    } catch {
      console.log(`存储根目录 ${CONFIG.STORAGE_ROOT} 不存在,初始化空映射`);
      return mapThreadIdToUserName;
    }

    // 2. 遍历所有用户目录(chat-storage/用户名)
    const userDirs = await fs.readdir(CONFIG.STORAGE_ROOT, {
      withFileTypes: true,
    });
    for (const userDir of userDirs) {
      // 只处理目录(排除文件)
      if (!userDir.isDirectory()) continue;

      const userName = userDir.name; // 用户名 = 目录名
      const userDirPath = path.join(CONFIG.STORAGE_ROOT, userName);

      // 3. 遍历当前用户目录下的所有 thread 目录(chat-storage/用户名/threadId)
      const threadDirs = await fs.readdir(userDirPath, { withFileTypes: true });
      for (const threadDir of threadDirs) {
        // 只处理目录(排除文件如 meta.json)
        if (!threadDir.isDirectory()) continue;

        const threadId = threadDir.name; // threadId = 目录名
        const threadDirPath = path.join(userDirPath, threadId);
        const metaPath = path.join(threadDirPath, "meta.json"); // thread 元信息文件
        // 4. 读取 meta.json(可选,提取更多信息)
        let threadMeta: Partial<IThreadIdInfo> = {};
        try {
          const metaContent = await fs.readFile(metaPath, "utf-8");
          const meta = JSON.parse(metaContent);
          threadMeta = {
            systemMsg: meta.systemMsg || "",
          };
        } catch (error) {
          console.warn(
            `thread ${threadId} 的 meta.json 不存在或损坏,跳过元信息读取`
          );
        }

        // 6. 构建关联信息
        const threadInfo: IThreadIdInfo = {
          threadId,
          systemMsg: threadMeta.systemMsg || "",
        };
        if (mapThreadIdToUserName.has(userName)) {
          mapThreadIdToUserName.get(userName)?.push(threadInfo);
        } else {
          mapThreadIdToUserName.set(userName, [threadInfo]);
        }
      }
    }
    console.log(
      `初始化完成:共加载 ${mapThreadIdToUserName.size} 个 threadId 映射`
    );
    return mapThreadIdToUserName;
  } catch (error) {
    console.error("初始化 threadId-用户名 映射失败:", error);
    return mapThreadIdToUserName; // 失败时返回空映射
  }
}

搭建Agent
├── src/
│   ├── main.ts
│   ├── modelChat.ts ★
│   ├── tools.ts 
├── nodemon.json
├── package.json
├── tsconfig.json
└── README.md

将上述各部分进行整合,配置

import {
  createAgent,
} from "langchain";

const allTools = [
// CSDN资讯funCall
  fetchData,
// 天气funCall
  getFutureWeather,
//   搜索引擎funCall
  searchTool,
//   MCP
  ...MCPTools,
];

 // 定义Agent
  const Agent = createAgent({
    model: model,
    tools: allTools,
    middleware: [
      createRetryMiddleware(),
      dynamicSystemPromptMiddleware((state, runtime: { context: IContext }) => {
        const userName = runtime.context?.userName;
        const threadId = runtime.context?.thread_id;
        return (
            // 这里配置system
          getThreadId(userName, threadId)?.systemMsg ||
          `你是一个智能助手. 称呼用户为${userName}.`
        );
      }),
    //   人工监督决策功能
      humanInTheLoopMiddleware({
        interruptOn: {
          getFutureWeather: {
            allowedDecisions: ["approve", "reject"],
            description: "是否确认获取天气信息",
          },

          modelstudio_image_gen_wan25: {
            allowedDecisions: ["approve", "reject"],
            description: "是否确认生成图片",
          },

          modelstudio_image_edit_wan25: {
            allowedDecisions: ["approve", "reject"],
            description: "是否确认编辑图片",
          },
        },
        descriptionPrefix: "功能执行前需要用户确认",
      }),
    ]
  });

至此Agent搭建完成。后续便是路由。

路由配置

├── src/
│   ├── main.ts
│   ├── modelChat.ts ★
│   ├── tools.ts 
├── nodemon.json
├── package.json
├── tsconfig.json
└── README.md

功能包含:用户提问对话(流式传输),设定系统消息,历史记录获取,移除会话等

用户提问对话(流式传输)

这部分需要处理不同的消息类型以及图片保存到oss。

消息有几种类型:messages,custom,updates

类型 核心含义 典型使用场景
messages 核心对话消息 AI 回复用户的核心文本 / 多媒体内容(如问答、闲聊、指令响应),是最基础的类型
custom 自定义消息 业务侧扩展的非标消息(如带按钮的卡片、专属业务字段的回复、个性化模板消息)
updates 状态更新消息 AI 回复的过程性 / 状态类通知(如 “正在生成回答”“内容已更新”“会话状态变更”)

根据不同类型需要进行不同处理,已得到更好的消息提示。

具体代码如下:

app.post("/chat", async (req, res) => {
    const userMessage = req.body.userMsg;
    const userName = req.body.userName;
    // 历史消息标识
    const thread_id = req.body.thread_id;

    // 中断交互情况,用于人工监督控制
    const interruptCallParams = req.body.interruptCallParams;

    console.log(userMessage, userName, thread_id);

    // 2. 设置 SSE 响应头(关键)
    res.writeHead(200, {
      "Content-Type": "text/event-stream",
      "Cache-Control": "no-cache", // 禁用缓存,避免流被浏览器缓存中断
      Connection: "keep-alive", // 维持长连接
      "X-Accel-Buffering": "no", // 禁用 Nginx 缓冲(若用 Nginx 反向代理)
    });
    try {
      // 如果用户有消息,保存用户消息
      if (userMessage) {
        await chatStorage.saveMessage(userName, thread_id, {
          role: "user",
          content: userMessage,
          metadata: { view: "web" },
        });
      }

      let chatParams = null;

      // 中断交互情况,通过Command指令
      if (interruptCallParams) {
        chatParams = new Command({
          resume: { decisions: [interruptCallParams] },
        });
      } else {
        const history = await chatStorage.readAllMessages(userName, thread_id);
        chatParams = {
          messages: history as any,
        };
      }

      // 流式请求
      const aiResponse = await Agent.stream(chatParams, {
        configurable: { thread_id: thread_id },
        streamMode: ["updates", "messages", "custom"],
        context: { userName: userName, thread_id: thread_id },
      });
      let allMessages = "";
      for await (const [streamMode, chunk] of aiResponse) {
        if (streamMode === "messages" && !(chunk[0] instanceof ToolMessage)) {
          // 用 SSE 格式包装(data: 内容\n\n),前端可直接解析
          if (chunk[0].content) {
            res.write(
              `data: ${JSON.stringify({
                type: "messages",
                content: chunk[0].content,
              })}\n\n`
            );
          }
        } else if (streamMode === "custom") {
          res.write(
            `data: ${JSON.stringify({ type: "custom", content: chunk })}\n\n`
          );
        } else if (streamMode === "updates") {
          if (chunk["model_request"]) {
            // 完整消息
            const fullMsg = chunk["model_request"].messages[0].content;
            // 中断交互情况会返回空字符串情况
            if (fullMsg) allMessages = fullMsg as string;
          }
          // 处理中断,需要用户手动确认
          if (chunk["__interrupt__"]) {
            res.write(
              `data: ${JSON.stringify({
                type: "interrupt",
                content: (chunk["__interrupt__"] as any)[0].value.actionRequests,
              })}\n\n`
            );
          }
        }
      }

      // 图片处理
      // 🔥 流结束后:检测并处理图片
      const imageUrlRegex =
        /\[([^\]]*)\]\((https:\/\/dashscope-result[^)\s]+)\)/g;
      const imageUrls = [...allMessages.matchAll(imageUrlRegex)].map(
        (m) => m[2]
      );

      for (const originalUrl of imageUrls) {
        try {
          const filename = await saveWanxiangImageToOss(originalUrl);

          const escapedUrl = escapeRegExp(originalUrl);
          const reg = new RegExp(`!?\\[.*?\\]\\(${escapedUrl}\\)`, "g");

          // 4. 推送你自己的图片路径给前端
          const publicUrl = filename;
          allMessages = allMessages.replaceAll(
            reg,
            `![${originalUrl}](${publicUrl})`
          );
          res.write(
            `data: ${JSON.stringify({
              type: "image",
              url: publicUrl, // 前端可直接访问
              originalUrl: originalUrl, // 可选:用于调试
            })}\n\n`
          );
        } catch (err) {
          console.error(
            "❌ 图片下载失败:",
            originalUrl,
            err instanceof Error ? err.message : "未知错误"
          );
          res.write(
            `data: ${JSON.stringify({
              type: "image_error",
              message: "图片保存失败",
            })}\n\n`
          );
        }
      }

      // 流结束,有消息情况保存,推送完成标识
      if (allMessages) {
        // 保存ai消息
        await chatStorage.saveMessage(userName, thread_id, {
          role: "assistant",
          content: allMessages,
          metadata: { model: modelName },
        });
      }
      // 用户对应线程ID集合
      addThreadId(userName, thread_id);
      res.write(
        `data: ${JSON.stringify({ type: "complete", content: "" })}\n\n`
      );

      res.end(); // 关闭连接
    } catch (err) {
      // 错误处理
      console.error("发送消息失败:", err);
      res.status(500).json({
        error: err instanceof Error ? err.message : "发送消息时发生错误",
      });
    }
  });

这里需要对模型返回的图片链接进行保存和重新替换以保证对话的持久性,新增imageHandler.ts工具

├── src/
│   ├── main.ts
│   ├── modelChat.ts
│   ├── tools.ts 
├── utils/
│   ├── imageHandler.ts ★
├── nodemon.json
├── package.json
├── tsconfig.json
└── README.md

代码如下

OSS配置可看阿里云oss官方文档

// imageHandler.js
import OSS from "ali-oss";

// 你自己的配置参数
const ossClient = new OSS({
  region: #####, // 如 'oss-cn-hangzhou'
  accessKeyId: ######,
  accessKeySecret: ######,,
  bucket: ######,,
});


export async function saveWanxiangImageToOss(
  originalUrl: string,
  customFilename = null
) {
  try {
    console.log("################################");
    console.log("开始获取图片:", originalUrl);
    // 1. 下载图片
    // 加入token
    const response = await fetch(originalUrl, {
      method: "GET",
      headers: {
        "User-Agent":
          "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36",
        Accept:
          "image/avif,image/webp,image/apng,image/svg+xml,image/*,*/*;q=0.8",
        //   DASHSCOPE_API是百炼MCP的api
        Authorization: `Bearer ${DASHSCOPE_API}`,
      },
    });

    if (!response.ok) {
      throw new Error(
        `Download failed: ${response.status} ${await response.text()}`
      );
    }

    const ImageBlob = await response.blob();
    // 转换为 Buffer
    const arrayBuffer = await ImageBlob.arrayBuffer();
    const buffer = Buffer.from(arrayBuffer);

    const contentType = response.headers.get("content-type") || "image/png";

    // 2: 生成 OSS 文件名
    const filename =
      customFilename ||
      `wanxiang/${Date.now()}_${Math.random().toString(36).slice(2, 10)}.${
        contentType.split("/")[1]
      }`;

    console.log("################################");
    console.log("开始上传图片:", filename);
    // 3: 上传到你的 OSS
    const result = await ossClient.put(filename, buffer, {
      headers: {
        "Content-Type": contentType,
      },
    });

    console.log("✅ 图片已保存到 OSS:", result.url);
    return result.url; // 这是你自己的 OSS 公开 URL
  } catch (err: any) {
    console.error("🔥 保存图片到 OSS 失败:", err?.message);
    throw err;
  }
}

设定系统消息

存储方案的实现,直接调用修改元数据即可

// 设定系统消息
  app.post("/setSystemMsg", async (req, res) => {
    const systemMsg = req.body.systemMsg;
    const userName = req.body.userName;
    const threadId = req.body.thread_id;
    // 添加线程ID和系统消息
    addThreadId(userName, threadId, systemMsg);
    // 保存线程ID和系统消息
    await chatStorage.updateThreadMeta(userName, threadId, { systemMsg });
    // 获取用户的所有线程ID
    const thisUserAlreadyThreadId = getThreadIdList(
      userName
    ) as IThreadIdInfo[];
    res.json({
      message: "系统消息设定成功",
      threadIdList: Array.from(thisUserAlreadyThreadId),
    });
  });
历史记录获取
 // 获取历史消息
  app.get("/history", async (req, res) => {
    const thread_id = req.query.thread_id as string;
    const userName = req.query.userName as string;
    console.log("获取历史消息:", thread_id);
    try {
      // 从存储中获取历史消息
      const history = await chatStorage.readAllMessages(userName, thread_id);
      res.json({
        msg: "历史消息获取成功",
        messages: history,
        threadInfo: getThreadId(userName, thread_id),
      });
    } catch (err) {
      console.error("获取历史消息失败:", err);
      res.status(500).json({
        error: err instanceof Error ? err.message : "获取历史消息时发生错误",
      });
    }
  });

移除会话
  // 移除会话
  app.delete("/history", async (req, res) => {
    const thread_id = req.query.thread_id as string;
    const userName = req.query.userName as string;
    console.log("移除会话:", thread_id);
    try {
      await chatStorage.deleteThread(userName, thread_id);
      // 从用户线程ID集合中移除
      removeThreadId(userName, thread_id);
      res.json({
        message: "会话移除成功",
      });
    } catch (err) {
      console.error("移除会话失败:", err);
      res.status(500).json({
        error: err instanceof Error ? err.message : "移除会话时发生错误",
      });
    }
  });
}

至此所有路由功能配置完成。

项目启动

pnpm run start

前端搭建

整体项目简单可按逻辑自行搭建,详细后续写

主要问答逻辑代码如下:

    const abortController = new AbortController();
    abortControllerRef.current = abortController;

    // 1. 发送 POST 请求(支持传递复杂 Body 数据)
    const res = await fetch(`/api/chat`, {
    method: "POST",
    headers: {
        "Content-Type": "application/json",
        Accept: "text/event-stream", // 告知服务端需要事件流
    },
    body: JSON.stringify({
        userName,
        thread_id,
        userMsg,
        interruptCallParams,
    }),
    signal: abortController.signal, // 用于中断请求
    });

    // 2. 校验响应状态
    if (!res.ok) throw new Error(`请求失败:${res.statusText}`);
    if (!res.body) throw new Error("后端未返回流式响应");

    // 3. 解析 ReadableStream(核心:逐块读取流数据)
    const reader = res.body.getReader();
    const decoder = new TextDecoder(); // 解码二进制数据为字符串
    let buffer = ""; // 缓存不完整的 Chunk(避免 JSON 被拆分)
    let msg = "";
    // 循环读取流
    while (true) {
        const { done, value } = await reader.read();

        if (done) break; // 流结束,退出循环

        // 4. 解码并处理每条数据
        buffer += decoder.decode(value, { stream: true }); // 流式解码,保留不完整数据
        const chunks = buffer.split("\n\n"); // 按 SSE 格式分割(每块以 \n\n 结束)
        buffer = chunks.pop() || ""; // 保留最后不完整的 Chunk,下次合并处理

        // 5. 处理每个完整的 Chunk
        for (const chunk of chunks) {
            //   console.log(chunk, "chunk");

            if (!chunk.startsWith("data: ")) continue; // 过滤非 SSE 格式数据
            const dataStr = chunk.slice(6); // 去掉前缀 "data: "
            if (dataStr === "[DONE]") continue; // 忽略结束标记

            // 解析 JSON 数据
            const data = JSON.parse(dataStr);
            switch (data.type) {
            case "messages":
                msg += data.content;
                setHistory((prev) => {
                // 如果历史最后一条已经是 AI 消息(流式中),直接更新 content
                if (prev.length > 0 && prev.at(-1)?.role === "assistant") {
                    return [
                    ...prev.slice(0, -1),
                    { role: "assistant", content: msg },
                    ];
                }
                // 若还没有 AI 消息(首次接收 chunk),直接添加新的 AIMessage
                return [...prev, { role: "assistant", content: msg }];
                });
                break;
            case "custom":
                setToolTips(data.content);
                break;
            case "interrupt":
                setInterruptMsg(JSON.stringify(data.content, null, 2));
                break;
            // 👇 新增:处理图片
            case "image": {
                // 将 base64 图片插入到当前消息末尾(或替换原 URL)
                const imgUrl = data.url; // 或直接用 HTML
                const originalUrl = data.originalUrl;

                const escapedUrl = escapeRegExp(originalUrl);
                const reg = new RegExp(`!?\\[.*?\\]\\(${escapedUrl}\\)`, "g");
                setHistory((prev) => {
                if (prev.at(-1)?.role === "assistant") {
                    // 替换最后一条 AI 消息的Url
                    const lastMsg = prev.at(-1);
                    return [
                    ...prev.slice(0, -1),
                    {
                        role: "assistant",
                        content:
                        lastMsg?.content?.replace(reg, `![图片](${imgUrl})`) ||
                        "",
                    },
                    ];
                }
                return [...prev, { role: "assistant", content: msg }];
                });
                break;
            }
            case "image_error":
                msg += `\n❌ 图片加载失败`;
                setHistory((prev) => {
                if (prev.length > 0 && prev.at(-1)?.role === "assistant") {
                    return [
                    ...prev.slice(0, -1),
                    { role: "assistant", content: msg },
                    ];
                }
                return [...prev, { role: "assistant", content: msg }];
                });
                break;
            case "complete":
                setToolTips("");
                break;
            case "error":
                throw new Error(data.content);
            }
        }
    }

功能展示

对话界面

在这里插入图片描述

简单对话,功能展示

在这里插入图片描述

Human-in-the-Loop (HITL)

在这里插入图片描述在这里插入图片描述

搜索

在这里插入图片描述

进度提示

在这里插入图片描述

人物设定

在这里插入图片描述

文生图

在这里插入图片描述

图生视频

在这里插入图片描述在这里插入图片描述

存储结构

在这里插入图片描述

总结

Agent 功能可以实现,函数调用和MCP也能执行成功,但部分时候还不稳定,func的描述还需要写详细。同时针对视频这类需要时间的可以加入消息推送功能。整体一个功能丰富的Agent搭建完成。

❌