普通视图

发现新文章,点击刷新页面。
今天 — 2025年5月19日首页

MCP Server 的三种实现

作者 唐诗
2025年5月19日 09:11

MCP 规范定义了三种标准的传输机制,也就对应了三种 MCP Server 的实现方式

本文对三种数据传输方式及 Server 的实现进行了实践,代码已上传 GitHub 猛击访问

image.png

MCP 三种标准的传输机制

标准输入和标准输出的通信 STDIO

专为本地 MCP 连接设计,比如通过 node index.js 执行 MCP Server 进行交互

服务器发送事件 SSE

目前大多数远程 MCP 客户端都支持,但预计随着时间的推移将被流式 HTTP 取代。

它需要两个端点:一个用于发送请求,另一个用于接收流式响应。

可流式传输 HTTP

2025 年 3 月引入的新传输方法。它通过使用单个 HTTP 端点简化了双向通信。

目前,它正在被远程 MCP 客户端采用,预计未来将成为标准的传输方式。

为什么要替换可以看 Replace HTTP+SSE with new "Streamable HTTP" transport GitHub 上的这个 RFC

MCP 三种 Server 的实现

创建一个 server

三种实现方式只是数据传输的方式不同, server 实现是一样的

创建一个简单的 MCP server 导出

import { McpServer } from '@modelcontextprotocol/sdk/server/mcp.js'
import { z } from 'zod'
import dayjs from 'dayjs'
import utc from 'dayjs/plugin/utc.js'
import timezone from 'dayjs/plugin/timezone.js'

dayjs.extend(utc)
dayjs.extend(timezone)

const server = new McpServer({
  name: 'mcp-server-time',
  version: '1.0.0'
})

// 获取当前时间的工具
server.tool(
  'get_current_time',
  '获取当前时间',
  {
    timezone: z.string().optional(),
  },
  async ({ timezone }) => {
    const tz = timezone || process.env.LOCAL_TIMEZONE || 'Asia/Shanghai';
    const currentTime = dayjs().tz(tz).format('YYYY-MM-DD HH:mm:ss');
    return {
      content: [{ type: "text", text: JSON.stringify({ currentTime }, null, 2) }],
    };
  }
)

// 日期时间转换工具
server.tool(
  'convert_time',
  '在时区之间转换日期时间',
  {
    source_timezone: z.string(),
    datetime: z.string().regex(/^\d{4}-\d{1,2}-\d{1,2} ([01]\d|2[0-3]):([0-5]\d):([0-5]\d)$/, '日期时间格式无效,应为 YYYY-MM-DD HH:mm:ss'),
    target_timezone: z.string(),
  },
  async ({ source_timezone, datetime, target_timezone }) => {
    const sourceTime = dayjs.tz(datetime, source_timezone);
    const convertedTime = sourceTime.clone().tz(target_timezone).format('YYYY-MM-DD HH:mm:ss');
    return {
      content: [{ type: "text", text: JSON.stringify({ convertedTime }, null, 2) }],
    };
  }
)

server.tool(
  'get_text',
  '返回测试文本',
  {},
  async () => {
    const text = '在这个充满变化的时代,每一天都带来了新的机遇与挑战。科技的发展不仅改变了我们的生活方式,也让我们的思维方式不断更新。面对未知,我们或许会感到迷茫,但正是这种探索精神推动着社会不断进步。从传统的理念到现代的创新,每一次转变都蕴含着无限可能。测试文本的存在,正是为了验证系统的生成和处理能力。相信在不断的尝试中,我们能够找到更好的解决方案,为未来的发展铺平道路。'
    return {
      content: [{ type: "text", text: text }],
    };
  }
)

export default server

标准输入和标准输出的通信 STDIO

#!/usr/bin/env node
import { StdioServerTransport } from '@modelcontextprotocol/sdk/server/stdio.js'
import server from './mcpServerTest.js'

// 启动服务器
async function runServer() {
  // 使用 StdioServerTransport 标准输入输出
  const transport = new StdioServerTransport()
  await server.connect(transport)
  console.error('获取当前时间和时区转换的 MCP 服务器已在 stdio 上启动')
}

runServer().catch((error) => {
  console.error('启动服务器时出错:', error)
  process.exit(1)
})

测试

执行 npx @modelcontextprotocol/inspector node transports/dist/stdio.js 启动测试页面

image.png

访问 http://127.0.0.1:6274

image.png

注意这里的配置,确认没有问题点击连接即可

iShot_2025-05-14_11.40.50.gif

服务器发送事件 SSE

这是一个简单的 mcp sse 服务的实现,在生产中可能需要继续完善代码比如对跨域、鉴权的处理

import express from "express";
import { SSEServerTransport } from "@modelcontextprotocol/sdk/server/sse.js";
import server from './mcpServerTest.js'
import {addInfoLog, addWarnLog, addSuccessLog} from './utils.js'

const app = express();
app.use(express.json());

// 存储连接
const connections = new Map<string, SSEServerTransport>();

app.get('/sse', async (req, res) => {

  addSuccessLog('客户端连接参数:', req.query)

  // 创建 sse 传输
  const transport = new SSEServerTransport('/messages', res);
  const sessionId = transport.sessionId

  addInfoLog(`新的 SSE 连接建立: ${sessionId}`)  

  // 注册连接
  connections.set(sessionId, transport);
  
  res.on("close", () => {
    connections.delete(sessionId);
    addInfoLog(`SSE 连接关闭: ${sessionId}`)  
  });
  
  // 将传输对象与MCP服务器连接
  await server.connect(transport);
  addSuccessLog(`MCP 服务器连接成功: ${sessionId}`)  
});

// 旧消息端点
app.post('/messages', async (req, res) => {
  const sessionId = req.query.sessionId as string;

  addInfoLog(`收到客户端消息: ${sessionId}`)  
  
  console.log('query',req.query, '\r\n')
  console.log('body',req.body, '\r\n')
  console.log('params',req.params, '\r\n')

  // 获取连接
  const transport = connections.get(sessionId)
  if (transport) {
    await transport.handlePostMessage(req, res, req.body);
  } else {
    addWarnLog(`未找到活跃的 ${sessionId} 连接`) 
    res.status(400).send(`未找到活跃的 ${sessionId} 连接`);
  }
});


// 启动服务器
const port = process.env.PORT || 9001;
app.listen(port, () => {
  addSuccessLog(`MCP SSE 服务器已启动:`, `http://localhost:${port}`)
  addInfoLog('SSE 连接端点:', `http://localhost:${port}/sse`)
  addInfoLog('SSE 消息处理端点:', ` http://localhost:${port}/messages`)

  console.log('=========================== success ===========================\r\n')
});

测试

执行 node dist/sse.js 启动服务

image.png

执行 npx @modelcontextprotocol/inspector http://127.0.0.1 启动测试页面

image.png

访问 http://127.0.0.1:6274

image.png

这里需要选择 sse 和确认 sse 服务的地址及端口,确认没有问题后点击连接即可

iShot_2025-05-14_13.44.16.gif

可流式传输 HTTP

这是一个简单的 Streamable HTTP mcp 服务实现,其实是官方仓库的示例代码 (看起来有点绕🤪) 加了一些注释便于理解

import express from "express";
import { randomUUID } from "node:crypto";
import { StreamableHTTPServerTransport } from "@modelcontextprotocol/sdk/server/streamableHttp.js";
import { isInitializeRequest } from "@modelcontextprotocol/sdk/types.js"
import server from './mcpServerTest.js'
import {addInfoLog, addSuccessLog} from './utils.js'
import pc from 'picocolors'

const app = express();
app.use(express.json());

/**
 * POST 请求:创建新的传输实例并保存到 transports 映射(以 sessionId 为键)。
 * GET 请求(SSE):通过已保存的传输实例推送数据流到客户端。
 * DELETE 请求:通过已保存的传输实例终止会话,断开连接。
 */

// 保存 会话 id 到 传输实例的映射 
const transports: { [sessionId: string]: StreamableHTTPServerTransport } = {};

// Handle POST requests for client-to-server communication
app.post('/mcp', async (req, res) => {
  // Check for existing session ID
  const sessionId = req.headers['mcp-session-id'] as string | undefined;
  let transport: StreamableHTTPServerTransport;

  if (sessionId && transports[sessionId]) {
    // 如果传输实例已经存在则直接使用
    transport = transports[sessionId];
  } else if (!sessionId && isInitializeRequest(req.body)) {
    // isInitializeRequest 判断 是否是一个合法的 mcp 的请求
    // 创建一个新的传输实例
    transport = new StreamableHTTPServerTransport({
      sessionIdGenerator: () => randomUUID(),
      onsessioninitialized: (sessionId) => {
        // 通过会话 id 保存传输实例
        transports[sessionId] = transport;
        addInfoLog(`创建传输实例成功 ${sessionId}`)
      }
    });

    // 接收到 DELETE 请求关闭连接
    transport.onclose = () => {
      if (transport.sessionId) {
        delete transports[transport.sessionId];
      }
    };

    await server.connect(transport);
    addSuccessLog(`MCP 服务器连接成功: ${sessionId}`)  
  } else {
    // 无效的请求
    res.status(400).json({
      jsonrpc: '2.0',
      error: {
        code: -32000,
        message: '错误请求:未提供有效的会话 ID',
      },
      id: null,
    });
    return;
  }

  // 处理请求 这里 根据 get、delete 做对应的处理
  // 如果请求是 GET,通常会被解释为服务器推送通知(SSE)。
  // 如果请求是 DELETE,通常会被解释为终止会话(断开连接)。
  await transport.handleRequest(req, res, req.body);
});

// 处理 get 和 delete 请求
async function handleSessionRequest(req: express.Request, res: express.Response) {
  const sessionId = req.headers['mcp-session-id'] as string | undefined;
  if (!sessionId || !transports[sessionId]) {
    res.status(400).send('会话 ID 无效或缺失');
    return;
  }
  
  // 获取传输实例执行对应的的操作
  const transport = transports[sessionId];
  await transport.handleRequest(req, res);
}

// 通过 SSE 处理服务器到客户端通知的 GET 请求
app.get('/mcp', handleSessionRequest);

// 处理会话终止的 DELETE 请求
app.delete('/mcp', handleSessionRequest);

// 启动服务器
const port = process.env.PORT || 9002;
app.listen(port, () => {
  addSuccessLog(`MCP Streamable 服务器已启动: ${pc.green(`http://localhost:${port}`)}`)

  console.log('=========================== success ===========================\r\n')
});

测试

执行 node dist/streamable.js 启动服务

image.png

执行 npx @modelcontextprotocol/inspector http://127.0.0.1 启动测试页面

image.png

访问 http://127.0.0.1:6274

这里需要选择 Streamable HTTP 和确认服务的地址及端口,确认没有问题后点击连接即可

image.png

iShot_2025-05-14_17.24.21.gif

server 同时兼容 SSE 与 Streamable 的写法

把 sse 和 Streamable HTTP 的 server 实现组合起来,没有什么魔法!

MCP 三种 Client 的实现

三种客户端的实现区别不大, MCP SDK 导出了三个文件对应三种 server 的实现

import { StdioClientTransport } from "@modelcontextprotocol/sdk/client/stdio.js";

import { SSEClientTransport } from "@modelcontextprotocol/sdk/client/sse.js";

import { StreamableHTTPClientTransport } from "@modelcontextprotocol/sdk/client/streamableHttp.js";

STDIO

import { Client } from "@modelcontextprotocol/sdk/client/index.js";
import { StdioClientTransport } from "@modelcontextprotocol/sdk/client/stdio.js";
import { addInfoLog, addErrLog, addSuccessLog } from './utils.js'

async function createStdioClient() {
  // 创建 client
  const client = new Client({
    name: 'stdio-client',
    version: '1.0.0',
  });

  try {
    const sseTransport = new StdioClientTransport({
      command: 'node',
      args: ['../transports/dist/stdio.js']
    });

    // 连接到 MCP 服务
    await client.connect(sseTransport);

    addSuccessLog(' MCP 服务 连接成功!')

    // 获取 工具列表
    const toolInfo = await client.listTools()

    addInfoLog('MCP 工具信息', toolInfo)

    // 调用生成文本工具
    const callToolInfo = await client.callTool({
      "name": "get_text",
      "arguments": {}
    })

    addInfoLog('get_text 返回信息', callToolInfo)
  } catch (error) {
    addErrLog('MCP 客户端错误', error);
  }
}

createStdioClient()

image.png

SSE

这里需要先启动 sse serevr node ./dist/sse.js

image.png

import { Client } from "@modelcontextprotocol/sdk/client/index.js";
import { SSEClientTransport } from "@modelcontextprotocol/sdk/client/sse.js";
import { addInfoLog, addErrLog, addSuccessLog } from './utils.js'

let client: Client | undefined = undefined
const baseUrl = new URL('http://localhost:9001/sse');

async function createSseClient() {
  client = new Client({
    name: 'sse-client',
    version: '1.0.0',
  });

  try {
    const sseTransport = new SSEClientTransport(baseUrl, {
      requestInit: {
        headers: {
          // 这里的参数可以在 messages req.headers 中获取
          'X-Custom-Param': 'custom_value'
        },
      }
    });

    // 连接到 MCP 服务
    await client.connect(sseTransport);

    addSuccessLog(' MCP 服务 连接成功!')

    // 获取 工具列表
    const toolInfo = await client.listTools()

    addInfoLog('MCP 工具信息', toolInfo)

    // 调用生成文本工具
    const callToolInfo = await client.callTool({
      "name": "get_text",
      "arguments": {}
    })

    addInfoLog('get_text 返回信息', callToolInfo)
  } catch (error) {
    addErrLog('MCP 客户端错误', error);
  }

  // 关闭连接
  // client.close()
}

createSseClient()

image.png

Streamable HTTP

实现与 SSE 几乎相同将 SSEClientTransport 替换为 StreamableHTTPClientTransport 即可

如果失败并出现 4xx 错误时尝试使用 SSE 客户端

import { Client } from "@modelcontextprotocol/sdk/client/index.js";
import { StreamableHTTPClientTransport } from "@modelcontextprotocol/sdk/client/streamableHttp.js";

let client: Client | undefined = undefined
const baseUrl = new URL('http://localhost:9001/sse');

async function createStreamableHttpClient() {
  try {
    client = new Client({
      name: 'streamable-http-client',
      version: '1.0.0'
    });

    const transport = new StreamableHTTPClientTransport(baseUrl);
    await client.connect(transport);

    console.log("Connected using Streamable HTTP transport");
  } catch (error) {
    // 如果失败并出现 4xx 错误,请尝试较旧的 SSE 传输 
    
    console.log("Streamable HTTP connection failed, falling back to SSE transport");

    // todo 使用 sse client 与 server 交互
  }
}

createStreamableHttpClient()

参考文章

modelcontextprotocol typescript-sdk

cloudflare / Agents 代理 / Transport 传输

开发 SSE 类型的 MCP 服务

昨天以前首页

Agentic Loop与MCP:大模型能力扩展技术解析

作者 Loadings
2025年5月17日 16:22

一、什么是MCP

MCP(Model Context Protocol)是一种用于大语言模型与外部工具交互的协议框架。它允许大语言模型能够调用各种外部工具来扩展其能力边界,如访问文件系统、搜索引擎、数据库等。

MCP的核心价值

  1. 能力扩展:使大语言模型突破知识边界和计算能力的限制
  2. 标准化接口:提供统一的工具调用标准,降低集成成本
  3. 复杂任务处理:通过递进式工具调用,解决复杂问题
  4. 灵活扩展:开发者可方便地添加自定义工具

MCP通过宿主应用、客户端、服务器和大语言模型的协作,实现了AI与工具的无缝对接,为AI应用提供了强大的扩展性。

二、各组件职责

宿主应用(Host)

  • 提供用户界面,接收用户输入
  • 展示AI响应和工具执行结果
  • 管理用户会话和界面状态
  • 维护对话历史和上下文管理

宿主应用最简实现:

// host.ts - 宿主应用简化实现
import { MCPClient } from './client';

class MCPHost {
  private client: MCPClient;
  private conversationHistory: any[] = [];

  constructor() {
    // 初始化MCP客户端
    this.client = new MCPClient();
  }

  // 应用启动
  async start() {
    // 初始化客户端
    await this.client.initialize();
    console.log("MCP宿主应用启动成功");
  }

  // 处理用户消息
  async handleUserMessage(userInput: string) {
    // 添加用户消息到历史
    this.conversationHistory.push({ role: "user", content: userInput });
    
    // 发送消息给客户端处理
    const response = await this.client.processMessage(userInput, this.conversationHistory);
    
    // 添加响应到历史
    this.conversationHistory.push({ role: "assistant", content: response });
    
    // 返回响应给UI
    return response;
  }
}

MCP客户端(Client)

  • 管理注册连接mcp server
  • 解析模型输出中的工具调用指令
  • 使用stdio标准IO以及RPC规范与mcp server通信让其执行工具方法

注意:实际开发中,推荐使用Model Context Protocol官方SDK,官方提供了Python、TypeScript、Java、Kotlin、C#等多种语言的SDK实现。本文为了更清晰地展示底层实现细节,自行实现了一些核心方法,帮助读者理解MCP客户端的工作原理。

2.1 客户端核心结构

MCP客户端最简实现:

// client.ts - MCP客户端简化实现
import { LLMProvider } from './llm-provider';
import { spawn } from 'child_process';
import * as fs from 'fs';
import * as path from 'path';

export class MCPClient {
  private servers: Map<string, any> = new Map();
  private availableTools: any[] = [];
  private maxAgentLoops = 20;
  private llmProvider: LLMProvider;
  
  constructor() {
    this.llmProvider = new LLMProvider();
  }
  
  // 初始化客户端
  async initialize() {
    // 读取MCP服务器配置
    const configPath = path.join(process.env.HOME || process.env.USERPROFILE || '', 'mcp_config.json');
    const config = JSON.parse(fs.readFileSync(configPath, 'utf-8'));
    
    // 连接到每个配置的MCP服务器
    for (const [serverName, serverConfig] of Object.entries(config.mcpServers)) {
      console.log(`正在连接到MCP服务器: ${serverName}`);
      
      try {
        // 创建与服务器的连接
        const serverConnection = await this.connectToServer(serverName, serverConfig);
        this.servers.set(serverName, serverConnection);
        
        // 获取服务器提供的工具列表
        const tools = await this.fetchToolsFromServer(serverConnection);
        this.availableTools.push(...tools.map(tool => ({
          ...tool,
          serverName
        })));
        
        console.log(`已连接MCP服务器 ${serverName} 并发现 ${tools.length} 个工具`);
      } catch (error) {
        console.error(`连接MCP服务器 ${serverName} 失败:`, error);
      }
    }
    
    console.log(`已连接 ${this.servers.size} 个MCP服务器,共发现 ${this.availableTools.length} 个工具`);
  }
  
  // 连接到MCP服务器
  private async connectToServer(serverName: string, config: any) {
    // 根据配置启动服务器进程
    const childProcess = spawn(
      config.command,
      config.args || [],
      {
        cwd: config.cwd || process.cwd(),
        stdio: ['pipe', 'pipe', 'pipe']
      }
    );
    
    // 简单封装连接对象
    const serverConnection = {
      name: serverName,
      process: childProcess,
      async sendRequest(request: any) {
        return new Promise((resolve, reject) => {
          // 将请求写入标准输入
          childProcess.stdin.write(JSON.stringify(request) + '\n');
          
          // 从标准输出读取响应
          childProcess.stdout.once('data', (data) => {
            try {
              const response = JSON.parse(data.toString());
              resolve(response);
            } catch (error) {
              reject(new Error(`无法解析服务器响应: ${error.message}`));
            }
          });
          
          // 处理错误
          childProcess.on('error', reject);
        });
      }
    };
    
    return serverConnection;
  }
  
  // 从服务器获取工具列表
  private async fetchToolsFromServer(serverConnection: any) {
    const request = {
      jsonrpc: "2.0",
      id: this.generateId(),
      method: "listTools",
      params: {}
    };
    
    const response = await serverConnection.sendRequest(request);
    
    if (response.error) {
      throw new Error(`获取工具列表失败: ${response.error.message}`);
    }
    
    return response.result.tools || [];
  }
  
  // 处理消息并执行Agentic Loop
  async processMessage(userInput: string, history: any[]) {
    // 准备LLM请求
    const llmRequest = this.prepareLLMRequest(userInput, history);
    
    // 执行Agentic Loop
    let loopCount = 0;
    let finalResponse = "";
    
    while (loopCount < this.maxAgentLoops) {
      // 调用LLM获取响应
      const llmResponse = await this.llmProvider.generateResponse(llmRequest);
      
      // 检查是否包含工具调用
      const toolCalls = this.extractToolCalls(llmResponse);
      
      if (toolCalls.length === 0) {
        // 无工具调用,循环结束
        finalResponse = llmResponse.content;
        break;
      }
      
      // 首先添加包含工具调用的assistant消息到历史
      llmRequest.messages.push({
        role: "assistant",
        content: null,
        tool_calls: toolCalls.map((toolCall, index) => ({
          id: `call_${Date.now()}_${index}`,
          type: 'function',
          function: {
            name: toolCall.name,
            arguments: JSON.stringify(toolCall.arguments)
          }
        }))
      });
      
      // 执行工具调用
      for (const toolCall of toolCalls) {
        const result = await this.callTool(toolCall);
        
        // 将工具结果添加到对话历史
        llmRequest.messages.push({
          role: "tool",
          name: toolCall.name,
          content: JSON.stringify(result)
        });
      }
      
      loopCount++;
    }
    
    return finalResponse;
  }
  
  // 调用工具
  async callTool(toolCall: { name: string, arguments: Record<string, any> }) {
    console.log(`客户端调用工具: ${toolCall.name},参数:`, toolCall.arguments);
    
    // 查找支持此工具的服务器
    const tool = this.availableTools.find(t => t.name === toolCall.name);
    if (!tool) {
      throw new Error(`未知工具: ${toolCall.name}`);
    }
    
    const serverConnection = this.servers.get(tool.serverName);
    if (!serverConnection) {
      throw new Error(`未找到支持工具 ${toolCall.name} 的服务器`);
    }
    
    // 创建标准JSON-RPC请求
    const request = {
      jsonrpc: "2.0",
      id: this.generateId(),
      method: "callTool",
      params: {
        name: toolCall.name,
        arguments: toolCall.arguments || {}
      }
    };
    
    // 发送请求并获取响应
    const response = await serverConnection.sendRequest(request);
    
    // 处理错误
    if (response.error) {
      throw new Error(`工具调用失败: ${response.error.message}`);
    }
    
    // 返回结果
    return response.result;
  }
  
  // 准备LLM请求
  private prepareLLMRequest(userInput: string, history: any[]) {
    return {
      messages: [...history],
      tools: this.availableTools.map(tool => ({
        type: "function",
        function: {
          name: tool.name,
          description: tool.description,
          parameters: {
            type: tool.inputSchema.type,
            properties: tool.inputSchema.properties,
            required: tool.inputSchema.required
          }
        }
      }))
    };
  }
  
  // 从LLM响应中提取工具调用
  private extractToolCalls(response: any) {
    // 简化实现,假设response包含tool_calls属性
    return response.tool_calls || [];
  }
  
  // 生成唯一ID
  private generateId() {
    return `req-${Date.now()}-${Math.floor(Math.random() * 1000)}`;
  }
}

@modelcontextprotocol/sdk 的callTool方法实现

官方SDK也是基于JSON-RPC 2.0协议实现,是客户端与服务器交互的核心接口:

sequenceDiagram
    participant Client as SDK Client
    participant Transport as Transport层
    participant MCPServer as MCP Server
    participant Tool as 工具实现
    
    Client->>Client: callTool({name, arguments})
    Client->>Client: 验证工具能力支持
    Client->>Client: 构建JSON-RPC请求
    Client->>Client: 调用sendRequest方法
    
    Client->>Transport: 序列化并发送JSON-RPC请求
    Transport->>MCPServer: 通过HTTP/WebSocket/Stdio传输
    
    MCPServer->>MCPServer: 解析请求
    MCPServer->>Tool: 调用对应工具
    Tool-->>MCPServer: 返回执行结果
    
    MCPServer->>MCPServer: 构建JSON-RPC响应
    MCPServer-->>Transport: 返回响应
    Transport-->>Client: 接收并解析响应
    
    Client->>Client: 处理响应或错误
    Client-->>Client: 返回标准化结果

JSON-RPC 2.0格式

MCP客户端和服务器之间的通信基于JSON-RPC 2.0协议:

请求格式

{
  "jsonrpc": "2.0",
  "id": "request-123",
  "method": "callTool",
  "params": {
    "name": "calculator",
    "arguments": {
      "a": 5,
      "b": 3,
      "operation": "multiply"
    }
  }
}

成功响应格式

{
  "jsonrpc": "2.0",
  "id": "request-123",
  "result": {
    "content": [
      {
        "type": "text",
        "text": "15"
      }
    ],
    "isError": false
  }
}

错误响应格式

{
  "jsonrpc": "2.0",
  "id": "request-123",
  "error": {
    "code": -32000,
    "message": "Unknown tool 'calculator'"
  }
}

大语言模型(LLM)

  • 处理用户输入并生成回复
  • 根据需要输出标准格式的工具调用指令
  • 根据工具执行结果给出自然语言结果或暂时由于信息不足需要调用更多的工具而继续对话

LLM提供者最简实现:

// llm-provider.ts - LLM提供者简化实现
export class LLMProvider {
  // 向大语言模型发送请求并获取响应
  async generateResponse(request: any) {
    console.log("向LLM发送请求:", request);
    
    // 这里应该调用实际的LLM API
    // 为简化示例,返回模拟响应
    
    // 模拟LLM可能返回工具调用或直接回答
    const useTool = Math.random() > 0.5;
    
    if (useTool) {
      return {
        content: "我需要先获取一些信息...",
        tool_calls: [
          {
            name: "calculator",
            arguments: {
              a: 5,
              b: 3,
              operation: "multiply"
            }
          }
        ]
      };
    } else {
      return {
        content: "这是我直接生成的回答,不需要使用工具。",
        tool_calls: []
      };
    }
  }
}

MCP服务器(Server)

  • 接收来自Client的RPC工具调用请求
  • 管理和执行注册的工具
  • 将工具执行结果返回给Client
  • 可能包含多种工具的实现或与外部工具服务的连接

MCP服务器最简实现(使用官方SDK):

// mcp-server.js - 基于SDK的MCP服务器简单实现
const { createServer } = require('@modelcontextprotocol/server');

// 创建一个MCP服务器实例
const server = createServer({
  name: 'simple-tools'
});

// 注册一个简单的计算器工具
server.registerTool({
  name: 'calculator',
  description: '执行简单的数学计算',
  parameters: {
    type: 'object',
    properties: {
      a: { type: 'number', description: '第一个数字' },
      b: { type: 'number', description: '第二个数字' },
      operation: { 
        type: 'string', 
        description: '操作类型',
        enum: ['add', 'subtract', 'multiply', 'divide']
      }
    },
    required: ['a', 'b', 'operation']
  },
  execute: async ({ a, b, operation }) => {
    let result;
    
    switch (operation) {
      case 'add':
        result = a + b;
        break;
      case 'subtract':
        result = a - b;
        break;
      case 'multiply':
        result = a * b;
        break;
      case 'divide':
        if (b === 0) {
          throw new Error('除数不能为零');
        }
        result = a / b;
        break;
      default:
        throw new Error(`不支持的操作: ${operation}`);
    }
    
    return { result: result.toString() };
  }
});

// 启动服务器,使用标准输入/输出作为传输层
server.start({ transport: 'stdio' });
console.error('计算器MCP服务器已启动');

三、MCP系统工作流程

3.1 系统初始化过程

系统启动时,会进行以下初始化步骤:

sequenceDiagram
    participant Host as 宿主应用(Host)
    participant Client as MCP Client
    participant MCPServer as MCP Server
    
    Host->>Host: 应用启动
    Host->>Client: 创建Client实例
    Client->>Client: 加载配置
    
    loop 对每个MCP服务器配置
        Client->>MCPServer: 建立连接
        MCPServer-->>Client: 返回连接状态
        Client->>MCPServer: 请求工具列表
        MCPServer-->>Client: 返回可用工具定义
        Client->>Client: 注册工具信息
    end
    
    Client-->>Host: 初始化完成,返回可用工具
    Host->>Host: 更新UI显示可用工具

初始化关键步骤

  1. Host启动

    • 宿主应用启动,加载mcp server配置文件
    • 创建MCP Client实例
  2. Client初始化

    • 加载MCPServer配置列表
    • 为每个配置创建连接
  3. 工具发现与注册

    • Client连接到各MCPServer
    • 请求并获取每个Server提供的工具列表
    • 注册工具信息(名称、描述、输入模式、所属服务器)
    • 返回完整工具列表给Host
  4. UI初始化

    • Host更新UI展示可用工具
    • 准备就绪,等待用户输入

MCP服务器配置示例:

{
  "mcpServers": {
    "calculator": {
      "command": "node",
      "args": [
        "/absolute/path/to/mcp-server.js"  // 请替换为实际的服务器脚本路径
      ]
    }
  }
}

注意:请根据实际环境替换服务器脚本路径

  • Windows示例: "C:\path\to\mcp-server.js" 或 "C:/path/to/mcp-server.js"
  • MacOS/Linux示例: "/Users/username/projects/mcp-server.js" 或 "/home/username/projects/mcp-server.js"

3.2 完整交互流程

MCP的各个组件之间有清晰的调用关系,下图展示了从用户输入到最终输出的完整流程:

sequenceDiagram
    actor User
    participant Host as 宿主应用(Host)
    participant Client as MCP Client
    participant LLM as 大语言模型(LLM)
    participant MCPServer as MCP Server
    participant Tools as 工具实现
    
    User->>Host: 输入请求
    Host->>Client: 发送消息
    
    Client->>Client: 构建包含对话历史和工具定义的请求
    Client->>Client: 将用户问题及MCP工具添加到对话历史
    
    loop Agentic Loop
        Client->>LLM: 发送完整请求
        LLM-->>Client: 返回响应
        
        alt 响应中包含工具调用
            Client->>Client: 解析工具调用指令
            Client->>MCPServer: 调用SDK的callTool方法
            MCPServer->>Tools: 执行对应工具
            Tools-->>MCPServer: 返回执行结果
            MCPServer-->>Client: 返回工具执行结果
            Client->>Client: 将结果添加到历史
            note right of Client: 循环继续,再次调用LLM
        else 无工具调用或达到最大次数
            Client->>Client: 循环终止
        end
    end
    
    Client-->>Host: 返回最终响应
    Host-->>User: 展示结果

四、Agentic Loop循环实现

Agentic Loop是MCP系统实现工具调用的核心机制,它允许模型进行多次连续的工具调用,实现复杂任务分解和递进式解决问题。

4.1 Agentic Loop工作原理

Agentic Loop的核心思想是让大语言模型根据情况决定是直接回答还是通过工具获取更多信息。循环机制使模型能够分步骤解决复杂问题。

4.2 循环控制与终止条件

MCP系统通常设置最大循环次数(如20次)以防止无限循环。循环在以下情况终止:

  1. 模型返回不包含工具调用的完整回答
  2. 达到最大循环次数
  3. 遇到无法处理的错误

4.3 关键步骤详解

  1. 循环初始化

    • 用户发送请求
    • Client构建所有可用工具的大模型请求参数
    • Client将用户问题以及MCP工具添加到对话历史
    • 设置循环计数器和最大循环次数
  2. LLM推理

    • Client向LLM发送完整请求
    • LLM返回响应,可能会包含工具调用指令,也可能会直接做出回答,由大模型自主推理是否需要借助工具
  3. 工具调用处理

    • Client解析响应,检查是否包含工具调用
    • 如有工具调用,Client调用@modelcontextprotocol/sdk里client的callTool方法
    • 获取mcp server工具执行结果并添加到历史
  4. 循环继续或终止

    • 若大模型继续返回工具调用且未达到最大循环次数,回到步骤2继续
    • 若大模型未返回工具调用或达到最大次数,终止循环,输出大模型最终回答

4.4 对话历史管理

在Agentic Loop中,每轮工具调用的结果都会被添加到对话历史中,这样LLM可以利用之前工具调用的结果进行推理。这种方式使LLM能够参考历史信息,实现多步骤的复杂问题解决。

完整系统协作流程示例:

// example.ts - 展示MCP系统协作
import { MCPHost } from './host';

async function runMCPExample() {
  // 创建并启动宿主应用
  const host = new MCPHost();
  await host.start();
  
  // 模拟用户提问
  const userQuestion = "计算5乘以3等于多少?";
  console.log(`用户: ${userQuestion}`);
  
  // 宿主应用处理用户消息
  const response = await host.handleUserMessage(userQuestion);
  console.log(`助手: ${response}`);
}

// 运行示例
runMCPExample().catch(console.error);

五、最终实现效果

f4ab7814ca108d04ad9b822c832e313.png

六、总结

MCP的技术实现是一个多层次、多组件协作的过程,它通过标准化的协议实现了大语言模型与外部工具的无缝交互。通过Agentic Loop循环机制,系统能够支持复杂任务的递进式解决,而基于JSON-RPC 2.0协议的通信方式则确保了通信的可靠性和跨平台兼容性。这种设计使得开发者可以轻松地扩展和定制工具功能,从而大幅提升AI应用的能力边界。

❌
❌