
1. 概述
1.1 什么是SSE
Server-Sent Events(SSE)是一种基于HTTP协议的服务器推送技术,允许服务器主动向客户端发送实时更新。与WebSocket不同,SSE是单向通信通道,只能从服务器向客户端推送数据。然而,通过结合HTTP POST请求,客户端同样可以向服务器发送消息,实现双向通信。SSE具有以下显著优势:它基于标准HTTP协议,无需特殊的协议升级,因此可以轻松穿透大多数防火墙和代理服务器;SSE内置了自动重连机制,当连接中断时客户端会自动尝试重新建立连接;它还支持自定义事件类型,使得服务器可以向客户端发送不同类型的消息。
1.2 消息通知SDK的设计目标
本SDK的设计目标是构建一个功能完备、易于使用的双向消息通知系统,能够支持多种消息场景和类型。主要功能需求包括:实时消息推送能力,使服务器能够即时向客户端发送各类通知;双向通信支持,允许客户端主动向服务器发送消息请求;多种消息类型支持,包括文本消息、指令消息、系统通知等;并发场景处理,确保在高并发环境下消息的正确传递和指令的有序执行;断线重连机制,保证网络不稳定时的消息可靠性;消息确认和回调机制,确保重要消息的可靠送达。
2. 技术架构
2.1 系统架构图
┌─────────────────────────────────────────────────────────────────┐
│ 前端应用层 │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────────────────┐ │
│ │ 业务组件 │ │ UI通知层 │ │ 指令处理模块 │ │
│ └──────┬──────┘ └──────┬──────┘ └───────────┬─────────────┘ │
│ │ │ │ │
│ ┌──────▼─────────────────▼─────────────────────▼─────────────┐ │
│ │ SSE SDK 核心层 │ │
│ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │
│ │ │连接管理器│ │消息队列 │ │事件分发器│ │重连管理器│ │ │
│ │ └──────────┘ └──────────┘ └──────────┘ └──────────┘ │ │
│ └────────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────┘
│
┌─────────▼─────────┐
│ HTTP/POST │ ← 客户端发送消息
│ HTTP/SSE │ ← 服务器推送消息
└─────────┬─────────┘
│
┌─────────────────────────────▼───────────────────────────────────┐
│ 后端服务层 │
│ ┌────────────────┐ ┌────────────────┐ ┌─────────────────┐ │
│ │ SSE连接管理器 │ │ 消息路由器 │ │ 指令调度器 │ │
│ └────────────────┘ └────────────────┘ └─────────────────┘ │
│ ┌────────────────┐ ┌────────────────┐ ┌─────────────────┐ │
│ │ 消息队列 │ │ 并发控制器 │ │ 存储层 │ │
│ └────────────────┘ └────────────────┘ └─────────────────┘ │
└─────────────────────────────────────────────────────────────────┘
2.2 消息类型定义
/**
* 消息类型枚举
* 定义了SDK支持的所有消息类型
*/
export enum MessageType {
/** 普通文本消息 */
TEXT = 'text',
/** JSON结构化消息 */
JSON = 'json',
/** 系统通知消息 */
SYSTEM = 'system',
/** 指令消息(用于并发控制) */
COMMAND = 'command',
/** 心跳消息 */
HEARTBEAT = 'heartbeat',
/** 确认消息 */
ACKNOWLEDGE = 'ack',
/** 错误消息 */
ERROR = 'error',
/** 进度通知 */
PROGRESS = 'progress'
}
/**
* 消息优先级枚举
* 用于消息处理的优先级调度
*/
export enum MessagePriority {
LOW = 0,
NORMAL = 1,
HIGH = 2,
CRITICAL = 3
}
/**
* 指令操作类型枚举
* 定义了支持的指令操作类型
*/
export enum CommandAction {
/** 执行操作 */
EXECUTE = 'execute',
/** 取消操作 */
CANCEL = 'cancel',
/** 暂停操作 */
PAUSE = 'pause',
/** 恢复操作 */
RESUME = 'resume',
/** 查询状态 */
QUERY = 'query',
/** 同步状态 */
SYNC = 'sync'
}
/**
* 消息接口定义
* 所有消息的基类接口
*/
export interface Message {
/** 消息唯一标识符 */
id: string;
/** 消息类型 */
type: MessageType;
/** 消息优先级 */
priority: MessagePriority;
/** 消息创建时间戳 */
timestamp: number;
/** 消息来源标识 */
source: string;
/** 消息目标标识(可选,用于定向消息) */
target?: string;
/** 消息载荷数据 */
payload: any;
/** 消息元数据 */
metadata?: Record<string, any>;
}
/**
* 指令消息接口
* 用于并发控制和任务调度
*/
export interface CommandMessage extends Message {
type: MessageType.COMMAND;
payload: {
/** 指令ID,用于追踪和响应 */
commandId: string;
/** 指令操作类型 */
action: CommandAction;
/** 指令关联的任务ID */
taskId?: string;
/** 指令参数 */
params?: Record<string, any>;
/** 执行超时时间(毫秒) */
timeout?: number;
/** 是否需要确认 */
requireAck?: boolean;
};
}
/**
* 进度消息接口
* 用于任务进度通知
*/
export interface ProgressMessage extends Message {
type: MessageType.PROGRESS;
payload: {
/** 任务ID */
taskId: string;
/** 当前进度(0-100) */
progress: number;
/** 进度描述 */
description?: string;
/** 预计剩余时间(毫秒) */
estimatedTimeRemaining?: number;
};
}
/**
* 系统消息接口
* 用于系统级通知
*/
export interface SystemMessage extends Message {
type: MessageType.SYSTEM;
payload: {
/** 系统事件类型 */
event: 'connect' | 'disconnect' | 'reconnect' | 'error' | 'maintenance';
/** 事件详情 */
detail?: any;
/** 建议的恢复动作 */
recommendedAction?: string;
};
}
2.3 SDK核心类设计
/**
* SSE连接状态枚举
*/
export enum ConnectionState {
/** 断开状态 */
DISCONNECTED = 'disconnected',
/** 连接中状态 */
CONNECTING = 'connecting',
/** 已连接状态 */
CONNECTED = 'connected',
/** 重连中状态 */
RECONNECTING = 'reconnecting',
/** 错误状态 */
ERROR = 'error'
}
/**
* SSE SDK配置选项
*/
export interface SSKSConfig {
/** SSE服务端点URL */
endpoint: string;
/** 客户端标识符 */
clientId: string;
/** 认证令牌(可选) */
authToken?: string;
/** 心跳间隔(毫秒),默认30000 */
heartbeatInterval?: number;
/** 最大重连次数,默认10 */
maxReconnectAttempts?: number;
/** 重连延迟基数(毫秒),默认1000 */
reconnectBaseDelay?: number;
/** 最大重连延迟(毫秒),默认30000 */
reconnectMaxDelay?: number;
/** 连接超时时间(毫秒),默认10000 */
connectionTimeout?: number;
/** 是否启用调试模式 */
debug?: boolean;
/** 自定义请求头 */
headers?: Record<string, string>;
/** 消息队列容量,默认100 */
messageQueueSize?: number;
/** 是否自动连接,默认true */
autoConnect?: boolean;
}
/**
* SDK默认配置
*/
export const DEFAULT_CONFIG: Partial<SSKSConfig> = {
heartbeatInterval: 30000,
maxReconnectAttempts: 10,
reconnectBaseDelay: 1000,
reconnectMaxDelay: 30000,
connectionTimeout: 10000,
debug: false,
messageQueueSize: 100,
autoConnect: true
};
/**
* 消息处理器接口
*/
export interface MessageHandler {
(message: Message): void | Promise<void>;
}
/**
* 事件处理器映射类型
*/
export type EventHandlerMap = {
[K in MessageType]?: MessageHandler;
} & {
/** 连接成功处理器 */
onConnect?: () => void;
/** 连接断开处理器 */
onDisconnect?: (reason: string) => void;
/** 连接错误处理器 */
onError?: (error: Error) => void;
/** 重连尝试处理器 */
onReconnecting?: (attempt: number, maxAttempts: number) => void;
/** 连接状态变化处理器 */
onStateChange?: (state: ConnectionState, prevState: ConnectionState) => void;
};
3. 前端SDK实现
3.1 连接管理器实现
/**
* SSE连接管理器
* 负责管理SSE连接的创建、维护和销毁
*/
export class SSEConnectionManager {
private eventSource: EventSource | null = null;
private config: SSKSConfig;
private state: ConnectionState = ConnectionState.DISCONNECTED;
private reconnectAttempt = 0;
private reconnectTimer: ReturnType<typeof setTimeout> | null = null;
private heartbeatTimer: ReturnType<typeof setInterval> | null = null;
private stateListeners: Array<(state: ConnectionState, prev: ConnectionState) => void> = [];
private messageListeners: Array<(event: MessageEvent) => void> = [];
private errorListeners: Array<(error: Error) => void> = [];
constructor(config: SSKSConfig) {
this.config = { ...DEFAULT_CONFIG, ...config };
}
/**
* 获取当前连接状态
*/
public getState(): ConnectionState {
return this.state;
}
/**
* 检查是否已连接
*/
public isConnected(): boolean {
return this.state === ConnectionState.CONNECTED && this.eventSource !== null;
}
/**
* 添加状态变化监听器
*/
public addStateListener(listener: (state: ConnectionState, prev: ConnectionState) => void): void {
this.stateListeners.push(listener);
}
/**
* 移除状态变化监听器
*/
public removeStateListener(listener: (state: ConnectionState, prev: ConnectionState) => void): void {
this.stateListeners = this.stateListeners.filter(l => l !== listener);
}
/**
* 添加消息监听器
*/
public addMessageListener(listener: (event: MessageEvent) => void): void {
this.messageListeners.push(listener);
}
/**
* 移除消息监听器
*/
public removeMessageListener(listener: (event: MessageEvent) => void): void {
this.messageListeners = this.messageListeners.filter(l => l !== listener);
}
/**
* 添加错误监听器
*/
public addErrorListener(listener: (error: Error) => void): void {
this.errorListeners.push(listener);
}
/**
* 移除错误监听器
*/
public removeErrorListener(listener: (error: Error) => void): void {
this.errorListeners = this.errorListeners.filter(l => l !== listener);
}
/**
* 建立SSE连接
*/
public connect(): Promise<void> {
return new Promise((resolve, reject) => {
if (this.isConnected()) {
resolve();
return;
}
const prevState = this.state;
this.setState(ConnectionState.CONNECTING);
// 构建连接URL
const url = this.buildConnectionUrl();
this.debug(`建立SSE连接: ${url}`);
try {
this.eventSource = new EventSource(url);
// 连接成功超时处理
const timeout = setTimeout(() => {
if (this.state === ConnectionState.CONNECTING) {
this.eventSource?.close();
this.eventSource = null;
const error = new Error('连接超时');
this.notifyError(error);
reject(error);
}
}, this.config.connectionTimeout!);
this.eventSource.onopen = (event: Event) => {
clearTimeout(timeout);
this.debug('SSE连接已建立');
this.reconnectAttempt = 0;
this.setState(ConnectionState.CONNECTED);
this.startHeartbeat();
resolve();
};
this.eventSource.onmessage = (event: MessageEvent) => {
this.debug(`收到消息: ${event.data}`);
this.messageListeners.forEach(listener => listener(event));
};
this.eventSource.onerror = (event: Event) => {
clearTimeout(timeout);
this.debug(`SSE错误事件: ${JSON.stringify(event)}`);
const error = this.parseError(event);
this.notifyError(error);
if (this.state === ConnectionState.CONNECTED) {
this.handleDisconnect('服务器错误');
} else {
reject(error);
}
};
// 为不同事件类型添加监听器
this.setupEventListeners();
} catch (error) {
this.debug(`SSE连接失败: ${error}`);
this.setState(ConnectionState.ERROR);
reject(error);
}
});
}
/**
* 断开SSE连接
*/
public disconnect(reason: string = '手动断开'): void {
this.debug(`断开SSE连接: ${reason}`);
this.stopHeartbeat();
this.clearReconnectTimer();
if (this.eventSource) {
this.eventSource.close();
this.eventSource = null;
}
const prevState = this.state;
this.state = ConnectionState.DISCONNECTED;
this.notifyStateChange(prevState);
}
/**
* 强制重连
*/
public async reconnect(): Promise<void> {
this.debug('执行强制重连');
this.disconnect('准备重连');
await this.connect();
}
/**
* 构建连接URL
*/
private buildConnectionUrl(): string {
const url = new URL(this.config.endpoint);
url.searchParams.set('clientId', this.config.clientId);
if (this.config.authToken) {
url.searchParams.set('token', this.config.authToken);
}
return url.toString();
}
/**
* 设置事件监听器
*/
private setupEventListeners(): void {
if (!this.eventSource) return;
// 监听自定义事件类型
Object.values(MessageType).forEach(type => {
this.eventSource!.addEventListener(type, (event: MessageEvent) => {
this.debug(`收到${type}事件: ${event.data}`);
this.messageListeners.forEach(listener => listener(event));
});
});
// 监听心跳事件
this.eventSource.addEventListener('heartbeat', (event: MessageEvent) => {
this.debug(`收到心跳: ${event.data}`);
});
}
/**
* 处理断开连接
*/
private handleDisconnect(reason: string): void {
const prevState = this.state;
this.stopHeartbeat();
if (this.eventSource) {
this.eventSource.close();
this.eventSource = null;
}
if (this.reconnectAttempt < this.config.maxReconnectAttempts!) {
this.setState(ConnectionState.RECONNECTING);
this.scheduleReconnect();
} else {
this.setState(ConnectionState.DISCONNECTED);
this.debug(`已达到最大重连次数(${this.config.maxReconnectAttempts}),停止重连`);
}
}
/**
* 调度重连
*/
private scheduleReconnect(): void {
const delay = this.calculateReconnectDelay();
this.reconnectAttempt++;
this.debug(`计划${delay}ms后进行第${this.reconnectAttempt}次重连`);
this.reconnectTimer = setTimeout(async () => {
try {
await this.connect();
} catch (error) {
this.debug(`重连失败: ${error}`);
this.handleDisconnect('重连失败');
}
}, delay);
}
/**
* 计算重连延迟(指数退避)
*/
private calculateReconnectDelay(): number {
const delay = Math.min(
this.config.reconnectBaseDelay! * Math.pow(2, this.reconnectAttempt - 1),
this.config.reconnectMaxDelay!
);
// 添加随机抖动
return delay + Math.random() * delay * 0.1;
}
/**
* 清除重连定时器
*/
private clearReconnectTimer(): void {
if (this.reconnectTimer) {
clearTimeout(this.reconnectTimer);
this.reconnectTimer = null;
}
}
/**
* 启动心跳
*/
private startHeartbeat(): void {
this.stopHeartbeat();
this.heartbeatTimer = setInterval(() => {
if (this.isConnected()) {
this.debug('发送心跳');
}
}, this.config.heartbeatInterval!);
}
/**
* 停止心跳
*/
private stopHeartbeat(): void {
if (this.heartbeatTimer) {
clearInterval(this.heartbeatTimer);
this.heartbeatTimer = null;
}
}
/**
* 设置连接状态
*/
private setState(state: ConnectionState): void {
if (this.state !== state) {
const prevState = this.state;
this.state = state;
this.notifyStateChange(prevState);
}
}
/**
* 通知状态变化
*/
private notifyStateChange(prevState: ConnectionState): void {
this.stateListeners.forEach(listener => listener(this.state, prevState));
}
/**
* 通知错误
*/
private notifyError(error: Error): void {
this.errorListeners.forEach(listener => listener(error));
}
/**
* 解析错误
*/
private parseError(event: Event): Error {
if (event instanceof CloseEvent) {
return new Error(`连接关闭: ${event.reason || '未知原因'}, 代码: ${event.code}`);
}
return new Error('SSE连接错误');
}
/**
* 调试日志
*/
private debug(message: string): void {
if (this.config.debug) {
console.log(`[SSE Connection] ${message}`);
}
}
}
3.2 消息队列管理器实现
/**
* 消息队列项接口
*/
interface QueueItem {
/** 消息对象 */
message: Message;
/** 入队时间戳 */
enqueuedAt: number;
/** 优先级 */
priority: MessagePriority;
/** 重试次数 */
retryCount: number;
/** 回调函数 */
callback?: MessageCallback;
}
/**
* 消息回调类型
*/
type MessageCallback = {
onSuccess?: (message: Message) => void;
onError?: (error: Error, message: Message) => void;
onTimeout?: (message: Message) => void;
};
/**
* 消息队列管理器
* 负责消息的缓存、优先级排序和可靠投递
*/
export class MessageQueueManager {
private queue: QueueItem[] = [];
private processing = false;
private maxSize: number;
private processInterval: number = 100;
private retryDelays: number[] = [1000, 2000, 5000, 10000, 30000];
private stats = {
totalEnqueued: 0,
totalProcessed: 0,
totalFailed: 0,
totalRetried: 0
};
constructor(maxSize: number = 100) {
this.maxSize = maxSize;
}
/**
* 入队消息
*/
public enqueue(
message: Message,
options: {
priority?: MessagePriority;
callback?: MessageCallback;
} = {}
): boolean {
if (this.queue.length >= this.maxSize) {
console.warn(`消息队列已满(${this.maxSize}),丢弃低优先级消息`);
this.evictLowPriority();
if (this.queue.length >= this.maxSize) {
return false;
}
}
const item: QueueItem = {
message,
enqueuedAt: Date.now(),
priority: options.priority ?? message.priority,
retryCount: 0,
callback: options.callback
};
this.queue.push(item);
this.sortByPriority();
this.stats.totalEnqueued++;
this.debug(`消息入队: ${message.id}, 优先级: ${item.priority}, 队列长度: ${this.queue.length}`);
return true;
}
/**
* 出队消息
*/
public dequeue(): QueueItem | null {
return this.queue.shift() || null;
}
/**
* 查看队首消息
*/
public peek(): QueueItem | null {
return this.queue[0] || null;
}
/**
* 获取队列长度
*/
public size(): number {
return this.queue.length;
}
/**
* 清空队列
*/
public clear(): void {
this.queue = [];
}
/**
* 检查队列是否为空
*/
public isEmpty(): boolean {
return this.queue.length === 0;
}
/**
* 获取队列统计信息
*/
public getStats() {
return { ...this.stats, queueSize: this.queue.length };
}
/**
* 按优先级排序(降序)
*/
private sortByPriority(): void {
this.queue.sort((a, b) => {
if (a.priority !== b.priority) {
return b.priority - a.priority;
}
return a.enqueuedAt - b.enqueuedAt;
});
}
/**
* 驱逐低优先级消息
*/
private evictLowPriority(): void {
let minPriority = MessagePriority.LOW;
let minIndex = -1;
for (let i = 0; i < this.queue.length; i++) {
if (this.queue[i].priority < minPriority) {
minPriority = this.queue[i].priority;
minIndex = i;
}
}
if (minIndex !== -1) {
const removed = this.queue.splice(minIndex, 1);
this.debug(`驱逐低优先级消息: ${removed[0].message.id}`);
}
}
/**
* 获取消息的重试延迟
*/
public getRetryDelay(retryCount: number): number {
return this.retryDelays[Math.min(retryCount, this.retryDelays.length - 1)];
}
/**
* 增加重试次数
*/
public incrementRetry(item: QueueItem): boolean {
item.retryCount++;
this.stats.totalRetried++;
if (item.retryCount >= this.retryDelays.length) {
this.debug(`消息 ${item.message.id} 已达到最大重试次数`);
return false;
}
return true;
}
/**
* 处理失败消息
*/
public handleFailure(item: QueueItem, error: Error): void {
this.stats.totalFailed++;
if (item.callback?.onError) {
item.callback.onError(error, item.message);
}
}
/**
* 处理成功消息
*/
public handleSuccess(item: QueueItem): void {
this.stats.totalProcessed++;
if (item.callback?.onSuccess) {
item.callback.onSuccess(item.message);
}
}
/**
* 获取可处理的消息列表
*/
public getProcessableItems(): QueueItem[] {
const now = Date.now();
return this.queue.filter(item => {
const cmdMsg = item.message as CommandMessage;
if (cmdMsg.type === MessageType.COMMAND && cmdMsg.payload.requireAck) {
const timeout = cmdMsg.payload.timeout || 30000;
if (now - item.enqueuedAt > timeout) {
if (item.callback?.onTimeout) {
item.callback.onTimeout(item.message);
}
return false;
}
}
return true;
});
}
/**
* 移除指定消息
*/
public remove(messageId: string): boolean {
const index = this.queue.findIndex(item => item.message.id === messageId);
if (index !== -1) {
this.queue.splice(index, 1);
return true;
}
return false;
}
/**
* 获取指定消息
*/
public get(messageId: string): QueueItem | undefined {
return this.queue.find(item => item.message.id === messageId);
}
/**
* 调试日志
*/
private debug(message: string): void {
console.log(`[MessageQueue] ${message}`);
}
}
3.3 事件分发器实现
/**
* 订阅者接口
*/
interface Subscriber {
/** 订阅者ID */
id: string;
/** 消息类型过滤 */
messageTypes: MessageType[];
/** 消息处理器 */
handler: MessageHandler;
/** 优先级 */
priority: number;
/** 是否一次性订阅 */
once?: boolean;
}
/**
* 事件分发器
* 负责将接收到的消息分发给相应的订阅者
*/
export class EventDispatcher {
private subscribers: Map<string, Subscriber> = new Map();
private defaultHandlers: EventHandlerMap = {};
private messageHistory: Message[] = [];
private maxHistorySize = 50;
private stats = {
totalDispatched: 0,
totalHandled: 0,
totalErrors: 0
};
constructor() {
// 初始化默认处理器
}
/**
* 订阅消息
*/
public subscribe(
messageTypes: MessageType[],
handler: MessageHandler,
options: {
id?: string;
priority?: number;
once?: boolean;
} = {}
): string {
const id = options.id || this.generateId();
const subscriber: Subscriber = {
id,
messageTypes,
handler,
priority: options.priority ?? 0,
once: options.once
};
this.subscribers.set(id, subscriber);
this.debug(`订阅者 ${id} 订阅了: ${messageTypes.join(', ')}`);
return id;
}
/**
* 取消订阅
*/
public unsubscribe(subscriptionId: string): boolean {
const deleted = this.subscribers.delete(subscriptionId);
if (deleted) {
this.debug(`取消订阅: ${subscriptionId}`);
}
return deleted;
}
/**
* 订阅一次性消息
*/
public subscribeOnce(
messageType: MessageType,
handler: MessageHandler
): string {
return this.subscribe([messageType], handler, { once: true });
}
/**
* 设置默认处理器
*/
public setDefaultHandlers(handlers: EventHandlerMap): void {
this.defaultHandlers = { ...this.defaultHandlers, ...handlers };
}
/**
* 分发消息
*/
public async dispatch(message: Message): Promise<void> {
this.stats.totalDispatched++;
this.addToHistory(message);
this.debug(`分发消息: ${message.id}, 类型: ${message.type}`);
const sortedSubscribers = this.getSortedSubscribers(message.type);
for (const subscriber of sortedSubscribers) {
try {
await this.executeHandler(subscriber, message);
this.stats.totalHandled++;
if (subscriber.once) {
this.unsubscribe(subscriber.id);
}
} catch (error) {
this.stats.totalErrors++;
console.error(`消息处理错误 (${subscriber.id}):`, error);
}
}
await this.callDefaultHandler(message);
}
/**
* 获取指定类型的已排序订阅者
*/
private getSortedSubscribers(messageType: MessageType): Subscriber[] {
return Array.from(this.subscribers.values())
.filter(sub => sub.messageTypes.includes(messageType))
.sort((a, b) => b.priority - a.priority);
}
/**
* 执行处理器
*/
private async executeHandler(subscriber: Subscriber, message: Message): Promise<void> {
const result = subscriber.handler(message);
if (result instanceof Promise) {
await result;
}
}
/**
* 调用默认处理器
*/
private async callDefaultHandler(message: Message): Promise<void> {
const handler = this.defaultHandlers[message.type];
if (handler) {
try {
await this.executeHandler(
{ id: 'default', messageTypes: [message.type], handler, priority: -1 },
message
);
} catch (error) {
console.error('默认处理器错误:', error);
}
}
}
/**
* 添加到历史记录
*/
private addToHistory(message: Message): void {
this.messageHistory.push(message);
if (this.messageHistory.length > this.maxHistorySize) {
this.messageHistory.shift();
}
}
/**
* 获取消息历史
*/
public getHistory(type?: MessageType): Message[] {
if (type) {
return this.messageHistory.filter(m => m.type === type);
}
return [...this.messageHistory];
}
/**
* 获取统计信息
*/
public getStats() {
return {
...this.stats,
subscriberCount: this.subscribers.size
};
}
/**
* 生成唯一ID
*/
private generateId(): string {
return `sub_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`;
}
/**
* 清空所有订阅
*/
public clear(): void {
this.subscribers.clear();
}
/**
* 获取订阅者数量
*/
public getSubscriberCount(): number {
return this.subscribers.size;
}
/**
* 调试日志
*/
private debug(message: string): void {
console.log(`[EventDispatcher] ${message}`);
}
}
3.4 HTTP请求客户端实现
/**
* 请求配置接口
*/
interface RequestConfig {
/** 请求路径 */
path: string;
/** 请求方法 */
method?: 'GET' | 'POST' | 'PUT' | 'DELETE';
/** 请求头 */
headers?: Record<string, string>;
/** 请求体 */
body?: any;
/** 超时时间(毫秒) */
timeout?: number;
/** 是否需要认证 */
withAuth?: boolean;
}
/**
* HTTP响应接口
*/
interface HttpResponse<T = any> {
/** 状态码 */
status: number;
/** 状态文本 */
statusText: string;
/** 响应数据 */
data: T;
/** 响应头 */
headers: Record<string, string>;
}
/**
* HTTP请求客户端
* 负责客户端到服务器的消息发送
*/
export class HttpClient {
private baseUrl: string;
private defaultHeaders: Record<string, string>;
private defaultTimeout: number;
private authToken?: string;
constructor(
baseUrl: string,
options: {
headers?: Record<string, string>;
timeout?: number;
authToken?: string;
} = {}
) {
this.baseUrl = baseUrl.replace(/\/$/, '');
this.defaultHeaders = {
'Content-Type': 'application/json',
'Accept': 'application/json',
...options.headers
};
this.defaultTimeout = options.timeout || 10000;
this.authToken = options.authToken;
}
/**
* 设置认证令牌
*/
public setAuthToken(token: string): void {
this.authToken = token;
}
/**
* 清除认证令牌
*/
public clearAuthToken(): void {
this.authToken = undefined;
}
/**
* GET请求
*/
public async get<T = any>(
path: string,
params?: Record<string, string>,
options?: Partial<Omit<RequestConfig, 'path' | 'method' | 'body'>>
): Promise<HttpResponse<T>> {
let url = `${this.baseUrl}${path}`;
if (params) {
const searchParams = new URLSearchParams(params);
url += `?${searchParams.toString()}`;
}
return this.request<T>({
path,
method: 'GET',
...options
}, url);
}
/**
* POST请求
*/
public async post<T = any>(
path: string,
body?: any,
options?: Partial<Omit<RequestConfig, 'path' | 'method'>>
): Promise<HttpResponse<T>> {
return this.request<T>({
path,
method: 'POST',
body,
...options
});
}
/**
* PUT请求
*/
public async put<T = any>(
path: string,
body?: any,
options?: Partial<Omit<RequestConfig, 'path' | 'method'>>
): Promise<HttpResponse<T>> {
return this.request<T>({
path,
method: 'PUT',
body,
...options
});
}
/**
* DELETE请求
*/
public async delete<T = any>(
path: string,
options?: Partial<Omit<RequestConfig, 'path' | 'method'>>
): Promise<HttpResponse<T>> {
return this.request<T>({
path,
method: 'DELETE',
...options
});
}
/**
* 发送消息
*/
public async sendMessage<T = any>(
message: Message,
options?: {
path?: string;
timeout?: number;
}
): Promise<HttpResponse<T>> {
return this.post<T>(
options?.path || '/api/messages',
message,
{ timeout: options?.timeout }
);
}
/**
* 发送指令
*/
public async sendCommand<T = any>(
command: CommandMessage,
options?: {
timeout?: number;
retry?: boolean;
}
): Promise<HttpResponse<T>> {
const response = await this.post<T>(
'/api/commands',
command,
{ timeout: options?.timeout }
);
return response;
}
/**
* 核心请求方法
*/
private async request<T>(
config: RequestConfig,
url?: string
): Promise<HttpResponse<T>> {
const requestUrl = url || `${this.baseUrl}${config.path}`;
const headers = { ...this.defaultHeaders, ...config.headers };
if (config.withAuth !== false && this.authToken) {
headers['Authorization'] = `Bearer ${this.authToken}`;
}
const controller = new AbortController();
const timeout = config.timeout || this.defaultTimeout;
const timeoutId = setTimeout(() => controller.abort(), timeout);
try {
const response = await fetch(requestUrl, {
method: config.method || 'GET',
headers,
body: config.body ? JSON.stringify(config.body) : undefined,
signal: controller.signal
});
clearTimeout(timeoutId);
const responseText = await response.text();
let data: T;
try {
data = JSON.parse(responseText) as T;
} catch {
data = responseText as unknown as T;
}
const httpResponse: HttpResponse<T> = {
status: response.status,
statusText: response.statusText,
data,
headers: this.parseHeaders(response.headers)
};
if (!response.ok) {
throw new HttpError(
httpResponse.status,
httpResponse.statusText,
httpResponse.data
);
}
return httpResponse;
} catch (error) {
clearTimeout(timeoutId);
if (error instanceof Error && error.name === 'AbortError') {
throw new HttpError(408, 'Request Timeout', null);
}
throw error;
}
}
/**
* 解析响应头
*/
private parseHeaders(headers: Headers): Record<string, string> {
const result: Record<string, string> = {};
headers.forEach((value, key) => {
result[key] = value;
});
return result;
}
}
/**
* HTTP错误类
*/
export class HttpError extends Error {
constructor(
public status: number,
public statusText: string,
public data: any
) {
super(`HTTP ${status}: ${statusText}`);
this.name = 'HttpError';
}
}
3.5 SDK主类实现
/**
* SSE SDK主类
* 提供统一的消息通知接口
*/
export class SSENotificationSDK {
private connectionManager: SSEConnectionManager;
private messageQueue: MessageQueueManager;
private eventDispatcher: EventDispatcher;
private httpClient: HttpClient;
private config: SSKSConfig;
private messageIdGenerator: Iterator<string>;
private disposed = false;
/**
* 构造函数
*/
constructor(config: SSKSConfig) {
this.config = { ...DEFAULT_CONFIG, ...config };
this.connectionManager = new SSEConnectionManager(this.config);
this.messageQueue = new MessageQueueManager(this.config.messageQueueSize);
this.eventDispatcher = new EventDispatcher();
this.httpClient = new HttpClient(this.config.endpoint, {
authToken: this.config.authToken,
headers: this.config.headers
});
this.messageIdGenerator = this.createMessageIdGenerator();
this.setupInternalHandlers();
if (this.config.autoConnect) {
this.connect();
}
}
/**
* 连接到服务器
*/
public async connect(): Promise<void> {
if (this.disposed) {
throw new Error('SDK已释放,无法连接');
}
await this.connectionManager.connect();
}
/**
* 断开连接
*/
public disconnect(reason?: string): void {
this.connectionManager.disconnect(reason);
}
/**
* 检查是否已连接
*/
public isConnected(): boolean {
return this.connectionManager.isConnected();
}
/**
* 获取连接状态
*/
public getConnectionState(): ConnectionState {
return this.connectionManager.getState();
}
/**
* 订阅消息
*/
public subscribe(
messageTypes: MessageType[],
handler: MessageHandler,
options?: {
id?: string;
priority?: number;
once?: boolean;
}
): string {
return this.eventDispatcher.subscribe(messageTypes, handler, options);
}
/**
* 取消订阅
*/
public unsubscribe(subscriptionId: string): boolean {
return this.eventDispatcher.unsubscribe(subscriptionId);
}
/**
* 订阅一次性消息
*/
public subscribeOnce(messageType: MessageType, handler: MessageHandler): string {
return this.eventDispatcher.subscribeOnce(messageType, handler);
}
/**
* 发送消息到服务器
*/
public async send(message: Message): Promise<void> {
const result = this.messageQueue.enqueue(message);
if (!result) {
throw new Error('消息队列已满');
}
try {
await this.httpClient.sendMessage(message);
const item = this.messageQueue.get(message.id);
if (item) {
this.messageQueue.handleSuccess(item);
}
} catch (error) {
const item = this.messageQueue.get(message.id);
if (item) {
this.messageQueue.handleFailure(item, error as Error);
}
throw error;
}
}
/**
* 发送文本消息
*/
public async sendText(
content: string,
options?: {
target?: string;
metadata?: Record<string, any>;
}
): Promise<void> {
const message: Message = {
id: this.generateMessageId(),
type: MessageType.TEXT,
priority: MessagePriority.NORMAL,
timestamp: Date.now(),
source: this.config.clientId,
target: options?.target,
payload: { content },
metadata: options?.metadata
};
await this.send(message);
}
/**
* 发送JSON消息
*/
public async sendJSON(
data: any,
options?: {
target?: string;
metadata?: Record<string, any>;
}
): Promise<void> {
const message: Message = {
id: this.generateMessageId(),
type: MessageType.JSON,
priority: MessagePriority.NORMAL,
timestamp: Date.now(),
source: this.config.clientId,
target: options?.target,
payload: data,
metadata: options?.metadata
};
await this.send(message);
}
/**
* 发送指令消息
*/
public async sendCommand(
action: CommandAction,
params?: Record<string, any>,
options?: {
taskId?: string;
timeout?: number;
requireAck?: boolean;
priority?: MessagePriority;
}
): Promise<string> {
const commandId = this.generateMessageId();
const command: CommandMessage = {
id: this.generateMessageId(),
type: MessageType.COMMAND,
priority: options?.priority || MessagePriority.HIGH,
timestamp: Date.now(),
source: this.config.clientId,
payload: {
commandId,
action,
taskId: options?.taskId,
params,
timeout: options?.timeout,
requireAck: options?.requireAck !== false
}
};
await this.send(command);
return commandId;
}
/**
* 发送进度查询请求
*/
public async queryProgress(taskId: string): Promise<void> {
await this.sendCommand(CommandAction.QUERY, { taskId }, {
priority: MessagePriority.LOW
});
}
/**
* 取消任务
*/
public async cancelTask(taskId: string): Promise<void> {
await this.sendCommand(CommandAction.CANCEL, { taskId });
}
/**
* 设置连接状态变化处理器
*/
public onConnectionChange(
callback: (state: ConnectionState, prevState: ConnectionState) => void
): void {
this.connectionManager.addStateListener(callback);
}
/**
* 设置错误处理器
*/
public onError(callback: (error: Error) => void): void {
this.connectionManager.addErrorListener(callback);
}
/**
* 设置默认消息处理器
*/
public setDefaultHandlers(handlers: EventHandlerMap): void {
this.eventDispatcher.setDefaultHandlers(handlers);
}
/**
* 获取消息历史
*/
public getMessageHistory(type?: MessageType): Message[] {
return this.eventDispatcher.getHistory(type);
}
/**
* 获取统计信息
*/
public getStats() {
return {
connection: {
state: this.getConnectionState(),
isConnected: this.isConnected()
},
queue: this.messageQueue.getStats(),
dispatcher: this.eventDispatcher.getStats()
};
}
/**
* 设置认证令牌
*/
public setAuthToken(token: string): void {
this.httpClient.setAuthToken(token);
}
/**
* 释放SDK资源
*/
public dispose(): void {
if (this.disposed) return;
this.disposed = true;
this.disconnect('SDK释放');
this.messageQueue.clear();
this.eventDispatcher.clear();
}
/**
* 设置内部事件处理
*/
private setupInternalHandlers(): void {
this.connectionManager.addMessageListener(async (event: MessageEvent) => {
try {
const message = JSON.parse(event.data) as Message;
await this.eventDispatcher.dispatch(message);
} catch (error) {
console.error('解析消息失败:', error);
}
});
this.connectionManager.addStateListener((state, prevState) => {
this.debug(`连接状态变化: ${prevState} -> ${state}`);
if (state === ConnectionState.CONNECTED) {
const systemMessage: SystemMessage = {
id: this.generateMessageId(),
type: MessageType.SYSTEM,
priority: MessagePriority.HIGH,
timestamp: Date.now(),
source: 'sdk',
payload: { event: 'connect' }
};
this.eventDispatcher.dispatch(systemMessage);
} else if (state === ConnectionState.DISCONNECTED && prevState === ConnectionState.CONNECTED) {
const systemMessage: SystemMessage = {
id: this.generateMessageId(),
type: MessageType.SYSTEM,
priority: MessagePriority.HIGH,
timestamp: Date.now(),
source: 'sdk',
payload: { event: 'disconnect' }
};
this.eventDispatcher.dispatch(systemMessage);
}
});
this.connectionManager.addErrorListener((error) => {
const errorMessage: SystemMessage = {
id: this.generateMessageId(),
type: MessageType.SYSTEM,
priority: MessagePriority.CRITICAL,
timestamp: Date.now(),
source: 'sdk',
payload: {
event: 'error',
detail: error.message
}
};
this.eventDispatcher.dispatch(errorMessage);
});
}
/**
* 创建消息ID生成器
*/
private createMessageIdGenerator(): Iterator<string> {
let counter = 0;
return {
next(): IteratorResult<string> {
const timestamp = Date.now().toString(36);
const random = Math.random().toString(36).substr(2, 5);
counter++;
return {
value: `${timestamp}_${random}_${counter}`,
done: false
};
}
};
}
/**
* 生成消息ID
*/
private generateMessageId(): string {
return this.messageIdGenerator.next().value;
}
/**
* 调试日志
*/
private debug(message: string): void {
if (this.config.debug) {
console.log(`[SSENotificationSDK] ${message}`);
}
}
}
// 创建SDK的工厂函数
export function createSSENotificationSDK(config: SSKSConfig): SSENotificationSDK {
return new SSENotificationSDK(config);
}
4. 后端服务实现
4.1 Node.js/Express后端实现
import express, { Request, Response, Router } from 'express';
import cors from 'cors';
/**
* SSE连接信息接口
*/
interface SSEConnection {
id: string;
clientId: string;
createdAt: number;
lastActiveAt: number;
response: Response;
closed: boolean;
}
/**
* 消息接口
*/
interface Message {
id: string;
type: string;
priority: number;
timestamp: number;
source: string;
target?: string;
payload: any;
metadata?: Record<string, any>;
}
/**
* SSE消息服务
* 管理所有SSE连接和消息路由
*/
class SSEMessageService {
private connections: Map<string, SSEConnection> = new Map();
private messageQueue: Map<string, Message[]> = new Map();
private heartbeatInterval: number = 30000;
private heartbeatTimer: NodeJS.Timeout | null = null;
private cleanupInterval: number = 60000;
private cleanupTimer: NodeJS.Timeout | null = null;
private app: express.Application;
private router: Router;
constructor() {
this.app = express();
this.router = Router();
this.setupMiddleware();
this.setupRoutes();
this.startHeartbeat();
this.startCleanup();
}
/**
* 设置中间件
*/
private setupMiddleware(): void {
this.app.use(cors());
this.app.use(express.json());
this.app.use((req, res, next) => {
console.log(`[${new Date().toISOString()}] ${req.method} ${req.path}`);
next();
});
}
/**
* 设置路由
*/
private setupRoutes(): void {
// SSE连接端点
this.router.get('/sse', (req, res) => {
this.handleSSEConnection(req, res);
});
// 发送消息端点
this.router.post('/api/messages', (req, res) => {
this.handleSendMessage(req, res);
});
// 发送指令端点
this.router.post('/api/commands', (req, res) => {
this.handleSendCommand(req, res);
});
// 广播消息端点
this.router.post('/api/broadcast', (req, res) => {
this.handleBroadcast(req, res);
});
// 获取连接列表
this.router.get('/api/connections', (req, res) => {
res.json(this.getConnectionList());
});
// 发送定向消息
this.router.post('/api/direct/:clientId', (req, res) => {
this.handleDirectMessage(req, res);
});
this.app.use(this.router);
}
/**
* 处理SSE连接
*/
private handleSSEConnection(req: Request, res: Response): void {
const clientId = req.query.clientId as string;
const token = req.query.token as string;
if (!clientId) {
res.status(400).json({ error: '缺少clientId参数' });
return;
}
if (token && !this.validateToken(token)) {
res.status(401).json({ error: '无效的认证令牌' });
return;
}
res.writeHead(200, {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive',
'X-Accel-Buffering': 'no'
});
const connection: SSEConnection = {
id: this.generateConnectionId(),
clientId,
createdAt: Date.now(),
lastActiveAt: Date.now(),
response: res,
closed: false
};
this.connections.set(clientId, connection);
console.log(`[SSE] 新连接: ${clientId}, 总连接数: ${this.connections.size}`);
this.messageQueue.set(clientId, []);
this.sendToClient(clientId, {
id: this.generateMessageId(),
type: 'system',
priority: 3,
timestamp: Date.now(),
source: 'server',
payload: {
event: 'connect',
detail: { clientId, connectionId: connection.id }
}
});
req.on('close', () => {
this.handleConnectionClose(clientId);
});
req.on('error', (error) => {
console.error(`[SSE] 连接错误 (${clientId}):`, error);
this.handleConnectionClose(clientId);
});
const keepAliveTimer = setInterval(() => {
if (!connection.closed && connection.response.writable) {
res.write(': keepalive\n\n');
} else {
clearInterval(keepAliveTimer);
}
}, 15000);
connection.response.on('close', () => {
clearInterval(keepAliveTimer);
});
}
/**
* 处理发送消息
*/
private async handleSendMessage(req: Request, res: Response): Promise<void> {
try {
const message: Message = req.body;
if (!message.id || !message.type) {
res.status(400).json({ error: '无效的消息格式' });
return;
}
console.log(`[Message] 收到消息: ${message.id}, 类型: ${message.type}`);
if (message.target) {
const sent = await this.sendToClient(message.target, message);
if (sent) {
res.json({ success: true, messageId: message.id });
} else {
res.status(404).json({ error: '目标客户端未连接' });
}
return;
}
this.broadcastMessage(message);
res.json({ success: true, messageId: message.id });
} catch (error) {
console.error('[Message] 处理消息失败:', error);
res.status(500).json({ error: '处理消息失败' });
}
}
/**
* 处理发送指令
*/
private async handleSendCommand(req: Request, res: Response): Promise<void> {
try {
const command: Message = req.body;
if (command.payload?.requireAck) {
const target = command.target;
if (target) {
this.messageQueue.get(target)?.push(command);
}
}
if (command.target) {
const sent = await this.sendToClient(command.target, command);
res.json({ success: sent, messageId: command.id });
} else {
this.broadcastMessage(command);
res.json({ success: true, messageId: command.id });
}
} catch (error) {
console.error('[Command] 处理指令失败:', error);
res.status(500).json({ error: '处理指令失败' });
}
}
/**
* 处理广播消息
*/
private handleBroadcast(req: Request, res: Response): void {
const { message, excludeClientId } = req.body;
if (!message) {
res.status(400).json({ error: '缺少消息内容' });
return;
}
const fullMessage: Message = {
id: this.generateMessageId(),
type: 'broadcast',
priority: message.priority || 1,
timestamp: Date.now(),
source: 'server',
...message
};
let sentCount = 0;
this.connections.forEach((conn, clientId) => {
if (clientId !== excludeClientId && !conn.closed) {
if (this.sendToClient(clientId, fullMessage)) {
sentCount++;
}
}
});
res.json({ success: true, sentCount, messageId: fullMessage.id });
}
/**
* 处理定向消息
*/
private handleDirectMessage(req: Request, res: Response): void {
const { clientId } = req.params;
const message: Message = req.body;
if (!clientId) {
res.status(400).json({ error: '缺少目标客户端ID' });
return;
}
const sent = this.sendToClient(clientId, {
id: this.generateMessageId(),
timestamp: Date.now(),
source: 'server',
...message
});
if (sent) {
res.json({ success: true });
} else {
res.status(404).json({ error: '目标客户端未连接' });
}
}
/**
* 发送消息到指定客户端
*/
private sendToClient(clientId: string, message: Message): boolean {
const connection = this.connections.get(clientId);
if (!connection || connection.closed) {
console.warn(`[SSE] 客户端未连接: ${clientId}`);
return false;
}
try {
const eventType = message.type || 'message';
const data = JSON.stringify(message);
connection.response.write(`event: ${eventType}\n`);
connection.response.write(`data: ${data}\n\n`);
connection.lastActiveAt = Date.now();
console.log(`[SSE] 发送消息到 ${clientId}: ${message.id}`);
return true;
} catch (error) {
console.error(`[SSE] 发送消息失败 (${clientId}):`, error);
this.handleConnectionClose(clientId);
return false;
}
}
/**
* 广播消息到所有客户端
*/
private broadcastMessage(message: Message): void {
let sentCount = 0;
this.connections.forEach((conn, clientId) => {
if (!conn.closed) {
if (this.sendToClient(clientId, message)) {
sentCount++;
}
}
});
console.log(`[Broadcast] 广播消息 ${message.id} 到 ${sentCount} 个客户端`);
}
/**
* 处理连接关闭
*/
private handleConnectionClose(clientId: string): void {
const connection = this.connections.get(clientId);
if (connection) {
connection.closed = true;
this.connections.delete(clientId);
this.messageQueue.delete(clientId);
console.log(`[SSE] 连接关闭: ${clientId}, 剩余连接: ${this.connections.size}`);
}
}
/**
* 获取连接列表
*/
private getConnectionList(): any[] {
return Array.from(this.connections.entries()).map(([clientId, conn]) => ({
clientId,
connectionId: conn.id,
createdAt: conn.createdAt,
lastActiveAt: conn.lastActiveAt,
uptime: Date.now() - conn.createdAt
}));
}
/**
* 验证认证令牌
*/
private validateToken(token: string): boolean {
return token && token.length > 0;
}
/**
* 生成连接ID
*/
private generateConnectionId(): string {
return `conn_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`;
}
/**
* 生成消息ID
*/
private generateMessageId(): string {
return `msg_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`;
}
/**
* 启动心跳
*/
private startHeartbeat(): void {
this.heartbeatTimer = setInterval(() => {
this.connections.forEach((conn, clientId) => {
if (!conn.closed && conn.response.writable) {
const heartbeat = {
id: this.generateMessageId(),
type: 'heartbeat',
priority: 0,
timestamp: Date.now(),
source: 'server',
payload: { serverTime: Date.now() }
};
conn.response.write(`event: heartbeat\n`);
conn.response.write(`data: ${JSON.stringify(heartbeat)}\n\n`);
conn.lastActiveAt = Date.now();
}
});
}, this.heartbeatInterval);
}
/**
* 启动清理任务
*/
private startCleanup(): void {
this.cleanupTimer = setInterval(() => {
const now = Date.now();
const staleThreshold = 300000;
this.connections.forEach((conn, clientId) => {
if (now - conn.lastActiveAt > staleThreshold) {
console.log(`[Cleanup] 清理超时连接: ${clientId}`);
conn.response.end();
this.handleConnectionClose(clientId);
}
});
}, this.cleanupInterval);
}
/**
* 获取Express应用实例
*/
public getApp(): express.Application {
return this.app;
}
/**
* 停止服务
*/
public stop(): void {
if (this.heartbeatTimer) {
clearInterval(this.heartbeatTimer);
}
if (this.cleanupTimer) {
clearInterval(this.cleanupTimer);
}
this.connections.forEach((conn) => {
conn.response.end();
});
this.connections.clear();
this.messageQueue.clear();
}
}
// 创建并启动服务
const sseService = new SSEMessageService();
const PORT = process.env.PORT || 3000;
const server = sseService.getApp().listen(PORT, () => {
console.log(`[Server] SSE消息服务已启动,监听端口: ${PORT}`);
});
process.on('SIGTERM', () => {
console.log('[Server] 收到SIGTERM信号,正在关闭...');
sseService.stop();
server.close(() => {
console.log('[Server] 服务器已关闭');
process.exit(0);
});
});
export { SSEMessageService, SSEConnection, Message };
4.2 并发指令调度器实现
/**
* 指令任务接口
*/
interface CommandTask {
taskId: string;
commandId: string;
clientId: string;
action: string;
params: Record<string, any>;
createdAt: number;
startedAt?: number;
completedAt?: number;
status: 'pending' | 'running' | 'completed' | 'cancelled' | 'failed';
result?: any;
error?: string;
}
/**
* 指令调度器接口
*/
interface SchedulerConfig {
maxConcurrent: number;
defaultTimeout: number;
enablePriority: boolean;
maxRetries: number;
}
/**
* 并发指令调度器
* 负责管理和调度并发执行的指令任务
*/
class CommandScheduler {
private tasks: Map<string, CommandTask> = new Map();
private pendingQueue: CommandTask[] = [];
private runningTasks: Map<string, CommandTask> = new Map();
private completedTasks: Map<string, CommandTask> = new Map();
private config: SchedulerConfig;
private messageService: SSEMessageService;
private taskTimer: NodeJS.Timeout | null = null;
private taskHandlers: Map<string, (task: CommandTask) => Promise<any>> = new Map();
constructor(messageService: SSEMessageService, config?: Partial<SchedulerConfig>) {
this.messageService = messageService;
this.config = {
maxConcurrent: config?.maxConcurrent || 5,
defaultTimeout: config?.defaultTimeout || 60000,
enablePriority: config?.enablePriority ?? true,
maxRetries: config?.maxRetries || 3
};
this.startTaskProcessor();
}
/**
* 注册任务处理器
*/
public registerHandler(action: string, handler: (task: CommandTask) => Promise<any>): void {
this.taskHandlers.set(action, handler);
console.log(`[Scheduler] 注册处理器: ${action}`);
}
/**
* 提交任务
*/
public async submitTask(
command: {
commandId: string;
action: string;
params?: Record<string, any>;
clientId: string;
timeout?: number;
}
): Promise<string> {
const taskId = `task_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`;
const task: CommandTask = {
taskId,
commandId: command.commandId,
clientId: command.clientId,
action: command.action,
params: command.params || {},
createdAt: Date.now(),
status: 'pending'
};
this.tasks.set(taskId, task);
console.log(`[Scheduler] 提交任务: ${taskId}, 动作: ${command.action}`);
this.sendToClient(command.clientId, {
id: this.generateMessageId(),
type: 'command',
priority: 2,
timestamp: Date.now(),
source: 'scheduler',
payload: {
event: 'task_submitted',
taskId,
commandId: command.commandId
}
});
this.enqueueTask(task);
return taskId;
}
/**
* 取消任务
*/
public async cancelTask(taskId: string): Promise<boolean> {
const task = this.tasks.get(taskId);
if (!task) {
console.warn(`[Scheduler] 任务不存在: ${taskId}`);
return false;
}
if (task.status === 'completed') {
console.warn(`[Scheduler] 任务已完成,无法取消: ${taskId}`);
return false;
}
if (task.status === 'cancelled') {
return true;
}
task.status = 'cancelled';
task.completedAt = Date.now();
if (this.runningTasks.has(taskId)) {
this.runningTasks.delete(taskId);
}
this.pendingQueue = this.pendingQueue.filter(t => t.taskId !== taskId);
console.log(`[Scheduler] 任务已取消: ${taskId}`);
this.sendToClient(task.clientId, {
id: this.generateMessageId(),
type: 'command',
priority: 2,
timestamp: Date.now(),
source: 'scheduler',
payload: {
event: 'task_cancelled',
taskId,
commandId: task.commandId
}
});
return true;
}
/**
* 查询任务状态
*/
public getTaskStatus(taskId: string): CommandTask | null {
return this.tasks.get(taskId) || null;
}
/**
* 获取所有任务
*/
public getAllTasks(): CommandTask[] {
return Array.from(this.tasks.values());
}
/**
* 获取正在运行的任务
*/
public getRunningTasks(): CommandTask[] {
return Array.from(this.runningTasks.values());
}
/**
* 获取待处理任务数量
*/
public getPendingCount(): number {
return this.pendingQueue.length;
}
/**
* 入队任务
*/
private enqueueTask(task: CommandTask): void {
this.pendingQueue.push(task);
if (this.config.enablePriority) {
this.pendingQueue.sort((a, b) => {
const taskA = this.tasks.get(a.taskId)!;
const taskB = this.tasks.get(b.taskId)!;
return (taskB.params?.priority || 1) - (taskA.params?.priority || 1);
});
}
this.scheduleNext();
}
/**
* 调度下一个任务
*/
private scheduleNext(): void {
if (this.runningTasks.size >= this.config.maxConcurrent) {
return;
}
const task = this.pendingQueue.shift();
if (!task) {
return;
}
const currentTask = this.tasks.get(task.taskId);
if (currentTask?.status === 'cancelled') {
this.scheduleNext();
return;
}
this.executeTask(task);
}
/**
* 执行任务
*/
private async executeTask(task: CommandTask): Promise<void> {
task.status = 'running';
task.startedAt = Date.now();
this.runningTasks.set(task.taskId, task);
console.log(`[Scheduler] 开始执行任务: ${task.taskId}`);
const handler = this.taskHandlers.get(task.action);
if (!handler) {
task.status = 'failed';
task.error = `未找到动作处理器: ${task.action}`;
this.completeTask(task);
return;
}
const timeout = task.params?.timeout || this.config.defaultTimeout;
const timeoutTimer = setTimeout(() => {
if (task.status === 'running') {
this.cancelTask(task.taskId);
task.status = 'failed';
task.error = '任务执行超时';
this.completeTask(task);
}
}, timeout);
try {
task.result = await handler(task);
clearTimeout(timeoutTimer);
if (task.status === 'cancelled') {
return;
}
task.status = 'completed';
task.completedAt = Date.now();
console.log(`[Scheduler] 任务完成: ${task.taskId}`);
this.sendToClient(task.clientId, {
id: this.generateMessageId(),
type: 'command',
priority: 2,
timestamp: Date.now(),
source: 'scheduler',
payload: {
event: 'task_completed',
taskId: task.taskId,
commandId: task.commandId,
result: task.result
}
});
if (task.params?.requireAck) {
this.sendAck(task.commandId, task.clientId, true);
}
} catch (error) {
clearTimeout(timeoutTimer);
if (task.status === 'cancelled') {
return;
}
task.status = 'failed';
task.error = (error as Error).message;
task.completedAt = Date.now();
console.error(`[Scheduler] 任务失败: ${task.taskId}`, error);
this.sendToClient(task.clientId, {
id: this.generateMessageId(),
type: 'command',
priority: 2,
timestamp: Date.now(),
source: 'scheduler',
payload: {
event: 'task_failed',
taskId: task.taskId,
commandId: task.commandId,
error: task.error
}
});
if (this.shouldRetry(task)) {
console.log(`[Scheduler] 任务将重试: ${task.taskId}`);
task.status = 'pending';
task.createdAt = Date.now();
this.enqueueTask(task);
}
}
this.completeTask(task);
}
/**
* 判断是否应该重试
*/
private shouldRetry(task: CommandTask): boolean {
const retryCount = task.params?.retryCount || 0;
return retryCount < this.config.maxRetries && task.status === 'failed';
}
/**
* 完成任务处理
*/
private completeTask(task: CommandTask): void {
this.runningTasks.delete(task.taskId);
this.completedTasks.set(task.taskId, task);
this.scheduleNext();
}
/**
* 发送确认消息
*/
private sendAck(commandId: string, clientId: string, success: boolean): void {
this.sendToClient(clientId, {
id: this.generateMessageId(),
type: 'ack',
priority: 3,
timestamp: Date.now(),
source: 'scheduler',
payload: {
commandId,
success
}
});
}
/**
* 发送消息到客户端
*/
private sendToClient(clientId: string, message: any): void {
console.log(`[Scheduler] 发送到 ${clientId}:`, JSON.stringify(message));
}
/**
* 生成消息ID
*/
private generateMessageId(): string {
return `msg_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`;
}
/**
* 启动任务处理器
*/
private startTaskProcessor(): void {
this.taskTimer = setInterval(() => {
this.scheduleNext();
}, 100);
}
/**
* 停止任务处理器
*/
public stop(): void {
if (this.taskTimer) {
clearInterval(this.taskTimer);
this.taskTimer = null;
}
this.pendingQueue.forEach(task => {
task.status = 'cancelled';
});
this.pendingQueue = [];
this.runningTasks.clear();
}
}
export { CommandScheduler, CommandTask, SchedulerConfig };
5. 完整项目结构
5.1 前端SDK项目结构
sse-sdk/
├── src/
│ ├── core/
│ │ ├── connection.ts # 连接管理器
│ │ ├── dispatcher.ts # 事件分发器
│ │ ├── queue.ts # 消息队列
│ │ └── http.ts # HTTP客户端
│ ├── types/
│ │ ├── message.ts # 消息类型定义
│ │ ├── config.ts # 配置类型定义
│ │ └── events.ts # 事件类型定义
│ ├── sdk.ts # SDK主类
│ └── index.ts # 导出入口
├── tests/
│ ├── connection.test.ts # 连接测试
│ ├── queue.test.ts # 队列测试
│ └── dispatcher.test.ts # 分发器测试
├── package.json
├── tsconfig.json
└── README.md
5.2 后端服务项目结构
sse-server/
├── src/
│ ├── services/
│ │ ├── sse.service.ts # SSE服务
│ │ ├── command.scheduler.ts # 指令调度器
│ │ └── message.router.ts # 消息路由
│ ├── models/
│ │ ├── connection.ts # 连接模型
│ │ ├── message.ts # 消息模型
│ │ └── task.ts # 任务模型
│ ├── middleware/
│ │ ├── auth.ts # 认证中间件
│ │ └── logger.ts # 日志中间件
│ ├── routes/
│ │ └── index.ts # 路由配置
│ └── app.ts # 应用入口
├── package.json
├── tsconfig.json
└── README.md
6. 总结与最佳实践
6.1 核心设计要点
本SDK的核心设计遵循了几个重要原则:模块化架构将连接管理、消息队列、事件分发等功能分离为独立模块,便于维护和测试;消息类型系统通过TypeScript的强类型定义,确保了消息传递的类型安全;优先级队列机制保证了重要消息能够优先处理;自动重连机制提高了连接的可靠性;并发控制通过指令调度器实现对并发任务的合理调度。
6.2 使用建议
在实际项目中应用本SDK时,建议遵循以下最佳实践:连接管理方面,应在应用启动时初始化SDK并建立连接,同时监听连接状态变化以便及时处理异常;消息处理方面,为不同类型的消息注册专门的处理函数,避免在单个处理器中处理过多逻辑;资源清理方面,在组件卸载或应用退出时务必调用dispose方法释放资源;错误处理方面,应实现完善的错误处理和重试机制,确保关键消息的可靠传递。
6.3 扩展方向
本SDK还可以在以下方向进行扩展:加密传输支持添加消息加密功能以提高安全性;消息持久化支持将消息存储到数据库以支持消息历史查询;负载均衡支持添加多服务器支持以提高系统可用性;监控指标支持添加详细的性能监控指标便于运维分析。
本SDK提供了一个功能完备、易于使用的基础架构,开发者可以在此基础上根据具体业务需求进行定制和扩展。