阅读视图

发现新文章,点击刷新页面。

20个例子掌握RxJS——第十一章实现 WebSocket 消息节流

RxJS 实战:WebSocket 连接管理与消息节流

概述

WebSocket 是一种全双工通信协议,允许服务器和客户端之间进行实时双向通信。在实际应用中,我们需要:

  1. 管理连接状态:处理连接、断开、重连等
  2. 控制消息频率:避免发送过于频繁的消息
  3. 错误处理:处理连接错误和消息错误
  4. 自动重连:连接断开后自动重连

本章将介绍如何使用 RxJS 管理 WebSocket 连接,并使用 throttleTime 实现消息节流。

WebSocket 基础

WebSocket 的特点

  1. 全双工通信:客户端和服务器可以同时发送和接收消息
  2. 低延迟:比 HTTP 轮询更高效
  3. 持久连接:建立连接后保持打开状态
  4. 实时性:适合实时通信场景

WebSocket 连接状态

  • CONNECTING (0):正在连接
  • OPEN (1):连接已打开
  • CLOSING (2):正在关闭
  • CLOSED (3):连接已关闭

实现思路

1. WebSocket 连接管理

// WebSocket 服务器地址
private readonly wsUrl = 'ws://localhost:8080/ws';

// WebSocket 连接
private ws: WebSocket | null = null;

// 连接状态
connectionStatus: 'disconnected' | 'connecting' | 'connected' = 'disconnected';

// 连接 WebSocket
connect(): void {
  if (this.ws && this.ws.readyState === WebSocket.OPEN) {
    return;
  }
  
  this.connectionStatus = 'connecting';
  this.cdr.detectChanges();
  
  try {
    this.ws = new WebSocket(this.wsUrl);
    
    // 连接打开
    this.ws.onopen = () => {
      console.log('WebSocket 连接已建立');
      this.connectionStatus = 'connected';
      this.cdr.detectChanges();
    };
    
    // 接收消息
    this.ws.onmessage = (event) => {
      try {
        const message: WebSocketMessage = JSON.parse(event.data);
        console.log('收到 WebSocket 消息:', message);
        this.handleReceivedMessage(message);
      } catch (error) {
        console.error('解析 WebSocket 消息错误:', error);
      }
    };
    
    // 连接关闭
    this.ws.onclose = (event) => {
      console.log('WebSocket 连接已关闭', event);
      this.connectionStatus = 'disconnected';
      this.cdr.detectChanges();
      
      // 如果非正常关闭,尝试重连(可选)
      if (event.code !== 1000) {
        console.log('连接异常关闭,5秒后尝试重连...');
        setTimeout(() => {
          if (this.connectionStatus === 'disconnected') {
            this.connect();
          }
        }, 5000);
      }
    };
    
    // 连接错误
    this.ws.onerror = (error) => {
      console.error('WebSocket 错误:', error);
      this.connectionStatus = 'disconnected';
      this.cdr.detectChanges();
    };
  } catch (error) {
    console.error('创建 WebSocket 连接失败:', error);
    this.connectionStatus = 'disconnected';
    this.cdr.detectChanges();
  }
}

// 断开 WebSocket 连接
disconnect(): void {
  if (this.ws) {
    this.ws.close(1000, '正常关闭');
    this.ws = null;
  }
  this.connectionStatus = 'disconnected';
  this.cdr.detectChanges();
}

2. 消息发送节流

使用 throttleTime 限制消息发送频率:

// 消息发送 Subject(用于节流)
private messageSendSubject$ = new Subject<string>();

// 是否启用节流
throttleEnabled = true;

ngOnInit(): void {
  // 设置消息发送节流(500ms 内最多发送一次)
  this.messageSendSubject$
    .pipe(
      throttleTime(500), // 节流:每 500ms 最多发送一次
      takeUntil(this.destroySubject$)
    )
    .subscribe((message) => {
      // 只有在连接状态下才发送
      if (this.ws && this.ws.readyState === WebSocket.OPEN) {
        this.sendMessageToServer(message);
      }
    });
}

// 发送消息(点击按钮)
sendMessage(): void {
  const message = this.messageInput.value?.trim() || '';
  if (!message) {
    return;
  }
  
  if (this.throttleEnabled) {
    // 使用节流发送
    this.messageSendSubject$.next(message);
  } else {
    // 直接发送
    this.sendMessageToServer(message);
  }
  
  // 清空输入框
  this.messageInput.setValue('');
}

// 发送消息到服务器
private sendMessageToServer(content: string): void {
  if (!this.ws || this.ws.readyState !== WebSocket.OPEN) {
    console.warn('WebSocket 未连接,无法发送消息');
    return;
  }
  
  if (!content || content.trim() === '') {
    return;
  }
  
  // 发送 echo 类型的消息(服务器会回显)
  const message = {
    type: 'echo',
    content: content.trim()
  };
  
  try {
    this.ws.send(JSON.stringify(message));
    
    // 添加发送记录
    const record: MessageRecord = {
      id: ++this.messageCounter,
      type: 'sent',
      content: content.trim(),
      timestamp: Date.now(),
      messageType: 'echo'
    };
    
    this.messages.unshift(record);
    this.cdr.detectChanges();
  } catch (error) {
    console.error('发送消息失败:', error);
  }
}

3. 消息接收处理

// 处理接收到的消息
handleReceivedMessage(message: WebSocketMessage): void {
  let displayContent = '';
  let messageType = message.type;
  
  switch (message.type) {
    case 'welcome':
      displayContent = message.message || '连接成功';
      if (message.clientId) {
        this.clientId = message.clientId;
      }
      break;
    case 'echo':
      displayContent = message.original || '';
      break;
    case 'pong':
      displayContent = '收到心跳响应';
      break;
    case 'broadcast':
      displayContent = `${message.from ? `来自 ${message.from}: ` : ''}${message.content || ''}`;
      break;
    case 'message':
      displayContent = `${message.from ? `来自 ${message.from}: ` : ''}${JSON.stringify(message.content)}`;
      break;
    default:
      displayContent = JSON.stringify(message);
  }
  
  // 添加消息记录
  const record: MessageRecord = {
    id: ++this.messageCounter,
    type: 'received',
    content: displayContent,
    timestamp: Date.now(),
    messageType: messageType
  };
  
  this.messages.unshift(record);
  this.cdr.detectChanges();
}

关键点解析

1. 连接状态管理

通过维护 connectionStatus 状态,可以:

  • 在 UI 中显示连接状态
  • 根据状态决定是否允许发送消息
  • 处理重连逻辑

2. 消息节流

使用 throttleTime 可以:

  • 限制消息发送频率,避免服务器压力过大
  • 提升用户体验,避免消息过于频繁
  • 可以通过开关控制是否启用节流

3. 自动重连

onclose 事件中,如果非正常关闭,可以自动重连:

this.ws.onclose = (event) => {
  if (event.code !== 1000) { // 1000 表示正常关闭
    setTimeout(() => {
      if (this.connectionStatus === 'disconnected') {
        this.connect(); // 自动重连
      }
    }, 5000);
  }
};

4. 错误处理

确保所有可能的错误都有适当的处理:

  • 连接错误
  • 消息解析错误
  • 发送消息错误

实际应用场景

1. 实时聊天

// 聊天消息发送
sendChatMessage(message: string): void {
  this.messageSendSubject$.next(message);
}

// 接收聊天消息
handleChatMessage(message: WebSocketMessage): void {
  this.chatMessages.push(message);
  this.scrollToBottom();
}

2. 实时通知

// 接收服务器推送的通知
handleNotification(message: WebSocketMessage): void {
  if (message.type === 'notification') {
    this.showNotification(message.content);
  }
}

3. 实时数据更新

// 接收实时数据更新
handleDataUpdate(message: WebSocketMessage): void {
  if (message.type === 'data-update') {
    this.updateData(message.data);
  }
}

性能优化建议

1. 心跳机制

定期发送心跳消息,保持连接活跃:

// 心跳间隔
private readonly HEARTBEAT_INTERVAL = 30000; // 30 秒

// 启动心跳
startHeartbeat(): void {
  setInterval(() => {
    if (this.ws && this.ws.readyState === WebSocket.OPEN) {
      this.ws.send(JSON.stringify({ type: 'ping' }));
    }
  }, this.HEARTBEAT_INTERVAL);
}

2. 消息队列

对于重要消息,可以实现消息队列,确保消息不丢失:

private messageQueue: string[] = [];

// 发送消息(带队列)
sendMessageWithQueue(message: string): void {
  if (this.ws && this.ws.readyState === WebSocket.OPEN) {
    // 发送队列中的消息
    while (this.messageQueue.length > 0) {
      this.ws.send(this.messageQueue.shift()!);
    }
    // 发送当前消息
    this.ws.send(message);
  } else {
    // 连接未建立,加入队列
    this.messageQueue.push(message);
  }
}

3. 限制消息历史

限制保存的消息数量,避免内存占用过大:

// 限制消息数量
if (this.messages.length > 100) {
  this.messages = this.messages.slice(0, 100);
}

注意事项

  1. 内存泄漏:确保在组件销毁时关闭连接和取消订阅
  2. 重连策略:合理设置重连间隔,避免频繁重连
  3. 消息格式:统一消息格式,便于解析和处理
  4. 安全性:使用 WSS(WebSocket Secure)保护数据传输

总结

使用 RxJS 管理 WebSocket 连接是一个完整的解决方案,它提供了:

  • 连接管理:处理连接、断开、重连等状态
  • 消息节流:使用 throttleTime 限制消息发送频率
  • 错误处理:处理各种错误情况
  • 自动重连:连接断开后自动重连
  • 消息处理:统一处理不同类型的消息

通过合理使用 RxJS 操作符(throttleTimetakeUntil 等),我们可以构建一个稳定、高效的 WebSocket 通信系统。

记住:WebSocket 适合实时通信场景,但对于不需要实时性的场景,HTTP 轮询可能更简单

码云地址:gitee.com/leeyamaster…

15个例子熟练异步框架 Zone.js

15个例子熟练异步框架 Zone.js

一、理解 Zone.js

可以把 Zone.js 理解成异步的监听器,通过钩子感知异步的各个阶段:

钩子 对应阶段
onScheduleTask 订阅/注册(调用 setTimeout、.then 等)
onInvokeTask 执行(回调真正运行)
onCancelTask 取消(clearTimeout 等)
onHasTask 是否有未完成任务(全部完成时 hasTask 变为 false)

核心价值:

  1. 提取冗余代码:错误处理、耗时统计、日志等可集中到 Zone 钩子,业务回调只保留核心逻辑
  2. 共享变量:用 properties 在 Zone 上挂数据,Zone.current.get('key') 即可访问,无需闭包或层层传参
  3. 统一错误捕获onHandleError 可捕获 Zone 内同步和异步抛出的错误
  4. 等所有异步完成onHasTask 在 hasTask 变为 false 时,表示全部完成,可触发回调(类似自动版 Promise.all)

官方总结

  • Zone.js 通过包装异步 API,在订阅、执行、取消、完成等阶段提供钩子,让你可以集中处理错误、上下文、监控和“全部完成”等逻辑,从而减少重复代码并提高可读性。

白话文总结:

  • Zone.js 就是一个异步“监听器”,可以追踪异步任务的执行取消注册/订阅等各个阶段,把原本分散在异步代码里的冗余处理(比如日志、错误捕获、耗时统计)提取到统一的位置,让业务代码更清晰。
  • 当“异步套异步”时,各层 Zone 可以共享变量,无需再用闭包或层层传参。
  • 支持在异步任务中统一捕获错误,不用再每处手动 try/catch。
  • 可以检测多个异步任务何时全部完成,比如多个 loading 结束后再统一触发某些操作。

二、示例精华(01-15)

01 最基本用法

// Zone.current 获取当前 Zone
console.log('当前 Zone:', Zone.current.name);

// zone.run() 在 Zone 内执行代码
Zone.current.run(function() {
  console.log('在 Zone 内执行,当前 Zone:', Zone.current.name);
});

讲解:Zone.js 加载后自动创建 root Zone。zone.run(fn) 在指定 Zone 内执行函数。


02 Zone 嵌套

var childZone = Zone.current.fork({ name: 'child-zone' });
var grandchildZone = childZone.fork({ name: 'grandchild-zone' });

childZone.run(function() {
  console.log('在 child-zone 内:', Zone.current.name);
  grandchildZone.run(function() {
    console.log('在 grandchild-zone 内:', Zone.current.name);
  });
});

讲解zone.fork(config) 基于当前 Zone 创建子 Zone,形成父子层级关系。


03 Zone 存储数据

var myZone = Zone.current.fork({
  name: 'my-zone',
  properties: {
    userId: 'user-123',
    requestId: 'req-456'
  }
});

myZone.run(function() {
  console.log('同步:', Zone.current.get('userId'));
  setTimeout(function() {
    // 异步回调里也能拿到!
    console.log('异步:', Zone.current.get('requestId'));
  }, 500);
});

讲解properties 让 Zone 携带数据,同步和异步代码都能用 Zone.current.get('key') 访问。


04 对比:上下文数据

无 Zone:多层 setTimeout 需闭包或层层传参才能拿到 requestId。

有 Zone:在 Zone 上设置一次,所有异步回调都能直接拿到。

var myZone = Zone.current.fork({
  name: 'request-zone',
  properties: { requestId: 'req-002' }
});

myZone.run(function() {
  setTimeout(function() {
    setTimeout(function() {
      // 照样能拿到,不用传参!
      console.log(Zone.current.get('requestId'));
    }, 200);
  }, 200);
});

05 对比:任务追踪

无 Zone:需手动 pendingCount++/--,每次 setTimeout 前后自己维护。

有 ZoneonHasTask 自动感知「有任务」或「全部完成」。

var trackingZone = Zone.current.fork({
  name: 'tracking-zone',
  onHasTask: function(delegate, current, target, hasTaskState) {
    var hasTask = hasTaskState.macroTask || hasTaskState.microTask || hasTaskState.eventTask;
    // hasTask 为 true:有异步任务
    // hasTask 为 false:全部完成
    console.log(hasTask ? '有任务执行中' : '空闲');
  }
});

trackingZone.run(function() {
  setTimeout(function() {
    setTimeout(function() { /* 什么都不用做 */ }, 300);
  }, 500);
});

06 对比:错误捕获

无 Zone:try-catch 抓不到 setTimeout 里的错误。

有 ZoneonHandleError 统一捕获 Zone 内所有异步错误。

var errorZone = Zone.current.fork({
  name: 'error-zone',
  onHandleError: function(delegate, current, target, error) {
    console.log('捕获到:', error.message);
    return false; // 不继续向外抛
  }
});

errorZone.run(function() {
  setTimeout(function() {
    throw new Error('setTimeout 里的错误!');
  }, 300);
});

07 对比:任务拦截

无 Zone:无法知道 setTimeout、Promise.then 何时执行。

有 ZoneonInvokeTask 在每次异步回调执行前都会触发。

var interceptZone = Zone.current.fork({
  name: 'intercept-zone',
  onInvokeTask: function(delegate, current, target, task, applyThis, applyArgs) {
    console.log('▶ 执行任务:', task.source);
    return delegate.invokeTask(target, task, applyThis, applyArgs);
  }
});

interceptZone.run(function() {
  setTimeout(function() { /* ... */ }, 200);
  Promise.resolve().then(function() { /* ... */ });
});

注意:onInvokeTask 在回调执行前触发,不是执行后。delegate.invokeTask() 会同步执行回调,执行完才返回。


08 onScheduleTask vs onInvokeTask

var z = Zone.current.fork({
  name: 'demo',
  onScheduleTask: function(delegate, curr, target, task) {
    console.log('📋 任务被注册:', task.source);  // 调用 setTimeout 的瞬间
    return delegate.scheduleTask(target, task);
  },
  onInvokeTask: function(delegate, curr, target, task, applyThis, applyArgs) {
    console.log('▶ 任务即将执行:', task.source);  // 回调真正运行的瞬间
    return delegate.invokeTask(target, task, applyThis, applyArgs);
  }
});

z.run(function() {
  setTimeout(function() { console.log('回调执行了'); }, 500);
});
// 顺序:onScheduleTask → (500ms) → onInvokeTask → 回调

讲解:onScheduleTask = 注册时;onInvokeTask = 执行时。


09 zone.wrap

var myZone = Zone.current.fork({
  name: 'my-zone',
  properties: { requestId: 'req-999' }
});

// 包装后,无论何时何处被调用,都会在 my-zone 内执行
var wrappedCallback = myZone.wrap(function() {
  console.log(Zone.current.get('requestId'));
}, 'button-callback');

document.getElementById('btn').addEventListener('click', wrappedCallback);

讲解:适合 addEventListener、第三方库回调等,你无法控制调用时机,但希望它在你的 Zone 内执行。


10 异步耗时统计

var timingZone = Zone.current.fork({
  name: 'timing-zone',
  onInvokeTask: function(delegate, curr, target, task, applyThis, applyArgs) {
    var start = performance.now();
    var result = delegate.invokeTask(target, task, applyThis, applyArgs);
    var cost = (performance.now() - start).toFixed(2);
    console.log(task.source + ' 耗时: ' + cost + ' ms');
    return result;
  }
});

讲解delegate.invokeTask() 是同步的,执行完才返回,所以前后 performance.now() 的差值就是回调耗时。


11 onInvoke 同步钩子

var z = Zone.current.fork({
  name: 'invoke-zone',
  onInvoke: function(delegate, curr, target, callback, applyThis, applyArgs, source) {
    console.log('onInvoke: 即将执行', source);
    return delegate.invoke(target, callback, applyThis, applyArgs, source);
  }
});

z.run(function() {
  console.log('zone.run 里的回调体执行了');
}, null, null, 'main');

讲解onInvoke 针对 zone.run(fn)同步执行;onInvokeTask 针对异步任务。


12 onCancelTask

var z = Zone.current.fork({
  onCancelTask: function(delegate, curr, target, task) {
    console.log('❌ 任务被取消:', task.source);
    return delegate.cancelTask(target, task);
  }
});

z.run(function() {
  var id = setTimeout(function() {}, 3000);
  // 点击按钮时 clearTimeout(id) → onCancelTask 触发
});

讲解clearTimeoutclearInterval 取消任务时,onCancelTask 会触发。


13 模拟 Angular 变更检测

var ngZone = Zone.current.fork({
  name: 'ng-zone',
  onHasTask: function(delegate, curr, target, hasTaskState) {
    delegate.hasTask(target, hasTaskState);
    var hasTask = hasTaskState.macroTask || hasTaskState.microTask || hasTaskState.eventTask;
    if (!hasTask) {
      console.log('🔔 所有异步完成 → 执行变更检测');
    }
  }
});

ngZone.run(function() {
  setTimeout(function() {
    // 更新数据...
    setTimeout(function() { /* 后续处理 */ }, 200);
  }, 500);
});

讲解:Angular 的 NgZone 就是利用 onHasTask,在「全部完成」时触发变更检测。


14 Zone 边界

// Zone 内发起 → 会被追踪
trackingZone.run(function() {
  setTimeout(function() { /* 会被 onInvokeTask 捕获 */ }, 200);
});

// Zone 外发起 → 不会被追踪
setTimeout(function() { /* 不会被 Zone 追踪! */ }, 400);

讲解:只有在 Zone 内发起的异步才会被追踪。Zone 外调用的 setTimeout 不会被感知。


15 zone.runGuarded

var safeZone = Zone.current.fork({
  onHandleError: function(delegate, curr, target, error) {
    console.log('捕获:', error.message);
    return false; // 不继续向外抛
  }
});

safeZone.runGuarded(function() {
  throw new Error('故意的错误!');
});
console.log('程序继续运行');

讲解zone.run(fn) 抛错会向外冒泡;zone.runGuarded(fn) 会捕获错误交给 onHandleError,不向外抛。


三、核心概念速查

概念 说明
Zone.current 当前所在的 Zone
zone.run(fn) 在指定 Zone 内执行函数
zone.runGuarded(fn) 安全执行,错误交给 onHandleError
zone.fork(config) 基于当前 Zone 创建子 Zone
zone.wrap(callback) 包装回调,使其在 Zone 内执行
Zone.current.get('key') 获取 Zone 的 properties
properties Zone 携带的数据

四、码云地址

码云地址gitee.com/leeyamaster…

❌