引言
在 JavaScript 开发中,事件驱动编程是构建可维护、可扩展应用的核心范式之一。从浏览器 DOM 事件到 Node.js 的异步 I/O,从 Vue 的组件通信到 React 的状态管理,发布订阅模式无处不在。
通过手写一个符合 Node.js EventEmitter 标准的实现,我们不仅能深入理解事件驱动架构的设计原理,还能掌握 JavaScript 中闭包、内存管理、设计模式等核心概念。更重要的是,这是面试中常见的高级题目,能体现你对JavaScript设计模式的理解深度。
本文将带你从零实现一个功能完整的EventEmitter,并探讨其在实际项目中的应用和优化策略。
一、发布订阅模式的核心概念
1.1 什么是发布订阅模式
发布订阅模式(Pub/Sub)是一种消息传递范式,消息的发送者(发布者)不会将消息直接发送给特定的接收者(订阅者),而是通过一个中间件(事件中心)进行通信。
// 类比:报纸订阅系统
class NewspaperSystem {
constructor() {
this.subscribers = new Map(); // 事件中心
}
// 订阅(读者订阅报纸)
subscribe(topic, reader) {
if (!this.subscribers.has(topic)) {
this.subscribers.set(topic, []);
}
this.subscribers.get(topic).push(reader);
}
// 发布(报社发布新闻)
publish(topic, news) {
if (this.subscribers.has(topic)) {
this.subscribers.get(topic).forEach(reader => reader(news));
}
}
// 取消订阅(读者退订)
unsubscribe(topic, reader) {
if (this.subscribers.has(topic)) {
const readers = this.subscribers.get(topic);
const index = readers.indexOf(reader);
if (index > -1) {
readers.splice(index, 1);
}
}
}
}
1.2 核心组件
-
事件中心(EventEmitter): 存储事件和回调的对应关系
-
发布者(Publisher): 触发事件,传递数据
-
订阅者(Subscriber): 监听事件,处理数据
1.3 与观察者模式的对比
| 特性 |
观察者模式 |
发布订阅模式 |
| 耦合度 |
直接耦合 |
通过事件中心解耦 |
| 通信方式 |
直接调用 |
间接通信 |
| 灵活性 |
较低 |
更高 |
| 典型实现 |
Vue响应式 |
NodeJS EventEmitter |
二、基础 EventEmitter 实现
2.1 最小可行实现
class EventEmitter {
constructor() {
// 存储事件和对应的回调函数
this.events = new Map();
}
/**
* 订阅事件
* @param {string} eventName 事件名称
* @param {Function} callback 回调函数
* @returns {Function} 取消订阅的函数
*/
on(eventName, callback) {
if (!this.events.has(eventName)) {
this.events.set(eventName, []);
}
this.events.get(eventName).push(callback);
// 返回取消订阅的函数
return () => this.off(eventName, callback);
}
/**
* 触发事件
* @param {string} eventName 事件名称
* @param {...any} args 传递给回调函数的参数
*/
emit(eventName, ...args) {
if (!this.events.has(eventName)) {
return;
}
const callbacks = this.events.get(eventName);
callbacks.forEach((callback) => {
try {
callback.apply(null, args);
} catch (error) {
console.error(`Error in event listener for ${eventName}:`, error);
}
});
}
/**
* 取消订阅
* @param {string} eventName 事件名称
* @param {Function} callback 要移除的回调函数
*/
off(eventName, callback) {
if (!this.events.has(eventName)) {
return;
}
const callbacks = this.events.get(eventName);
const index = callbacks.indexOf(callback);
if (index > -1) {
callbacks.splice(index, 1);
// 如果没有回调函数了, 删除这个事件
if (callbacks.length === 0) {
this.events.delete(eventName);
}
}
}
/**
* 一次性订阅
* @param {string} eventName 事件名称
* @param {Function} callback 回调函数
*/
once(eventName, callback) {
const onceCallback = (...args) => {
callback.apply(null, args);
this.off(eventName, callback);
};
this.on(eventName, onceCallback);
return () => this.off(eventName, onceCallback);
}
/**
* 移除所有事件监听器, 或指定事件的所有监听器
* @param {string} eventName 可选, 事件名称
*/
removeAllListeners(eventName) {
if (eventName) {
this.events.delete(eventName);
} else {
this.events.clear();
}
}
}
// 基础使用示例
const emitter = new EventEmitter();
// 订阅事件
const unsubscribe = emitter.on("message", (data) => {
console.log("收到消息:", data);
});
// 触发事件
emitter.emit("message", "Hello World!"); // 输出: 收到消息: Hello World!
// 取消订阅
unsubscribe();
// 再次触发,不会有输出
emitter.emit("message", "Hello Again!");
三、完整的 EventEmitter 实现
3.1 支持更多特性的完整实现
class EventEmitter {
constructor() {
// 使用 Object.create(null) 避免原型污染
this._events = Object.create(null);
this._maxListeners = 10;
}
// ================ 核心方法 ================
/**
* 添加事件监听器
* @param {string} eventName 事件名称
* @param {Function} listener 监听器函数
* @returns {EventEmitter} this
*/
on(eventName, listener) {
return this._addListener(eventName, listener, false);
}
/**
* 添加事件监听器(别名)
*/
addListener(eventName, listener) {
return this.on(eventName, listener);
}
/**
* 添加一次性事件监听器
* @param {string} eventName 事件名称
* @param {Function} listener 监听器函数
* @returns {EventEmitter} this
*/
once(eventName, listener) {
return this._addListener(eventName, listener, true);
}
/**
* 触发事件
* @param {string} eventName 事件名称
* @param {...any} args 传递给监听器的参数
* @returns {boolean} 是否有监听器被调用
*/
emit(eventName, ...args) {
if (!this._events[eventName]) {
// 如果没有 error 事件的监听器,抛出错误
if (eventName === "error") {
const error = args[0];
if (error instanceof Error) {
throw error;
} else {
throw new Error("Unhandled error event");
}
}
return false;
}
const listeners = this._events[eventName];
const listenersCopy = listeners.slice(); // 创建副本避免迭代时修改
let called = false;
for (const listener of listenersCopy) {
try {
// 检查是否为 once 包装函数
if (listener._once) {
// 移除原始监听器
this._removeListener(eventName, listener);
}
listener.apply(this, args);
called = true;
} catch (error) {
// 触发错误事件
if (eventName !== "error") {
this.emit("error", error);
}
}
}
return called;
}
/**
* 移除事件监听器
* @param {string} eventName 事件名称
* @param {Function} listener 要移除的监听器
* @returns {EventEmitter} this
*/
off(eventName, listener) {
return this.removeListener(eventName, listener);
}
/**
* 移除事件监听器
* @param {string} eventName 事件名称
* @param {Function} listener 要移除的监听器
* @returns {EventEmitter} this
*/
removeListener(eventName, listener) {
return this._removeListener(eventName, listener);
}
/**
* 移除所有事件监听器
* @param {string} [eventName] 可选,事件名称
* @returns {EventEmitter} this
*/
removeAllListeners(eventName) {
if (eventName) {
delete this._events[eventName];
} else {
this._events = Object.create(null);
}
return this;
}
// ================ 辅助方法 ================
/**
* 设置最大监听器数量
* @param {number} n 最大监听器数量
* @returns {EventEmitter} this
*/
setMaxListeners(n) {
if (typeof n !== "number" || n < 0) {
throw new TypeError("n must be a non-negative number");
}
this._maxListeners = n;
return this;
}
/**
* 获取最大监听器数量
* @returns {number} 最大监听器数量
*/
getMaxListeners() {
return this._maxListeners;
}
/**
* 获取指定事件的监听器数量
* @param {string} eventName 事件名称
* @returns {number} 监听器数量
*/
listenerCount(eventName) {
if (!this._events[eventName]) {
return 0;
}
return this._events[eventName].length;
}
/**
* 获取所有事件名称
* @returns {string[]} 事件名称数组
*/
eventNames() {
return Object.keys(this._events);
}
/**
* 获取指定事件的所有监听器
* @param {string} eventName 事件名称
* @returns {Function[]} 监听器数组
*/
listeners(eventName) {
if (!this._events[eventName]) {
return [];
}
// 返回副本,避免外部修改内部数组
return this._events[eventName].slice();
}
/**
* 添加监听器到数组开头
* @param {string} eventName 事件名称
* @param {Function} listener 监听器函数
* @returns {EventEmitter} this
*/
prependListener(eventName, listener) {
return this._addListener(eventName, listener, false, true);
}
/**
* 添加一次性监听器到数组开头
* @param {string} eventName 事件名称
* @param {Function} listener 监听器函数
* @returns {EventEmitter} this
*/
prependOnceListener(eventName, listener) {
return this._addListener(eventName, listener, true, true);
}
/**
* 内部方法:添加监听器
* @private
*/
_addListener(eventName, listener, once = false, prepend = false) {
if (typeof listener !== "function") {
throw new TypeError("listener must be a function");
}
// 初始化事件数组
if (!this._events[eventName]) {
this._events[eventName] = [];
}
const listeners = this._events[eventName];
// 检查最大监听器限制
if (listeners.length >= this._maxListeners && this._maxListeners !== 0) {
console.warn(
`MaxListenersExceededWarning: Possible EventEmitter memory leak detected. ` +
`${listeners.length} ${eventName} listeners added. ` +
`Use emitter.setMaxListeners() to increase limit`
);
}
// 如果是 once,创建包装函数
let listenerToAdd = listener;
if (once) {
const onceWrapper = (...args) => {
listener.apply(this, args);
// 标记为 once 包装函数
onceWrapper._once = true;
};
// 保存原始监听器引用,用于移除
onceWrapper._originalListener = listener;
listenerToAdd = onceWrapper;
}
// 添加到数组开头或结尾
if (prepend) {
listeners.unshift(listenerToAdd);
} else {
listeners.push(listenerToAdd);
}
return this;
}
/**
* 内部方法:移除监听器
* @private
*/
_removeListener(eventName, listener) {
if (!this._events[eventName]) {
return this;
}
const listeners = this._events[eventName];
// 查找要移除的监听器
// 需要考虑两种情况:
// 1. 直接传入监听器
// 2. 传入 once 包装函数的原始监听器
let index = -1;
// 尝试直接查找
index = listeners.indexOf(listener);
// 如果没找到,尝试查找原始监听器
if (index === -1) {
for (let i = 0; i < listeners.length; i++) {
const current = listeners[i];
if (current._originalListener === listener) {
index = i;
break;
}
}
}
if (index > -1) {
listeners.splice(index, 1);
// 如果数组为空,删除事件
if (listeners.length === 0) {
delete this._events[eventName];
}
}
return this;
}
}
3.2 类型安全的TypeScript版本
type Listener = (...args: any[]) => void;
type OnceWrapper = Listener & { _originalListener?: Listener; _once?: boolean };
class EventEmitter {
private _events: Record<string, Listener[]> = Object.create(null);
private _maxListeners: number = 10;
// 核心方法
on(eventName: string, listener: Listener): this {
return this._addListener(eventName, listener, false);
}
addListener(eventName: string, listener: Listener): this {
return this.on(eventName, listener);
}
once(eventName: string, listener: Listener): this {
return this._addListener(eventName, listener, true);
}
emit(eventName: string, ...args: any[]): boolean {
const listeners = this._events[eventName];
if (!listeners) {
if (eventName === 'error') {
const error = args[0];
throw error instanceof Error ? error : new Error('Unhandled error event');
}
return false;
}
const listenersCopy = listeners.slice();
let called = false;
for (const listener of listenersCopy) {
try {
// 检查是否为 once 包装函数
const onceWrapper = listener as OnceWrapper;
if (onceWrapper._once) {
this._removeListener(eventName, listener);
}
listener.apply(this, args);
called = true;
} catch (error) {
if (eventName !== 'error') {
this.emit('error', error);
}
}
}
return called;
}
off(eventName: string, listener: Listener): this {
return this.removeListener(eventName, listener);
}
removeListener(eventName: string, listener: Listener): this {
return this._removeListener(eventName, listener);
}
removeAllListeners(eventName?: string): this {
if (eventName) {
delete this._events[eventName];
} else {
this._events = Object.create(null);
}
return this;
}
// 辅助方法
setMaxListeners(n: number): this {
if (typeof n !== 'number' || n < 0) {
throw new TypeError('n must be a non-negative number');
}
this._maxListeners = n;
return this;
}
getMaxListeners(): number {
return this._maxListeners;
}
listenerCount(eventName: string): number {
const listeners = this._events[eventName];
return listeners ? listeners.length : 0;
}
eventNames(): string[] {
return Object.keys(this._events);
}
listeners(eventName: string): Listener[] {
const listeners = this._events[eventName];
return listeners ? listeners.slice() : [];
}
prependListener(eventName: string, listener: Listener): this {
return this._addListener(eventName, listener, false, true);
}
prependOnceListener(eventName: string, listener: Listener): this {
return this._addListener(eventName, listener, true, true);
}
// 私有方法
private _addListener(eventName: string, listener: Listener, once: boolean, prepend: boolean = false): this {
if (typeof listener !== 'function') {
throw new TypeError('listener must be a function');
}
if (!this._events[eventName]) {
this._events[eventName] = [];
}
const listeners = this._events[eventName];
// 检查最大监听器限制
if (listeners.length >= this._maxListeners && this._maxListeners !== 0) {
console.warn(`MaxListenersExceededWarning for event ${eventName}`);
}
let listenerToAdd: Listener = listener;
if (once) {
const onceWrapper: OnceWrapper = (...args: any[]) => {
listener.apply(this, args);
onceWrapper._once = true;
};
onceWrapper._originalListener = listener;
listenerToAdd = onceWrapper;
}
if (prepend) {
listeners.unshift(listenerToAdd);
} else {
listeners.push(listenerToAdd);
}
return this;
}
private _removeListener(eventName: string, listener: Listener): this {
const listeners = this._events[eventName];
if (!listeners) return this;
let index = listeners.indexOf(listener);
// 如果没找到,尝试查找原始监听器
if (index === -1) {
for (let i = 0; i < listeners.length; i++) {
const current = listeners[i] as OnceWrapper;
if (current._originalListener === listener) {
index = i;
break;
}
}
}
if (index > -1) {
listeners.splice(index, 1);
if (listeners.length === 0) {
delete this._events[eventName];
}
}
return this;
}
}
四、测试用例
4.1 基础功能测试
console.log("=== EventEmitter 基础功能测试 ===");
const emitter = new EventEmitter();
// 测试1: 基本订阅和触发
let test1Count = 0;
emitter.on("test1", (data) => {
console.log("测试1 - 收到数据:", data);
test1Count++;
});
emitter.emit("test1", "Hello"); // 测试1 - 收到数据: Hello
emitter.emit("test1", "World"); // 测试1 - 收到数据: World
console.log(`测试1 - 调用次数: ${test1Count}`); // 测试1 - 调用次数: 2
// 测试2: 多个监听器
let test2Result = [];
emitter.on("test2", (data) => {
test2Result.push(`listener1: ${data}`);
});
emitter.on("test2", (data) => {
test2Result.push(`listener2: ${data.toUpperCase()}`);
});
emitter.emit("test2", "hello");
console.log("测试2 - 多个监听器:", test2Result); // 测试2 - 多个监听器: [ 'listener1: hello', 'listener2: HELLO' ]
// 测试3: 取消订阅
let test3Count = 0;
const test3Listener = () => {
test3Count++;
console.log("测试3 - 监听器被调用");
};
emitter.on("test3", test3Listener);
emitter.emit("test3"); // 测试3 - 监听器被调用
emitter.off("test3", test3Listener);
emitter.emit("test3"); // 不调用
console.log(`测试3 - 最终调用次数: ${test3Count}`); // 测试3 - 最终调用次数: 1
// 测试4: once 方法
let test4Count = 0;
emitter.once("test4", () => {
test4Count++;
console.log("测试4 - once 监听器被调用");
});
emitter.emit("test4"); // 测试4 - once 监听器被调用
emitter.emit("test4"); // 测试4 - once 监听器被调用
emitter.emit("test4"); // 不调用
console.log(`测试4 - once 调用次数: ${test4Count}`); // 测试4 - once 调用次数: 2
// 测试5: 错误处理
let errorCaught = false;
emitter.on("error", (error) => {
errorCaught = true;
console.log("测试5 - 捕获到错误:", error.message);
});
emitter.on("test5", () => {
throw new Error("测试错误"); // 测试5 - 捕获到错误: 测试错误
});
emitter.emit("test5");
console.log(`测试5 - 错误是否被捕获: ${errorCaught}`); // 测试5 - 错误是否被捕获: true
4.2 高级功能测试
console.log("\n=== EventEmitter 高级功能测试 ===");
const emitter2 = new EventEmitter();
// 测试6: prependListener 方法
let test6Order = [];
emitter2.on("test6", () => test6Order.push("normal1"));
emitter2.on("test6", () => test6Order.push("normal2"));
emitter2.prependListener("test6", () => test6Order.push("prepended"));
emitter2.emit("test6");
console.log("测试6 - 监听器顺序:", test6Order);
// 测试6 - 监听器顺序: [ 'prepended', 'normal1', 'normal2' ]
// 测试7: 最大监听器限制
emitter2.setMaxListeners(2);
console.log(`测试7 - 最大监听器数: ${emitter2.getMaxListeners()}`); // 测试7 - 最大监听器数: 2
emitter2.on("test7", () => {});
emitter2.on("test7", () => {});
// 第三个应该触发警告
emitter2.on("test7", () => {});
// 测试8: 获取监听器信息
emitter2.on("test8", () => {});
emitter2.on("test8", () => {});
emitter2.once("test8", () => {});
console.log(`测试8 - 监听器数量: ${emitter2.listenerCount("test8")}`); // 3
console.log(`测试8 - 监听器数组长度: ${emitter2.listeners("test8").length}`); // 3
console.log(`测试8 - 事件名称: ${emitter2.eventNames()}`); // ['test6', 'test7', 'test8']
// 测试9: removeAllListeners
emitter2.removeAllListeners("test8");
console.log(`测试9 - 移除后监听器数量: ${emitter2.listenerCount("test8")}`); // 0
// 测试10: 链式调用
emitter2
.on("test10", () => console.log("测试10 - 链式调用1"))
.on("test10", () => console.log("测试10 - 链式调用2"))
.emit("test10");
4.3 边界情况测试
console.log('\n=== EventEmitter 边界情况测试 ===');
const emitter3 = new EventEmitter();
// 测试11: 重复添加相同监听器
let test11Count = 0;
const test11Listener = () => test11Count++;
emitter3.on('test11', test11Listener);
emitter3.on('test11', test11Listener); // 重复添加
emitter3.emit('test11');
console.log(`测试11 - 重复监听器调用次数: ${test11Count}`); // 2
// 测试12: 移除不存在的监听器
const fakeListener = () => {};
emitter3.off('nonexistent', fakeListener); // 应该不报错
// 测试13: 触发没有监听器的事件
const result = emitter3.emit('nonexistent');
console.log(`测试13 - 触发无监听器事件返回值: ${result}`); // false
// 测试14: 错误事件处理
try {
emitter3.emit('error', new Error('未处理的错误'));
} catch (error) {
console.log(`测试14 - 捕获未处理的错误: ${error.message}`);
}
// 测试15: once 监听器移除后再次触发
let test15Count = 0;
const test15Listener = () => {
test15Count++;
console.log(`测试15 - 第 ${test15Count} 次调用`);
};
const removeOnce = emitter3.once('test15', test15Listener);
emitter3.emit('test15'); // 调用
removeOnce(); // 手动移除
emitter3.emit('test15'); // 不调用
console.log(`测试15 - 最终调用次数: ${test15Count}`); // 1
4.4 性能测试
console.log('\n=== EventEmitter 性能测试 ===');
const performanceEmitter = new EventEmitter();
const iterations = 100000;
// 准备测试数据
const listeners = [];
for (let i = 0; i < 100; i++) {
listeners.push(() => {});
}
// 测试添加监听器的性能
console.time('添加监听器');
for (let i = 0; i < iterations; i++) {
performanceEmitter.on('performance', listeners[i % listeners.length]);
}
console.timeEnd('添加监听器');
console.log(`添加后监听器数量: ${performanceEmitter.listenerCount('performance')}`);
// 测试触发事件的性能
console.time('触发事件');
for (let i = 0; i < iterations; i++) {
performanceEmitter.emit('performance', i);
}
console.timeEnd('触发事件');
// 测试移除监听器的性能
console.time('移除监听器');
for (let i = 0; i < iterations; i++) {
performanceEmitter.off('performance', listeners[i % listeners.length]);
}
console.timeEnd('移除监听器');
console.log(`移除后监听器数量: ${performanceEmitter.listenerCount('performance')}`);
五、EventEmitter的核心原理分析
51 数据结构设计
// EventEmitter 的核心数据结构
class EventEmitter {
constructor() {
// 使用普通对象而不是 Map 的原因:
// 1. 在 V8 中,普通对象性能更好
// 2. 事件名通常是字符串,适合作为对象键
// 3. Object.create(null) 创建没有原型的对象,避免原型污染
this._events = Object.create(null);
// 每个事件对应一个监听器数组
// {
// 'event1': [listener1, listener2, ...],
// 'event2': [listener3, listener4, ...]
// }
}
}
5.2 once 方法的实现原理
// once 方法的实现细节
once(eventName, listener) {
// 创建包装函数
const onceWrapper = (...args) => {
// 1. 执行原始监听器
listener.apply(this, args);
// 2. 标记为已执行
onceWrapper._once = true;
// 3. 在 emit 中检测到这个标记后会移除监听器
};
// 保存原始监听器引用,用于 off 方法
onceWrapper._originalListener = listener;
// 添加到监听器数组
this._addListener(eventName, onceWrapper, false);
return this;
}
// emit 方法中处理 once 监听器
emit(eventName, ...args) {
// ...
for (const listener of listenersCopy) {
// 检查是否为 once 包装函数
if (listener._once) {
// 移除监听器
this._removeListener(eventName, listener);
}
// ...
}
// ...
}
5.3 内存管理策略
// 避免内存泄漏的实现
class SafeEventEmitter extends EventEmitter {
constructor() {
super();
// 跟踪所有订阅,便于清理
this._subscriptions = new WeakMap();
}
safeOn(eventName, listener, context = null) {
// 绑定上下文
const boundListener = context ? listener.bind(context) : listener;
// 存储元数据
const meta = {
eventName,
originalListener: listener,
boundListener,
unsubscribe: () => this.off(eventName, boundListener)
};
// 使用 WeakMap 存储,不会阻止垃圾回收
this._subscriptions.set(listener, meta);
// 添加监听器
this.on(eventName, boundListener);
// 返回增强的取消订阅函数
return () => {
this.off(eventName, boundListener);
this._subscriptions.delete(listener);
};
}
}
六、常见面试题实现
6.1 实现一个简单的 EventBus
// 全局事件总线(类似 Vue 中的 EventBus)
class EventBus {
constructor() {
this._events = Object.create(null);
}
// 单例模式
static getInstance() {
if (!EventBus._instance) {
EventBus._instance = new EventBus();
}
return EventBus._instance;
}
$on(event, callback) {
if (!this._events[event]) {
this._events[event] = [];
}
this._events[event].push(callback);
}
$emit(event, ...args) {
const callbacks = this._events[event];
if (!callbacks) return;
// 使用 slice 创建副本,避免迭代时修改数组
callbacks.slice().forEach(callback => {
try {
callback(...args);
} catch (error) {
console.error(`EventBus error in ${event}:`, error);
}
});
}
$off(event, callback) {
if (!this._events[event]) return;
if (callback) {
const index = this._events[event].indexOf(callback);
if (index > -1) {
this._events[event].splice(index, 1);
}
} else {
delete this._events[event];
}
}
$once(event, callback) {
const onceWrapper = (...args) => {
callback(...args);
this.$off(event, onceWrapper);
};
this.$on(event, onceWrapper);
}
}
// 使用示例
const bus = EventBus.getInstance();
// 组件 A
bus.$on('user-login', (user) => {
console.log('组件A: 用户登录', user.name);
});
// 组件 B
bus.$on('user-login', (user) => {
console.log('组件B: 更新用户信息', user.id);
});
// 登录成功后
bus.$emit('user-login', { id: 1, name: 'Alice' });
6.2 实现带命名空间的事件系统
class NamespacedEventEmitter {
constructor() {
this._events = Object.create(null);
this._separator = ':';
}
// 解析事件名,支持命名空间
_parseEvent(eventString) {
const parts = eventString.split(this._separator);
if (parts.length === 1) {
return { namespace: null, event: parts[0] };
}
return { namespace: parts[0], event: parts.slice(1).join(this._separator) };
}
// 生成完整的事件键
_getEventKey(namespace, event) {
return namespace ? `${namespace}${this._separator}${event}` : event;
}
on(eventString, listener) {
const { namespace, event } = this._parseEvent(eventString);
const eventKey = this._getEventKey(namespace, event);
if (!this._events[eventKey]) {
this._events[eventKey] = [];
}
this._events[eventKey].push({
listener,
namespace,
event
});
return () => this.off(eventString, listener);
}
emit(eventString, ...args) {
const { namespace, event } = this._parseEvent(eventString);
// 收集所有匹配的监听器
const listenersToCall = [];
// 如果指定了命名空间,只触发该命名空间的事件
if (namespace) {
const eventKey = this._getEventKey(namespace, event);
if (this._events[eventKey]) {
listenersToCall.push(...this._events[eventKey]);
}
} else {
// 如果没有指定命名空间,触发所有匹配的事件
for (const eventKey in this._events) {
const listeners = this._events[eventKey];
for (const listenerInfo of listeners) {
if (listenerInfo.event === event) {
listenersToCall.push(listenerInfo);
}
}
}
}
// 执行监听器
listenersToCall.forEach(({ listener }) => {
try {
listener(...args);
} catch (error) {
console.error(`Error in ${eventString}:`, error);
}
});
}
off(eventString, listener) {
const { namespace, event } = this._parseEvent(eventString);
if (namespace) {
const eventKey = this._getEventKey(namespace, event);
if (this._events[eventKey]) {
const listeners = this._events[eventKey];
const index = listeners.findIndex(item => item.listener === listener);
if (index > -1) {
listeners.splice(index, 1);
if (listeners.length === 0) {
delete this._events[eventKey];
}
}
}
} else {
// 移除所有命名空间下的事件
for (const eventKey in this._events) {
const listeners = this._events[eventKey];
for (let i = listeners.length - 1; i >= 0; i--) {
if (listeners[i].listener === listener && listeners[i].event === event) {
listeners.splice(i, 1);
}
}
if (listeners.length === 0) {
delete this._events[eventKey];
}
}
}
}
}
// 使用示例
const nsEmitter = new NamespacedEventEmitter();
nsEmitter.on('user:login', () => console.log('用户模块: 登录'));
nsEmitter.on('admin:login', () => console.log('管理员模块: 登录'));
nsEmitter.on('login', () => console.log('全局: 登录'));
nsEmitter.emit('user:login'); // 只输出: 用户模块: 登录
nsEmitter.emit('admin:login'); // 只输出: 管理员模块: 登录
nsEmitter.emit('login'); // 输出所有
6.3 实现支持异步监听器的 EventEmitter
class AsyncEventEmitter extends EventEmitter {
/**
* 异步触发事件,等待所有监听器完成
*/
async emitAsync(eventName, ...args) {
const listeners = this.listeners(eventName);
if (listeners.length === 0) {
return;
}
// 并行执行所有监听器
const promises = listeners.map(async (listener) => {
try {
const result = listener(...args);
// 如果监听器返回 Promise,等待它完成
if (result && typeof result.then === 'function') {
await result;
}
} catch (error) {
if (eventName !== 'error') {
await this.emitAsync('error', error);
} else {
throw error;
}
}
});
await Promise.all(promises);
}
/**
* 顺序执行监听器(一个接一个)
*/
async emitSeries(eventName, ...args) {
const listeners = this.listeners(eventName);
for (const listener of listeners) {
try {
const result = listener(...args);
// 如果监听器返回 Promise,等待它完成
if (result && typeof result.then === 'function') {
await result;
}
} catch (error) {
if (eventName !== 'error') {
await this.emitSeries('error', error);
} else {
throw error;
}
}
}
}
}
// 使用示例
const asyncEmitter = new AsyncEventEmitter();
asyncEmitter.on('process', async (data) => {
await new Promise(resolve => setTimeout(resolve, 100));
console.log('处理完成:', data);
});
asyncEmitter.on('process', async (data) => {
console.log('第二个监听器:', data);
});
asyncEmitter.emitAsync('process', '测试数据');
6.5 实现支持优先级的 EventEmitter
class PriorityEventEmitter {
constructor() {
this._events = Object.create(null);
this._defaultPriority = 0;
}
on(eventName, listener, priority = this._defaultPriority) {
if (!this._events[eventName]) {
this._events[eventName] = [];
}
const listeners = this._events[eventName];
listeners.push({ listener, priority });
// 按优先级排序(数字越小优先级越高)
listeners.sort((a, b) => a.priority - b.priority);
return () => this.off(eventName, listener);
}
emit(eventName, ...args) {
const listeners = this._events[eventName];
if (!listeners) return;
// 遍历已排序的监听器
for (const { listener } of listeners) {
try {
const result = listener(...args);
// 如果监听器返回 false,停止后续监听器的执行
if (result === false) {
break;
}
} catch (error) {
console.error(`Error in ${eventName}:`, error);
}
}
}
off(eventName, listener) {
const listeners = this._events[eventName];
if (!listeners) return this;
const index = listeners.findIndex(item => item.listener === listener);
if (index > -1) {
listeners.splice(index, 1);
if (listeners.length === 0) {
delete this._events[eventName];
}
}
return this;
}
}
// 使用示例
const priorityEmitter = new PriorityEventEmitter();
priorityEmitter.on('process', () => console.log('优先级 10'), 10);
priorityEmitter.on('process', () => console.log('优先级 0'), 0);
priorityEmitter.on('process', () => console.log('优先级 5'), 5);
priorityEmitter.emit('process');
// 输出顺序: 优先级 0, 优先级 5, 优先级 10
七、实际应用场景
7.1 在 Vue 中实现组件通信
// 全局事件总线
const EventBus = new EventEmitter();
// 在 Vue 组件中使用
// ComponentA.vue
export default {
mounted() {
EventBus.on('user-updated', this.handleUserUpdate);
},
methods: {
handleUserUpdate(user) {
console.log('用户更新:', user);
this.user = user;
}
},
beforeDestroy() {
EventBus.off('user-updated', this.handleUserUpdate);
}
};
// ComponentB.vue
export default {
methods: {
updateUser() {
EventBus.emit('user-updated', { id: 1, name: 'John' });
}
}
};
// 或者在 Vue 原型上添加
Vue.prototype.$bus = new EventEmitter();
// 在组件中使用
this.$bus.on('event', handler);
this.$bus.emit('event', data);
this.$bus.off('event', handler);
7.2 在 Express 中实现事件驱动架构
const express = require('express');
const EventEmitter = require('./EventEmitter');
class AppEvents extends EventEmitter {
constructor() {
super();
this.setupEvents();
}
setupEvents() {
// 定义应用级别事件
this.on('user:registered', (user) => {
console.log('新用户注册:', user.email);
// 发送欢迎邮件
// 创建用户目录
// 更新统计
});
this.on('order:created', (order) => {
console.log('新订单:', order.id);
// 发送确认邮件
// 更新库存
// 通知物流
});
this.on('error', (error, context) => {
console.error('应用错误:', error.message, context);
// 发送错误报告
// 记录到监控系统
});
}
}
// 创建 Express 应用
const appEvents = new AppEvents();
const app = express();
// 中间件:将事件发射器添加到请求对象
app.use((req, res, next) => {
req.appEvents = appEvents;
next();
});
// 路由处理
app.post('/register', (req, res) => {
const user = createUser(req.body);
// 触发事件
req.appEvents.emit('user:registered', user);
res.json({ success: true, user });
});
app.post('/order', (req, res) => {
const order = createOrder(req.body);
// 触发事件
req.appEvents.emit('order:created', order);
res.json({ success: true, order });
});
// 错误处理中间件
app.use((error, req, res, next) => {
// 触发错误事件
req.appEvents.emit('error', error, {
url: req.url,
method: req.method,
userId: req.user?.id
});
res.status(500).json({ error: 'Internal server error' });
});
7.3 实现简单的状态管理
class ObservableStore {
constructor(initialState = {}) {
this._state = initialState;
this._prevState = null;
this._emitter = new EventEmitter();
}
// 获取当前状态
getState() {
return this._state;
}
// 设置状态
setState(updates) {
this._prevState = { ...this._state };
this._state = { ...this._state, ...updates };
// 触发状态变化事件
this._emitter.emit('state:changed', this._state, this._prevState);
// 触发特定属性的变化事件
Object.keys(updates).forEach(key => {
this._emitter.emit(`state:${key}:changed`, updates[key], this._prevState[key]);
});
}
// 订阅状态变化
subscribe(callback) {
return this._emitter.on('state:changed', callback);
}
// 订阅特定状态变化
subscribeTo(key, callback) {
return this._emitter.on(`state:${key}:changed`, callback);
}
// 批量更新
batchUpdate(updater) {
const updates = updater(this._state);
this.setState(updates);
}
}
// 使用示例
const store = new ObservableStore({
user: null,
theme: 'light',
notifications: []
});
// 订阅状态变化
store.subscribe((newState, oldState) => {
console.log('状态变化:', newState);
});
// 订阅特定状态变化
store.subscribeTo('theme', (newTheme, oldTheme) => {
console.log('主题变化:', oldTheme, '->', newTheme);
document.body.setAttribute('data-theme', newTheme);
});
// 更新状态
store.setState({ theme: 'dark' });
store.setState({ user: { id: 1, name: 'Alice' } });
// 批量更新
store.batchUpdate(state => ({
notifications: [...state.notifications, '新消息'],
theme: 'dark'
}));
八、性能优化和注意事项
8.1 内存泄漏预防
// 安全的 EventEmitter,自动清理订阅
class SafeEventEmitter extends EventEmitter {
constructor() {
super();
// 使用 WeakRef 跟踪组件引用
this._componentRefs = new WeakMap();
}
// 为组件绑定事件,自动清理
bindToComponent(component, eventName, listener) {
// 创建绑定函数
const boundListener = listener.bind(component);
// 添加监听器
this.on(eventName, boundListener);
// 存储引用
if (!this._componentRefs.has(component)) {
this._componentRefs.set(component, []);
}
this._componentRefs.get(component).push({ eventName, listener: boundListener });
// 返回清理函数
return () => {
this.off(eventName, boundListener);
const refs = this._componentRefs.get(component);
if (refs) {
const index = refs.findIndex(ref =>
ref.eventName === eventName && ref.listener === boundListener
);
if (index > -1) {
refs.splice(index, 1);
}
}
};
}
// 清理组件的所有事件
cleanupComponent(component) {
const refs = this._componentRefs.get(component);
if (refs) {
refs.forEach(({ eventName, listener }) => {
this.off(eventName, listener);
});
this._componentRefs.delete(component);
}
}
}
// 使用示例
class Component {
constructor(emitter) {
this.emitter = emitter;
this._cleanupFns = [];
}
setupEvents() {
// 绑定事件,自动管理生命周期
const cleanup1 = this.emitter.bindToComponent(
this,
'data',
this.handleData.bind(this)
);
this._cleanupFns.push(cleanup1);
const cleanup2 = this.emitter.bindToComponent(
this,
'error',
this.handleError.bind(this)
);
this._cleanupFns.push(cleanup2);
}
handleData(data) {
console.log('处理数据:', data);
}
handleError(error) {
console.error('处理错误:', error);
}
destroy() {
// 清理所有事件
this._cleanupFns.forEach(fn => fn());
this._cleanupFns = [];
// 或者使用自动清理
this.emitter.cleanupComponent(this);
}
}
8.2 性能优化技巧
// 高性能 EventEmitter
class HighPerformanceEventEmitter {
constructor() {
// 使用空对象作为原型,避免原型链查找
this._events = Object.create(null);
// 缓存空数组,避免频繁创建
this._emptyArray = Object.freeze([]);
}
on(eventName, listener) {
if (!this._events[eventName]) {
// 预分配数组空间
this._events[eventName] = [];
}
this._events[eventName].push(listener);
return this;
}
emit(eventName, ...args) {
// 快速路径:没有监听器
const listeners = this._events[eventName];
if (!listeners) return false;
// 使用 for 循环而不是 forEach,性能更好
for (let i = 0, len = listeners.length; i < len; i++) {
try {
listeners[i].apply(this, args);
} catch (error) {
// 错误处理
if (eventName !== 'error') {
const errorListeners = this._events.error;
if (errorListeners) {
// 避免递归调用
for (let j = 0; j < errorListeners.length; j++) {
try {
errorListeners[j].call(this, error);
} catch (e) {
// 忽略错误处理函数中的错误
}
}
}
}
}
}
return true;
}
off(eventName, listener) {
const listeners = this._events[eventName];
if (!listeners) return this;
// 从后向前遍历,避免数组移动
for (let i = listeners.length - 1; i >= 0; i--) {
if (listeners[i] === listener) {
listeners.splice(i, 1);
break;
}
}
// 如果没有监听器了,删除属性(让 V8 优化)
if (listeners.length === 0) {
delete this._events[eventName];
}
return this;
}
// 批量操作优化
emitMany(eventNames, ...args) {
const results = [];
for (const eventName of eventNames) {
results.push(this.emit(eventName, ...args));
}
return results;
}
}
8,3 调试和监控
// 可监控的 EventEmitter
class MonitoredEventEmitter extends EventEmitter {
constructor(options = {}) {
super();
this._monitoring = {
enabled: options.enabled !== false,
emitCount: 0,
listenerCount: 0,
eventStats: new Map(),
errorStats: new Map(),
slowListeners: []
};
// 性能监控阈值(毫秒)
this._slowThreshold = options.slowThreshold || 100;
}
// 重写 emit 方法以收集监控数据
emit(eventName, ...args) {
if (!this._monitoring.enabled) {
return super.emit(eventName, ...args);
}
this._monitoring.emitCount++;
// 更新事件统计
const eventStat = this._monitoring.eventStats.get(eventName) || {
count: 0,
lastEmitted: null,
avgDuration: 0,
maxDuration: 0
};
eventStat.count++;
eventStat.lastEmitted = new Date();
const startTime = performance.now();
const result = super.emit(eventName, ...args);
const duration = performance.now() - startTime;
// 更新性能统计
eventStat.avgDuration =
(eventStat.avgDuration * (eventStat.count - 1) + duration) / eventStat.count;
eventStat.maxDuration = Math.max(eventStat.maxDuration, duration);
this._monitoring.eventStats.set(eventName, eventStat);
// 记录慢监听器
if (duration > this._slowThreshold) {
this._monitoring.slowListeners.push({
eventName,
duration,
timestamp: new Date(),
args: args.slice(0, 3) // 只记录前三个参数
});
// 保持慢监听器记录的数量
if (this._monitoring.slowListeners.length > 100) {
this._monitoring.slowListeners.shift();
}
}
return result;
}
// 获取监控数据
getMonitoringData() {
return {
...this._monitoring,
currentListeners: this._getListenersCount(),
eventNames: this.eventNames(),
timestamp: new Date()
};
}
// 重置监控数据
resetMonitoring() {
this._monitoring = {
enabled: true,
emitCount: 0,
listenerCount: 0,
eventStats: new Map(),
errorStats: new Map(),
slowListeners: []
};
}
// 生成监控报告
generateReport() {
const data = this.getMonitoringData();
console.log('=== EventEmitter 监控报告 ===');
console.log(`运行时间: ${data.timestamp.toISOString()}`);
console.log(`总触发次数: ${data.emitCount}`);
console.log(`活跃事件数量: ${data.eventNames.length}`);
console.log('\n事件统计:');
for (const [eventName, stat] of data.eventStats) {
console.log(` ${eventName}:`);
console.log(` 触发次数: ${stat.count}`);
console.log(` 平均耗时: ${stat.avgDuration.toFixed(2)}ms`);
console.log(` 最大耗时: ${stat.maxDuration.toFixed(2)}ms`);
console.log(` 最后触发: ${stat.lastEmitted.toISOString()}`);
}
if (data.slowListeners.length > 0) {
console.log('\n慢监听器警告:');
data.slowListeners.forEach((item, index) => {
console.log(` ${index + 1}. ${item.eventName} - ${item.duration.toFixed(2)}ms`);
});
}
return data;
}
// 私有方法:获取监听器计数
_getListenersCount() {
const result = {};
for (const eventName in this._events) {
result[eventName] = this._events[eventName].length;
}
return result;
}
}
九、总结与最佳实践
9.1 核心要点总结
-
发布订阅模式的核心: 通过事件中心解耦发布者和订阅者
- EventEmitter的实现要点:
- 使用合适的数据结构存储事件和监听器
- 正确处理 once 监听器
- 实现错误处理机制
- 支持链式调用
-
内存管理: 及时清理监听器, 避免内存泄漏
-
性能考虑: 选择合适的数据结构和算法
9.2 最佳实现
- 命名规范:
// 好的命名
emitter.on('user:login', handler);
emitter.on('order:created', handler);
// 不好的命名
emitter.on('login', handler);
emitter.on('newOrder', handler);
- 错误处理:
// 总是监听 error 事件
emitter.on('error', (error) => {
console.error('EventEmitter error:', error);
// 发送到错误监控系统
});
// 或者在监听器内部处理错误
emitter.on('data', (data) => {
try {
processData(data);
} catch (error) {
console.error('处理数据时出错:', error);
emitter.emit('error', error);
}
});
- 资源清理:
class Component {
constructor() {
this._cleanupFns = [];
}
setupEvents() {
const cleanup1 = emitter.on('event1', this.handler1.bind(this));
this._cleanupFns.push(cleanup1);
const cleanup2 = emitter.once('event2', this.handler2.bind(this));
this._cleanupFns.push(cleanup2);
}
destroy() {
// 清理所有事件监听器
this._cleanupFns.forEach(fn => fn());
this._cleanupFns = [];
}
}
9.3 使用建议
| 场景 |
推荐方案 |
理由 |
| 简单组件通信 |
基础 EventEmitter |
轻量、简单 |
| 大型应用状态管理 |
Observable Store |
结构化、可预测 |
| 异步任务协调 |
AsyncEventEmitter |
更好的异步支持 |
| 性能敏感场景 |
HighPerformanceEventEmitter |
优化过的实现 |
| 需要监控调试 |
MonitoredEventEmitter |
内置监控功能 |
结语
通过手写 EventEmitter,我们不仅掌握了发布订阅模式的实现原理,更重要的是理解了事件驱动编程的核心思想。EventEmitter 虽然简单,但其设计思想在现代前端框架、Node.js 后端系统以及各种复杂应用中都有广泛的应用。
记住,好的事件系统应该:
-
✅ 职责清晰:事件中心只负责转发消息
-
✅ 性能优秀:高频事件触发时表现良好
-
✅ 易于调试:有良好的监控和错误处理
-
✅ 内存安全:避免内存泄漏和资源浪费
延伸阅读:
希望这篇经过严格测试的博客能帮助你深入理解 EventEmitter!如果有任何问题或建议,欢迎讨论交流。