普通视图

发现新文章,点击刷新页面。
今天 — 2025年12月8日首页

手写 EventEmitter:深入理解发布订阅模式

作者 1024肥宅
2025年12月7日 23:31

引言

在 JavaScript 开发中,事件驱动编程是构建可维护、可扩展应用的核心范式之一。从浏览器 DOM 事件到 Node.js 的异步 I/O,从 Vue 的组件通信到 React 的状态管理,发布订阅模式无处不在。

通过手写一个符合 Node.js EventEmitter 标准的实现,我们不仅能深入理解事件驱动架构的设计原理,还能掌握 JavaScript 中闭包、内存管理、设计模式等核心概念。更重要的是,这是面试中常见的高级题目,能体现你对JavaScript设计模式的理解深度。

本文将带你从零实现一个功能完整的EventEmitter,并探讨其在实际项目中的应用和优化策略。

一、发布订阅模式的核心概念

1.1 什么是发布订阅模式

发布订阅模式(Pub/Sub)是一种消息传递范式,消息的发送者(发布者)不会将消息直接发送给特定的接收者(订阅者),而是通过一个中间件(事件中心)进行通信。

// 类比:报纸订阅系统
class NewspaperSystem {
    constructor() {
        this.subscribers = new Map(); // 事件中心
    }
    
    // 订阅(读者订阅报纸)
    subscribe(topic, reader) {
        if (!this.subscribers.has(topic)) {
            this.subscribers.set(topic, []);
        }
        this.subscribers.get(topic).push(reader);
    }
    
    // 发布(报社发布新闻)
    publish(topic, news) {
        if (this.subscribers.has(topic)) {
            this.subscribers.get(topic).forEach(reader => reader(news));
        }
    }
    
    // 取消订阅(读者退订)
    unsubscribe(topic, reader) {
        if (this.subscribers.has(topic)) {
            const readers = this.subscribers.get(topic);
            const index = readers.indexOf(reader);
            if (index > -1) {
                readers.splice(index, 1);
            }
        }
    }
}
1.2 核心组件
  1. 事件中心(EventEmitter): 存储事件和回调的对应关系
  2. 发布者(Publisher): 触发事件,传递数据
  3. 订阅者(Subscriber): 监听事件,处理数据
1.3 与观察者模式的对比
特性 观察者模式 发布订阅模式
耦合度 直接耦合 通过事件中心解耦
通信方式 直接调用 间接通信
灵活性 较低 更高
典型实现 Vue响应式 NodeJS EventEmitter

二、基础 EventEmitter 实现

2.1 最小可行实现
class EventEmitter {
  constructor() {
    // 存储事件和对应的回调函数
    this.events = new Map();
  }

  /**
   * 订阅事件
   * @param {string} eventName 事件名称
   * @param {Function} callback 回调函数
   * @returns {Function} 取消订阅的函数
   */
  on(eventName, callback) {
    if (!this.events.has(eventName)) {
      this.events.set(eventName, []);
    }
    this.events.get(eventName).push(callback);

    // 返回取消订阅的函数
    return () => this.off(eventName, callback);
  }

  /**
   * 触发事件
   * @param {string} eventName 事件名称
   * @param {...any} args 传递给回调函数的参数
   */
  emit(eventName, ...args) {
    if (!this.events.has(eventName)) {
      return;
    }

    const callbacks = this.events.get(eventName);
    callbacks.forEach((callback) => {
      try {
        callback.apply(null, args);
      } catch (error) {
        console.error(`Error in event listener for ${eventName}:`, error);
      }
    });
  }

  /**
   * 取消订阅
   * @param {string} eventName 事件名称
   * @param {Function} callback 要移除的回调函数
   */
  off(eventName, callback) {
    if (!this.events.has(eventName)) {
      return;
    }

    const callbacks = this.events.get(eventName);
    const index = callbacks.indexOf(callback);

    if (index > -1) {
      callbacks.splice(index, 1);

      // 如果没有回调函数了, 删除这个事件
      if (callbacks.length === 0) {
        this.events.delete(eventName);
      }
    }
  }

  /**
   * 一次性订阅
   * @param {string} eventName 事件名称
   * @param {Function} callback 回调函数
   */
  once(eventName, callback) {
    const onceCallback = (...args) => {
      callback.apply(null, args);
      this.off(eventName, callback);
    };

    this.on(eventName, onceCallback);
    return () => this.off(eventName, onceCallback);
  }

  /**
   * 移除所有事件监听器, 或指定事件的所有监听器
   * @param {string} eventName 可选, 事件名称
   */
  removeAllListeners(eventName) {
    if (eventName) {
      this.events.delete(eventName);
    } else {
      this.events.clear();
    }
  }
}

// 基础使用示例
const emitter = new EventEmitter();

// 订阅事件
const unsubscribe = emitter.on("message", (data) => {
  console.log("收到消息:", data);
});

// 触发事件
emitter.emit("message", "Hello World!"); // 输出: 收到消息: Hello World!

// 取消订阅
unsubscribe();

// 再次触发,不会有输出
emitter.emit("message", "Hello Again!");

三、完整的 EventEmitter 实现

3.1 支持更多特性的完整实现
class EventEmitter {
  constructor() {
    // 使用 Object.create(null) 避免原型污染
    this._events = Object.create(null);
    this._maxListeners = 10;
  }

  // ================ 核心方法 ================

  /**
   * 添加事件监听器
   * @param {string} eventName 事件名称
   * @param {Function} listener 监听器函数
   * @returns {EventEmitter} this
   */
  on(eventName, listener) {
    return this._addListener(eventName, listener, false);
  }

  /**
   * 添加事件监听器(别名)
   */
  addListener(eventName, listener) {
    return this.on(eventName, listener);
  }

  /**
   * 添加一次性事件监听器
   * @param {string} eventName 事件名称
   * @param {Function} listener 监听器函数
   * @returns {EventEmitter} this
   */
  once(eventName, listener) {
    return this._addListener(eventName, listener, true);
  }

  /**
   * 触发事件
   * @param {string} eventName 事件名称
   * @param {...any} args 传递给监听器的参数
   * @returns {boolean} 是否有监听器被调用
   */
  emit(eventName, ...args) {
    if (!this._events[eventName]) {
      // 如果没有 error 事件的监听器,抛出错误
      if (eventName === "error") {
        const error = args[0];
        if (error instanceof Error) {
          throw error;
        } else {
          throw new Error("Unhandled error event");
        }
      }
      return false;
    }

    const listeners = this._events[eventName];
    const listenersCopy = listeners.slice(); // 创建副本避免迭代时修改

    let called = false;

    for (const listener of listenersCopy) {
      try {
        // 检查是否为 once 包装函数
        if (listener._once) {
          // 移除原始监听器
          this._removeListener(eventName, listener);
        }

        listener.apply(this, args);
        called = true;
      } catch (error) {
        // 触发错误事件
        if (eventName !== "error") {
          this.emit("error", error);
        }
      }
    }

    return called;
  }

  /**
   * 移除事件监听器
   * @param {string} eventName 事件名称
   * @param {Function} listener 要移除的监听器
   * @returns {EventEmitter} this
   */
  off(eventName, listener) {
    return this.removeListener(eventName, listener);
  }

  /**
   * 移除事件监听器
   * @param {string} eventName 事件名称
   * @param {Function} listener 要移除的监听器
   * @returns {EventEmitter} this
   */
  removeListener(eventName, listener) {
    return this._removeListener(eventName, listener);
  }

  /**
   * 移除所有事件监听器
   * @param {string} [eventName] 可选,事件名称
   * @returns {EventEmitter} this
   */
  removeAllListeners(eventName) {
    if (eventName) {
      delete this._events[eventName];
    } else {
      this._events = Object.create(null);
    }
    return this;
  }

  // ================ 辅助方法 ================

  /**
   * 设置最大监听器数量
   * @param {number} n 最大监听器数量
   * @returns {EventEmitter} this
   */
  setMaxListeners(n) {
    if (typeof n !== "number" || n < 0) {
      throw new TypeError("n must be a non-negative number");
    }
    this._maxListeners = n;
    return this;
  }

  /**
   * 获取最大监听器数量
   * @returns {number} 最大监听器数量
   */
  getMaxListeners() {
    return this._maxListeners;
  }

  /**
   * 获取指定事件的监听器数量
   * @param {string} eventName 事件名称
   * @returns {number} 监听器数量
   */
  listenerCount(eventName) {
    if (!this._events[eventName]) {
      return 0;
    }
    return this._events[eventName].length;
  }

  /**
   * 获取所有事件名称
   * @returns {string[]} 事件名称数组
   */
  eventNames() {
    return Object.keys(this._events);
  }

  /**
   * 获取指定事件的所有监听器
   * @param {string} eventName 事件名称
   * @returns {Function[]} 监听器数组
   */
  listeners(eventName) {
    if (!this._events[eventName]) {
      return [];
    }
    // 返回副本,避免外部修改内部数组
    return this._events[eventName].slice();
  }

  /**
   * 添加监听器到数组开头
   * @param {string} eventName 事件名称
   * @param {Function} listener 监听器函数
   * @returns {EventEmitter} this
   */
  prependListener(eventName, listener) {
    return this._addListener(eventName, listener, false, true);
  }

  /**
   * 添加一次性监听器到数组开头
   * @param {string} eventName 事件名称
   * @param {Function} listener 监听器函数
   * @returns {EventEmitter} this
   */
  prependOnceListener(eventName, listener) {
    return this._addListener(eventName, listener, true, true);
  }

  /**
   * 内部方法:添加监听器
   * @private
   */
  _addListener(eventName, listener, once = false, prepend = false) {
    if (typeof listener !== "function") {
      throw new TypeError("listener must be a function");
    }

    // 初始化事件数组
    if (!this._events[eventName]) {
      this._events[eventName] = [];
    }

    const listeners = this._events[eventName];

    // 检查最大监听器限制
    if (listeners.length >= this._maxListeners && this._maxListeners !== 0) {
      console.warn(
        `MaxListenersExceededWarning: Possible EventEmitter memory leak detected. ` +
          `${listeners.length} ${eventName} listeners added. ` +
          `Use emitter.setMaxListeners() to increase limit`
      );
    }

    // 如果是 once,创建包装函数
    let listenerToAdd = listener;
    if (once) {
      const onceWrapper = (...args) => {
        listener.apply(this, args);
        // 标记为 once 包装函数
        onceWrapper._once = true;
      };
      // 保存原始监听器引用,用于移除
      onceWrapper._originalListener = listener;
      listenerToAdd = onceWrapper;
    }

    // 添加到数组开头或结尾
    if (prepend) {
      listeners.unshift(listenerToAdd);
    } else {
      listeners.push(listenerToAdd);
    }

    return this;
  }

  /**
   * 内部方法:移除监听器
   * @private
   */
  _removeListener(eventName, listener) {
    if (!this._events[eventName]) {
      return this;
    }

    const listeners = this._events[eventName];

    // 查找要移除的监听器
    // 需要考虑两种情况:
    // 1. 直接传入监听器
    // 2. 传入 once 包装函数的原始监听器
    let index = -1;

    // 尝试直接查找
    index = listeners.indexOf(listener);

    // 如果没找到,尝试查找原始监听器
    if (index === -1) {
      for (let i = 0; i < listeners.length; i++) {
        const current = listeners[i];
        if (current._originalListener === listener) {
          index = i;
          break;
        }
      }
    }

    if (index > -1) {
      listeners.splice(index, 1);

      // 如果数组为空,删除事件
      if (listeners.length === 0) {
        delete this._events[eventName];
      }
    }

    return this;
  }
}
3.2 类型安全的TypeScript版本
type Listener = (...args: any[]) => void;
type OnceWrapper = Listener & { _originalListener?: Listener; _once?: boolean };

class EventEmitter {
    private _events: Record<string, Listener[]> = Object.create(null);
    private _maxListeners: number = 10;
    
    // 核心方法
    on(eventName: string, listener: Listener): this {
        return this._addListener(eventName, listener, false);
    }
    
    addListener(eventName: string, listener: Listener): this {
        return this.on(eventName, listener);
    }
    
    once(eventName: string, listener: Listener): this {
        return this._addListener(eventName, listener, true);
    }
    
    emit(eventName: string, ...args: any[]): boolean {
        const listeners = this._events[eventName];
        if (!listeners) {
            if (eventName === 'error') {
                const error = args[0];
                throw error instanceof Error ? error : new Error('Unhandled error event');
            }
            return false;
        }
        
        const listenersCopy = listeners.slice();
        let called = false;
        
        for (const listener of listenersCopy) {
            try {
                // 检查是否为 once 包装函数
                const onceWrapper = listener as OnceWrapper;
                if (onceWrapper._once) {
                    this._removeListener(eventName, listener);
                }
                
                listener.apply(this, args);
                called = true;
            } catch (error) {
                if (eventName !== 'error') {
                    this.emit('error', error);
                }
            }
        }
        
        return called;
    }
    
    off(eventName: string, listener: Listener): this {
        return this.removeListener(eventName, listener);
    }
    
    removeListener(eventName: string, listener: Listener): this {
        return this._removeListener(eventName, listener);
    }
    
    removeAllListeners(eventName?: string): this {
        if (eventName) {
            delete this._events[eventName];
        } else {
            this._events = Object.create(null);
        }
        return this;
    }
    
    // 辅助方法
    setMaxListeners(n: number): this {
        if (typeof n !== 'number' || n < 0) {
            throw new TypeError('n must be a non-negative number');
        }
        this._maxListeners = n;
        return this;
    }
    
    getMaxListeners(): number {
        return this._maxListeners;
    }
    
    listenerCount(eventName: string): number {
        const listeners = this._events[eventName];
        return listeners ? listeners.length : 0;
    }
    
    eventNames(): string[] {
        return Object.keys(this._events);
    }
    
    listeners(eventName: string): Listener[] {
        const listeners = this._events[eventName];
        return listeners ? listeners.slice() : [];
    }
    
    prependListener(eventName: string, listener: Listener): this {
        return this._addListener(eventName, listener, false, true);
    }
    
    prependOnceListener(eventName: string, listener: Listener): this {
        return this._addListener(eventName, listener, true, true);
    }
    
    // 私有方法
    private _addListener(eventName: string, listener: Listener, once: boolean, prepend: boolean = false): this {
        if (typeof listener !== 'function') {
            throw new TypeError('listener must be a function');
        }
        
        if (!this._events[eventName]) {
            this._events[eventName] = [];
        }
        
        const listeners = this._events[eventName];
        
        // 检查最大监听器限制
        if (listeners.length >= this._maxListeners && this._maxListeners !== 0) {
            console.warn(`MaxListenersExceededWarning for event ${eventName}`);
        }
        
        let listenerToAdd: Listener = listener;
        
        if (once) {
            const onceWrapper: OnceWrapper = (...args: any[]) => {
                listener.apply(this, args);
                onceWrapper._once = true;
            };
            onceWrapper._originalListener = listener;
            listenerToAdd = onceWrapper;
        }
        
        if (prepend) {
            listeners.unshift(listenerToAdd);
        } else {
            listeners.push(listenerToAdd);
        }
        
        return this;
    }
    
    private _removeListener(eventName: string, listener: Listener): this {
        const listeners = this._events[eventName];
        if (!listeners) return this;
        
        let index = listeners.indexOf(listener);
        
        // 如果没找到,尝试查找原始监听器
        if (index === -1) {
            for (let i = 0; i < listeners.length; i++) {
                const current = listeners[i] as OnceWrapper;
                if (current._originalListener === listener) {
                    index = i;
                    break;
                }
            }
        }
        
        if (index > -1) {
            listeners.splice(index, 1);
            if (listeners.length === 0) {
                delete this._events[eventName];
            }
        }
        
        return this;
    }
}

四、测试用例

4.1 基础功能测试
console.log("=== EventEmitter 基础功能测试 ===");

const emitter = new EventEmitter();

// 测试1: 基本订阅和触发
let test1Count = 0;
emitter.on("test1", (data) => {
  console.log("测试1 - 收到数据:", data);
  test1Count++;
});

emitter.emit("test1", "Hello"); // 测试1 - 收到数据: Hello
emitter.emit("test1", "World"); // 测试1 - 收到数据: World
console.log(`测试1 - 调用次数: ${test1Count}`); // 测试1 - 调用次数: 2

// 测试2: 多个监听器
let test2Result = [];
emitter.on("test2", (data) => {
  test2Result.push(`listener1: ${data}`);
});
emitter.on("test2", (data) => {
  test2Result.push(`listener2: ${data.toUpperCase()}`);
});

emitter.emit("test2", "hello");
console.log("测试2 - 多个监听器:", test2Result); // 测试2 - 多个监听器: [ 'listener1: hello', 'listener2: HELLO' ]

// 测试3: 取消订阅
let test3Count = 0;
const test3Listener = () => {
  test3Count++;
  console.log("测试3 - 监听器被调用");
};

emitter.on("test3", test3Listener);
emitter.emit("test3"); // 测试3 - 监听器被调用
emitter.off("test3", test3Listener);
emitter.emit("test3"); // 不调用
console.log(`测试3 - 最终调用次数: ${test3Count}`); // 测试3 - 最终调用次数: 1

// 测试4: once 方法
let test4Count = 0;
emitter.once("test4", () => {
  test4Count++;
  console.log("测试4 - once 监听器被调用");
});

emitter.emit("test4"); // 测试4 - once 监听器被调用
emitter.emit("test4"); // 测试4 - once 监听器被调用
emitter.emit("test4"); // 不调用
console.log(`测试4 - once 调用次数: ${test4Count}`); // 测试4 - once 调用次数: 2

// 测试5: 错误处理
let errorCaught = false;
emitter.on("error", (error) => {
  errorCaught = true;
  console.log("测试5 - 捕获到错误:", error.message);
});

emitter.on("test5", () => {
  throw new Error("测试错误"); // 测试5 - 捕获到错误: 测试错误
});

emitter.emit("test5");
console.log(`测试5 - 错误是否被捕获: ${errorCaught}`); // 测试5 - 错误是否被捕获: true
4.2 高级功能测试
console.log("\n=== EventEmitter 高级功能测试 ===");

const emitter2 = new EventEmitter();

// 测试6: prependListener 方法
let test6Order = [];
emitter2.on("test6", () => test6Order.push("normal1"));
emitter2.on("test6", () => test6Order.push("normal2"));
emitter2.prependListener("test6", () => test6Order.push("prepended"));

emitter2.emit("test6");
console.log("测试6 - 监听器顺序:", test6Order);
// 测试6 - 监听器顺序: [ 'prepended', 'normal1', 'normal2' ]

// 测试7: 最大监听器限制
emitter2.setMaxListeners(2);
console.log(`测试7 - 最大监听器数: ${emitter2.getMaxListeners()}`); // 测试7 - 最大监听器数: 2

emitter2.on("test7", () => {});
emitter2.on("test7", () => {});
// 第三个应该触发警告
emitter2.on("test7", () => {});

// 测试8: 获取监听器信息
emitter2.on("test8", () => {});
emitter2.on("test8", () => {});
emitter2.once("test8", () => {});

console.log(`测试8 - 监听器数量: ${emitter2.listenerCount("test8")}`); // 3
console.log(`测试8 - 监听器数组长度: ${emitter2.listeners("test8").length}`); // 3
console.log(`测试8 - 事件名称: ${emitter2.eventNames()}`); // ['test6', 'test7', 'test8']

// 测试9: removeAllListeners
emitter2.removeAllListeners("test8");
console.log(`测试9 - 移除后监听器数量: ${emitter2.listenerCount("test8")}`); // 0

// 测试10: 链式调用
emitter2
  .on("test10", () => console.log("测试10 - 链式调用1"))
  .on("test10", () => console.log("测试10 - 链式调用2"))
  .emit("test10");
4.3 边界情况测试
console.log('\n=== EventEmitter 边界情况测试 ===');

const emitter3 = new EventEmitter();

// 测试11: 重复添加相同监听器
let test11Count = 0;
const test11Listener = () => test11Count++;

emitter3.on('test11', test11Listener);
emitter3.on('test11', test11Listener); // 重复添加

emitter3.emit('test11');
console.log(`测试11 - 重复监听器调用次数: ${test11Count}`); // 2

// 测试12: 移除不存在的监听器
const fakeListener = () => {};
emitter3.off('nonexistent', fakeListener); // 应该不报错

// 测试13: 触发没有监听器的事件
const result = emitter3.emit('nonexistent');
console.log(`测试13 - 触发无监听器事件返回值: ${result}`); // false

// 测试14: 错误事件处理
try {
    emitter3.emit('error', new Error('未处理的错误'));
} catch (error) {
    console.log(`测试14 - 捕获未处理的错误: ${error.message}`);
}

// 测试15: once 监听器移除后再次触发
let test15Count = 0;
const test15Listener = () => {
    test15Count++;
    console.log(`测试15 - 第 ${test15Count} 次调用`);
};

const removeOnce = emitter3.once('test15', test15Listener);
emitter3.emit('test15'); // 调用
removeOnce(); // 手动移除
emitter3.emit('test15'); // 不调用
console.log(`测试15 - 最终调用次数: ${test15Count}`); // 1
4.4 性能测试
console.log('\n=== EventEmitter 性能测试 ===');

const performanceEmitter = new EventEmitter();
const iterations = 100000;

// 准备测试数据
const listeners = [];
for (let i = 0; i < 100; i++) {
    listeners.push(() => {});
}

// 测试添加监听器的性能
console.time('添加监听器');
for (let i = 0; i < iterations; i++) {
    performanceEmitter.on('performance', listeners[i % listeners.length]);
}
console.timeEnd('添加监听器');

console.log(`添加后监听器数量: ${performanceEmitter.listenerCount('performance')}`);

// 测试触发事件的性能
console.time('触发事件');
for (let i = 0; i < iterations; i++) {
    performanceEmitter.emit('performance', i);
}
console.timeEnd('触发事件');

// 测试移除监听器的性能
console.time('移除监听器');
for (let i = 0; i < iterations; i++) {
    performanceEmitter.off('performance', listeners[i % listeners.length]);
}
console.timeEnd('移除监听器');

console.log(`移除后监听器数量: ${performanceEmitter.listenerCount('performance')}`);

五、EventEmitter的核心原理分析

51 数据结构设计
// EventEmitter 的核心数据结构
class EventEmitter {
    constructor() {
        // 使用普通对象而不是 Map 的原因:
        // 1. 在 V8 中,普通对象性能更好
        // 2. 事件名通常是字符串,适合作为对象键
        // 3. Object.create(null) 创建没有原型的对象,避免原型污染
        this._events = Object.create(null);
        
        // 每个事件对应一个监听器数组
        // {
        //   'event1': [listener1, listener2, ...],
        //   'event2': [listener3, listener4, ...]
        // }
    }
}
5.2 once 方法的实现原理
// once 方法的实现细节
once(eventName, listener) {
    // 创建包装函数
    const onceWrapper = (...args) => {
        // 1. 执行原始监听器
        listener.apply(this, args);
        
        // 2. 标记为已执行
        onceWrapper._once = true;
        
        // 3. 在 emit 中检测到这个标记后会移除监听器
    };
    
    // 保存原始监听器引用,用于 off 方法
    onceWrapper._originalListener = listener;
    
    // 添加到监听器数组
    this._addListener(eventName, onceWrapper, false);
    
    return this;
}

// emit 方法中处理 once 监听器
emit(eventName, ...args) {
    // ...
    for (const listener of listenersCopy) {
        // 检查是否为 once 包装函数
        if (listener._once) {
            // 移除监听器
            this._removeListener(eventName, listener);
        }
        // ...
    }
    // ...
}
5.3 内存管理策略
// 避免内存泄漏的实现
class SafeEventEmitter extends EventEmitter {
    constructor() {
        super();
        // 跟踪所有订阅,便于清理
        this._subscriptions = new WeakMap();
    }
    
    safeOn(eventName, listener, context = null) {
        // 绑定上下文
        const boundListener = context ? listener.bind(context) : listener;
        
        // 存储元数据
        const meta = {
            eventName,
            originalListener: listener,
            boundListener,
            unsubscribe: () => this.off(eventName, boundListener)
        };
        
        // 使用 WeakMap 存储,不会阻止垃圾回收
        this._subscriptions.set(listener, meta);
        
        // 添加监听器
        this.on(eventName, boundListener);
        
        // 返回增强的取消订阅函数
        return () => {
            this.off(eventName, boundListener);
            this._subscriptions.delete(listener);
        };
    }
}

六、常见面试题实现

6.1 实现一个简单的 EventBus
// 全局事件总线(类似 Vue 中的 EventBus)
class EventBus {
    constructor() {
        this._events = Object.create(null);
    }
    
    // 单例模式
    static getInstance() {
        if (!EventBus._instance) {
            EventBus._instance = new EventBus();
        }
        return EventBus._instance;
    }
    
    $on(event, callback) {
        if (!this._events[event]) {
            this._events[event] = [];
        }
        this._events[event].push(callback);
    }
    
    $emit(event, ...args) {
        const callbacks = this._events[event];
        if (!callbacks) return;
        
        // 使用 slice 创建副本,避免迭代时修改数组
        callbacks.slice().forEach(callback => {
            try {
                callback(...args);
            } catch (error) {
                console.error(`EventBus error in ${event}:`, error);
            }
        });
    }
    
    $off(event, callback) {
        if (!this._events[event]) return;
        
        if (callback) {
            const index = this._events[event].indexOf(callback);
            if (index > -1) {
                this._events[event].splice(index, 1);
            }
        } else {
            delete this._events[event];
        }
    }
    
    $once(event, callback) {
        const onceWrapper = (...args) => {
            callback(...args);
            this.$off(event, onceWrapper);
        };
        this.$on(event, onceWrapper);
    }
}

// 使用示例
const bus = EventBus.getInstance();

// 组件 A
bus.$on('user-login', (user) => {
    console.log('组件A: 用户登录', user.name);
});

// 组件 B
bus.$on('user-login', (user) => {
    console.log('组件B: 更新用户信息', user.id);
});

// 登录成功后
bus.$emit('user-login', { id: 1, name: 'Alice' });
6.2 实现带命名空间的事件系统
class NamespacedEventEmitter {
    constructor() {
        this._events = Object.create(null);
        this._separator = ':';
    }
    
    // 解析事件名,支持命名空间
    _parseEvent(eventString) {
        const parts = eventString.split(this._separator);
        if (parts.length === 1) {
            return { namespace: null, event: parts[0] };
        }
        return { namespace: parts[0], event: parts.slice(1).join(this._separator) };
    }
    
    // 生成完整的事件键
    _getEventKey(namespace, event) {
        return namespace ? `${namespace}${this._separator}${event}` : event;
    }
    
    on(eventString, listener) {
        const { namespace, event } = this._parseEvent(eventString);
        const eventKey = this._getEventKey(namespace, event);
        
        if (!this._events[eventKey]) {
            this._events[eventKey] = [];
        }
        
        this._events[eventKey].push({
            listener,
            namespace,
            event
        });
        
        return () => this.off(eventString, listener);
    }
    
    emit(eventString, ...args) {
        const { namespace, event } = this._parseEvent(eventString);
        
        // 收集所有匹配的监听器
        const listenersToCall = [];
        
        // 如果指定了命名空间,只触发该命名空间的事件
        if (namespace) {
            const eventKey = this._getEventKey(namespace, event);
            if (this._events[eventKey]) {
                listenersToCall.push(...this._events[eventKey]);
            }
        } else {
            // 如果没有指定命名空间,触发所有匹配的事件
            for (const eventKey in this._events) {
                const listeners = this._events[eventKey];
                for (const listenerInfo of listeners) {
                    if (listenerInfo.event === event) {
                        listenersToCall.push(listenerInfo);
                    }
                }
            }
        }
        
        // 执行监听器
        listenersToCall.forEach(({ listener }) => {
            try {
                listener(...args);
            } catch (error) {
                console.error(`Error in ${eventString}:`, error);
            }
        });
    }
    
    off(eventString, listener) {
        const { namespace, event } = this._parseEvent(eventString);
        
        if (namespace) {
            const eventKey = this._getEventKey(namespace, event);
            if (this._events[eventKey]) {
                const listeners = this._events[eventKey];
                const index = listeners.findIndex(item => item.listener === listener);
                if (index > -1) {
                    listeners.splice(index, 1);
                    if (listeners.length === 0) {
                        delete this._events[eventKey];
                    }
                }
            }
        } else {
            // 移除所有命名空间下的事件
            for (const eventKey in this._events) {
                const listeners = this._events[eventKey];
                for (let i = listeners.length - 1; i >= 0; i--) {
                    if (listeners[i].listener === listener && listeners[i].event === event) {
                        listeners.splice(i, 1);
                    }
                }
                if (listeners.length === 0) {
                    delete this._events[eventKey];
                }
            }
        }
    }
}

// 使用示例
const nsEmitter = new NamespacedEventEmitter();

nsEmitter.on('user:login', () => console.log('用户模块: 登录'));
nsEmitter.on('admin:login', () => console.log('管理员模块: 登录'));
nsEmitter.on('login', () => console.log('全局: 登录'));

nsEmitter.emit('user:login');    // 只输出: 用户模块: 登录
nsEmitter.emit('admin:login');   // 只输出: 管理员模块: 登录
nsEmitter.emit('login');         // 输出所有
6.3 实现支持异步监听器的 EventEmitter
class AsyncEventEmitter extends EventEmitter {
    /**
     * 异步触发事件,等待所有监听器完成
     */
    async emitAsync(eventName, ...args) {
        const listeners = this.listeners(eventName);
        if (listeners.length === 0) {
            return;
        }
        
        // 并行执行所有监听器
        const promises = listeners.map(async (listener) => {
            try {
                const result = listener(...args);
                // 如果监听器返回 Promise,等待它完成
                if (result && typeof result.then === 'function') {
                    await result;
                }
            } catch (error) {
                if (eventName !== 'error') {
                    await this.emitAsync('error', error);
                } else {
                    throw error;
                }
            }
        });
        
        await Promise.all(promises);
    }
    
    /**
     * 顺序执行监听器(一个接一个)
     */
    async emitSeries(eventName, ...args) {
        const listeners = this.listeners(eventName);
        
        for (const listener of listeners) {
            try {
                const result = listener(...args);
                // 如果监听器返回 Promise,等待它完成
                if (result && typeof result.then === 'function') {
                    await result;
                }
            } catch (error) {
                if (eventName !== 'error') {
                    await this.emitSeries('error', error);
                } else {
                    throw error;
                }
            }
        }
    }
}

// 使用示例
const asyncEmitter = new AsyncEventEmitter();

asyncEmitter.on('process', async (data) => {
    await new Promise(resolve => setTimeout(resolve, 100));
    console.log('处理完成:', data);
});

asyncEmitter.on('process', async (data) => {
    console.log('第二个监听器:', data);
});

asyncEmitter.emitAsync('process', '测试数据');
6.5 实现支持优先级的 EventEmitter
class PriorityEventEmitter {
    constructor() {
        this._events = Object.create(null);
        this._defaultPriority = 0;
    }
    
    on(eventName, listener, priority = this._defaultPriority) {
        if (!this._events[eventName]) {
            this._events[eventName] = [];
        }
        
        const listeners = this._events[eventName];
        listeners.push({ listener, priority });
        
        // 按优先级排序(数字越小优先级越高)
        listeners.sort((a, b) => a.priority - b.priority);
        
        return () => this.off(eventName, listener);
    }
    
    emit(eventName, ...args) {
        const listeners = this._events[eventName];
        if (!listeners) return;
        
        // 遍历已排序的监听器
        for (const { listener } of listeners) {
            try {
                const result = listener(...args);
                // 如果监听器返回 false,停止后续监听器的执行
                if (result === false) {
                    break;
                }
            } catch (error) {
                console.error(`Error in ${eventName}:`, error);
            }
        }
    }
    
    off(eventName, listener) {
        const listeners = this._events[eventName];
        if (!listeners) return this;
        
        const index = listeners.findIndex(item => item.listener === listener);
        if (index > -1) {
            listeners.splice(index, 1);
            if (listeners.length === 0) {
                delete this._events[eventName];
            }
        }
        
        return this;
    }
}

// 使用示例
const priorityEmitter = new PriorityEventEmitter();

priorityEmitter.on('process', () => console.log('优先级 10'), 10);
priorityEmitter.on('process', () => console.log('优先级 0'), 0);
priorityEmitter.on('process', () => console.log('优先级 5'), 5);

priorityEmitter.emit('process');
// 输出顺序: 优先级 0, 优先级 5, 优先级 10

七、实际应用场景

7.1 在 Vue 中实现组件通信
// 全局事件总线
const EventBus = new EventEmitter();

// 在 Vue 组件中使用
// ComponentA.vue
export default {
    mounted() {
        EventBus.on('user-updated', this.handleUserUpdate);
    },
    methods: {
        handleUserUpdate(user) {
            console.log('用户更新:', user);
            this.user = user;
        }
    },
    beforeDestroy() {
        EventBus.off('user-updated', this.handleUserUpdate);
    }
};

// ComponentB.vue
export default {
    methods: {
        updateUser() {
            EventBus.emit('user-updated', { id: 1, name: 'John' });
        }
    }
};

// 或者在 Vue 原型上添加
Vue.prototype.$bus = new EventEmitter();

// 在组件中使用
this.$bus.on('event', handler);
this.$bus.emit('event', data);
this.$bus.off('event', handler);
7.2 在 Express 中实现事件驱动架构
const express = require('express');
const EventEmitter = require('./EventEmitter');

class AppEvents extends EventEmitter {
    constructor() {
        super();
        this.setupEvents();
    }
    
    setupEvents() {
        // 定义应用级别事件
        this.on('user:registered', (user) => {
            console.log('新用户注册:', user.email);
            // 发送欢迎邮件
            // 创建用户目录
            // 更新统计
        });
        
        this.on('order:created', (order) => {
            console.log('新订单:', order.id);
            // 发送确认邮件
            // 更新库存
            // 通知物流
        });
        
        this.on('error', (error, context) => {
            console.error('应用错误:', error.message, context);
            // 发送错误报告
            // 记录到监控系统
        });
    }
}

// 创建 Express 应用
const appEvents = new AppEvents();
const app = express();

// 中间件:将事件发射器添加到请求对象
app.use((req, res, next) => {
    req.appEvents = appEvents;
    next();
});

// 路由处理
app.post('/register', (req, res) => {
    const user = createUser(req.body);
    
    // 触发事件
    req.appEvents.emit('user:registered', user);
    
    res.json({ success: true, user });
});

app.post('/order', (req, res) => {
    const order = createOrder(req.body);
    
    // 触发事件
    req.appEvents.emit('order:created', order);
    
    res.json({ success: true, order });
});

// 错误处理中间件
app.use((error, req, res, next) => {
    // 触发错误事件
    req.appEvents.emit('error', error, {
        url: req.url,
        method: req.method,
        userId: req.user?.id
    });
    
    res.status(500).json({ error: 'Internal server error' });
});
7.3 实现简单的状态管理
class ObservableStore {
    constructor(initialState = {}) {
        this._state = initialState;
        this._prevState = null;
        this._emitter = new EventEmitter();
    }
    
    // 获取当前状态
    getState() {
        return this._state;
    }
    
    // 设置状态
    setState(updates) {
        this._prevState = { ...this._state };
        this._state = { ...this._state, ...updates };
        
        // 触发状态变化事件
        this._emitter.emit('state:changed', this._state, this._prevState);
        
        // 触发特定属性的变化事件
        Object.keys(updates).forEach(key => {
            this._emitter.emit(`state:${key}:changed`, updates[key], this._prevState[key]);
        });
    }
    
    // 订阅状态变化
    subscribe(callback) {
        return this._emitter.on('state:changed', callback);
    }
    
    // 订阅特定状态变化
    subscribeTo(key, callback) {
        return this._emitter.on(`state:${key}:changed`, callback);
    }
    
    // 批量更新
    batchUpdate(updater) {
        const updates = updater(this._state);
        this.setState(updates);
    }
}

// 使用示例
const store = new ObservableStore({
    user: null,
    theme: 'light',
    notifications: []
});

// 订阅状态变化
store.subscribe((newState, oldState) => {
    console.log('状态变化:', newState);
});

// 订阅特定状态变化
store.subscribeTo('theme', (newTheme, oldTheme) => {
    console.log('主题变化:', oldTheme, '->', newTheme);
    document.body.setAttribute('data-theme', newTheme);
});

// 更新状态
store.setState({ theme: 'dark' });
store.setState({ user: { id: 1, name: 'Alice' } });

// 批量更新
store.batchUpdate(state => ({
    notifications: [...state.notifications, '新消息'],
    theme: 'dark'
}));

八、性能优化和注意事项

8.1 内存泄漏预防
// 安全的 EventEmitter,自动清理订阅
class SafeEventEmitter extends EventEmitter {
    constructor() {
        super();
        // 使用 WeakRef 跟踪组件引用
        this._componentRefs = new WeakMap();
    }
    
    // 为组件绑定事件,自动清理
    bindToComponent(component, eventName, listener) {
        // 创建绑定函数
        const boundListener = listener.bind(component);
        
        // 添加监听器
        this.on(eventName, boundListener);
        
        // 存储引用
        if (!this._componentRefs.has(component)) {
            this._componentRefs.set(component, []);
        }
        this._componentRefs.get(component).push({ eventName, listener: boundListener });
        
        // 返回清理函数
        return () => {
            this.off(eventName, boundListener);
            const refs = this._componentRefs.get(component);
            if (refs) {
                const index = refs.findIndex(ref => 
                    ref.eventName === eventName && ref.listener === boundListener
                );
                if (index > -1) {
                    refs.splice(index, 1);
                }
            }
        };
    }
    
    // 清理组件的所有事件
    cleanupComponent(component) {
        const refs = this._componentRefs.get(component);
        if (refs) {
            refs.forEach(({ eventName, listener }) => {
                this.off(eventName, listener);
            });
            this._componentRefs.delete(component);
        }
    }
}

// 使用示例
class Component {
    constructor(emitter) {
        this.emitter = emitter;
        this._cleanupFns = [];
    }
    
    setupEvents() {
        // 绑定事件,自动管理生命周期
        const cleanup1 = this.emitter.bindToComponent(
            this,
            'data',
            this.handleData.bind(this)
        );
        this._cleanupFns.push(cleanup1);
        
        const cleanup2 = this.emitter.bindToComponent(
            this,
            'error',
            this.handleError.bind(this)
        );
        this._cleanupFns.push(cleanup2);
    }
    
    handleData(data) {
        console.log('处理数据:', data);
    }
    
    handleError(error) {
        console.error('处理错误:', error);
    }
    
    destroy() {
        // 清理所有事件
        this._cleanupFns.forEach(fn => fn());
        this._cleanupFns = [];
        
        // 或者使用自动清理
        this.emitter.cleanupComponent(this);
    }
}
8.2 性能优化技巧
// 高性能 EventEmitter
class HighPerformanceEventEmitter {
    constructor() {
        // 使用空对象作为原型,避免原型链查找
        this._events = Object.create(null);
        // 缓存空数组,避免频繁创建
        this._emptyArray = Object.freeze([]);
    }
    
    on(eventName, listener) {
        if (!this._events[eventName]) {
            // 预分配数组空间
            this._events[eventName] = [];
        }
        
        this._events[eventName].push(listener);
        return this;
    }
    
    emit(eventName, ...args) {
        // 快速路径:没有监听器
        const listeners = this._events[eventName];
        if (!listeners) return false;
        
        // 使用 for 循环而不是 forEach,性能更好
        for (let i = 0, len = listeners.length; i < len; i++) {
            try {
                listeners[i].apply(this, args);
            } catch (error) {
                // 错误处理
                if (eventName !== 'error') {
                    const errorListeners = this._events.error;
                    if (errorListeners) {
                        // 避免递归调用
                        for (let j = 0; j < errorListeners.length; j++) {
                            try {
                                errorListeners[j].call(this, error);
                            } catch (e) {
                                // 忽略错误处理函数中的错误
                            }
                        }
                    }
                }
            }
        }
        
        return true;
    }
    
    off(eventName, listener) {
        const listeners = this._events[eventName];
        if (!listeners) return this;
        
        // 从后向前遍历,避免数组移动
        for (let i = listeners.length - 1; i >= 0; i--) {
            if (listeners[i] === listener) {
                listeners.splice(i, 1);
                break;
            }
        }
        
        // 如果没有监听器了,删除属性(让 V8 优化)
        if (listeners.length === 0) {
            delete this._events[eventName];
        }
        
        return this;
    }
    
    // 批量操作优化
    emitMany(eventNames, ...args) {
        const results = [];
        for (const eventName of eventNames) {
            results.push(this.emit(eventName, ...args));
        }
        return results;
    }
}
8,3 调试和监控
// 可监控的 EventEmitter
class MonitoredEventEmitter extends EventEmitter {
    constructor(options = {}) {
        super();
        this._monitoring = {
            enabled: options.enabled !== false,
            emitCount: 0,
            listenerCount: 0,
            eventStats: new Map(),
            errorStats: new Map(),
            slowListeners: []
        };
        
        // 性能监控阈值(毫秒)
        this._slowThreshold = options.slowThreshold || 100;
    }
    
    // 重写 emit 方法以收集监控数据
    emit(eventName, ...args) {
        if (!this._monitoring.enabled) {
            return super.emit(eventName, ...args);
        }
        
        this._monitoring.emitCount++;
        
        // 更新事件统计
        const eventStat = this._monitoring.eventStats.get(eventName) || {
            count: 0,
            lastEmitted: null,
            avgDuration: 0,
            maxDuration: 0
        };
        
        eventStat.count++;
        eventStat.lastEmitted = new Date();
        
        const startTime = performance.now();
        const result = super.emit(eventName, ...args);
        const duration = performance.now() - startTime;
        
        // 更新性能统计
        eventStat.avgDuration = 
            (eventStat.avgDuration * (eventStat.count - 1) + duration) / eventStat.count;
        eventStat.maxDuration = Math.max(eventStat.maxDuration, duration);
        
        this._monitoring.eventStats.set(eventName, eventStat);
        
        // 记录慢监听器
        if (duration > this._slowThreshold) {
            this._monitoring.slowListeners.push({
                eventName,
                duration,
                timestamp: new Date(),
                args: args.slice(0, 3) // 只记录前三个参数
            });
            
            // 保持慢监听器记录的数量
            if (this._monitoring.slowListeners.length > 100) {
                this._monitoring.slowListeners.shift();
            }
        }
        
        return result;
    }
    
    // 获取监控数据
    getMonitoringData() {
        return {
            ...this._monitoring,
            currentListeners: this._getListenersCount(),
            eventNames: this.eventNames(),
            timestamp: new Date()
        };
    }
    
    // 重置监控数据
    resetMonitoring() {
        this._monitoring = {
            enabled: true,
            emitCount: 0,
            listenerCount: 0,
            eventStats: new Map(),
            errorStats: new Map(),
            slowListeners: []
        };
    }
    
    // 生成监控报告
    generateReport() {
        const data = this.getMonitoringData();
        
        console.log('=== EventEmitter 监控报告 ===');
        console.log(`运行时间: ${data.timestamp.toISOString()}`);
        console.log(`总触发次数: ${data.emitCount}`);
        console.log(`活跃事件数量: ${data.eventNames.length}`);
        
        console.log('\n事件统计:');
        for (const [eventName, stat] of data.eventStats) {
            console.log(`  ${eventName}:`);
            console.log(`    触发次数: ${stat.count}`);
            console.log(`    平均耗时: ${stat.avgDuration.toFixed(2)}ms`);
            console.log(`    最大耗时: ${stat.maxDuration.toFixed(2)}ms`);
            console.log(`    最后触发: ${stat.lastEmitted.toISOString()}`);
        }
        
        if (data.slowListeners.length > 0) {
            console.log('\n慢监听器警告:');
            data.slowListeners.forEach((item, index) => {
                console.log(`  ${index + 1}. ${item.eventName} - ${item.duration.toFixed(2)}ms`);
            });
        }
        
        return data;
    }
    
    // 私有方法:获取监听器计数
    _getListenersCount() {
        const result = {};
        for (const eventName in this._events) {
            result[eventName] = this._events[eventName].length;
        }
        return result;
    }
}

九、总结与最佳实践

9.1 核心要点总结
  1. 发布订阅模式的核心: 通过事件中心解耦发布者和订阅者
  2. EventEmitter的实现要点:
  • 使用合适的数据结构存储事件和监听器
  • 正确处理 once 监听器
  • 实现错误处理机制
  • 支持链式调用
  1. 内存管理: 及时清理监听器, 避免内存泄漏
  2. 性能考虑: 选择合适的数据结构和算法
9.2 最佳实现
  1. 命名规范:
// 好的命名
emitter.on('user:login', handler);
emitter.on('order:created', handler);

// 不好的命名
emitter.on('login', handler);
emitter.on('newOrder', handler);
  1. 错误处理:
// 总是监听 error 事件
emitter.on('error', (error) => {
    console.error('EventEmitter error:', error);
    // 发送到错误监控系统
});

// 或者在监听器内部处理错误
emitter.on('data', (data) => {
    try {
        processData(data);
    } catch (error) {
        console.error('处理数据时出错:', error);
        emitter.emit('error', error);
    }
});
  1. 资源清理:
class Component {
    constructor() {
        this._cleanupFns = [];
    }
    
    setupEvents() {
        const cleanup1 = emitter.on('event1', this.handler1.bind(this));
        this._cleanupFns.push(cleanup1);
        
        const cleanup2 = emitter.once('event2', this.handler2.bind(this));
        this._cleanupFns.push(cleanup2);
    }
    
    destroy() {
        // 清理所有事件监听器
        this._cleanupFns.forEach(fn => fn());
        this._cleanupFns = [];
    }
}
9.3 使用建议
场景 推荐方案 理由
简单组件通信 基础 EventEmitter 轻量、简单
大型应用状态管理 Observable Store 结构化、可预测
异步任务协调 AsyncEventEmitter 更好的异步支持
性能敏感场景 HighPerformanceEventEmitter 优化过的实现
需要监控调试 MonitoredEventEmitter 内置监控功能

结语

通过手写 EventEmitter,我们不仅掌握了发布订阅模式的实现原理,更重要的是理解了事件驱动编程的核心思想。EventEmitter 虽然简单,但其设计思想在现代前端框架、Node.js 后端系统以及各种复杂应用中都有广泛的应用。

记住,好的事件系统应该:

  • ✅ 职责清晰:事件中心只负责转发消息

  • ✅ 性能优秀:高频事件触发时表现良好

  • ✅ 易于调试:有良好的监控和错误处理

  • ✅ 内存安全:避免内存泄漏和资源浪费


延伸阅读:

希望这篇经过严格测试的博客能帮助你深入理解 EventEmitter!如果有任何问题或建议,欢迎讨论交流。

❌
❌