阅读视图

发现新文章,点击刷新页面。

基于 NestJS + LangChain 的 AI 流式对话实战

前言

在 AI 应用开发中,流式输出能极大提升用户体验——让 AI 的回答像打字机一样逐字呈现,而不是等待漫长的完整生成。本文将带从零搭建一个完整的 AI 对话项目,涵盖同步/流式接口、前端 SSE 对接、限流防护等核心能力。

技术栈

  • 后端: NestJS + LangChain
  • 前端: React + Ant Design + EventSource
  • AI 模型: 通义千问 (qwen-plus),兼容 OpenAI API 格式

项目初始化

搭建项目

创建项目

pnpm install -g @nestjs/cli
nest new hello-nest-langchain

安装依赖

pnpm install @nestjs/config
pnpm install @langchain/core @langchain/openai

生成ai模块

nest g res ai --no-spec

配置环境变变量

MODEL_NAME=qwen-plus
OPENAI_API_KEY=sk-xxx
OPENAI_BASE_URL=https://dashscope.aliyuncs.com/compatible-mode/v1

全局配置 ConfigModel

import { Module } from '@nestjs/common';
import { AppController } from './app.controller';
import { AppService } from './app.service';
import { AiModule } from './ai/ai.module';
import { ConfigModule } from '@nestjs/config';

@Module({
  imports: [
    AiModule,
    ConfigModule.forRoot({
      isGlobal: true,
      envFilePath: '.env',
    }),
  ],
  controllers: [AppController],
  providers: [AppService],
})
export class AppModule {}

isGlobal 的意思是将 ConfigModel 注册为全局模块,不需要在每个模块的 imports 中重复导入

main.ts 配置跨域

import { NestFactory } from '@nestjs/core';
import { AppModule } from './app.module';

async function bootstrap() {
  const app = await NestFactory.create(AppModule);
  app.enableCors();
  await app.listen(process.env.PORT ?? 3000);
}
bootstrap();

同步接口

在 AiService 里面创建 LangChain 调用链

import { StringOutputParser } from '@langchain/core/output_parsers';
import { PromptTemplate } from '@langchain/core/prompts';
import { Runnable } from '@langchain/core/runnables';
import { ChatOpenAI } from '@langchain/openai';
import { Injectable } from '@nestjs/common';

@Injectable()
export class AiService {
  private readonly chain: Runnable<{ query: string }, string>;

  constructor() {
    const prompt = PromptTemplate.fromTemplate('请回答以下问题: \n\n{query}');

    const model = new ChatOpenAI({
      temperature: 0.7,
      modelName: 'qwen-plus',
      apiKey: 'xxx',
      configuration: {
        baseURL: 'https://dashscope.aliyuncs.com/compatible-mode/v1',
      },
    });

    this.chain = prompt.pipe(model).pipe(new StringOutputParser());
  }

  async runChain(query: string): Promise<string> {
    return await this.chain.invoke({ query });
  }
}

在 AiController 里暴露接口

import { Controller, Get, Query } from '@nestjs/common';
import { AiService } from './ai.service';

@Controller('ai')
export class AiController {
  constructor(private readonly aiService: AiService) {}

  @Get('chat')
  async chat(@Query('query') query: string) {
    const answer = await this.aiService.runChain(query);
    return { answer };
  }
}

流式接口

在 AiService 里面添加流式方法

async *streamChain(query: string): AsyncGenerator<string> {
  const stream = await this.chain.stream({ query });
  for await (const chunk of stream) {
    yield chunk;
  }
}

这时一个流式返回的异步生成器方法,可以让 Ai 的回答像打字机一样一个字一个字的展示,而不是等全部生成完才一次性返回

这里用到了 js 的生成器语法,也就是方法名那里标注 *,然后通过 yield 不断异步返回内容。

前端页面

pnpm create vite
pnpm i @tanstack/react-query
pnpm i antd

组件核心代码

import { useState, useRef, useEffect } from "react";
import "./App.css";
import "antd/dist/reset.css";
import { Card, Input, Button, Typography, Space, Form } from "antd";

const { Title } = Typography;
const { TextArea } = Input;

function App() {
  const [apiUrl, setApiUrl] = useState("http://localhost:3000");
  const [question, setQuestion] = useState("你是谁?");
  const [responseText, setResponseText] = useState("回复将显示在这里...");
  const esRef = useRef<EventSource | null>(null);
  const [isStreaming, setIsStreaming] = useState(false);
  const responseRef = useRef<HTMLDivElement | null>(null);

  const handleStart = () => {
    setResponseText("");
    const base = apiUrl.replace(//+$/, "");
    const url = `${base}/ai/chat/stream?query=${encodeURIComponent(question)}`;

    if (esRef.current) {
      esRef.current.close();
      esRef.current = null;
    }

    try {
      const es = new EventSource(url);
      esRef.current = es;
      setIsStreaming(true);
      es.onmessage = (ev) => {
        const chunk = ev.data;
        setResponseText((prev) => {
          if (
            !prev ||
            prev === "回复将显示在这里..." ||
            prev.startsWith("(演示)")
          )
            return chunk;
          return prev + chunk;
        });
      };
      es.onerror = () => {
        const ready = es.readyState;
        if (ready === 2) {
          setResponseText((prev) => (prev ? prev + "\n【已结束】" : "已结束"));
        }
        try {
          es.close();
        } catch {}
        esRef.current = null;
        setIsStreaming(false);
      };

      // 可选的自定义事件(后端可能发送 event: done)
      es.addEventListener("done", () => {
        try {
          es.close();
        } catch {}
        esRef.current = null;
        setIsStreaming(false);
        setResponseText((prev) => (prev ? prev + "\n【已完成】" : "已完成"));
      });
    } catch (err) {
      setResponseText(`错误:${String(err)}`);
      setIsStreaming(false);
    }
  };

  const handleStop = () => {
    if (esRef.current) {
      esRef.current.close();
      esRef.current = null;
    }
    setIsStreaming(false);
    setResponseText((prev) => (prev ? prev + "\n【已停止】" : "已停止"));
  };

  // 自动滚动到最底部
  useEffect(() => {
    const el = responseRef.current;
    if (!el) return;
    el.scrollTop = el.scrollHeight;
  }, [responseText]);

  return (
    <div className="sse-page">
      <Card className="sse-card" bordered={false}>
        <Title level={2} className="sse-title">
          SSE 流式接口测试
        </Title>

        <Form layout="vertical">
          <Form.Item label="API 地址">
            <Input value={apiUrl} onChange={(e) => setApiUrl(e.target.value)} />
          </Form.Item>

          <Form.Item label="问题">
            <TextArea
              value={question}
              onChange={(e) => setQuestion(e.target.value)}
              rows={3}
            />
          </Form.Item>

          <Form.Item>
            <Space>
              <Button
                type="primary"
                onClick={handleStart}
                disabled={isStreaming}
              >
                开始流式请求
              </Button>
              <Button danger onClick={handleStop} disabled={!isStreaming}>
                停止
              </Button>
            </Space>
          </Form.Item>

          <Form.Item label="">
            <Card className="response-box" bordered={false}>
              <div className="response-content" ref={responseRef}>
                <div className="response-text">{responseText}</div>
              </div>
            </Card>
          </Form.Item>
        </Form>
      </Card>
    </div>
  );
}

export default App;

实现效果

录屏2026-04-10 15.00.13.gif

一些优化

动态注入

将 ChatOpenAI 实例通过 NestJS 的 DI 容器管理,解耦配置与业务逻辑。

nest 动态注入就是不用 new 依赖对象,只要声明下,运行的时候会自动注入依赖的实例对象

在 AiModel 中使用 useFactory 创建 CHAT_MODEL

通过 @Injectable 声明的 Service,和通过 useFactory 创建的对象,都可以作为 provider 来注入

import { Module } from '@nestjs/common';
import { AiService } from './ai.service';
import { AiController } from './ai.controller';
import { ConfigService } from '@nestjs/config';
import { ChatOpenAI } from '@langchain/openai';

@Module({
  controllers: [AiController],
  providers: [
    AiService,
    {
      provide: 'CHAT_MODEL',
      useFactory: (configService: ConfigService) => {
        return new ChatOpenAI({
          modelName: configService.get<string>('MODEL_NAME'),
          apiKey: configService.get<string>('OPENAI_API_KEY'),
          configuration: {
            baseURL: configService.get<string>('OPENAI_BASE_URL'),
          },
        });
      },
      inject: [ConfigService],
    },
  ],
})
export class AiModule {}

在 AiService 中直接使用

import { StringOutputParser } from '@langchain/core/output_parsers';
import { PromptTemplate } from '@langchain/core/prompts';
import { Runnable } from '@langchain/core/runnables';
import { ChatOpenAI } from '@langchain/openai';
import { Inject, Injectable } from '@nestjs/common';

@Injectable()
export class AiService {
  private readonly chain: Runnable<{ query: string }, string>;

  constructor(@Inject('CHAT_MODEL') private model: ChatOpenAI) {
    const prompt = PromptTemplate.fromTemplate('请回答以下问题: \n\n{query}');

    this.chain = prompt.pipe(model).pipe(new StringOutputParser());
  }

  async runChain(query: string): Promise<string> {
    return await this.chain.invoke({ query });
  }

  async *streamChain(query: string): AsyncGenerator<string> {
    const stream = await this.chain.stream({ query });
    for await (const chunk of stream) {
      yield chunk;
    }
  }
}

ip 限流保护

安装限流模块

pnpm i @nestjs/throttler

配置 trust proxy 来获取客户端真实的 IP

trust proxy 是 Express 的一个开关,作用是:当请求经过 Nginx / CDN / 负载均衡时,读取 X-Forwarded-For 请求头里的原始客户端 ip

import { NestFactory } from '@nestjs/core';
import { AppModule } from './app.module';
import type { Express } from 'express';

async function bootstrap() {
  const app = await NestFactory.create(AppModule);
  const expressApp = app.getHttpAdapter().getInstance() as Express;
  expressApp.set('trust proxy', 1);
  app.enableCors();
  await app.listen(process.env.PORT ?? 3000);
}
bootstrap();

配置全局限流

AppModel 里面添加每个 ip 每秒内最多请求 30 次,并且对所有的请求生效

import { Module } from '@nestjs/common';
import { APP_GUARD } from '@nestjs/core';
import { AppController } from './app.controller';
import { AppService } from './app.service';
import { AiModule } from './ai/ai.module';
import { ConfigModule } from '@nestjs/config';
import { ThrottlerGuard, ThrottlerModule } from '@nestjs/throttler';

@Module({
  imports: [
    AiModule,
    ConfigModule.forRoot({
      isGlobal: true,
      envFilePath: '.env',
    }),
    ThrottlerModule.forRoot([
      {
        ttl: 60000,
        limit: 30,
      },
    ]),
  ],
  controllers: [AppController],
  providers: [
    AppService,
    {
      provide: APP_GUARD,
      useClass: ThrottlerGuard,
    },
  ],
})
export class AppModule {}

在 AiController 里面对 sse 接口限流为每秒钟 5 次

import { Controller, Get, Query, Sse } from '@nestjs/common';
import { AiService } from './ai.service';
import { from, Observable } from 'rxjs';
import { map } from 'rxjs/operators';
import { Throttle } from '@nestjs/throttler';

@Controller('ai')
export class AiController {
  constructor(private readonly aiService: AiService) {}

  @Get('chat')
  @Throttle({ default: { ttl: 60000, limit: 20 } })
  async chat(@Query('query') query: string) {
    const answer = await this.aiService.runChain(query);
    return { answer };
  }

  @Sse('/chat/stream')
  @Throttle({ default: { ttl: 60000, limit: 5 } })
  chatStream(@Query('query') query: string): Observable<{ data: string }> {
    return from(this.aiService.streamChain(query)).pipe(
      map((chunk) => ({ data: chunk })),
    );
  }
}

封装 useSseChat hook

将 SSE 逻辑抽离为可复用的自定义 Hook

import { useState, useRef, useEffect, useCallback } from "react";

type Status = "idle" | "connecting" | "streaming" | "done" | "error";

interface UseSseChatOptions {
  /** SSE 接口基地址,末尾不需要带斜杠 */
  baseUrl: string;
}

interface UseSseChatReturn {
  /** 当前累积的响应文本 */
  responseText: string;
  /** 当前连接状态 */
  status: Status;
  /** 是否正在流式接收(streaming / connecting) */
  isStreaming: boolean;
  /** 滚动锚点 ref,绑定到响应内容容器上可实现自动滚动 */
  responseRef: React.RefObject<HTMLDivElement | null>;
  /** 发起一次流式请求 */
  start: (query: string) => void;
  /** 手动停止当前流 */
  stop: () => void;
}

export function useSseChat({ baseUrl }: UseSseChatOptions): UseSseChatReturn {
  const [responseText, setResponseText] = useState("回复将显示在这里...");
  const [status, setStatus] = useState<Status>("idle");
  const esRef = useRef<EventSource | null>(null);
  const responseRef = useRef<HTMLDivElement | null>(null);

  const isStreaming = status === "connecting" || status === "streaming";

  // 响应内容变化时自动滚动到底部
  useEffect(() => {
    const el = responseRef.current;
    if (!el) return;
    el.scrollTop = el.scrollHeight;
  }, [responseText]);

  // 组件卸载时关闭连接
  useEffect(() => {
    return () => {
      esRef.current?.close();
    };
  }, []);

  const stop = useCallback(() => {
    esRef.current?.close();
    esRef.current = null;
    setStatus("idle");
    setResponseText((prev) => (prev ? prev + "\n【已停止】" : "已停止"));
  }, []);

  const start = useCallback(
    (query: string) => {
      // 关闭上一次未结束的连接
      esRef.current?.close();
      esRef.current = null;

      setResponseText("");
      setStatus("connecting");

      const base = baseUrl.replace(//+$/, "");
      const url = `${base}/ai/chat/stream?query=${encodeURIComponent(query)}`;

      try {
        const es = new EventSource(url);
        esRef.current = es;

        es.onmessage = (ev) => {
          setStatus("streaming");
          setResponseText((prev) => prev + ev.data);
        };

        es.onerror = () => {
          // readyState === 2 表示连接已关闭(正常结束或异常断开)
          if (es.readyState === EventSource.CLOSED) {
            setResponseText((prev) =>
              prev ? prev + "\n【已结束】" : "已结束",
            );
            setStatus("done");
          } else {
            setStatus("error");
          }
          es.close();
          esRef.current = null;
        };

        // 后端可发送 event: done 来明确标记结束
        es.addEventListener("done", () => {
          es.close();
          esRef.current = null;
          setStatus("done");
          setResponseText((prev) => (prev ? prev + "\n【已完成】" : "已完成"));
        });
      } catch (err) {
        setResponseText(`错误:${String(err)}`);
        setStatus("error");
      }
    },
    [baseUrl],
  );

  return { responseText, status, isStreaming, responseRef, start, stop };
}

使用示例

const { responseText, isStreaming, responseRef, start, stop } = useSseChat({
  baseUrl: apiUrl,
});

小结

使用 invoke 和 stream 实现了同步和流式的接口。

在 service层生成流式内容,在 controller 层创建了一个 sse 接口,返回流式数据。

前端使用 EventSource 来监听流式接口的 message 事件。

最后对 sse 请求限流,对依赖进行解耦,对 sse 请求进行封装解耦。

从单体到协同:用 LangGraph 构建 Web3 投研的多智能体工作流

前言

在上一篇文章中,我们通过一个单体 Agent 实现了 Polymarket 与新闻数据的初步整合。然而,在面对波诡云谲的实战环境时,单体架构暴露出逻辑深度不足工具调用混乱以及无法自我纠错等硬伤。

为了解决这些痛点,本文将对原有的单智能体系统进行“手术级”重构,引入 LangGraph 实现多角色协同,让你的投研工具从一个“全栈练习生”进化为一支“专业特种部队”。

一、 单体 Agent 的“天花板”

在尝试构建 Web3 投研助手时,我们通常会给一个 Agent 塞进所有工具(搜索、行情、链上监测)。但在实战中,单体 Agent 常遇到三大痛点:

  1. 注意力涣散:Prompt 过长导致模型忽略了关键的风险提示。
  2. 逻辑闭环难:模型容易“自嗨”,拿到错误数据后直接开始推演,没有纠错机制。
  3. 工具冲突:当工具超过 5 个时,模型选择工具的准确率大幅下降。

二、 多智能体的降维打击:分工与制衡

多智能体架构的核心在于 “角色拆解” 。通过将任务分给不同的 Agent,我们模拟了一个专业投研机构的运作流水线:

1. 角色纯粹化(Specialization)

  • 研究员 (Researcher) :只负责“找”。它精通各种搜索语法,不带主观偏见地搬运事实。
  • 分析师 (Analyst) :只负责“想”。它不直接查数据,而是对研究员提供的情报进行逻辑建模、胜率计算和风险评估。

2. 动态博弈与纠错(Feedback Loop)

这是多智能体最强的地方:分析师可以“打回重做” 。如果研究员提供的情报不足以支撑结论,分析师会提出具体的补查要求,迫使系统进入循环直到逻辑闭环。

三、 实战:基于 LangGraph 的投研工作流

1.工具

import * as dotenv from "dotenv";
dotenv.config();
import { tool } from "@langchain/core/tools";
import { z } from "zod";
import { TavilySearch } from "@langchain/tavily";
import axios from "axios";
import { HttpsProxyAgent } from "https-proxy-agent";

const agent = new HttpsProxyAgent("http://127.0.0.1:3067");
const axiosConfig = { timeout: 15000, httpsAgent: agent, proxy: false };
const searchInstance = new TavilySearch({ maxResults: 5 }); // 增加搜索结果以捕捉更多套利新闻

// 1. 搜索工具:强化了对套利/异动新闻的搜索描述
export const financialSearchTool = tool(
    async (input) => {
        const query = typeof input === 'string' ? input : (input.query || JSON.stringify(input));
        console.log(`\n[🔍 正在执行深度搜索]: ${query}`);
        try {
            const res = await searchInstance.invoke(query);
            return JSON.stringify(res);
        } catch (e: any) { return `搜索暂时不可用`; }
    },
    {
        name: "financial_market_search",
        description: "搜索最新新闻背景、套利机会或市场异动。请输入关键词对象,例如:{\"query\": \"Polymarket arbitrage opportunities\"}",
        schema: z.object({ query: z.string() }), 
    }
);

// 2. 深度优化的行情工具 (Arbitrage & Trend Ready)
export const marketDataTool = tool(
    async (input) => {
        const userInput = typeof input === 'string' ? input : (input.marketName || JSON.stringify(input));
        
        // 1. 行业语义映射表 (包含最新套利关键词)
        const mapping: Record<string, string[]> = {
            "原油": ["oil", "crude", "energy", "brent", "wti", "gasoline"],
            "油价": ["oil", "crude", "energy"],
            "中东": ["israel", "gaza", "iran", "lebanon", "middle east", "hezbollah", "conflict"],
            "战争": ["war", "military", "strike", "attack", "invasion"],
            "选举": ["election", "trump", "vance", "harris", "walz"],
            "停火": ["ceasefire", "truce", "peace"],
            "核": ["nuclear", "facility", "isfahan"],
            "海峡": ["hormuz", "strait", "shipping"],
            "宏观": ["fed", "rate cut", "inflation", "recession", "gdp"],
            "套利": ["arbitrage", "mispricing", "spread", "basis", "hedging"],
            "时间差": ["deadline", "expiry", "until", "before", "sooner"],
            "美联储": ["powell", "fomc", "interest rate", "basis points"],
            "加密货币": ["bitcoin", "etf", "ethereum", "solana", "ath"]
        };

        // 获取基础关注词
        let baseTerms = [userInput.toLowerCase()];
        for (const [zh, ens] of Object.entries(mapping)) {
            if (userInput.includes(zh)) {
                baseTerms = ens;
                break;
            }
        }

        // --- 优化点:自动生成组合搜索词(捕获最新最热) ---
        const hotSuffixes = ["ceasefire", "rate cut", "ath", "trump", "deadline"];
        const focusTerms = [
            ...baseTerms,
            ...baseTerms.flatMap(term => hotSuffixes.map(suffix => `${term} ${suffix}`))
        ];

        console.log(`\n[📊 正在扫描套利机会]: 领域 -> ${userInput} | 衍生词数 -> ${focusTerms.length}`);

        try {
            // 获取全平台最火的 50 个市场
            const url = `https://gamma-api.polymarket.com/markets?active=true&closed=false&limit=50`;
            const res = await axios.get(url, axiosConfig);
            
            if (!res.data || res.data.length === 0) return "Polymarket 暂无活跃市场。";

            // 2. 精准过滤逻辑
            const relevantMarkets = res.data.filter((m: any) => {
                const title = m.question.toLowerCase();
                // 必须包含正向词,剔除体育等干扰噪音
                const hasFocus = focusTerms.some(term => title.includes(term));
                const isNoise = ["nhl", "nba", "cup", "game", "soccer", "football"].some(noise => title.includes(noise));
                return hasFocus && !isNoise;
            });

            if (relevantMarkets.length > 0) {
                // 按相关度或题目排序,方便 Agent 对比相似市场寻找套利空间
                const marketList = relevantMarkets.map((m: any) => ({
                    question: m.question,
                    price: m.lastTradePrice,
                    endDate: m.endDate
                }));

                return JSON.stringify({
                    status: "success",
                    count: marketList.length,
                    data: marketList,
                    hint: "请分析以上市场之间是否存在隐含概率冲突或定价偏差。"
                });
            }

            return `[提示]:热门榜单中暂无直接对标 "${userInput}" 的套利交易对。`;
        } catch (e: any) { return `行情接口异常: ${e.message}`; }
    },
    {
        name: "get_realtime_market_data",
        description: "获取 Polymarket 实时赔率与套利机会。支持组合搜索。",
        schema: z.object({ marketName: z.string() }),
    }
);

export const tools = [financialSearchTool, marketDataTool];

2.主程

以下是使用 LangChain 最新 LangGraph 框架实现的协作代码片段:

import { ChatOpenAI } from "@langchain/openai";
import { ToolNode } from "@langchain/langgraph/prebuilt";
import { StateGraph, MessagesAnnotation, END } from "@langchain/langgraph";
import { financialSearchTool } from "./tools"; // 复用你之前的工具

// 1. 定义两个不同的 LLM 实例(可以给分析师更高的 Temperature 来激发灵感)
const llm = new ChatOpenAI({
  // 核心修复:显式指定 API Key 和 Base URL
  apiKey: process.env.DEEPSEEK_API_KEY, 
  modelName: "deepseek-chat",
  configuration: {
    baseURL: process.env.DEEPSEEK_API_BASE_URL,
  },
  temperature: 0,
});
// 2. 定义角色逻辑
// 研究员节点:强迫它必须使用工具
async function researcherNode(state: typeof MessagesAnnotation.State) {
  const systemMessage = {
    role: "system",
    content: "你是一名 Web3 研究员。你的任务是利用搜索工具获取关于特定事件的最新事实。获取事实后,直接将其传递给分析师,不要进行深度评论。",
  };
  const response = await llm.bindTools([financialSearchTool]).invoke([systemMessage, ...state.messages]);
  return { messages: [response] };
}

// 分析师节点:负责逻辑推演
async function analystNode(state: typeof MessagesAnnotation.State) {
  const systemMessage = {
    role: "system",
    content: "你是一名顶级高级分析师。你需要审查研究员提供的事实。如果事实太模糊,请要求研究员重新搜索;如果事实充足,请给出最终的套利分析报告。你的回复必须以 '【最终报告】' 开头。",
  };
  const response = await llm.invoke([systemMessage, ...state.messages]);
  return { messages: [response] };
}

// 3. 定义路由逻辑:判断是该继续搜,还是该结束了
function shouldContinue(state: typeof MessagesAnnotation.State) {
  const lastMessage = state.messages[state.messages.length - 1];
  // 如果分析师说了“最终报告”,就结束
  if (typeof lastMessage.content === "string" && lastMessage.content.includes("【最终报告】")) {
    return END;
  }
  // 如果有工具调用,去执行工具
  if (lastMessage.additional_kwargs.tool_calls) {
    return "tools";
  }
  // 否则,让分析师看研究员的结果
  return "analyst";
}

// 4. 构建工作流图 (Graph)
const workflow = new StateGraph(MessagesAnnotation)
  .addNode("researcher", researcherNode)
  .addNode("analyst", analystNode)
  .addNode("tools", new ToolNode([financialSearchTool])) // 专门执行工具的节点
  
  // 连线逻辑
  .addEdge("__start__", "researcher") // 从研究员开始
  .addEdge("tools", "researcher")      // 工具执行完后回到研究员
  .addConditionalEdges("researcher", shouldContinue) // 研究员做完后判断:调工具还是给分析师
  .addConditionalEdges("analyst", shouldContinue);    // 分析师做完后判断:结束还是打回重搜

// 5. 编译并运行
const app = workflow.compile();

async function runMultiAgent() {
  const inputs = { messages: [{ role: "user", content: "分析以伊冲突对 Polymarket 原油预测价格的影响" }] };
  const result = await app.invoke(inputs);
  
  console.log("\n--- 协作过程结束 ---");
  console.log(result.messages[result.messages.length - 1].content);
}

runMultiAgent();

为什么这套代码能跑赢单体模型?

  • 状态可控:你可以清晰看到请求是在“搜索”阶段还是“审计”阶段。
  • 递归深度:通过 recursionLimit 限制,你可以防止 Agent 陷入死循环,同时保证了深度。
  • 模型异构:你可以让研究员用便宜快速的 GPT-4o-mini,而让分析师用逻辑更强的 DeepSeek-V3 或 Claude 3.5。

结语

单体 AI 是工具,多智能体才是“数字员工”。在信息密度极高的 Web3 领域,学会如何编排一群 Agent 协作,将是开发者真正的竞争壁垒。

双源验证与强制推演:生产级 AI 投研 Agent 的幻觉抑制方案

前言

本文系前篇技术方案的进阶优化版本,核心聚焦于解决生成式 AI 在实际应用中的三大典型问题:幻觉生成(Hallucination)、语义模糊(Ambiguity)及输出精度不足(Low Precision)

前篇内容主要围绕基础环境配置与端到端流程验证展开,侧重于确保系统可正常运行,尚未对业务逻辑层进行深度优化与精细化处理。

整体架构概览

这是一个 LangChain/LangGraph 驱动的 AI 投研 Agent,专门用于分析地缘政治事件对预测市场(Polymarket)和能源市场的影响,寻找套利机会。

🔧 第一部分:tools.ts —— 工具层实现

1. 环境配置与依赖

import * as dotenv from "dotenv";      // 环境变量管理
import { tool } from "@langchain/core/tools";  // LangChain 工具装饰器
import { z } from "zod";                // 参数校验
import { TavilySearch } from "@langchain/tavily";  // Tavily AI 搜索引擎
import axios from "axios";              // HTTP 请求
import { HttpsProxyAgent } from "https-proxy-agent";  // 代理支持

2. 网络代理配置

const agent = new HttpsProxyAgent("http://127.0.0.1:3067");  // 本地代理
const axiosConfig = { timeout: 15000, httpsAgent: agent, proxy: false };
  • 通过本地 3067 端口代理访问外部 API
  • 15秒超时设置

3. 工具一:financialSearchTool —— Tavily 新闻搜索

属性 说明
功能 搜索最新新闻背景、套利机会或市场异动
底层引擎 TavilySearch (maxResults: 5)
输入格式 {"query": "搜索关键词"}
输出 JSON 格式化的搜索结果

核心用途:获取基本面信息(政策、突发事件、市场情绪)


4. 工具二:marketDataTool —— Polymarket 套利扫描器 ⭐核心

这是一个高度优化的智能过滤系统

4.1 语义映射表(中英关键词转换)
"原油" → ["oil", "crude", "energy", "brent", "wti"]
"中东" → ["israel", "gaza", "iran", "lebanon", "middle east"]
"战争" → ["war", "military", "strike", "attack"]
"选举" → ["election", "trump", "harris"]
"套利" → ["arbitrage", "mispricing", "spread", "basis"]
4.2 智能组合搜索策略
const hotSuffixes = ["ceasefire", "rate cut", "ath", "trump", "deadline"];
// 生成组合词:baseTerm + hotSuffix
// 例如:输入"以色列" → 生成 "israel ceasefire", "israel rate cut" 等
4.3 数据获取与过滤流程
1. 调用 Polymarket API → 获取最活跃的 50 个市场
2. 语义匹配 → 检查标题是否包含关注词或其组合
3. 噪音过滤 → 剔除体育类市场 (NBA, NHL, soccer等)
4. 结果排序 → 按相关度返回
4.4 输出格式
{
  "status": "success",
  "count": 3,
  "data": [
    {"question": "市场问题", "price": 0.65, "endDate": "2024-12-31"},
    ...
  ],
  "hint": "请分析以上市场之间是否存在隐含概率冲突或定价偏差"
}

🤖 第二部分:主程序 —— ReAct Agent 实现

1. 核心架构组件

┌─────────────────────────────────────────┐
│           ReAct Agent 架构             │
├─────────────────────────────────────────┤
│  LLM: DeepSeek Chat (via OpenAI SDK)   │
│  Memory: MemorySaver (对话状态持久化)   │
│  Tools: [搜索工具, 行情工具]             │
│  System Prompt: 投研专家角色定义         │
└─────────────────────────────────────────┘

2. LLM 配置详解

const llm = new ChatOpenAI({
    apiKey: process.env.DEEPSEEK_API_KEY,
    modelName: "deepseek-chat",
    configuration: { baseURL: process.env.DEEPSEEK_API_BASE_URL },
    temperature: 0,  // 确定性输出,适合分析任务
});

3. 系统提示词(System Message)核心逻辑

【双源验证策略】
┌─────────────┐      ┌─────────────┐
│  盘面赔率    │  ←→  │  新闻基本面  │
│ (Polymarket)│      │ (Tavily搜索) │
└─────────────┘      └─────────────┘
      ↓                    ↓
   寻找定价偏差 ←──── 验证/补充信息
【 fallback 机制】

如果没搜到预测市场数据 → 禁止只说"没搜到" → 必须基于新闻进行逻辑推演

【输出模板强制结构】
章节 内容要求
一、实时盘面观测 赔率数据或"未定价原因分析"
二、核心驱动因子 3条最关键新闻摘要
三、投研推演 深度见解 + 概率判断 + 套利建议

4. Agent 执行流程

const result = await agent.invoke({
    messages: [{ role: "user", content: "分析 Israel(以色列)局势..." }]
}, config);

ReAct 循环

  1. Thought: LLM 分析需要调用什么工具
  2. Action: 调用 get_realtime_market_datafinancial_market_search
  3. Observation: 接收工具返回数据
  4. Repeat: 如有需要,继续调用工具
  5. Final Answer: 生成结构化投研报告

🎯 完整工作流程图解

用户输入主题
    ↓
[Agent 启动 ReAct 循环]
    ↓
┌─────────────────┐
│ 并行调用双工具  │
├─────────────────┤
│ 1. 搜索新闻基本面 │ → Tavily API2. 扫描Polymarket│ → 智能语义匹配 → 过滤体育噪音
└─────────────────┘
    ↓
[数据融合分析]
    ↓
┌─────────────────────────┐
│  定价偏差检测算法        │
│  - 赔率 vs 新闻热度对比  │
│  - 相似市场概率冲突检测   │
│  - 时间差套利机会识别     │
└─────────────────────────┘
    ↓
[生成结构化报告]
    ↓
📊 深度投研报告(三章节固定格式)

💡 核心创新点

特性 说明
语义智能映射 中文输入自动扩展为英文关键词组合
组合搜索策略 基础词 + 热点后缀,捕获最新市场
噪音过滤 自动剔除体育等无关市场
强制推演机制 无数据时必须逻辑推演,禁止空返回
套利导向设计 输出明确提示寻找定价偏差

这是一个生产级的地缘政治-能源套利分析 Agent,结合了实时预测市场数据与 AI 搜索能力,输出具备实战价值的投研报告。

核心代码

工具(Tools.ts)

import * as dotenv from "dotenv";
dotenv.config();
import { tool } from "@langchain/core/tools";
import { z } from "zod";
import { TavilySearch } from "@langchain/tavily";
import axios from "axios";
import { HttpsProxyAgent } from "https-proxy-agent";

const agent = new HttpsProxyAgent("http://127.0.0.1:3067");
const axiosConfig = { timeout: 15000, httpsAgent: agent, proxy: false };
const searchInstance = new TavilySearch({ maxResults: 5 }); // 增加搜索结果以捕捉更多套利新闻

// 1. 搜索工具:强化了对套利/异动新闻的搜索描述
export const financialSearchTool = tool(
    async (input) => {
        const query = typeof input === 'string' ? input : (input.query || JSON.stringify(input));
        console.log(`\n[🔍 正在执行深度搜索]: ${query}`);
        try {
            const res = await searchInstance.invoke(query);
            return JSON.stringify(res);
        } catch (e: any) { return `搜索暂时不可用`; }
    },
    {
        name: "financial_market_search",
        description: "搜索最新新闻背景、套利机会或市场异动。请输入关键词对象,例如:{\"query\": \"Polymarket arbitrage opportunities\"}",
        schema: z.object({ query: z.string() }), 
    }
);

// 2. 深度优化的行情工具 (Arbitrage & Trend Ready)
export const marketDataTool = tool(
    async (input) => {
        const userInput = typeof input === 'string' ? input : (input.marketName || JSON.stringify(input));
        
        // 1. 行业语义映射表 (包含最新套利关键词)
        const mapping: Record<string, string[]> = {
            "原油": ["oil", "crude", "energy", "brent", "wti", "gasoline"],
            "油价": ["oil", "crude", "energy"],
            "中东": ["israel", "gaza", "iran", "lebanon", "middle east", "hezbollah", "conflict"],
            "战争": ["war", "military", "strike", "attack", "invasion"],
            "选举": ["election", "trump", "vance", "harris", "walz"],
            "停火": ["ceasefire", "truce", "peace"],
            "核": ["nuclear", "facility", "isfahan"],
            "海峡": ["hormuz", "strait", "shipping"],
            "宏观": ["fed", "rate cut", "inflation", "recession", "gdp"],
            "套利": ["arbitrage", "mispricing", "spread", "basis", "hedging"],
            "时间差": ["deadline", "expiry", "until", "before", "sooner"],
            "美联储": ["powell", "fomc", "interest rate", "basis points"],
            "加密货币": ["bitcoin", "etf", "ethereum", "solana", "ath"]
        };

        // 获取基础关注词
        let baseTerms = [userInput.toLowerCase()];
        for (const [zh, ens] of Object.entries(mapping)) {
            if (userInput.includes(zh)) {
                baseTerms = ens;
                break;
            }
        }

        // --- 优化点:自动生成组合搜索词(捕获最新最热) ---
        const hotSuffixes = ["ceasefire", "rate cut", "ath", "trump", "deadline"];
        const focusTerms = [
            ...baseTerms,
            ...baseTerms.flatMap(term => hotSuffixes.map(suffix => `${term} ${suffix}`))
        ];

        console.log(`\n[📊 正在扫描套利机会]: 领域 -> ${userInput} | 衍生词数 -> ${focusTerms.length}`);

        try {
            // 获取全平台最火的 50 个市场
            const url = `https://gamma-api.polymarket.com/markets?active=true&closed=false&limit=50`;
            const res = await axios.get(url, axiosConfig);
            
            if (!res.data || res.data.length === 0) return "Polymarket 暂无活跃市场。";

            // 2. 精准过滤逻辑
            const relevantMarkets = res.data.filter((m: any) => {
                const title = m.question.toLowerCase();
                // 必须包含正向词,剔除体育等干扰噪音
                const hasFocus = focusTerms.some(term => title.includes(term));
                const isNoise = ["nhl", "nba", "cup", "game", "soccer", "football"].some(noise => title.includes(noise));
                return hasFocus && !isNoise;
            });

            if (relevantMarkets.length > 0) {
                // 按相关度或题目排序,方便 Agent 对比相似市场寻找套利空间
                const marketList = relevantMarkets.map((m: any) => ({
                    question: m.question,
                    price: m.lastTradePrice,
                    endDate: m.endDate
                }));

                return JSON.stringify({
                    status: "success",
                    count: marketList.length,
                    data: marketList,
                    hint: "请分析以上市场之间是否存在隐含概率冲突或定价偏差。"
                });
            }

            return `[提示]:热门榜单中暂无直接对标 "${userInput}" 的套利交易对。`;
        } catch (e: any) { return `行情接口异常: ${e.message}`; }
    },
    {
        name: "get_realtime_market_data",
        description: "获取 Polymarket 实时赔率与套利机会。支持组合搜索。",
        schema: z.object({ marketName: z.string() }),
    }
);

export const tools = [financialSearchTool, marketDataTool];

主程(index.ts)

import * as dotenv from "dotenv";
dotenv.config();
import { ChatOpenAI } from "@langchain/openai";
import { createReactAgent } from "@langchain/langgraph/prebuilt"; 
import { MemorySaver } from "@langchain/langgraph";
import { tools } from "./tools1";

// ... 其他 import 保持不变

const systemMessage = `你是一位 Web3 顶级投研专家。你的目标是产出具备实战价值的地缘政治与能源市场报告。

【分析逻辑链条】:
1. 优先调用 get_realtime_market_data 寻找“盘面赔率”(最真实的真金白银预测)。
2. 同时调用 financial_market_search 搜索“基本面动态”(新闻、政策、突发事件)。

【核心指令】:
- 如果搜到了预测市场数据:请对比“盘面价格”与“新闻热度”,寻找定价偏差。
- 如果【没搜到】预测市场数据:你绝对不能只说“没搜到”。你必须利用搜索到的新闻事实,基于你的专业知识进行“逻辑推演”,分析当前局势对未来油价的潜在影响路径。

【报告输出模板】:
## 📊 [主题] 深度投研报告
---
### 一、 实时盘面观测 (On-chain Data)
[列出获取到的赔率,若无则分析为何市场尚未定价]

### 二、 核心驱动因子 (News Context)
[列出 Tavily 搜索到的 3 条最关键新闻摘要]

### 三、 投研推演与套利建议 (Expert Thesis)
[基于以上两点给出深度见解,必须包含对未来走势的概率判断]`;

// 创建 Agent 的配置
const llm = new ChatOpenAI({
        apiKey: process.env.DEEPSEEK_API_KEY,
        modelName: "deepseek-chat",
        configuration: { baseURL: process.env.DEEPSEEK_API_BASE_URL },
        temperature: 0,
    });
const memory = new MemorySaver();
const agent = createReactAgent({
  llm,
  tools,
  checkpointSaver: memory,
  messageModifier: systemMessage,
});

// 在运行前,我们可以通过一段逻辑拦截并修正 (这步是关键)
async function run() {
  console.log("🚀 正在启动一劳永逸版 Agent...");
  const config = { configurable: { thread_id: "final_test" }, recursionLimit: 50 };
  
  const result = await agent.invoke({
    messages: [{ role: "user", content: "分析 Israel(以色列)局势对预测市场的影响。" }]
  }, config);

  // 打印最后结果
  const lastMsg = result.messages[result.messages.length - 1];
  console.log("\n--- 最终报告 ---");
  console.log(lastMsg.content);
}

run();

结语

至此,关于 LangChain/LangGraph 驱动的 AI 投研 Agent 从理论架构到工程实现的完整链路已全部落地。

该方案通过双源验证策略(盘面赔率 + 新闻基本面)与强制推演机制,有效抑制了生成式 AI 的幻觉与模糊输出问题,为地缘政治与能源市场的量化分析提供了可复用的技术框架。后续可围绕多 Agent 协作、动态策略回测等方向持续迭代。

从零构建 DeepSeek + LangChain 智能 Agent:实现联网搜索与投资决策分析

前言

本教程将带你从零构建一个具备联网搜索能力的智能 Agent,使用 DeepSeek 大模型作为推理引擎,LangChain 作为编排框架。

前置要求

依赖 版本/说明
Node.js 20.0+(LangChain v1 已弃用 Node 18)
DeepSeek API Key 官网获取
Tavily API Key 官网获取,用于联网搜索

项目初始化

准备:已经安装了node,以及准备好有deepseekapi

# 创建并进入项目目录
mkdir my-first-agent && cd my-first-agent

# 初始化项目
npm init -y

# 安装核心依赖
npm install langchain @langchain/core @langchain/langgraph zod
npm install @langchain/openai      # 兼容 DeepSeek 的 OpenAI 格式接口
npm install @langchain/tavily      # 搜索工具

# 安装开发依赖
npm install -D typescript ts-node @types/node

# 初始化 TypeScript 配置
npx tsc --init
# 项目启动指令
npx tsx ./src/index.ts

项目目录

my-first-agent/
├── src/
│   ├── index.ts          # Agent 主入口
│   ├── tools.ts          # 工具定义
│   └── config.ts         # 配置管理
├── .env                  # 环境变量
├── package.json
└── tsconfig.json

环境变量配置

创建 .env 文件:

# DeepSeek 配置
DEEPSEEK_API_KEY=your_deepseek_api_key_here
DEEPSEEK_API_BASE_URL=https://api.deepseek.com/v1

# Tavily 搜索配置
TAVILY_API_KEY=your_tavily_api_key_here

核心代码实现

1.工具定义(src/tools.ts)

import * as dotenv from "dotenv";
dotenv.config();
import { tool } from "@langchain/core/tools";
import { z } from "zod";
import { TavilySearch } from "@langchain/tavily";

// 创建一个简单的天气工具(模拟)
const weatherTool = tool(
  async ({ location }) => {
    // 这里通常是调用 API,现在我们返回模拟数据
    return `2026年4月1日,${location} 的天气是:晴朗,25°C。`;
  },
  {
    name: "get_weather",
    description: "获取指定城市的实时天气",
    schema: z.object({
      location: z.string().describe("城市名称,例如:北京"),
    }),
  }
);


// 创建一个搜索工具实例
const searchTool = new TavilySearch({
  // 注意:在 v1.2.0 中,有些版本要求 apiKey 放在外层,有些要求放在 fields 内
  // 最标准的写法如下:
//   apiKey: process.env.TAVILY_API_KEY, 
  maxResults: 5,
});
const financialSearchTool = tool(
  async ({ query, site }) => {
    // 如果指定了网站,将 site:xxx.com 加入查询语句
    const fullQuery = site ? `${query} site:${site}` : query;
    const result = await searchTool.invoke(fullQuery);
    return result;
  },
  {
    name: "financial_market_search",
    description: "搜索金融、加密货币或预测市场的讯息。可以指定网站以获取更专业的数据。",
    schema: z.object({
      query: z.string().describe("具体的搜索关键词,例如:'以太坊 坎昆升级'"),
      site: z.string().optional().describe("指定搜索的域名,例如:'polymarket.com' 或 'coindesk.com'"),
    }),
  }
);
// 导出工具数组给 Agent 使用
// export const tools = [searchTool];

export const tools = [ searchTool, financialSearchTool];

2. Agent 主入口 (src/index.ts)

import * as dotenv from "dotenv";
dotenv.config();

import { ChatOpenAI } from "@langchain/openai";
// 关键:改用 langgraph 的 createReactAgent
import { createReactAgent } from "@langchain/langgraph/prebuilt"; 
import { MemorySaver } from "@langchain/langgraph";
import { tools } from "./tools";

async function runAgent() {
  const llm = new ChatOpenAI({
    apiKey: process.env.DEEPSEEK_API_KEY, 
    modelName: "deepseek-chat",
    configuration: {
      baseURL: process.env.DEEPSEEK_API_BASE_URL,
    },
    temperature: 0,
  });

  const memory = new MemorySaver();
const systemMessage = `你是一个专业的投资分析助手。
你的任务是:
1. 搜索指定网站(如 Polymarket, Binance, Twitter)的最新讯息。
2. 对比不同平台的信息差,寻找套利方向(Arbitrage)。
3. 分析行业趋势,给出具体的投资建议。
4. 必须输出逻辑链:现状描述 -> 数据对比 -> 风险评估 -> 结论建议。`;
  // --- 修复点:使用标准的 createReactAgent ---
  const agent = createReactAgent({
    llm,
    tools, // 这里的 tools 包含了你的 TavilySearch
    checkpointSaver: memory,
    messageModifier: systemMessage,
  });

  console.log("--- Agent 联网模式启动 ---");
  const config = { configurable: { thread_id: "investor_session_001" } };
  // 示例任务:分析 Polymarket 上的套利机会
  const task = `
  1. 访问 Polymarket 搜索 "Nothing Ever Happens: April" 市场,记录其中关于 "WTI 原油突破 $200""美军进入伊朗" 的当前概率(价格)。
  2. 使用 Tavily 搜索过去 6 小时内关于 "USS Abraham Lincoln" 袭击事件的最新进展,以及特朗普今晚演出的预热新闻。
  3. 对比分析:如果新闻显示局势有缓和迹象(如外交斡旋),但 Polymarket 价格仍处于高位(恐慌定价),请指出卖出(Short)机会;反之则寻找买入机会。
`;

  const result = await agent.invoke({
    messages: [{ role: "user", content: task }],
  }, config);

  console.log("\n[分析报告]:\n");
  console.log(result.messages[result.messages.length - 1].content);
}

runAgent().catch(console.error);

启动项目

# 开发模式(使用 tsx 支持 TypeScript 直接运行)
npx tsx src/index.ts

# 或先编译再运行
npx tsc
node dist/index.js

生成的分析报告(示例)

[分析报告]:

基于我收集到的信息,现在让我为您提供完整的分析报告:

## 投资分析报告:Polymarket与地缘政治套利机会

### 1. 现状描述

根据搜索结果,目前存在以下关键情况:

**Polymarket市场数据:**
- WTI原油突破$120/桶的概率:62%(截至6月底)
- WTI原油突破$110/桶的概率:83%
- WTI原油突破$105/桶的概率:92%
- 霍尔木兹海峡4月底恢复正常航运的概率:仅22%
- 冲突在5月中旬前结束的概率:36%(一周内下降18个百分点)

**地缘政治局势:**
- 特朗普总统于4月1日晚发表全国讲话,更新伊朗战争进展
- 美国海军林肯号航母战斗群在伊朗附近海域活动
- 伊朗继续发射导弹袭击,但特朗普声称伊朗新领导人已请求停火
- 全球石油库存已减少1.3亿桶,霍尔木兹海峡航运量降至正常水平的5%

### 2. 数据对比分析

**信息差识别:**

1. **Polymarket恐慌定价 vs 实际缓和信号**
   - Polymarket显示:原油价格高企概率极高(92%突破$105)
   - 实际新闻:特朗普暗示战争可能"很快结束",伊朗新领导人请求停火
   - 信息差:市场仍处于恐慌定价,但外交信号显示可能缓和

2. **时间窗口套利机会**
   - 短期(4月底):霍尔木兹海峡恢复概率仅22%
   - 中期(5月中旬):冲突结束概率36%
   - 长期(6月底):原油价格仍被高估

### 3. 风险评估

**风险因素:**
1. **地缘政治风险**:伊朗仍可能升级冲突,导弹袭击持续
2. **市场流动性风险**:Polymarket市场规模相对较小
3. **时间风险**:停火谈判可能破裂
4. **信息滞后风险**:新闻传播与市场反应的时间差

**风险等级:中等偏高**
- 地缘政治不确定性仍然存在
- 但特朗普政府的明确缓和信号值得关注

### 4. 结论与投资建议

**套利机会识别:**

**建议操作:卖出(Short)机会**

**逻辑链:**
1. **基本面**:特朗普明确表示伊朗新领导人请求停火,这是强烈的外交缓和信号
2. **技术面**:Polymarket原油价格概率仍处于恐慌高位(92%突破$105)
3. **时间差**:市场尚未充分消化缓和信号,存在定价错误
4. **风险回报比**:如果停火实现,原油价格可能迅速回落至$90-95区间

**具体建议:**
1. **短期套利**:在Polymarket上卖出"WTI原油突破$105"的合约
   - 当前价格:约0.92(92%概率)
   - 目标价格:如果停火进展顺利,可能降至0.60-0.70
   - 潜在回报:约25-35%

2. **中期对冲**:同时买入"冲突在5月中旬前结束"的合约
   - 当前价格:0.36(36%概率)
   - 如果停火进展顺利,可能升至0.60-0.70
   - 提供对冲保护

3. **风险控制**   - 仓位控制:建议不超过总资金的20%
   - 止损设置:如果原油价格突破$110,考虑止损
   - 时间框架:重点关注未来2-3周的外交进展

**监控指标:**
1. 特朗普政府与伊朗的进一步外交接触
2. 霍尔木兹海峡航运恢复迹象
3. 国际油价实时走势
4. Polymarket相关合约价格变化

**最终结论:**
当前存在明显的卖出套利机会。Polymarket的恐慌定价尚未充分反映特朗普政府发出的缓和信号。建议采取谨慎但积极的卖出策略,同时通过相关合约进行对冲,以控制地 缘政治风险。

结语

至此,我们已完成基于 LangChain 的 Agent 最小可行产品(MVP)。当前实现验证了核心链路通畅,但距离生产级应用仍需完善:异常重试机制、调用链路追踪、输入输出校验、成本配额控制等。本文提供基础架构参考,实际落地时请结合具体场景调整工具配置与提示词策略。

❌