从零实现 MCP 客户端:blade-code 的 MCP 集成实战
本文是 blade-code 技术深度系列的第 1 篇,深入剖析如何从零实现一个生产级的 MCP(Model Context Protocol)客户端,包括连接管理、OAuth 认证、健康监控、工具注册等核心功能。
目录
什么是 MCP?
MCP(Model Context Protocol)是 Anthropic 推出的开放协议,用于 AI 应用与外部工具/数据源的标准化通信。它解决了以下问题:
- 工具碎片化:每个 AI 应用都要重新实现工具集成
- 协议不统一:没有标准的工具调用格式
- 扩展性差:添加新工具需要修改核心代码
MCP 提供了:
- 📡 标准传输层:stdio、SSE、HTTP
- 🔧 工具发现机制:动态获取可用工具列表
- 🔐 认证支持:OAuth 2.0 集成
- 📊 资源管理:统一的资源读取接口
架构设计
blade-code 的 MCP 集成采用三层架构:
graph TB
A[Agent Runtime 调用层] --> B[McpRegistry 管理层]
B --> C[McpClient 通信层]
A1[工具调用<br/>参数验证] -.-> A
B1[服务器注册/注销<br/>工具冲突处理<br/>状态监控] -.-> B
C1[连接管理<br/>协议通信<br/>错误重试<br/>OAuth 认证] -.-> C
style A fill:#e1f5ff
style B fill:#fff4e1
style C fill:#ffe1f5
设计原则:
- 单一职责:每层只负责自己的核心功能
- 事件驱动:通过 EventEmitter 解耦组件
- 容错优先:网络错误自动重试,永久错误快速失败
- 可观测性:完整的状态机和事件日志
核心实现
1. McpClient:连接与通信
McpClient 是 MCP 集成的核心,负责与单个 MCP 服务器的通信。
1.1 传输层抽象
MCP 支持三种传输方式,blade-code 通过工厂模式统一创建:
private async createTransport(): Promise<Transport> {
const { type, command, args, env, url, headers } = this.config;
if (type === 'stdio') {
// 子进程通信(本地工具)
return new StdioClientTransport({
command,
args: args || [],
env: { ...process.env, ...env },
stderr: 'ignore', // 忽略子进程的 stderr 输出
});
} else if (type === 'sse') {
// Server-Sent Events(远程服务)
return new SSEClientTransport(new URL(url), {
requestInit: { headers },
});
} else if (type === 'http') {
// HTTP 长轮询
const { StreamableHTTPClientTransport } = await import(
'@modelcontextprotocol/sdk/client/streamableHttp.js'
);
return new StreamableHTTPClientTransport(new URL(url), {
requestInit: { headers },
});
}
throw new Error(`不支持的传输类型: ${type}`);
}
关键点:
-
stdio适合本地工具(如文件系统、数据库) -
sse适合远程服务(实时推送) -
http适合 RESTful API
1.2 连接管理与重试
生产环境中,网络不稳定是常态。blade-code 实现了智能重试机制:
async connectWithRetry(maxRetries = 3, initialDelay = 1000): Promise<void> {
let lastError: Error | null = null;
for (let attempt = 1; attempt <= maxRetries; attempt++) {
try {
await this.doConnect();
this.reconnectAttempts = 0; // 重置重连计数
return; // 成功连接
} catch (error) {
lastError = error as Error;
const classified = classifyError(error);
// 如果是永久性错误,不重试
if (!classified.isRetryable) {
console.error('[McpClient] 检测到永久性错误,放弃重试:', classified.type);
throw error;
}
// 指数退避
if (attempt < maxRetries) {
const delay = initialDelay * Math.pow(2, attempt - 1);
console.warn(`[McpClient] 连接失败(${attempt}/${maxRetries}),${delay}ms 后重试...`);
await new Promise((resolve) => setTimeout(resolve, delay));
}
}
}
throw lastError || new Error('连接失败');
}
错误分类:
enum ErrorType {
NETWORK_TEMPORARY = 'network_temporary', // 临时网络错误(可重试)
NETWORK_PERMANENT = 'network_permanent', // 永久网络错误
CONFIG_ERROR = 'config_error', // 配置错误
AUTH_ERROR = 'auth_error', // 认证错误
PROTOCOL_ERROR = 'protocol_error', // 协议错误
UNKNOWN = 'unknown', // 未知错误
}
function classifyError(error: unknown): ClassifiedError {
const msg = error.message.toLowerCase();
// 永久性配置错误(不应重试)
const permanentErrors = [
'command not found',
'no such file',
'permission denied',
'invalid configuration',
];
if (permanentErrors.some((permanent) => msg.includes(permanent))) {
return { type: ErrorType.CONFIG_ERROR, isRetryable: false, originalError: error };
}
// 临时网络错误(可重试)
const temporaryErrors = [
'timeout',
'connection refused',
'econnreset',
'etimedout',
'503',
'429',
];
if (temporaryErrors.some((temporary) => msg.includes(temporary))) {
return { type: ErrorType.NETWORK_TEMPORARY, isRetryable: true, originalError: error };
}
// 默认视为临时错误(保守策略:允许重试)
return { type: ErrorType.UNKNOWN, isRetryable: true, originalError: error };
}
为什么这样设计?
- 快速失败:配置错误立即抛出,避免无意义的重试
- 指数退避:避免雪崩效应,给服务器恢复时间
- 保守策略:未知错误默认可重试,提高容错性
1.3 意外断连处理
MCP 服务器可能随时断开(进程崩溃、网络中断),blade-code 通过监听 onclose 事件自动重连:
this.sdkClient.onclose = () => {
this.handleUnexpectedClose();
};
private handleUnexpectedClose(): void {
if (this.isManualDisconnect) {
return; // 手动断开,不重连
}
if (this.status === McpConnectionStatus.CONNECTED) {
console.warn('[McpClient] 检测到意外断连,准备重连...');
this.setStatus(McpConnectionStatus.ERROR);
this.emit('error', new Error('MCP服务器连接意外关闭'));
this.scheduleReconnect();
}
}
private scheduleReconnect(): void {
if (this.reconnectAttempts >= this.MAX_RECONNECT_ATTEMPTS) {
console.error('[McpClient] 达到最大重连次数,放弃重连');
this.emit('reconnectFailed');
return;
}
// 指数退避:1s, 2s, 4s, 8s, 16s(最大30s)
const delay = Math.min(1000 * Math.pow(2, this.reconnectAttempts), 30000);
this.reconnectAttempts++;
this.reconnectTimer = setTimeout(async () => {
try {
await this.doConnect();
console.log('[McpClient] 重连成功');
this.reconnectAttempts = 0;
this.emit('reconnected');
} catch (error) {
const classified = classifyError(error);
if (classified.isRetryable) {
this.scheduleReconnect(); // 继续重连
} else {
this.emit('reconnectFailed'); // 永久失败
}
}
}, delay);
}
关键点:
- 区分手动断开和意外断连
- 最多重连 5 次,避免无限循环
- 重连成功后重置计数器
2. McpRegistry:服务器管理
McpRegistry 是单例模式的注册中心,管理多个 MCP 服务器。
2.1 服务器注册
async registerServer(name: string, config: McpServerConfig): Promise<void> {
if (this.servers.has(name)) {
throw new Error(`MCP服务器 "${name}" 已经注册`);
}
const client = new McpClient(config, name, config.healthCheck);
const serverInfo: McpServerInfo = {
config,
client,
status: McpConnectionStatus.DISCONNECTED,
tools: [],
};
// 设置客户端事件处理器
this.setupClientEventHandlers(client, serverInfo, name);
this.servers.set(name, serverInfo);
this.emit('serverRegistered', name, serverInfo);
try {
await this.connectServer(name);
} catch (error) {
console.warn(`MCP服务器 "${name}" 连接失败:`, error);
}
}
2.2 工具冲突处理
多个 MCP 服务器可能提供同名工具,blade-code 通过前缀解决冲突:
async getAvailableTools(): Promise<Tool[]> {
const tools: Tool[] = [];
const nameConflicts = new Map<string, number>();
// 第一遍:检测冲突
for (const [_serverName, serverInfo] of this.servers) {
if (serverInfo.status === McpConnectionStatus.CONNECTED) {
for (const mcpTool of serverInfo.tools) {
const count = nameConflicts.get(mcpTool.name) || 0;
nameConflicts.set(mcpTool.name, count + 1);
}
}
}
// 第二遍:创建工具(冲突时添加前缀)
for (const [serverName, serverInfo] of this.servers) {
if (serverInfo.status === McpConnectionStatus.CONNECTED) {
for (const mcpTool of serverInfo.tools) {
const hasConflict = (nameConflicts.get(mcpTool.name) || 0) > 1;
const toolName = hasConflict
? `${serverName}__${mcpTool.name}`
: mcpTool.name;
const tool = createMcpTool(serverInfo.client, serverName, mcpTool, toolName);
tools.push(tool);
}
}
}
return tools;
}
命名策略:
- 无冲突:
toolName - 有冲突:
serverName__toolName
示例:
服务器 A: read_file
服务器 B: read_file
→ 最终工具: A__read_file, B__read_file
3. OAuth 认证
MCP 支持 OAuth 2.0 认证,blade-code 实现了完整的 OAuth 流程。
3.1 令牌存储
export class OAuthTokenStorage {
private readonly tokenFilePath: string;
constructor() {
const homeDir = os.homedir();
const configDir = path.join(homeDir, '.blade');
this.tokenFilePath = path.join(configDir, 'mcp-oauth-tokens.json');
}
async saveToken(
serverName: string,
token: OAuthToken,
clientId?: string,
tokenUrl?: string
): Promise<void> {
const credentials = await this.loadAllCredentials();
const credential: OAuthCredentials = {
serverName,
token,
clientId,
tokenUrl,
updatedAt: Date.now(),
};
credentials.set(serverName, credential);
await this.saveAllCredentials(credentials);
}
isTokenExpired(token: OAuthToken): boolean {
if (!token.expiresAt) {
return false; // 没有过期时间,认为不过期
}
// 提前 5 分钟视为过期,留出刷新时间
const buffer = 5 * 60 * 1000;
return Date.now() >= token.expiresAt - buffer;
}
}
安全措施:
- 令牌文件权限设置为
0o600(仅所有者可读写) - 提前 5 分钟刷新令牌,避免过期
- 支持 refresh_token 自动续期
3.2 OAuth 流程
export class OAuthProvider {
private tokenStorage = new OAuthTokenStorage();
async getValidToken(
serverName: string,
oauthConfig: OAuthConfig
): Promise<string | null> {
const credentials = await this.tokenStorage.getCredentials(serverName);
if (!credentials) {
return null; // 没有令牌,需要认证
}
// 检查是否过期
if (this.tokenStorage.isTokenExpired(credentials.token)) {
// 尝试刷新
if (credentials.token.refreshToken) {
try {
const newToken = await this.refreshToken(credentials, oauthConfig);
await this.tokenStorage.saveToken(
serverName,
newToken,
credentials.clientId,
credentials.tokenUrl
);
return newToken.accessToken;
} catch (error) {
console.error('[OAuthProvider] 刷新令牌失败:', error);
return null; // 刷新失败,需要重新认证
}
}
return null; // 没有 refresh_token,需要重新认证
}
return credentials.token.accessToken;
}
async authenticate(
serverName: string,
oauthConfig: OAuthConfig
): Promise<OAuthToken> {
// 1. 生成授权 URL
const authUrl = this.buildAuthUrl(oauthConfig);
console.log(`请访问以下 URL 进行授权:\n${authUrl}`);
// 2. 启动本地回调服务器
const code = await this.startCallbackServer(oauthConfig.redirectUri);
// 3. 用授权码换取令牌
const token = await this.exchangeCodeForToken(code, oauthConfig);
// 4. 保存令牌
await this.tokenStorage.saveToken(
serverName,
token,
oauthConfig.clientId,
oauthConfig.tokenUrl
);
return token;
}
}
流程图:
graph TD
A[用户请求] --> B{检查令牌}
B -->|有效| C[返回令牌]
B -->|无效/过期| D{有 refresh_token?}
D -->|是| E[刷新令牌]
E --> F[返回新令牌]
D -->|否| G[启动 OAuth 流程]
G --> H[返回新令牌]
style C fill:#90EE90
style F fill:#90EE90
style H fill:#90EE90
4. 健康监控
生产环境中,MCP 服务器可能"僵死"(连接正常但不响应)。blade-code 实现了主动健康检查:
export class HealthMonitor extends EventEmitter {
private intervalTimer: NodeJS.Timeout | null = null;
private consecutiveFailures = 0;
constructor(
private client: McpClient,
private config: HealthCheckConfig
) {
super();
}
start(): void {
if (this.intervalTimer) {
return; // 已经启动
}
this.intervalTimer = setInterval(async () => {
try {
await this.performHealthCheck();
this.consecutiveFailures = 0; // 重置失败计数
} catch (error) {
this.consecutiveFailures++;
console.warn(
`[HealthMonitor] 健康检查失败 (${this.consecutiveFailures}/${this.config.maxFailures}):`,
error
);
if (this.consecutiveFailures >= this.config.maxFailures) {
this.emit('unhealthy', this.consecutiveFailures, error);
await this.attemptReconnect();
}
}
}, this.config.intervalMs);
}
private async performHealthCheck(): Promise<void> {
const timeout = this.config.timeoutMs || 5000;
await Promise.race([
this.client.listTools(), // 调用一个轻量级方法
new Promise((_, reject) =>
setTimeout(() => reject(new Error('健康检查超时')), timeout)
),
]);
}
private async attemptReconnect(): Promise<void> {
console.log('[HealthMonitor] 尝试重连...');
try {
await this.client.disconnect();
await this.client.connect();
this.consecutiveFailures = 0;
this.emit('reconnected');
} catch (error) {
console.error('[HealthMonitor] 重连失败:', error);
}
}
}
配置示例:
{
enabled: true,
intervalMs: 30000, // 每 30 秒检查一次
timeoutMs: 5000, // 超时时间 5 秒
maxFailures: 3 // 连续失败 3 次触发重连
}
工具动态注册
MCP 工具需要转换为 blade-code 的 Tool 接口:
export function createMcpTool(
client: McpClient,
serverName: string,
mcpTool: McpToolDefinition,
toolName?: string
): Tool {
return {
name: toolName || mcpTool.name,
description: mcpTool.description || `MCP工具: ${mcpTool.name}`,
parameters: mcpTool.inputSchema || { type: 'object', properties: {} },
metadata: {
source: 'mcp',
serverName,
originalName: mcpTool.name,
},
async execute(args: Record<string, unknown>): Promise<ToolResult> {
try {
const response = await client.callTool(mcpTool.name, args);
return {
success: true,
output: formatMcpResponse(response),
metadata: {
serverName,
toolName: mcpTool.name,
},
};
} catch (error) {
return {
success: false,
error: error instanceof Error ? error.message : String(error),
metadata: {
serverName,
toolName: mcpTool.name,
},
};
}
},
};
}
关键点:
- 保留原始工具名(
originalName)用于调试 - 统一错误处理格式
- 支持元数据传递
错误处理与重连
blade-code 的错误处理遵循以下原则:
1. 错误分类
临时错误(可重试):
- 网络超时
- 连接被拒绝
- 速率限制(429)
- 服务不可用(503)
永久错误(不重试):
- 配置错误(命令不存在)
- 认证失败(401)
- 权限不足(403)
- 协议错误(格式错误)
2. 重试策略
指数退避:
- 第 1 次:1 秒后重试
- 第 2 次:2 秒后重试
- 第 3 次:4 秒后重试
- 第 4 次:8 秒后重试
- 第 5 次:16 秒后重试
- 最大延迟:30 秒
3. 状态机
stateDiagram-v2
[*] --> DISCONNECTED
DISCONNECTED --> CONNECTING: connect()
CONNECTING --> CONNECTED: 成功
CONNECTING --> ERROR: 失败
ERROR --> CONNECTING: 重试
CONNECTED --> ERROR: 意外断连
CONNECTED --> DISCONNECTED: disconnect()
实战案例
案例 1:集成文件系统 MCP 服务器
// 配置文件
{
"mcpServers": {
"filesystem": {
"type": "stdio",
"command": "npx",
"args": ["-y", "@modelcontextprotocol/server-filesystem", "/path/to/workspace"],
"env": {
"NODE_ENV": "production"
}
}
}
}
// 使用
const registry = McpRegistry.getInstance();
await registry.registerServer('filesystem', config.mcpServers.filesystem);
const tools = await registry.getAvailableTools();
// 输出: [{ name: 'read_file', ... }, { name: 'write_file', ... }]
案例 2:集成远程 API(带 OAuth)
{
"mcpServers": {
"github": {
"type": "sse",
"url": "https://api.github.com/mcp",
"oauth": {
"enabled": true,
"authUrl": "https://github.com/login/oauth/authorize",
"tokenUrl": "https://github.com/login/oauth/access_token",
"clientId": "your-client-id",
"clientSecret": "your-client-secret",
"scopes": ["repo", "user"],
"redirectUri": "http://localhost:3000/callback"
}
}
}
}
// 首次使用会自动触发 OAuth 流程
await registry.registerServer('github', config.mcpServers.github);
// 输出: 请访问以下 URL 进行授权: https://github.com/login/oauth/authorize?...
案例 3:健康监控与自动恢复
{
"mcpServers": {
"database": {
"type": "stdio",
"command": "mcp-database-server",
"healthCheck": {
"enabled": true,
"intervalMs": 30000,
"timeoutMs": 5000,
"maxFailures": 3
}
}
}
}
// 监听健康事件
registry.on('serverError', (name, error) => {
console.error(`服务器 ${name} 出错:`, error);
});
registry.on('healthMonitorReconnected', () => {
console.log('健康监控触发重连成功');
});
总结
blade-code 的 MCP 集成实现了以下核心功能:
已实现
- 多传输层支持:stdio、SSE、HTTP
- 智能重试:错误分类 + 指数退避
- 自动重连:意外断连自动恢复
- OAuth 认证:完整的 OAuth 2.0 流程
- 健康监控:主动检测僵死连接
- 工具冲突处理:自动添加前缀
- 事件驱动:完整的状态机和事件系统
设计亮点
- 容错优先:网络错误不会导致整个系统崩溃
- 可观测性:丰富的日志和事件,便于调试
- 扩展性:新增 MCP 服务器只需修改配置文件
- 安全性:令牌加密存储,权限最小化
相关资源:
讨论:欢迎在 GitHub Issues 或我的博客评论区交流!