MCP Server 的三种实现
MCP 规范定义了三种标准的传输机制,也就对应了三种 MCP Server 的实现方式
本文对三种数据传输方式及 Server 的实现进行了实践,代码已上传 GitHub 猛击访问
MCP 三种标准的传输机制
标准输入和标准输出的通信 STDIO
专为本地 MCP 连接设计,比如通过 node index.js
执行 MCP Server 进行交互
服务器发送事件 SSE
目前大多数远程 MCP 客户端都支持,但预计随着时间的推移将被流式 HTTP 取代。
它需要两个端点:一个用于发送请求,另一个用于接收流式响应。
可流式传输 HTTP
2025 年 3 月引入的新传输方法。它通过使用单个 HTTP 端点简化了双向通信。
目前,它正在被远程 MCP 客户端采用,预计未来将成为标准的传输方式。
为什么要替换可以看 Replace HTTP+SSE with new "Streamable HTTP" transport GitHub 上的这个 RFC
MCP 三种 Server 的实现
创建一个 server
三种实现方式只是数据传输的方式不同, server 实现是一样的
创建一个简单的 MCP server 导出
import { McpServer } from '@modelcontextprotocol/sdk/server/mcp.js'
import { z } from 'zod'
import dayjs from 'dayjs'
import utc from 'dayjs/plugin/utc.js'
import timezone from 'dayjs/plugin/timezone.js'
dayjs.extend(utc)
dayjs.extend(timezone)
const server = new McpServer({
name: 'mcp-server-time',
version: '1.0.0'
})
// 获取当前时间的工具
server.tool(
'get_current_time',
'获取当前时间',
{
timezone: z.string().optional(),
},
async ({ timezone }) => {
const tz = timezone || process.env.LOCAL_TIMEZONE || 'Asia/Shanghai';
const currentTime = dayjs().tz(tz).format('YYYY-MM-DD HH:mm:ss');
return {
content: [{ type: "text", text: JSON.stringify({ currentTime }, null, 2) }],
};
}
)
// 日期时间转换工具
server.tool(
'convert_time',
'在时区之间转换日期时间',
{
source_timezone: z.string(),
datetime: z.string().regex(/^\d{4}-\d{1,2}-\d{1,2} ([01]\d|2[0-3]):([0-5]\d):([0-5]\d)$/, '日期时间格式无效,应为 YYYY-MM-DD HH:mm:ss'),
target_timezone: z.string(),
},
async ({ source_timezone, datetime, target_timezone }) => {
const sourceTime = dayjs.tz(datetime, source_timezone);
const convertedTime = sourceTime.clone().tz(target_timezone).format('YYYY-MM-DD HH:mm:ss');
return {
content: [{ type: "text", text: JSON.stringify({ convertedTime }, null, 2) }],
};
}
)
server.tool(
'get_text',
'返回测试文本',
{},
async () => {
const text = '在这个充满变化的时代,每一天都带来了新的机遇与挑战。科技的发展不仅改变了我们的生活方式,也让我们的思维方式不断更新。面对未知,我们或许会感到迷茫,但正是这种探索精神推动着社会不断进步。从传统的理念到现代的创新,每一次转变都蕴含着无限可能。测试文本的存在,正是为了验证系统的生成和处理能力。相信在不断的尝试中,我们能够找到更好的解决方案,为未来的发展铺平道路。'
return {
content: [{ type: "text", text: text }],
};
}
)
export default server
标准输入和标准输出的通信 STDIO
#!/usr/bin/env node
import { StdioServerTransport } from '@modelcontextprotocol/sdk/server/stdio.js'
import server from './mcpServerTest.js'
// 启动服务器
async function runServer() {
// 使用 StdioServerTransport 标准输入输出
const transport = new StdioServerTransport()
await server.connect(transport)
console.error('获取当前时间和时区转换的 MCP 服务器已在 stdio 上启动')
}
runServer().catch((error) => {
console.error('启动服务器时出错:', error)
process.exit(1)
})
测试
执行 npx @modelcontextprotocol/inspector node transports/dist/stdio.js
启动测试页面
访问 http://127.0.0.1:6274
注意这里的配置,确认没有问题点击连接即可
服务器发送事件 SSE
这是一个简单的 mcp sse 服务的实现,在生产中可能需要继续完善代码比如对跨域、鉴权的处理
import express from "express";
import { SSEServerTransport } from "@modelcontextprotocol/sdk/server/sse.js";
import server from './mcpServerTest.js'
import {addInfoLog, addWarnLog, addSuccessLog} from './utils.js'
const app = express();
app.use(express.json());
// 存储连接
const connections = new Map<string, SSEServerTransport>();
app.get('/sse', async (req, res) => {
addSuccessLog('客户端连接参数:', req.query)
// 创建 sse 传输
const transport = new SSEServerTransport('/messages', res);
const sessionId = transport.sessionId
addInfoLog(`新的 SSE 连接建立: ${sessionId}`)
// 注册连接
connections.set(sessionId, transport);
res.on("close", () => {
connections.delete(sessionId);
addInfoLog(`SSE 连接关闭: ${sessionId}`)
});
// 将传输对象与MCP服务器连接
await server.connect(transport);
addSuccessLog(`MCP 服务器连接成功: ${sessionId}`)
});
// 旧消息端点
app.post('/messages', async (req, res) => {
const sessionId = req.query.sessionId as string;
addInfoLog(`收到客户端消息: ${sessionId}`)
console.log('query',req.query, '\r\n')
console.log('body',req.body, '\r\n')
console.log('params',req.params, '\r\n')
// 获取连接
const transport = connections.get(sessionId)
if (transport) {
await transport.handlePostMessage(req, res, req.body);
} else {
addWarnLog(`未找到活跃的 ${sessionId} 连接`)
res.status(400).send(`未找到活跃的 ${sessionId} 连接`);
}
});
// 启动服务器
const port = process.env.PORT || 9001;
app.listen(port, () => {
addSuccessLog(`MCP SSE 服务器已启动:`, `http://localhost:${port}`)
addInfoLog('SSE 连接端点:', `http://localhost:${port}/sse`)
addInfoLog('SSE 消息处理端点:', ` http://localhost:${port}/messages`)
console.log('=========================== success ===========================\r\n')
});
测试
执行 node dist/sse.js
启动服务
执行 npx @modelcontextprotocol/inspector http://127.0.0.1
启动测试页面
访问 http://127.0.0.1:6274
这里需要选择 sse 和确认 sse 服务的地址及端口,确认没有问题后点击连接即可
可流式传输 HTTP
这是一个简单的 Streamable HTTP
mcp 服务实现,其实是官方仓库的示例代码 (看起来有点绕🤪) 加了一些注释便于理解
import express from "express";
import { randomUUID } from "node:crypto";
import { StreamableHTTPServerTransport } from "@modelcontextprotocol/sdk/server/streamableHttp.js";
import { isInitializeRequest } from "@modelcontextprotocol/sdk/types.js"
import server from './mcpServerTest.js'
import {addInfoLog, addSuccessLog} from './utils.js'
import pc from 'picocolors'
const app = express();
app.use(express.json());
/**
* POST 请求:创建新的传输实例并保存到 transports 映射(以 sessionId 为键)。
* GET 请求(SSE):通过已保存的传输实例推送数据流到客户端。
* DELETE 请求:通过已保存的传输实例终止会话,断开连接。
*/
// 保存 会话 id 到 传输实例的映射
const transports: { [sessionId: string]: StreamableHTTPServerTransport } = {};
// Handle POST requests for client-to-server communication
app.post('/mcp', async (req, res) => {
// Check for existing session ID
const sessionId = req.headers['mcp-session-id'] as string | undefined;
let transport: StreamableHTTPServerTransport;
if (sessionId && transports[sessionId]) {
// 如果传输实例已经存在则直接使用
transport = transports[sessionId];
} else if (!sessionId && isInitializeRequest(req.body)) {
// isInitializeRequest 判断 是否是一个合法的 mcp 的请求
// 创建一个新的传输实例
transport = new StreamableHTTPServerTransport({
sessionIdGenerator: () => randomUUID(),
onsessioninitialized: (sessionId) => {
// 通过会话 id 保存传输实例
transports[sessionId] = transport;
addInfoLog(`创建传输实例成功 ${sessionId}`)
}
});
// 接收到 DELETE 请求关闭连接
transport.onclose = () => {
if (transport.sessionId) {
delete transports[transport.sessionId];
}
};
await server.connect(transport);
addSuccessLog(`MCP 服务器连接成功: ${sessionId}`)
} else {
// 无效的请求
res.status(400).json({
jsonrpc: '2.0',
error: {
code: -32000,
message: '错误请求:未提供有效的会话 ID',
},
id: null,
});
return;
}
// 处理请求 这里 根据 get、delete 做对应的处理
// 如果请求是 GET,通常会被解释为服务器推送通知(SSE)。
// 如果请求是 DELETE,通常会被解释为终止会话(断开连接)。
await transport.handleRequest(req, res, req.body);
});
// 处理 get 和 delete 请求
async function handleSessionRequest(req: express.Request, res: express.Response) {
const sessionId = req.headers['mcp-session-id'] as string | undefined;
if (!sessionId || !transports[sessionId]) {
res.status(400).send('会话 ID 无效或缺失');
return;
}
// 获取传输实例执行对应的的操作
const transport = transports[sessionId];
await transport.handleRequest(req, res);
}
// 通过 SSE 处理服务器到客户端通知的 GET 请求
app.get('/mcp', handleSessionRequest);
// 处理会话终止的 DELETE 请求
app.delete('/mcp', handleSessionRequest);
// 启动服务器
const port = process.env.PORT || 9002;
app.listen(port, () => {
addSuccessLog(`MCP Streamable 服务器已启动: ${pc.green(`http://localhost:${port}`)}`)
console.log('=========================== success ===========================\r\n')
});
测试
执行 node dist/streamable.js
启动服务
执行 npx @modelcontextprotocol/inspector http://127.0.0.1
启动测试页面
访问 http://127.0.0.1:6274
这里需要选择 Streamable HTTP 和确认服务的地址及端口,确认没有问题后点击连接即可
server 同时兼容 SSE 与 Streamable 的写法
把 sse 和 Streamable HTTP 的 server 实现组合起来,没有什么魔法!
MCP 三种 Client 的实现
三种客户端的实现区别不大, MCP SDK 导出了三个文件对应三种 server 的实现
import { StdioClientTransport } from "@modelcontextprotocol/sdk/client/stdio.js";
import { SSEClientTransport } from "@modelcontextprotocol/sdk/client/sse.js";
import { StreamableHTTPClientTransport } from "@modelcontextprotocol/sdk/client/streamableHttp.js";
STDIO
import { Client } from "@modelcontextprotocol/sdk/client/index.js";
import { StdioClientTransport } from "@modelcontextprotocol/sdk/client/stdio.js";
import { addInfoLog, addErrLog, addSuccessLog } from './utils.js'
async function createStdioClient() {
// 创建 client
const client = new Client({
name: 'stdio-client',
version: '1.0.0',
});
try {
const sseTransport = new StdioClientTransport({
command: 'node',
args: ['../transports/dist/stdio.js']
});
// 连接到 MCP 服务
await client.connect(sseTransport);
addSuccessLog(' MCP 服务 连接成功!')
// 获取 工具列表
const toolInfo = await client.listTools()
addInfoLog('MCP 工具信息', toolInfo)
// 调用生成文本工具
const callToolInfo = await client.callTool({
"name": "get_text",
"arguments": {}
})
addInfoLog('get_text 返回信息', callToolInfo)
} catch (error) {
addErrLog('MCP 客户端错误', error);
}
}
createStdioClient()
SSE
这里需要先启动 sse serevr node ./dist/sse.js
import { Client } from "@modelcontextprotocol/sdk/client/index.js";
import { SSEClientTransport } from "@modelcontextprotocol/sdk/client/sse.js";
import { addInfoLog, addErrLog, addSuccessLog } from './utils.js'
let client: Client | undefined = undefined
const baseUrl = new URL('http://localhost:9001/sse');
async function createSseClient() {
client = new Client({
name: 'sse-client',
version: '1.0.0',
});
try {
const sseTransport = new SSEClientTransport(baseUrl, {
requestInit: {
headers: {
// 这里的参数可以在 messages req.headers 中获取
'X-Custom-Param': 'custom_value'
},
}
});
// 连接到 MCP 服务
await client.connect(sseTransport);
addSuccessLog(' MCP 服务 连接成功!')
// 获取 工具列表
const toolInfo = await client.listTools()
addInfoLog('MCP 工具信息', toolInfo)
// 调用生成文本工具
const callToolInfo = await client.callTool({
"name": "get_text",
"arguments": {}
})
addInfoLog('get_text 返回信息', callToolInfo)
} catch (error) {
addErrLog('MCP 客户端错误', error);
}
// 关闭连接
// client.close()
}
createSseClient()
Streamable HTTP
实现与 SSE 几乎相同将 SSEClientTransport
替换为 StreamableHTTPClientTransport
即可
如果失败并出现 4xx 错误时尝试使用 SSE 客户端
import { Client } from "@modelcontextprotocol/sdk/client/index.js";
import { StreamableHTTPClientTransport } from "@modelcontextprotocol/sdk/client/streamableHttp.js";
let client: Client | undefined = undefined
const baseUrl = new URL('http://localhost:9001/sse');
async function createStreamableHttpClient() {
try {
client = new Client({
name: 'streamable-http-client',
version: '1.0.0'
});
const transport = new StreamableHTTPClientTransport(baseUrl);
await client.connect(transport);
console.log("Connected using Streamable HTTP transport");
} catch (error) {
// 如果失败并出现 4xx 错误,请尝试较旧的 SSE 传输
console.log("Streamable HTTP connection failed, falling back to SSE transport");
// todo 使用 sse client 与 server 交互
}
}
createStreamableHttpClient()
参考文章
modelcontextprotocol typescript-sdk