参考来源:
目录
Buffer(缓冲区)
Buffer 的实际作用
在深入理解 Buffer 的语法之前,我们先来看看 Buffer 在实际开发中解决了什么问题。
场景一:处理图片文件
问题:没有 Buffer 的情况
假设你需要读取一个图片文件并上传到服务器。JavaScript 的字符串只能处理文本数据,无法直接处理二进制数据(如图片、视频、音频等)。
// ❌ 错误示例:尝试用字符串读取图片
const fs = require('fs');
// 如果使用文本模式读取图片,会导致数据损坏
const imageData = fs.readFileSync('photo.jpg', 'utf8'); // 错误!图片会被损坏
// 图片文件包含二进制数据,不能按文本处理
解决方案:使用 Buffer
// ✅ 正确示例:使用 Buffer 处理图片
const fs = require('fs');
// Buffer 可以安全地处理二进制数据
const imageBuffer = fs.readFileSync('photo.jpg'); // 返回 Buffer 对象
console.log(imageBuffer); // <Buffer ff d8 ff e0 00 10 4a 46 49 46 ...>
// 可以将 Buffer 转换为 Base64 用于传输
const base64Image = imageBuffer.toString('base64');
console.log('Base64:', base64Image);
// 或者直接写入文件
fs.writeFileSync('copy.jpg', imageBuffer);
场景二:网络数据传输
问题:没有 Buffer 的瓶颈
在网络通信中,数据通常以字节流的形式传输。如果只能使用字符串:
// ❌ 问题示例:字符串处理二进制数据的问题
const net = require('net');
const server = net.createServer((socket) => {
socket.setEncoding('utf8'); // 只能处理文本
socket.on('data', (data) => {
// 问题1:如果接收到二进制数据(如图片),会被错误解析
// 问题2:字符串操作(如拼接)会创建新对象,内存占用大
// 问题3:无法精确控制字节级别的操作
console.log(data); // 可能显示乱码或数据损坏
});
});
解决方案:使用 Buffer
// ✅ 正确示例:使用 Buffer 处理网络数据
const net = require('net');
const server = net.createServer((socket) => {
// 不设置编码,默认接收 Buffer
socket.on('data', (buffer) => {
// Buffer 可以精确处理每个字节
console.log('接收到', buffer.length, '字节');
// 可以检查数据头(如检查文件类型)
if (buffer[0] === 0xFF && buffer[1] === 0xD8) {
console.log('这是一个 JPEG 图片');
}
// 可以精确提取特定字节
const header = buffer.slice(0, 4); // 提取前4个字节
// 可以高效地拼接数据(不会创建大量中间字符串)
// Buffer.concat() 比字符串拼接效率高得多
});
});
场景三:文件操作性能问题
问题:大文件处理的内存瓶颈
// ❌ 问题示例:处理大文件时的内存问题
const fs = require('fs');
// 读取整个大文件到内存(字符串)
const largeFile = fs.readFileSync('large-file.txt', 'utf8');
// 问题:
// 1. 整个文件被加载到内存,占用大量内存
// 2. 字符串操作(如 replace)会创建新字符串,内存翻倍
// 3. 对于 1GB 的文件,可能需要 2GB+ 的内存
// 字符串替换会创建新字符串
const modified = largeFile.replace(/old/g, 'new'); // 又占用一份内存
解决方案:使用 Buffer + Stream
// ✅ 正确示例:使用 Buffer 和 Stream 高效处理大文件
const fs = require('fs');
const { Transform } = require('stream');
// 使用流式处理,不需要将整个文件加载到内存
const transformStream = new Transform({
transform(chunk, encoding, callback) {
// chunk 是 Buffer,可以高效处理
// 只处理当前这一块数据,内存占用小
const modified = chunk.toString().replace(/old/g, 'new');
this.push(Buffer.from(modified));
callback();
}
});
// 流式处理,内存占用恒定(只占用缓冲区大小)
fs.createReadStream('large-file.txt')
.pipe(transformStream)
.pipe(fs.createWriteStream('output.txt'));
// 即使处理 10GB 的文件,内存占用也只有几 MB
场景四:数据编码转换
问题:不同编码格式的处理
在实际开发中,经常需要在不同编码格式之间转换(如 UTF-8、Base64、Hex 等)。
// ❌ 问题示例:字符串无法直接处理编码转换
const data = 'Hello 世界';
// JavaScript 字符串内部使用 UTF-16,无法直接转换为其他编码
// 无法直接获取字节级别的数据
解决方案:使用 Buffer 进行编码转换
// ✅ 正确示例:使用 Buffer 进行编码转换
const data = 'Hello 世界';
// 1. 字符串转 Buffer(UTF-8 编码)
const buffer = Buffer.from(data, 'utf8');
console.log(buffer); // <Buffer 48 65 6c 6c 6f 20 e4 b8 96 e7 95 8c>
// 2. Buffer 转 Base64(常用于数据传输)
const base64 = buffer.toString('base64');
console.log(base64); // 'SGVsbG8g5LiW5L2T'
// 3. Buffer 转 Hex(常用于调试)
const hex = buffer.toString('hex');
console.log(hex); // '48656c6c6f20e4b896e7958c'
// 4. 从 Base64 解码
const decoded = Buffer.from(base64, 'base64').toString('utf8');
console.log(decoded); // 'Hello 世界'
总结:Buffer 的核心价值
-
处理二进制数据:图片、视频、音频等非文本数据
-
精确控制字节:网络协议、文件格式解析等需要字节级操作
-
内存效率:避免字符串操作带来的内存浪费
-
编码转换:在不同编码格式之间高效转换
-
性能优化:配合 Stream 实现高效的大数据处理
Buffer 概念
Buffer 是 Node.js 中用于处理二进制数据的类,类似于整数数组,但对应于 V8 堆外部的固定大小的原始内存分配。Buffer 的大小在创建时确定,且无法调整。
Buffer 的特点:
- Buffer 是固定大小的内存分配
- Buffer 中的数据是二进制格式
- Buffer 实例是 JavaScript 的 Uint8Array 实例
- Buffer 的大小在创建时确定,无法改变
提示:关于 Buffer 的实际应用场景和解决的问题,请参考 Buffer 的实际作用 章节。
创建 Buffer
在 Node.js 中,有几种方式可以创建 Buffer:
1. Buffer.from()
Buffer.from() 是最推荐的方式,可以从字符串、数组或其他 Buffer 创建新的 Buffer。
语法: Buffer.from(source, encoding) 或 Buffer.from(array) 或 Buffer.from(buffer)
参数:
-
source: 源数据(字符串、数组或 Buffer)
-
encoding(可选): 字符编码,当 source 是字符串时使用,默认为 'utf8'
返回值: 返回一个新的 Buffer。
// 从字符串创建 Buffer(默认使用 utf8 编码)
const buf1 = Buffer.from('Hello World');
console.log(buf1); // <Buffer 48 65 6c 6c 6f 20 57 6f 72 6c 64>
// 从字符串创建 Buffer(指定编码)
const buf2 = Buffer.from('Hello World', 'utf8');
console.log(buf2);
// 从数组创建 Buffer
const buf3 = Buffer.from([0x48, 0x65, 0x6c, 0x6c, 0x6f]);
console.log(buf3); // <Buffer 48 65 6c 6c 6f>
// 从另一个 Buffer 创建
const buf4 = Buffer.from(buf1);
console.log(buf4);
2. Buffer.alloc()
Buffer.alloc() 创建一个指定大小的 Buffer,并用零填充。这是最安全的方式,因为内存会被初始化为零。
语法: Buffer.alloc(size, fill, encoding)
参数:
-
size: Buffer 的大小(字节数)
-
fill(可选): 填充值,默认为 0
-
encoding(可选): 当 fill 是字符串时的编码,默认为 'utf8'
返回值: 返回一个新的 Buffer。
// 创建一个大小为 10 的 Buffer,用零填充
const buf = Buffer.alloc(10);
console.log(buf); // <Buffer 00 00 00 00 00 00 00 00 00 00>
// 创建一个大小为 10 的 Buffer,用指定值填充
const buf2 = Buffer.alloc(10, 'a');
console.log(buf2); // <Buffer 61 61 61 61 61 61 61 61 61 61>
3. Buffer.allocUnsafe()
Buffer.allocUnsafe() 创建一个指定大小的 Buffer,但不会初始化内存。这意味着内存可能包含敏感数据。虽然性能更好,但需要谨慎使用。
语法: Buffer.allocUnsafe(size)
参数:
返回值: 返回一个新的 Buffer(内存未初始化)。
// 创建一个大小为 10 的 Buffer,内存未初始化
const buf = Buffer.allocUnsafe(10);
console.log(buf); // 内容可能是随机的旧数据
// 如果需要安全,创建后应该填充
const buf2 = Buffer.allocUnsafe(10);
buf2.fill(0); // 手动填充为零
console.log(buf2);
性能对比:
-
Buffer.alloc(): 最安全,性能稍慢(需要初始化内存)
-
Buffer.allocUnsafe(): 性能最好,但不安全(内存未初始化)
-
Buffer.allocUnsafe() + fill(0): 性能与 Buffer.alloc() 相近,但代码更复杂
-
Buffer.from(): 根据源数据创建,性能取决于源数据大小
注意:Buffer.allocUnsafe() + fill(0) 理论上可能比 Buffer.alloc() 稍快,但性能差异很小。大多数情况下推荐使用 Buffer.alloc(),因为它更安全、更简洁。
Buffer 与字符串转换及编码
Buffer 和字符串之间的转换是 Buffer 的核心功能之一。Node.js 的 Buffer 支持多种字符编码格式。
字符串转 Buffer
// 方法 1: Buffer.from()(推荐)
const buf1 = Buffer.from('Hello World', 'utf8');
// 方法 2: Buffer.alloc() + write()
const buf2 = Buffer.alloc(11);
buf2.write('Hello World', 0, 'utf8');
// 方法 3: 使用 Buffer.allocUnsafe()(不推荐,除非性能要求高)
const buf3 = Buffer.allocUnsafe(11);
buf3.write('Hello World', 0, 'utf8');
Buffer 转字符串
语法: buf.toString(encoding, start, end)
参数:
-
encoding(可选): 字符编码,默认为 'utf8'
-
start(可选): 开始位置,默认为 0
-
end(可选): 结束位置(不包含),默认为 buf.length
返回值: 返回转换后的字符串。
const buf = Buffer.from('Hello World', 'utf8');
// 方法 1: toString()(推荐)
const str1 = buf.toString('utf8');
// 方法 2: toString() 默认使用 utf8
const str2 = buf.toString();
// 方法 3: 指定范围
const str3 = buf.toString('utf8', 0, 5); // 'Hello'
支持的编码格式
Node.js Buffer 支持多种字符编码,常用编码如下:
常用编码:
-
utf8(默认):支持所有 Unicode 字符
-
base64:常用于编码二进制数据以便在文本协议中传输
-
hex:十六进制编码,常用于调试
-
ascii:仅支持 ASCII 字符(0-127)
其他编码: latin1 / binary、ucs2 / utf16le、utf16be
编码转换示例
const str = 'Hello World';
// UTF-8(默认)
const buf = Buffer.from(str, 'utf8');
console.log(buf.toString('utf8')); // 'Hello World'
// Base64 编码/解码
const base64 = buf.toString('base64');
console.log(base64); // 'SGVsbG8gV29ybGQ='
console.log(Buffer.from(base64, 'base64').toString('utf8')); // 'Hello World'
// Hex 编码/解码
const hex = buf.toString('hex');
console.log(hex); // '48656c6c6f20576f726c64'
console.log(Buffer.from(hex, 'hex').toString('utf8')); // 'Hello World'
// ASCII(仅支持 ASCII 字符)
const buf2 = Buffer.from('Hello', 'ascii');
console.log(buf2.toString('ascii')); // 'Hello'
Buffer 操作
Buffer 提供了多种操作方法,用于处理二进制数据。字符串转换相关的方法在上面已经详细说明。
slice()
创建一个新的 Buffer,引用相同的内存,但偏移和裁剪到指定的索引范围。
语法: buf.slice(start, end)
参数:
-
start(可选): 开始位置,默认为 0
-
end(可选): 结束位置(不包含),默认为 buf.length
返回值: 返回一个新的 Buffer,与原 Buffer 共享内存。
const buf = Buffer.from('Hello World');
// 创建切片(从索引 0 到 5)
const slice1 = buf.slice(0, 5);
console.log(slice1.toString()); // 'Hello'
// 创建切片(从索引 6 到结束)
const slice2 = buf.slice(6);
console.log(slice2.toString()); // 'World'
// 注意:slice 是浅拷贝,修改会影响原 Buffer
const slice3 = buf.slice(0, 5);
slice3[0] = 0x4a; // 修改第一个字节
console.log(buf.toString()); // 'Jello World'(原 Buffer 也被修改)
concat()
将多个 Buffer 实例连接成一个新的 Buffer。
语法: Buffer.concat(list, totalLength)
参数:
-
list: Buffer 数组,要连接的 Buffer 列表
-
totalLength(可选): 连接后 Buffer 的总长度
返回值: 返回一个新的 Buffer,包含所有连接的 Buffer。
const buf1 = Buffer.from('Hello');
const buf2 = Buffer.from(' ');
const buf3 = Buffer.from('World');
// 连接多个 Buffer
const buf = Buffer.concat([buf1, buf2, buf3]);
console.log(buf.toString()); // 'Hello World'
// 可以指定总长度(可选)
const buf4 = Buffer.concat([buf1, buf2, buf3], 11);
console.log(buf4.toString()); // 'Hello World'
copy()
将 Buffer 的数据复制到另一个 Buffer 中。
语法: buf.copy(target, targetStart, sourceStart, sourceEnd)
参数:
-
target: 目标 Buffer,要复制到的 Buffer
-
targetStart(可选): 目标 Buffer 的起始位置,默认为 0
-
sourceStart(可选): 源 Buffer 的起始位置,默认为 0
-
sourceEnd(可选): 源 Buffer 的结束位置(不包含),默认为 buf.length
返回值: 返回复制的字节数。
const buf1 = Buffer.from('Hello World');
const buf2 = Buffer.alloc(5);
// 将 buf1 的前 5 个字节复制到 buf2
buf1.copy(buf2, 0, 0, 5);
console.log(buf2.toString()); // 'Hello'
// 从 buf1 的索引 6 复制到 11,到 buf3 的索引 0
const buf3 = Buffer.alloc(5);
buf1.copy(buf3, 0, 6, 11);
console.log(buf3.toString()); // 'World'
write()
将字符串写入 Buffer,返回写入的字节数。
语法: buf.write(string, offset, length, encoding)
参数:
-
string: 要写入的字符串
-
offset(可选): 开始写入的位置,默认为 0
-
length(可选): 要写入的最大字节数,默认为 buf.length - offset
-
encoding(可选): 字符编码,默认为 'utf8'
const buf = Buffer.alloc(11);
// 从索引 0 开始写入
const bytesWritten = buf.write('Hello World', 0, 'utf8');
console.log(bytesWritten); // 11(写入的字节数)
console.log(buf.toString()); // 'Hello World'
// 从指定位置开始写入
const buf2 = Buffer.alloc(20);
buf2.write('Hello', 0, 'utf8');
buf2.write('World', 6, 'utf8'); // 从索引 6 开始写入
console.log(buf2.toString()); // 'Hello World'
// 限制写入长度
const buf3 = Buffer.alloc(5);
buf3.write('Hello World', 0, 5, 'utf8'); // 只写入前 5 个字节
console.log(buf3.toString()); // 'Hello'
返回值: 返回实际写入的字节数。如果 Buffer 空间不足,可能小于字符串的字节数。
const buf = Buffer.alloc(5);
const written = buf.write('Hello World', 0, 'utf8');
console.log(written); // 5(只写入了 5 个字节)
console.log(buf.toString()); // 'Hello'
Buffer 性能优化
1. 使用 Buffer.allocUnsafe() 时要小心
// 不好的做法:直接使用 allocUnsafe 而不填充。可能包含敏感数据
const buf = Buffer.allocUnsafe(1024);
// 好的做法:使用 alloc(安全且性能足够好)
const buf1 = Buffer.alloc(1024);
2. 复用 Buffer 实例
// 不好的做法:频繁创建新 Buffer
function processData(data) {
const buf = Buffer.from(data);
// 处理...
}
// 好的做法:复用 Buffer
const reusableBuf = Buffer.alloc(1024);
function processData(data) {
reusableBuf.write(data, 0, 'utf8');
// 处理...
}
3. 使用 Buffer.concat() 而不是字符串拼接
字符串拼接会创建多个中间字符串对象,导致内存浪费和性能下降。Buffer.concat() 直接操作 Buffer,一次性合并,更高效。
// 不好的做法:字符串拼接后转 Buffer
// 问题:每次 += 操作可能创建新字符串,内存占用大
let result = '';
for (let i = 0; i < 1000; i++) {
result += 'data';
}
const buf = Buffer.from(result);
// 好的做法:使用 Buffer.concat()
// 优势:直接操作 Buffer,避免字符串中间对象,性能更好
const buffers = [];
for (let i = 0; i < 1000; i++) {
buffers.push(Buffer.from('data'));
}
const buf = Buffer.concat(buffers);
4. 避免不必要的 Buffer 复制
// 不好的做法:不必要的复制
const buf1 = Buffer.from('Hello');
const buf2 = Buffer.from(buf1); // 创建了副本
// 好的做法:直接使用或使用 slice(如果需要共享内存)
const buf1 = Buffer.from('Hello');
const buf2 = buf1.slice(); // 共享内存,性能更好
5. 预分配 Buffer 大小
// 不好的做法:动态增长
let buf = Buffer.alloc(0);
for (let i = 0; i < 100; i++) {
buf = Buffer.concat([buf, Buffer.from('data')]);
}
// 好的做法:预分配大小
const buf = Buffer.alloc(400); // 预分配足够的大小
let offset = 0;
for (let i = 0; i < 100; i++) {
offset += buf.write('data', offset);
}
流(Stream)基础
流的实际作用
在了解流的语法之前,我们先来看看 Stream 在实际开发中解决了什么问题。
场景一:处理大文件的内存问题
问题:没有 Stream 的情况
当需要处理大文件时,如果一次性将整个文件加载到内存,会导致严重的内存问题。
// ❌ 错误示例:一次性读取大文件
const fs = require('fs');
const http = require('http');
// 问题1:内存占用巨大
// 假设有一个 2GB 的视频文件
const videoData = fs.readFileSync('large-video.mp4'); // 将整个 2GB 文件加载到内存
console.log('内存占用:', videoData.length / 1024 / 1024, 'MB'); // 约 2000MB
// 问题2:响应时间慢
// 用户需要等待整个文件读取完成后才能开始下载
http.createServer((req, res) => {
const data = fs.readFileSync('large-video.mp4'); // 阻塞,等待读取完成
res.writeHead(200, { 'Content-Type': 'video/mp4' });
res.end(data); // 用户等待很久才能看到响应
}).listen(3000);
// 问题3:服务器可能崩溃
// 如果有多个用户同时请求,内存会迅速耗尽
// 10 个用户 = 20GB 内存占用!
解决方案:使用 Stream
// ✅ 正确示例:使用 Stream 流式处理
const fs = require('fs');
const http = require('http');
// 内存占用恒定(只占用缓冲区大小,通常几 MB)
http.createServer((req, res) => {
const readStream = fs.createReadStream('large-video.mp4');
res.writeHead(200, { 'Content-Type': 'video/mp4' });
// 流式传输:边读边写,立即开始响应
readStream.pipe(res);
// 内存占用:无论文件多大,都只有缓冲区大小(如 64KB)
// 10 个用户 = 640KB 内存占用(而不是 20GB!)
}).listen(3000);
// 优势:
// 1. 内存占用恒定,不受文件大小影响
// 2. 响应速度快,用户可以立即开始下载
// 3. 可以处理任意大小的文件
场景二:实时数据处理
问题:没有 Stream 的延迟问题
在某些场景下,需要实时处理数据,而不是等待所有数据就绪。
// ❌ 问题示例:等待所有数据就绪
const fs = require('fs');
// 假设需要处理一个很大的日志文件
const logData = fs.readFileSync('access.log'); // 等待整个文件读取完成
const lines = logData.toString().split('\n');
// 问题:
// 1. 必须等待整个文件读取完成才能开始处理
// 2. 如果文件很大,用户需要等待很长时间
// 3. 无法实时看到处理进度
// 处理每一行
lines.forEach((line, index) => {
if (line.includes('ERROR')) {
console.log(`第 ${index} 行发现错误:`, line);
}
});
console.log('处理完成'); // 用户需要等待很久才能看到这个
解决方案:使用 Stream 实时处理
// ✅ 正确示例:使用 Stream 实时处理
const fs = require('fs');
const readline = require('readline');
// 创建可读流
const readStream = fs.createReadStream('access.log');
const rl = readline.createInterface({
input: readStream,
crlfDelay: Infinity
});
let lineNumber = 0;
// 边读边处理,立即看到结果
rl.on('line', (line) => {
lineNumber++;
// 实时处理,不需要等待整个文件
if (line.includes('ERROR')) {
console.log(`第 ${lineNumber} 行发现错误:`, line);
// 可以立即采取行动,如发送告警
}
// 可以显示进度
if (lineNumber % 1000 === 0) {
console.log(`已处理 ${lineNumber} 行...`);
}
});
rl.on('close', () => {
console.log('处理完成,共处理', lineNumber, '行');
});
// 优势:
// 1. 立即开始处理,不需要等待
// 2. 可以实时看到处理进度
// 3. 内存占用小,只缓存当前行
场景三:数据转换管道
问题:没有 Stream 的复杂处理
当需要对数据进行多个步骤的处理时,传统方式需要多次读取和写入。
// ❌ 问题示例:多次读取和写入
const fs = require('fs');
const zlib = require('zlib');
// 步骤1:读取文件
const data = fs.readFileSync('input.txt'); // 占用内存
// 步骤2:压缩
const compressed = zlib.gzipSync(data); // 又占用一份内存
// 步骤3:加密(假设有加密函数)
const encrypted = encrypt(compressed); // 再占用一份内存
// 步骤4:写入文件
fs.writeFileSync('output.txt.gz.enc', encrypted);
// 问题:
// 1. 内存占用 = 原始数据 + 压缩数据 + 加密数据(可能是原始数据的 3 倍)
// 2. 每个步骤都需要等待前一步完成
// 3. 代码复杂,难以维护
// 4. 对于大文件,内存可能不足
解决方案:使用 Stream 管道
// ✅ 正确示例:使用 Stream 管道
const fs = require('fs');
const zlib = require('zlib');
const { Transform } = require('stream');
const crypto = require('crypto');
// 创建加密转换流
const encryptStream = new Transform({
transform(chunk, encoding, callback) {
const cipher = crypto.createCipher('aes192', 'password');
const encrypted = Buffer.concat([
cipher.update(chunk),
cipher.final()
]);
this.push(encrypted);
callback();
}
});
// 使用管道连接:读取 -> 压缩 -> 加密 -> 写入
fs.createReadStream('input.txt')
.pipe(zlib.createGzip()) // 压缩流
.pipe(encryptStream) // 加密流
.pipe(fs.createWriteStream('output.txt.gz.enc'));
// 优势:
// 1. 内存占用恒定(只占用缓冲区大小)
// 2. 数据流式处理,不需要等待
// 3. 代码简洁,易于理解和维护
// 4. 可以处理任意大小的文件
// 5. 自动处理背压(backpressure),防止内存溢出
场景四:HTTP 文件上传
问题:没有 Stream 的上传限制
处理文件上传时,如果一次性加载整个文件到内存,会有严重限制。
// ❌ 问题示例:一次性处理上传文件
const http = require('http');
const formidable = require('formidable');
http.createServer((req, res) => {
if (req.method === 'POST') {
const form = formidable({
// 问题:整个文件会被加载到内存
// 如果用户上传 1GB 文件,服务器需要 1GB+ 内存
});
form.parse(req, (err, fields, files) => {
// 文件已经在内存中了
const uploadedFile = files.file;
// 如果内存不足,服务器可能崩溃
// 多个用户同时上传大文件时,问题更严重
});
}
}).listen(3000);
解决方案:使用 Stream 流式上传
// ✅ 正确示例:使用 Stream 流式上传
const http = require('http');
const fs = require('fs');
const { pipeline } = require('stream');
const { promisify } = require('util');
const pipelineAsync = promisify(pipeline);
http.createServer(async (req, res) => {
if (req.method === 'POST' && req.url === '/upload') {
// 创建写入流,直接写入磁盘
const writeStream = fs.createWriteStream(`uploads/${Date.now()}.file`);
try {
// 流式传输:请求体 -> 文件
// 内存占用恒定,不受文件大小影响
await pipelineAsync(req, writeStream);
res.writeHead(200, { 'Content-Type': 'application/json' });
res.end(JSON.stringify({ success: true, message: '上传成功' }));
} catch (err) {
res.writeHead(500);
res.end(JSON.stringify({ success: false, error: err.message }));
}
}
}).listen(3000);
// 优势:
// 1. 内存占用恒定,可以处理任意大小的文件
// 2. 数据直接写入磁盘,不需要在内存中缓存
// 3. 可以同时处理多个上传请求
// 4. 自动处理背压,防止内存溢出
场景五:数据库批量导入
问题:没有 Stream 的批量操作瓶颈
从文件批量导入数据到数据库时,传统方式效率低下。
// ❌ 问题示例:一次性加载所有数据
const fs = require('fs');
const mysql = require('mysql2/promise');
async function importData() {
// 读取整个 CSV 文件到内存
const csvData = fs.readFileSync('large-data.csv', 'utf8');
const lines = csvData.split('\n');
const connection = await mysql.createConnection({ /* ... */ });
// 问题:
// 1. 整个文件在内存中,占用大量内存
// 2. 如果文件很大(如 10GB),可能无法加载
// 3. 需要等待所有数据解析完成才能开始插入
// 4. 如果中途出错,所有工作都白费
for (const line of lines) {
const [name, email] = line.split(',');
await connection.execute(
'INSERT INTO users (name, email) VALUES (?, ?)',
[name, email]
);
}
await connection.end();
}
解决方案:使用 Stream 批量导入
// ✅ 正确示例:使用 Stream 批量导入
const fs = require('fs');
const readline = require('readline');
const mysql = require('mysql2/promise');
async function importData() {
const connection = await mysql.createConnection({ /* ... */ });
const readStream = fs.createReadStream('large-data.csv');
const rl = readline.createInterface({
input: readStream,
crlfDelay: Infinity
});
let batch = [];
const BATCH_SIZE = 1000; // 批量插入大小
for await (const line of rl) {
const [name, email] = line.split(',');
batch.push([name, email]);
// 达到批量大小时,执行插入
if (batch.length >= BATCH_SIZE) {
const values = batch.map(() => '(?, ?)').join(', ');
const sql = `INSERT INTO users (name, email) VALUES ${values}`;
const flatBatch = batch.flat();
await connection.execute(sql, flatBatch);
batch = []; // 清空批次
console.log(`已导入 ${BATCH_SIZE} 条记录...`);
}
}
// 处理剩余数据
if (batch.length > 0) {
const values = batch.map(() => '(?, ?)').join(', ');
const sql = `INSERT INTO users (name, email) VALUES ${values}`;
await connection.execute(sql, batch.flat());
}
await connection.end();
console.log('导入完成');
}
// 优势:
// 1. 内存占用小,只缓存当前批次
// 2. 可以处理任意大小的文件
// 3. 实时处理,可以看到进度
// 4. 批量插入,数据库操作效率高
总结:Stream 的核心价值
-
内存效率:处理大文件时内存占用恒定,不受文件大小影响
-
时间效率:可以边读边处理,不需要等待所有数据就绪
-
可组合性:通过管道(pipe)将多个流连接,代码简洁优雅
-
实时处理:可以实时看到处理进度和结果
-
自动背压控制:自动处理数据生产速度超过消费速度的情况
-
可扩展性:可以处理任意大小的数据,不受内存限制
流概念
流(Stream)是 Node.js 中处理流式数据的抽象接口。流是数据的集合,就像数组或字符串一样,但流可能不会一次性全部可用,也不需要全部放入内存。
流的类型:
-
Readable(可读流):可以读取数据的流(如
fs.createReadStream())
-
Writable(可写流):可以写入数据的流(如
fs.createWriteStream())
-
Duplex(双工流):既可读又可写的流(如 TCP socket)
-
Transform(转换流):在读写过程中可以修改或转换数据的双工流(如
zlib.createGzip())
流的工作模式:
-
对象模式:流可以处理 JavaScript 对象(除了 null)
-
非对象模式:流处理字符串、Buffer 或 Uint8Array
提示:关于 Stream 的实际应用场景和解决的问题,请参考 流的实际作用 章节。
可读流(Readable)
可读流是数据的来源,可以从文件、网络、内存等读取数据。
创建可读流
可读流有两种工作模式:
1. 流动模式(Flowing Mode) - 数据自动从底层系统读取,并通过事件提供给应用程序。
const fs = require('fs');
// 从文件创建可读流
const readableStream = fs.createReadStream('input.txt');
// 流动模式:监听 data 事件,数据自动流动
readableStream.on('data', (chunk) => {
console.log(`接收到 ${chunk.length} 字节的数据`);
console.log(chunk.toString());
});
readableStream.on('end', () => {
console.log('数据读取完成');
});
readableStream.on('error', (err) => {
console.error('读取错误:', err);
});
2. 暂停模式(Paused Mode) - 必须显式调用 stream.read() 来读取数据块。
const fs = require('fs');
const readableStream = fs.createReadStream('input.txt');
// 暂停模式:监听 readable 事件,手动读取
readableStream.on('readable', () => {
let chunk;
while (null !== (chunk = readableStream.read())) {
console.log('读取数据:', chunk.toString());
}
});
readableStream.on('end', () => {
console.log('读取完成');
});
手动创建可读流
const { Readable } = require('stream');
// 创建自定义可读流
const readableStream = new Readable({
read(size) {
// 模拟数据生成
this.push('Hello ');
this.push('World');
this.push(null); // 表示数据结束
}
});
readableStream.on('data', (chunk) => {
console.log(chunk.toString()); // 'Hello World'
});
可写流(Writable)
可写流是数据的目标,可以向文件、网络、内存等写入数据。
创建可写流
const fs = require('fs');
// 创建可写流
const writableStream = fs.createWriteStream('output.txt');
// write(): 写入数据
writableStream.write('Hello ');
writableStream.write('World');
// end(): 结束写入(可选传入最后的数据)
writableStream.end();
// 监听完成事件
writableStream.on('finish', () => {
console.log('数据写入完成');
});
// 监听错误事件
writableStream.on('error', (err) => {
console.error('写入错误:', err);
});
// 监听 drain 事件(当缓冲区可以继续写入时)
writableStream.on('drain', () => {
console.log('缓冲区已清空,可以继续写入');
});
手动创建可写流
const { Writable } = require('stream');
// 创建自定义可写流
const writableStream = new Writable({
write(chunk, encoding, callback) {
console.log('写入数据:', chunk.toString());
// 模拟异步操作
setTimeout(() => {
callback(); // 调用回调表示写入完成
}, 100);
}
});
writableStream.write('Hello');
writableStream.write(' World');
writableStream.end();
流管道(pipe)
pipe() 方法将可读流连接到可写流,自动管理数据流和背压(backpressure)。
基本用法
const fs = require('fs');
// 创建可读流和可写流
const readableStream = fs.createReadStream('input.txt');
const writableStream = fs.createWriteStream('output.txt');
// 使用 pipe 连接流
readableStream.pipe(writableStream);
// 监听完成事件
writableStream.on('finish', () => {
console.log('文件复制完成');
});
链式管道
可以将多个流通过管道连接起来。pipe() 返回目标流,所以可以链式调用:
const fs = require('fs');
const zlib = require('zlib');
// 链式管道:读取文件 -> 压缩 -> 写入文件
fs.createReadStream('input.txt')
.pipe(zlib.createGzip())
.pipe(fs.createWriteStream('input.txt.gz'));
// 也可以分开写,pipe() 返回目标流
const readableStream = fs.createReadStream('input.txt');
const gzipStream = zlib.createGzip();
const writableStream = fs.createWriteStream('output.txt.gz');
readableStream
.pipe(gzipStream)
.pipe(writableStream);
writableStream.on('finish', () => {
console.log('压缩完成');
});
流事件
流是 EventEmitter 的实例,可以监听各种事件。前面章节已经展示了基本的事件使用(data、end、error、finish、drain 等),这里补充一些重要的事件和最佳实践。
可读流常用事件:
-
data: 当流将数据块传送给消费者时触发(流动模式)
-
readable: 当有数据可从流中读取时触发(暂停模式)
-
end: 当流中没有更多数据可供消费时触发
-
error: 当流发生错误时触发
-
close: 当流及其底层资源被关闭时触发
可写流常用事件:
-
drain: 当可以继续写入数据到流时触发
-
finish: 当所有数据已被刷新到底层系统时触发
-
error: 当写入或管道操作发生错误时触发
-
close: 当流及其底层资源被关闭时触发
-
pipe: 当在可读流上调用 stream.pipe() 方法时触发
-
unpipe: 当在可读流上调用 stream.unpipe() 方法时触发
事件处理最佳实践
const fs = require('fs');
function copyFile(source, destination) {
return new Promise((resolve, reject) => {
const readableStream = fs.createReadStream(source);
const writableStream = fs.createWriteStream(destination);
// 使用 once 监听一次性事件
readableStream.once('error', reject);
writableStream.once('error', reject);
writableStream.once('finish', resolve);
// 使用 pipe 连接流
readableStream.pipe(writableStream);
});
}
// 使用示例
copyFile('input.txt', 'output.txt')
.then(() => {
console.log('文件复制成功');
})
.catch((err) => {
console.error('文件复制失败:', err);
});
流错误处理
正确处理流错误非常重要,可以防止内存泄漏和未处理的异常。虽然可以使用 on('error') 手动处理错误,但 Node.js 提供了更好的方式。
使用 pipeline() 自动处理错误
pipeline() 是 Node.js 提供的更好的方式,可以自动处理错误和清理资源。
const fs = require('fs');
const { pipeline } = require('stream');
const { promisify } = require('util');
const pipelineAsync = promisify(pipeline);
async function copyFile(source, destination) {
try {
await pipelineAsync(
fs.createReadStream(source),
fs.createWriteStream(destination)
);
console.log('文件复制成功');
} catch (err) {
console.error('文件复制失败:', err);
}
}
copyFile('input.txt', 'output.txt');
使用 finished() 监听流结束
finished() 可以监听流的结束(成功或失败)。
const fs = require('fs');
const { finished } = require('stream');
const { promisify } = require('util');
const finishedAsync = promisify(finished);
async function processStream() {
const readableStream = fs.createReadStream('input.txt');
readableStream.on('data', (chunk) => {
console.log('处理数据:', chunk.toString());
});
try {
await finishedAsync(readableStream);
console.log('流处理完成');
} catch (err) {
console.error('流处理错误:', err);
}
}
processStream();
自定义流的错误处理
const { Transform } = require('stream');
class SafeTransform extends Transform {
_transform(chunk, encoding, callback) {
try {
// 可能抛出错误的操作
const result = this.processChunk(chunk);
this.push(result);
callback();
} catch (err) {
// 将错误传递给回调
callback(err);
}
}
processChunk(chunk) {
// 处理逻辑
return chunk.toString().toUpperCase();
}
}
// 使用
const transform = new SafeTransform();
transform.on('error', (err) => {
console.error('转换错误:', err);
});
transform.write('hello');
transform.end();
流(Stream)高级
双工流(Duplex)
双工流同时实现了可读流和可写流的接口,可以同时读取和写入数据。TCP socket 就是一个典型的双工流。
创建双工流
const { Duplex } = require('stream');
// 创建自定义双工流
const duplexStream = new Duplex({
read(size) {
// 可读端:生成数据
this.push('Hello ');
this.push('World');
this.push(null);
},
write(chunk, encoding, callback) {
// 可写端:处理写入的数据
console.log('接收到写入数据:', chunk.toString());
callback(); // 调用回调表示写入完成
}
});
// 可以同时读取和写入
duplexStream.on('data', (chunk) => {
console.log('读取:', chunk.toString());
});
duplexStream.write('Test');
duplexStream.end();
实际应用:TCP Socket
const net = require('net');
// TCP socket 是双工流
const server = net.createServer((socket) => {
console.log('客户端已连接');
// socket 是可读流
socket.on('data', (data) => {
console.log('接收到数据:', data.toString());
// socket 也是可写流
socket.write('Echo: ' + data);
});
socket.on('end', () => {
console.log('客户端断开连接');
});
});
server.listen(8080, () => {
console.log('服务器监听在 8080 端口');
});
转换流(Transform)
转换流是一种特殊的双工流,在数据从可写端写入后,经过转换处理,可以从可读端读取转换后的数据。zlib.createGzip() 就是一个转换流。
创建转换流
const { Transform } = require('stream');
// 创建自定义转换流:将输入转换为大写
const upperCaseTransform = new Transform({
transform(chunk, encoding, callback) {
// 转换数据
const upperChunk = chunk.toString().toUpperCase();
// 将转换后的数据推送到可读端
this.push(upperChunk);
callback(); // 调用回调表示处理完成
}
});
// 使用转换流
process.stdin
.pipe(upperCaseTransform)
.pipe(process.stdout);
// 输入: hello world
// 输出: HELLO WORLD
实际应用:数据加密转换流
const { Transform } = require('stream');
const crypto = require('crypto');
// 创建加密转换流
class EncryptTransform extends Transform {
constructor(password) {
super();
this.cipher = crypto.createCipher('aes192', password);
}
_transform(chunk, encoding, callback) {
const encrypted = this.cipher.update(chunk);
this.push(encrypted);
callback();
}
_flush(callback) {
this.push(this.cipher.final());
callback();
}
}
// 创建解密转换流
class DecryptTransform extends Transform {
constructor(password) {
super();
this.decipher = crypto.createDecipher('aes192', password);
}
_transform(chunk, encoding, callback) {
const decrypted = this.decipher.update(chunk);
this.push(decrypted);
callback();
}
_flush(callback) {
this.push(this.decipher.final());
callback();
}
}
// 使用示例
const fs = require('fs');
const password = 'my-secret-password';
// 加密文件
fs.createReadStream('input.txt')
.pipe(new EncryptTransform(password))
.pipe(fs.createWriteStream('encrypted.txt'));
// 解密文件
fs.createReadStream('encrypted.txt')
.pipe(new DecryptTransform(password))
.pipe(fs.createWriteStream('decrypted.txt'));
实际应用:JSON 解析转换流
const { Transform } = require('stream');
// 创建 JSON 解析转换流
class JSONParseTransform extends Transform {
constructor(options) {
super({ objectMode: true }); // 对象模式
this.buffer = '';
}
_transform(chunk, encoding, callback) {
this.buffer += chunk.toString();
// 尝试解析完整的 JSON 对象
let boundary = this.buffer.indexOf('\n');
while (boundary !== -1) {
const line = this.buffer.slice(0, boundary);
this.buffer = this.buffer.slice(boundary + 1);
try {
const obj = JSON.parse(line);
this.push(obj); // 推送解析后的对象
} catch (err) {
// 忽略解析错误
}
boundary = this.buffer.indexOf('\n');
}
callback();
}
_flush(callback) {
// 处理剩余数据
if (this.buffer.trim()) {
try {
const obj = JSON.parse(this.buffer);
this.push(obj);
} catch (err) {
// 忽略解析错误
}
}
callback();
}
}
// 使用示例
const fs = require('fs');
fs.createReadStream('data.jsonl')
.pipe(new JSONParseTransform())
.on('data', (obj) => {
console.log('解析的对象:', obj);
});
流性能优化
1. 使用对象模式提高性能
对于处理对象而不是 Buffer 的场景,使用对象模式可以提高性能。
const { Transform } = require('stream');
// 对象模式:直接传递对象,避免序列化/反序列化
const objectTransform = new Transform({
objectMode: true,
transform(obj, encoding, callback) {
// 直接处理对象
obj.processed = true;
this.push(obj);
callback();
}
});
// 非对象模式:需要处理 Buffer
const bufferTransform = new Transform({
transform(chunk, encoding, callback) {
// 需要将 Buffer 转换为对象,处理后再转换回 Buffer
const obj = JSON.parse(chunk.toString());
obj.processed = true;
this.push(Buffer.from(JSON.stringify(obj)));
callback();
}
});
2. 控制背压(Backpressure)
背压是流控制的重要机制,防止数据生产速度超过消费速度。
const fs = require('fs');
const readableStream = fs.createReadStream('large-file.txt');
const writableStream = fs.createWriteStream('output.txt');
// pipe() 自动处理背压
readableStream.pipe(writableStream);
// 手动处理背压
readableStream.on('data', (chunk) => {
const canContinue = writableStream.write(chunk);
if (!canContinue) {
// 缓冲区已满,暂停读取
readableStream.pause();
// 等待 drain 事件后继续读取
writableStream.once('drain', () => {
readableStream.resume();
});
}
});
readableStream.on('end', () => {
writableStream.end();
});
3. 使用高水位标记(High Water Mark)
高水位标记控制内部缓冲区的大小。
const fs = require('fs');
// 设置较大的高水位标记以提高性能(但会占用更多内存)
const readableStream = fs.createReadStream('large-file.txt', {
highWaterMark: 64 * 1024 // 64KB(默认是 16KB)
});
// 对于可写流
const writableStream = fs.createWriteStream('output.txt', {
highWaterMark: 64 * 1024
});
4. 批量处理数据
const { Transform } = require('stream');
class BatchTransform extends Transform {
constructor(options) {
super(options);
this.batchSize = options.batchSize || 10;
this.batch = [];
}
_transform(chunk, encoding, callback) {
this.batch.push(chunk);
if (this.batch.length >= this.batchSize) {
// 批量处理
this.processBatch();
}
callback();
}
_flush(callback) {
// 处理剩余数据
if (this.batch.length > 0) {
this.processBatch();
}
callback();
}
processBatch() {
// 批量处理逻辑
const batchData = this.batch.splice(0, this.batchSize);
this.push(Buffer.from(JSON.stringify(batchData)));
}
}
// 使用
const batchTransform = new BatchTransform({ batchSize: 100 });
5. 使用流池避免内存泄漏
const { pipeline } = require('stream');
const { promisify } = require('util');
const pipelineAsync = promisify(pipeline);
async function processMultipleFiles(files) {
// 使用 Promise.all 并行处理,但限制并发数
const concurrency = 3;
for (let i = 0; i < files.length; i += concurrency) {
const batch = files.slice(i, i + concurrency);
await Promise.all(
batch.map(async (file) => {
try {
await pipelineAsync(
fs.createReadStream(file.input),
fs.createWriteStream(file.output)
);
console.log(`处理完成: ${file.input}`);
} catch (err) {
console.error(`处理失败: ${file.input}`, err);
}
})
);
}
}
6. 避免不必要的中间流
// 不好的做法:创建不必要的中间流
fs.createReadStream('input.txt')
.pipe(new Transform({ /* ... */ }))
.pipe(new Transform({ /* ... */ }))
.pipe(new Transform({ /* ... */ }))
.pipe(fs.createWriteStream('output.txt'));
// 好的做法:合并转换逻辑到一个流中
class CombinedTransform extends Transform {
_transform(chunk, encoding, callback) {
// 合并所有转换逻辑
let result = chunk;
result = this.transform1(result);
result = this.transform2(result);
result = this.transform3(result);
this.push(result);
callback();
}
}
fs.createReadStream('input.txt')
.pipe(new CombinedTransform())
.pipe(fs.createWriteStream('output.txt'));
Buffer 与 Stream 的关系及选择
Buffer 和 Stream 的关系
Buffer 和 Stream 在 Node.js 中经常一起使用,它们的关系如下:
-
Stream 使用 Buffer 作为数据单元
- Stream 在传输数据时,数据块(chunk)通常是 Buffer 对象
- 可读流读取的数据是 Buffer,可写流写入的数据也是 Buffer
- Stream 的缓冲区内部使用 Buffer 来存储数据
-
Buffer 是数据容器,Stream 是数据传输方式
- Buffer:处理二进制数据的容器,适合处理小块数据或需要精确控制字节的场景
- Stream:处理大量数据的方式,通过流式传输避免内存溢出
-
它们经常配合使用
const fs = require('fs');
// Stream 读取文件,数据块是 Buffer
const readStream = fs.createReadStream('file.txt');
readStream.on('data', (chunk) => {
// chunk 是 Buffer 对象
console.log(chunk instanceof Buffer); // true
console.log(chunk.length); // Buffer 的字节长度
});
如何选择使用 Buffer 还是 Stream?
使用 Buffer 的场景
✅ 适合使用 Buffer:
-
处理小文件或数据块
// 文件小于几 MB,可以直接加载到内存
const data = fs.readFileSync('small-file.txt'); // 返回 Buffer
-
需要精确控制字节
// 解析文件格式、网络协议等需要字节级操作
const header = buffer.slice(0, 4); // 提取文件头
if (header[0] === 0xFF && header[1] === 0xD8) {
console.log('这是 JPEG 文件');
}
-
数据编码转换
// Base64、Hex 等编码转换
const base64 = buffer.toString('base64');
-
处理图片、音频等二进制数据
// 图片处理、加密解密等
const imageBuffer = fs.readFileSync('photo.jpg');
-
数据量小且需要一次性处理
// 配置文件、小数据包等
const config = JSON.parse(fs.readFileSync('config.json', 'utf8'));
使用 Stream 的场景
✅ 适合使用 Stream:
-
处理大文件(> 10MB)
// 大文件处理,避免内存溢出
fs.createReadStream('large-file.txt')
.pipe(fs.createWriteStream('output.txt'));
-
实时数据处理
// 日志处理、实时监控等
fs.createReadStream('access.log')
.on('data', (chunk) => {
// 实时处理每一块数据
});
-
网络数据传输
// HTTP 请求/响应、文件上传/下载
http.createServer((req, res) => {
fs.createReadStream('video.mp4').pipe(res);
});
-
数据转换管道
// 压缩、加密、转换等多步骤处理
fs.createReadStream('input.txt')
.pipe(zlib.createGzip())
.pipe(fs.createWriteStream('output.gz'));
-
需要处理的数据大小未知
// 用户上传、API 响应等大小不确定的数据
req.pipe(fs.createWriteStream('uploaded-file'));
组合使用的场景
✅ Buffer + Stream 组合使用:
-
流式处理中的 Buffer 操作
const { Transform } = require('stream');
const transformStream = new Transform({
transform(chunk, encoding, callback) {
// chunk 是 Buffer,可以进行 Buffer 操作
const modified = Buffer.concat([
Buffer.from('Header: '),
chunk,
Buffer.from('\nFooter')
]);
this.push(modified);
callback();
}
});
fs.createReadStream('input.txt')
.pipe(transformStream)
.pipe(fs.createWriteStream('output.txt'));
-
批量处理
// 使用 Stream 读取,Buffer 批量处理
const buffers = [];
readStream.on('data', (chunk) => {
buffers.push(chunk); // 收集 Buffer
if (buffers.length >= 100) {
const batch = Buffer.concat(buffers);
// 批量处理
buffers.length = 0;
}
});
决策流程图
需要处理数据
│
├─ 数据大小 < 10MB?
│ ├─ 是 → 使用 Buffer
│ │ ├─ 需要字节级操作? → Buffer
│ │ ├─ 需要编码转换? → Buffer
│ │ └─ 一次性处理? → Buffer
│ │
│ └─ 否 → 使用 Stream
│ ├─ 大文件处理? → Stream
│ ├─ 实时处理? → Stream
│ ├─ 网络传输? → Stream
│ └─ 数据转换管道? → Stream
│
└─ 需要组合使用?
└─ Stream + Buffer(在 Stream 的 transform 中使用 Buffer 操作)
性能对比示例
❌ 错误:大文件使用 Buffer
// 问题:大文件会导致内存溢出
const data = fs.readFileSync('2GB-file.mp4'); // 占用 2GB 内存
✅ 正确:大文件使用 Stream
// 优势:内存占用恒定(几 MB)
fs.createReadStream('2GB-file.mp4')
.pipe(fs.createWriteStream('copy.mp4'));
❌ 错误:小文件使用 Stream
// 问题:不必要的复杂性
fs.createReadStream('1KB-config.json')
.on('data', (chunk) => {
// 处理小块数据
});
✅ 正确:小文件使用 Buffer
// 优势:简单直接
const config = JSON.parse(fs.readFileSync('1KB-config.json', 'utf8'));
总结
Buffer 和 Stream 是 Node.js 中处理二进制数据和流式数据的核心概念:
-
Buffer:处理二进制数据的容器,适合小块数据、字节级操作、编码转换
-
Stream:流式数据传输方式,适合大文件、实时处理、网络传输
-
关系:Stream 使用 Buffer 作为数据单元,两者配合使用效果最佳
-
选择原则:数据量大或未知 → Stream;数据量小且确定 → Buffer