20个例子掌握RxJS——第十一章实现 WebSocket 消息节流
RxJS 实战:WebSocket 连接管理与消息节流
概述
WebSocket 是一种全双工通信协议,允许服务器和客户端之间进行实时双向通信。在实际应用中,我们需要:
- 管理连接状态:处理连接、断开、重连等
- 控制消息频率:避免发送过于频繁的消息
- 错误处理:处理连接错误和消息错误
- 自动重连:连接断开后自动重连
本章将介绍如何使用 RxJS 管理 WebSocket 连接,并使用 throttleTime 实现消息节流。
WebSocket 基础
WebSocket 的特点
- 全双工通信:客户端和服务器可以同时发送和接收消息
- 低延迟:比 HTTP 轮询更高效
- 持久连接:建立连接后保持打开状态
- 实时性:适合实时通信场景
WebSocket 连接状态
-
CONNECTING (0):正在连接 -
OPEN (1):连接已打开 -
CLOSING (2):正在关闭 -
CLOSED (3):连接已关闭
实现思路
1. WebSocket 连接管理
// WebSocket 服务器地址
private readonly wsUrl = 'ws://localhost:8080/ws';
// WebSocket 连接
private ws: WebSocket | null = null;
// 连接状态
connectionStatus: 'disconnected' | 'connecting' | 'connected' = 'disconnected';
// 连接 WebSocket
connect(): void {
if (this.ws && this.ws.readyState === WebSocket.OPEN) {
return;
}
this.connectionStatus = 'connecting';
this.cdr.detectChanges();
try {
this.ws = new WebSocket(this.wsUrl);
// 连接打开
this.ws.onopen = () => {
console.log('WebSocket 连接已建立');
this.connectionStatus = 'connected';
this.cdr.detectChanges();
};
// 接收消息
this.ws.onmessage = (event) => {
try {
const message: WebSocketMessage = JSON.parse(event.data);
console.log('收到 WebSocket 消息:', message);
this.handleReceivedMessage(message);
} catch (error) {
console.error('解析 WebSocket 消息错误:', error);
}
};
// 连接关闭
this.ws.onclose = (event) => {
console.log('WebSocket 连接已关闭', event);
this.connectionStatus = 'disconnected';
this.cdr.detectChanges();
// 如果非正常关闭,尝试重连(可选)
if (event.code !== 1000) {
console.log('连接异常关闭,5秒后尝试重连...');
setTimeout(() => {
if (this.connectionStatus === 'disconnected') {
this.connect();
}
}, 5000);
}
};
// 连接错误
this.ws.onerror = (error) => {
console.error('WebSocket 错误:', error);
this.connectionStatus = 'disconnected';
this.cdr.detectChanges();
};
} catch (error) {
console.error('创建 WebSocket 连接失败:', error);
this.connectionStatus = 'disconnected';
this.cdr.detectChanges();
}
}
// 断开 WebSocket 连接
disconnect(): void {
if (this.ws) {
this.ws.close(1000, '正常关闭');
this.ws = null;
}
this.connectionStatus = 'disconnected';
this.cdr.detectChanges();
}
2. 消息发送节流
使用 throttleTime 限制消息发送频率:
// 消息发送 Subject(用于节流)
private messageSendSubject$ = new Subject<string>();
// 是否启用节流
throttleEnabled = true;
ngOnInit(): void {
// 设置消息发送节流(500ms 内最多发送一次)
this.messageSendSubject$
.pipe(
throttleTime(500), // 节流:每 500ms 最多发送一次
takeUntil(this.destroySubject$)
)
.subscribe((message) => {
// 只有在连接状态下才发送
if (this.ws && this.ws.readyState === WebSocket.OPEN) {
this.sendMessageToServer(message);
}
});
}
// 发送消息(点击按钮)
sendMessage(): void {
const message = this.messageInput.value?.trim() || '';
if (!message) {
return;
}
if (this.throttleEnabled) {
// 使用节流发送
this.messageSendSubject$.next(message);
} else {
// 直接发送
this.sendMessageToServer(message);
}
// 清空输入框
this.messageInput.setValue('');
}
// 发送消息到服务器
private sendMessageToServer(content: string): void {
if (!this.ws || this.ws.readyState !== WebSocket.OPEN) {
console.warn('WebSocket 未连接,无法发送消息');
return;
}
if (!content || content.trim() === '') {
return;
}
// 发送 echo 类型的消息(服务器会回显)
const message = {
type: 'echo',
content: content.trim()
};
try {
this.ws.send(JSON.stringify(message));
// 添加发送记录
const record: MessageRecord = {
id: ++this.messageCounter,
type: 'sent',
content: content.trim(),
timestamp: Date.now(),
messageType: 'echo'
};
this.messages.unshift(record);
this.cdr.detectChanges();
} catch (error) {
console.error('发送消息失败:', error);
}
}
3. 消息接收处理
// 处理接收到的消息
handleReceivedMessage(message: WebSocketMessage): void {
let displayContent = '';
let messageType = message.type;
switch (message.type) {
case 'welcome':
displayContent = message.message || '连接成功';
if (message.clientId) {
this.clientId = message.clientId;
}
break;
case 'echo':
displayContent = message.original || '';
break;
case 'pong':
displayContent = '收到心跳响应';
break;
case 'broadcast':
displayContent = `${message.from ? `来自 ${message.from}: ` : ''}${message.content || ''}`;
break;
case 'message':
displayContent = `${message.from ? `来自 ${message.from}: ` : ''}${JSON.stringify(message.content)}`;
break;
default:
displayContent = JSON.stringify(message);
}
// 添加消息记录
const record: MessageRecord = {
id: ++this.messageCounter,
type: 'received',
content: displayContent,
timestamp: Date.now(),
messageType: messageType
};
this.messages.unshift(record);
this.cdr.detectChanges();
}
关键点解析
1. 连接状态管理
通过维护 connectionStatus 状态,可以:
- 在 UI 中显示连接状态
- 根据状态决定是否允许发送消息
- 处理重连逻辑
2. 消息节流
使用 throttleTime 可以:
- 限制消息发送频率,避免服务器压力过大
- 提升用户体验,避免消息过于频繁
- 可以通过开关控制是否启用节流
3. 自动重连
在 onclose 事件中,如果非正常关闭,可以自动重连:
this.ws.onclose = (event) => {
if (event.code !== 1000) { // 1000 表示正常关闭
setTimeout(() => {
if (this.connectionStatus === 'disconnected') {
this.connect(); // 自动重连
}
}, 5000);
}
};
4. 错误处理
确保所有可能的错误都有适当的处理:
- 连接错误
- 消息解析错误
- 发送消息错误
实际应用场景
1. 实时聊天
// 聊天消息发送
sendChatMessage(message: string): void {
this.messageSendSubject$.next(message);
}
// 接收聊天消息
handleChatMessage(message: WebSocketMessage): void {
this.chatMessages.push(message);
this.scrollToBottom();
}
2. 实时通知
// 接收服务器推送的通知
handleNotification(message: WebSocketMessage): void {
if (message.type === 'notification') {
this.showNotification(message.content);
}
}
3. 实时数据更新
// 接收实时数据更新
handleDataUpdate(message: WebSocketMessage): void {
if (message.type === 'data-update') {
this.updateData(message.data);
}
}
性能优化建议
1. 心跳机制
定期发送心跳消息,保持连接活跃:
// 心跳间隔
private readonly HEARTBEAT_INTERVAL = 30000; // 30 秒
// 启动心跳
startHeartbeat(): void {
setInterval(() => {
if (this.ws && this.ws.readyState === WebSocket.OPEN) {
this.ws.send(JSON.stringify({ type: 'ping' }));
}
}, this.HEARTBEAT_INTERVAL);
}
2. 消息队列
对于重要消息,可以实现消息队列,确保消息不丢失:
private messageQueue: string[] = [];
// 发送消息(带队列)
sendMessageWithQueue(message: string): void {
if (this.ws && this.ws.readyState === WebSocket.OPEN) {
// 发送队列中的消息
while (this.messageQueue.length > 0) {
this.ws.send(this.messageQueue.shift()!);
}
// 发送当前消息
this.ws.send(message);
} else {
// 连接未建立,加入队列
this.messageQueue.push(message);
}
}
3. 限制消息历史
限制保存的消息数量,避免内存占用过大:
// 限制消息数量
if (this.messages.length > 100) {
this.messages = this.messages.slice(0, 100);
}
注意事项
- 内存泄漏:确保在组件销毁时关闭连接和取消订阅
- 重连策略:合理设置重连间隔,避免频繁重连
- 消息格式:统一消息格式,便于解析和处理
- 安全性:使用 WSS(WebSocket Secure)保护数据传输
总结
使用 RxJS 管理 WebSocket 连接是一个完整的解决方案,它提供了:
- 连接管理:处理连接、断开、重连等状态
-
消息节流:使用
throttleTime限制消息发送频率 - 错误处理:处理各种错误情况
- 自动重连:连接断开后自动重连
- 消息处理:统一处理不同类型的消息
通过合理使用 RxJS 操作符(throttleTime、takeUntil 等),我们可以构建一个稳定、高效的 WebSocket 通信系统。
记住:WebSocket 适合实时通信场景,但对于不需要实时性的场景,HTTP 轮询可能更简单。