如何实现流式输出?一篇文章手把手教你!
本文代码github仓库地址——github.com/Objecteee/a…
一、什么是流式输出?
流式输出是一种数据传输模式,在这种模式下,数据不是作为一个完整的、单一的包裹在一次响应中发送给客户端,而是被分成许多小的数据块 (chunks) ,并在服务器端生成的同时,持续不断、逐块地推送到客户端。例如下面的Gemini回答过程——
二、为什么我们需要流式输出?
流式输出的核心价值在于改变了用户对延迟的感知,将原本漫长的等待转化为即时的内容消费。
1.极大地提升用户体验 (UX):
在传统的 请求-响应模型中,用户必须等待服务器生成并返回全部数据才能看到结果。例如,一个 AI 回复可能需要 5-10 秒。
流式输出则实现了内容逐块、逐字到达和显示。用户感知到的延迟从 “总生成时间” 缩短为 “首个数据块到达时间” 。这种实时反馈机制让用户感觉程序在即时响应,显著降低了等待的焦虑感。
在 AI/LLM 场景中, 流式输出是必需品,它将数秒的枯燥等待变成了持续的阅读过程,是用户留存和产品体验的基石。
2.提高系统和网络效率:
流式输出技术(尤其是 SSE 和 WebSocket)通过建立持久连接,减少了重复建立和关闭 HTTP 连接的开销。 数据生成多少推送多少,网络带宽得到更有效的利用,特别是在处理大量异步或长时间运行的任务时,效率优势更为明显。
简单来说,流式输出的重要性在于:它把 “等待” 变成了 “消费” ,是现代交互式应用和实时数据平台的标配。
三、主流的流式输出实现
流式输出主要有两种方案,第一种是SSE(基于HTTP)的单向流式输出;第二种是基于WebSocket的双向流式输出。
1.基于SSE的单向流式输出
(1)后端实现
SSE 流式输出的后端代码是有固定的格式和严格的要求。这种格式是 Server-Sent Events 规范的核心,也是客户端浏览器能够正确解析流的关键。
A. 基础格式
这是最常用、最核心的格式,用于传输数据块。
| 字段 | 格式 | 作用 | 示例 |
|---|---|---|---|
| Data | data: [内容] |
包含要发送的实际数据。客户端 event.data 接收到的就是 : 后面的内容。 |
data: Hello world\n\n |
| 分隔符 | \n\n |
至关重要! 必须以两个换行符标记一个事件块的结束。 |
B. 完整格式
SSE 规范还允许其他可选字段,用于更复杂的流控制:
| 字段 | 格式 | 作用 | 示例 |
|---|---|---|---|
| Event | event: [事件名] |
允许发送不同类型的事件,客户端可以通过 eventSource.addEventListener('事件名', ...) 监听。 |
event: update\n |
| ID | id: [唯一标识] |
允许为每个事件分配一个唯一 ID。如果客户端断线重连,它会发送 Last-Event-ID,服务器可以从该 ID 恢复推送。 |
id: 12345\n |
| Retry | retry: [毫秒数] |
全局设置。 客户端断开连接后,浏览器等待该毫秒数后再尝试重连。 | retry: 10000\n\n |
一个包含所有字段的完整事件块示例如下:
event: system-update
id: 999
retry: 5000
data: {"status": "ok", "progress": 80}
注意: 即使使用了多个字段,最后也必须以 \n\n 结束。
const express = require('express');
const app = express();
const PORT = 3000;
app.get('/api/stream-sse', (req, res) => {
// ------------------------------------------------------------------
// A. 固定的 HTTP 响应头设置 (关键!)
// ------------------------------------------------------------------
res.setHeader('Content-Type', 'text/event-stream');
res.setHeader('Cache-Control', 'no-cache');
res.setHeader('Connection', 'keep-alive');
// 允许跨域访问
res.setHeader('Access-Control-Allow-Origin', '*');
// 可选:设置断线重连时间间隔(单位:毫秒)
// res.write('retry: 5000\n\n');
console.log('--- SSE Connection Established ---');
// 模拟数据流式生成
let counter = 0;
const intervalId = setInterval(() => {
if (counter >= 8) {
// 4. 结束流:发送 [DONE] 标记(常见实践)并关闭连接
res.write('data: [DONE]\n\n');
res.end();
clearInterval(intervalId);
console.log('--- SSE Stream Ended ---');
return;
}
counter++;
const payload = `这是第 ${counter} 块数据,时间:${new Date().toLocaleTimeString()}`;
// ------------------------------------------------------------------
// B. 固定的数据体格式 (严格要求!)
// ------------------------------------------------------------------
const sseData =
// 1. data 字段:必须以 "data: " 开头
`data: ${payload}\n` +
// 2. 两个换行符:必须以 "\n\n" 结尾,标志一个事件的结束
`\n`;
// 3. 实时写入数据到响应流
res.write(sseData);
console.log(`Pushed: ${payload}`);
}, 1500); // 每 1.5 秒推送一次
// 处理客户端断开连接的清理工作
req.on('close', () => {
clearInterval(intervalId);
console.log('Client closed connection.');
});
});
app.listen(PORT, () => {
console.log(`SSE Server running on http://localhost:${PORT}`);
});
-
设置正确的 Header: 必须设置
Content-Type: text/event-stream。 -
实时写入: 使用 Node.js 的
res.write()方法,而不是等待数据全部收集完毕后使用res.send()。res.send()会尝试关闭连接,不适用于流式输出。 -
遵守
data: ...\n\n格式: 任何偏离这个格式的输出都可能导致客户端EventSource无法正确触发onmessage事件。 -
连接清理: 监听
req.on('close', ...)事件,确保在客户端断开连接时,服务器能停止不必要的定时器或资源占用。
(2)前端实现
A.浏览器原生实现(EventSource)
EventSource 是浏览器专为 SSE 规范设计的高级 API。它自动处理了底层 HTTP 连接、数据格式解析、事件触发,以及断线重连等所有复杂逻辑。
优势
- 最简洁: 只需要几行代码即可开始监听流。
- 可靠性高: 内置自动重连机制,非常健壮。
-
无需手动解析: 自动将
data:字段的内容提取出来,作为event.data。
局限性
- 只能 GET 请求。
- 不支持自定义 Header: 无法在请求头中传递 Token(只能通过 URL query 参数)。
<!DOCTYPE html>
<html lang="zh-CN">
<head>
<meta charset="UTF-8">
<title>EventSource 客户端示例</title>
<style>
body { font-family: 'Segoe UI', Tahoma, Geneva, Verdana, sans-serif; padding: 20px; }
h1 { color: #333; }
#status { font-weight: bold; margin-bottom: 15px; padding: 8px; border-radius: 4px; }
.status-connecting { color: orange; background-color: #fff3e0; }
.status-open { color: green; background-color: #e8f5e9; }
.status-closed { color: red; background-color: #ffebee; }
#output-area {
border: 1px solid #ddd;
padding: 15px;
min-height: 150px;
white-space: pre-wrap; /* 保持换行和空格 */
background-color: #fcfcfc;
overflow-y: auto;
}
.chunk { margin-right: 5px; color: #00796b; }
.divider { color: #bdbdbd; }
</style>
</head>
<body>
<h1>EventSource 实时流接收端</h1>
<p id="status" class="status-connecting">连接状态: 正在初始化...</p>
<h3>接收到的消息流:</h3>
<div id="output-area"></div>
<button onclick="closeStream()">🛑 停止接收流</button>
<script>
// 请确保这里的 URL 与您的 Node.js 后端 SSE 路由匹配
const STREAM_URL = 'http://localhost:3000/api/stream-sse';
const statusElement = document.getElementById('status');
const outputElement = document.getElementById('output-area');
let eventSource;
function updateStatus(text, className) {
statusElement.textContent = text;
statusElement.className = '';
statusElement.classList.add(className);
}
function initializeSSE() {
outputElement.innerHTML = ''; // 清空旧内容
// 1. 创建 EventSource 实例并建立连接
eventSource = new EventSource(STREAM_URL);
updateStatus('连接状态: 正在连接...', 'status-connecting');
// 2. 监听连接打开事件
eventSource.onopen = () => {
updateStatus('连接状态: ✅ 已建立', 'status-open');
console.log('SSE connection opened successfully.');
};
// 3. 监听接收到数据事件 (核心逻辑)
eventSource.onmessage = (event) => {
const chunk = event.data;
console.log('Received chunk:', chunk);
// 检查结束标记(与后端定义的 "[DONE]" 匹配)
if (chunk === '[DONE]') {
eventSource.close();
updateStatus('连接状态: 🟢 流已完成并关闭', 'status-closed');
console.log('Stream finished and closed by server.');
return;
}
// 实时追加数据到 UI
// 使用 innerHTML 追加,可以实现更丰富的样式
outputElement.innerHTML += `<span class="chunk">${chunk}</span><span class="divider"> | </span>`;
// 确保滚动到底部以查看最新内容
outputElement.scrollTop = outputElement.scrollHeight;
};
// 4. 监听错误事件(处理断线等)
eventSource.onerror = (error) => {
console.error('EventSource encountered an error:', error);
if (eventSource.readyState === EventSource.CLOSED) {
updateStatus('连接状态: ❌ 已关闭或断开', 'status-closed');
} else {
// EventSource 默认会尝试自动重连
updateStatus('连接状态: ⚠️ 发生错误,正在尝试重连...', 'status-connecting');
}
};
}
// 5. 客户端主动关闭流的函数
function closeStream() {
if (eventSource && eventSource.readyState !== EventSource.CLOSED) {
eventSource.close();
updateStatus('连接状态: 🛑 用户手动关闭', 'status-closed');
console.log('User manually closed the stream.');
}
}
// 页面加载完成后立即启动 SSE
window.onload = initializeSSE;
</script>
</body>
</html>
阶段一:连接初始化与建立
此阶段旨在建立一个持久化的 HTTP 连接,并开始监听。
| 步骤 | 动作描述 | 核心代码 | 关键点 |
|---|---|---|---|
1. 创建 EventSource 实例 |
使用 EventSource 构造函数传入 SSE 接口 URL,向服务器发起连接请求。 |
const es = new EventSource(url); |
浏览器自动处理 底层 HTTP GET 请求。 |
| 2. 监听连接打开 | 监听 onopen 事件,确认与服务器的连接已成功建立。 |
es.onopen = () => { ... } |
此时流式数据传输通道已打开。 |
| 3. 更新 UI 状态 | 在 onopen 中,更新界面状态,提示用户数据流已开始。 |
setStatus('已连接'); |
提升用户体验。 |
阶段二:数据接收与处理(核心)
此阶段是持续接收服务器推送的数据,并实时更新 UI。
| 步骤 | 动作描述 | 核心代码 | 关键点 |
|---|---|---|---|
| 4. 监听消息事件 | 监听 onmessage 事件。每当服务器推送一个完整的 data:...\n\n 事件块时,此回调函数就会触发。 |
es.onmessage = (event) => { ... } |
event.data 包含了服务器推送的实际内容。 |
| 5. 实时数据追加 | 将接收到的数据 (event.data) 追加到当前展示的文本末尾(而不是替换)。 |
currentMsg += event.data; |
这是实现“打字机”效果的关键。 |
| 6. 处理自定义事件 | 如果后端使用了 event: [name] 字段,前端可以通过 es.addEventListener('name', callback) 针对性地处理不同类型的事件。 |
es.addEventListener('update', ...) |
适用于需要区分不同业务类型数据的场景。 |
阶段三:连接维护、关闭与错误处理
此阶段处理流的正常结束、网络错误和重试逻辑。
| 步骤 | 动作描述 | 核心代码 | 关键点 |
|---|---|---|---|
| 7. 监听流结束标记 | 根据与后端约定的结束标记(如 [DONE]),判断流是否完成。 |
if (event.data === '[DONE]') { ... } |
流的正常结束, 避免无限等待。 |
| 8. 关闭连接 | 当流结束或用户主动点击“停止”按钮时,主动调用 close() 方法。 |
es.close(); |
释放客户端资源。 |
| 9. 监听错误与重连 | 监听 onerror 事件。当连接出错时,浏览器会根据服务器设置的 retry: 时间间隔自动尝试重连。 |
es.onerror = (error) => { ... } |
自动重试 是 SSE 相比于手动 fetch 的巨大优势。 |
B.Fetch API实现
当需要更强大的控制力、自定义请求头,或处理非标准 SSE 格式的流时,fetch 是最佳选择。
利用 fetch 返回的 response.body,它是一个 ReadableStream 对象。开发者需要获取这个流的 Reader,进入一个循环,手动读取数据块、解码,并根据 \n\n 规则手动解析 SSE 格式。
优势
- 控制力强: 可以发送 POST、PUT 等请求,并设置自定义 Header(用于鉴权)。
- 通用性: 可以处理任何基于 HTTP 的流式数据,不限于 SSE 格式。
- 新标准: 符合现代 Web 标准,浏览器支持良好。
挑战
- 实现复杂: 需要手动编写数据解析和错误重连的代码。
- 性能考量: 频繁的循环读取和字符串拼接/解码操作需要谨慎优化。
<!DOCTYPE html>
<html lang="zh-CN">
<head>
<meta charset="UTF-8">
<title>Fetch API SSE 客户端示例</title>
<style>
body { font-family: 'Segoe UI', Tahoma, Geneva, Verdana, sans-serif; padding: 20px; }
#status { font-weight: bold; margin-bottom: 15px; color: orange; }
#output-area {
border: 1px solid #ddd;
padding: 15px;
min-height: 150px;
white-space: pre-wrap;
background-color: #fcfcfc;
overflow-y: auto;
}
</style>
</head>
<body>
<h1>🔗 Fetch API 模拟 SSE 接收端</h1>
<p id="status">连接状态: 待启动</p>
<h3>接收到的消息流:</h3>
<div id="output-area"></div>
<button onclick="startStream()">▶️ 启动流</button>
<button onclick="stopStream()">🛑 停止接收流</button>
<script>
const STREAM_URL = 'http://localhost:3000/api/stream-sse'; // 确保 URL 正确
const statusElement = document.getElementById('status');
const outputElement = document.getElementById('output-area');
let controller = null; // 用于控制请求中止 (AbortController)
function updateStatus(text, color) {
statusElement.textContent = text;
statusElement.style.color = color;
}
async function startStream() {
if (controller) stopStream(); // 确保前一个流已停止
controller = new AbortController();
const signal = controller.signal;
let textDecoder = new TextDecoder('utf-8');
let buffer = ''; // 缓冲区,用于存储不完整的事件块
outputElement.innerHTML = '';
updateStatus('连接状态: 正在连接...', 'orange');
try {
// 1. 发起 Fetch 请求,设置 signal 用于中止
const response = await fetch(STREAM_URL, {
method: 'GET',
headers: { 'Accept': 'text/event-stream' },
signal: signal
});
if (!response.body) {
throw new Error("响应体不是一个可读流。");
}
// 2. 获取 ReadableStream 的 Reader
const reader = response.body.getReader();
// 3. 循环读取流数据
while (true) {
const { done, value } = await reader.read();
if (done) {
updateStatus('连接状态: 🟢 流已完成', 'green');
console.log('Stream finished.');
break;
}
// 4. 解码字节数据并追加到缓冲区
// { stream: true } 允许在流继续时进行解码
buffer += textDecoder.decode(value, { stream: true });
// 5. 手动解析 SSE 消息块
// SSE 消息以 \n\n 结束,分割缓冲区
let messages = buffer.split('\n\n');
buffer = messages.pop(); // 最后一个不完整的块留在缓冲区
messages.forEach(message => {
if (message) {
// 6. 提取 data: 字段内容
const dataMatch = message.match(/data: (.*)/);
if (dataMatch && dataMatch[1]) {
const data = dataMatch[1].trim();
if (data === '[DONE]') {
controller.abort(); // 接收到结束标记,主动中止请求
} else {
// 7. 处理接收到的数据
outputElement.innerHTML += `<span class="chunk">${data}</span><span class="divider"> | </span>`;
outputElement.scrollTop = outputElement.scrollHeight;
}
}
}
});
}
} catch (error) {
if (error.name === 'AbortError') {
updateStatus('连接状态: 🛑 流被用户/DONE标记中止', 'gray');
console.log('Fetch request aborted successfully.');
} else {
updateStatus(`连接状态: ❌ 错误: ${error.message}`, 'red');
console.error('Fetch SSE 错误:', error);
}
}
}
function stopStream() {
if (controller) {
controller.abort();
controller = null;
}
}
// 页面加载完成后,设置初始状态
window.onload = () => updateStatus('连接状态: 待启动', 'black');
</script>
</body>
</html>
Fetch API 实现流式输出是高度可定制但相对复杂的技术方案,它要求开发者手动处理数据的接收、解码和 SSE 格式的解析。整个流程可以清晰地划分为三个主要阶段。
阶段一:初始化请求与准备工作
此阶段的目标是发起请求并设置流读取所需的环境。
| 步骤 | 动作描述 | 关键技术点 |
|---|---|---|
| 1. 准备控制器 | 创建 AbortController 实例。用于在流结束或需要中断时,能够中止 fetch 请求。 |
new AbortController() |
| 2. 发起 Fetch 请求 | 发起 fetch 请求,将 controller.signal 附加到请求配置中。可在 headers 中设置鉴权信息。 |
fetch(url, { signal: controller.signal, headers: {...} }) |
| 3. 获取流读取器 | 检查响应是否成功 (response.ok),然后通过 response.body.getReader() 获取 reader 对象。 |
response.body.getReader() |
| 4. 准备解码器与缓冲区 | 实例化 TextDecoder 用于将字节解码为字符串,并初始化一个 buffer 变量用于暂存不完整的数据片段。 |
new TextDecoder('utf-8'), let buffer = ''
|
阶段二:循环读取、解码与解析(核心)
此阶段是整个流程最复杂的部分,需要持续从流中拉取数据并手动解析 SSE 格式。
| 步骤 | 动作描述 | 关键技术点 |
|---|---|---|
| 5. 启动读取循环 | 使用 while (true) 或类似的循环结构开始持续读取数据。 |
while (true) |
| 6. 读取数据块 | 调用 await reader.read()。它会暂停执行,等待服务器推送新的数据块 (value: Uint8Array)。 |
const { done, value } = await reader.read() |
| 7. 判断流结束 | 检查 done 属性。如果为 true,跳出循环,进入清理阶段。 |
if (done) break; |
| 8. 解码与缓冲 | 使用 TextDecoder 将原始字节 value 解码为字符串,并将其追加到缓冲区 (buffer) 中。 |
buffer += decoder.decode(value, { stream: true }) |
| 9. 手动解析 SSE | 使用 buffer.split('\n\n') 将缓冲区内容分割成潜在的 SSE 消息数组。然后将数组中最后一个不完整的块重新放回缓冲区。 |
buffer.split('\n\n'), buffer = messages.pop()
|
| 10. 提取数据 | 遍历其余完整的消息块,使用正则表达式(如 /data: (.*)/)手动提取 data: 字段后的实际内容。 |
message.match(/data: (.*)/) |
| 11. 实时处理 | 将提取到的数据追加到 UI 界面,并执行所需的业务逻辑。 | UI 更新、滚动到底部 |
阶段三:连接清理与错误处理
此阶段确保流的正确终止和资源释放。
| 步骤 | 动作描述 | 关键技术点 |
|---|---|---|
| 12. 处理结束标记 | 在数据处理逻辑中,如果检测到服务器推送的结束标记(如 [DONE]),则立即调用 controller.abort() 终止流。 |
controller.abort() |
| 13. 错误捕获 | 将整个 fetch 和流读取循环包裹在 try...catch 块中。捕获网络错误和因 controller.abort() 产生的 AbortError。 |
try...catch (error) |
| 14. 释放资源 | 无论是正常结束还是出错,确保所有相关资源(如 reader)得到释放。 |
AbortController 自动处理了请求的终止。 |
通过这种手动控制的方式,Fetch API 提供了对流的最高级别控制,但代价是代码实现更为复杂,且需要开发者手动处理自动重连等健壮性机制。
C.Axios API实现
使用 axios 实现流式输出主要依赖其配置项,但在浏览器环境下的表现不如 fetch 和 EventSource。
在 axios 的配置中,需要显式设置 responseType: 'stream'。在 Node.js 环境中,这会返回一个 Node.js 的流,处理起来比较方便。但在浏览器环境中,axios 对流的封装不如 fetch 原生。
优势
- Node.js 友好: 在 Node.js 环境中处理流式数据很方便。
-
代码统一: 如果应用大量依赖
axios,可以避免引入fetch。
挑战
-
浏览器兼容性/稳定性: 浏览器端对
axios流的稳定支持不如fetch和EventSource。 -
解析依然需要手动: 像
fetch一样,您仍然需要获取流并手动进行 SSE 格式的解析和解码。 - 潜在的内存问题: 如果流处理不当,可能会导致内存占用过高。
<!DOCTYPE html>
<html lang="zh-CN">
<head>
<meta charset="UTF-8">
<title>Axios 客户端模拟 SSE 示例</title>
<script src="https://cdn.jsdelivr.net/npm/axios/dist/axios.min.js"></script>
<style>
body { font-family: 'Segoe UI', Tahoma, Geneva, Verdana, sans-serif; padding: 20px; }
#status { font-weight: bold; margin-bottom: 15px; color: orange; }
#output-area {
border: 1px solid #ddd;
padding: 15px;
min-height: 150px;
white-space: pre-wrap;
background-color: #fcfcfc;
overflow-y: auto;
}
.chunk { margin-right: 5px; color: #00796b; }
.divider { color: #bdbdbd; }
</style>
</head>
<body>
<h1>🛠️ Axios 模拟 SSE 接收端</h1>
<p id="status">连接状态: 待启动</p>
<h3>接收到的消息流:</h3>
<div id="output-area"></div>
<button onclick="startStream()">▶️ 启动流</button>
<button onclick="stopStream()">🛑 停止接收流</button>
<script>
const STREAM_URL = 'http://localhost:3000/api/stream-sse';
const statusElement = document.getElementById('status');
const outputElement = document.getElementById('output-area');
let cancelTokenSource = null; // 用于取消请求的 Axios Token
let lastProcessedIndex = 0; // 追踪上次处理到的数据索引
let buffer = ''; // 用于存放不完整的 SSE 事件块
let isConnected = false; // 标记是否已建立连接
function updateStatus(text, color) {
statusElement.textContent = text;
statusElement.style.color = color;
}
async function startStream() {
if (cancelTokenSource) stopStream();
cancelTokenSource = axios.CancelToken.source();
lastProcessedIndex = 0;
buffer = '';
isConnected = false;
outputElement.innerHTML = '';
updateStatus('连接状态: 正在连接...', 'orange');
try {
await axios.get(STREAM_URL, {
responseType: 'text',
cancelToken: cancelTokenSource.token,
onDownloadProgress: (progressEvent) => {
let xhr = progressEvent.currentTarget || progressEvent.target;
if (progressEvent.event && progressEvent.event.target) {
xhr = progressEvent.event.target;
}
if (!xhr || typeof xhr.responseText === 'undefined') {
return;
}
const responseText = xhr.responseText;
const newChunk = responseText.substring(lastProcessedIndex);
lastProcessedIndex = responseText.length;
buffer += newChunk;
// 收到数据时更新连接状态
if (!isConnected && newChunk.length > 0) {
isConnected = true;
updateStatus('连接状态: ✅ 已连接,正在接收数据...', 'green');
}
let messages = buffer.split('\n\n');
buffer = messages.pop() || '';
messages.forEach(message => {
if (message.trim()) {
const dataMatch = message.match(/data: (.*)/);
if (dataMatch && dataMatch[1]) {
const data = dataMatch[1].trim();
if (data === '[DONE]') {
stopStream(true);
} else {
outputElement.innerHTML += `<span class="chunk">${data}</span><span class="divider"> | </span>`;
outputElement.scrollTop = outputElement.scrollHeight;
}
}
}
});
}
});
} catch (error) {
if (axios.isCancel(error)) {
updateStatus('连接状态: 🛑 流已中止', 'gray');
} else {
updateStatus(`连接状态: ❌ 错误: ${error.message}`, 'red');
console.error('Axios SSE 错误:', error);
}
}
}
function stopStream(isDone = false) {
if (cancelTokenSource) {
cancelTokenSource.cancel('Stream terminated.');
cancelTokenSource = null;
if (isDone) {
updateStatus('连接状态: 🟢 流已完成并关闭', 'green');
}
}
}
window.onload = () => updateStatus('连接状态: 待启动', 'black');
</script>
</body>
</html>
Axios 并非为 SSE 设计的,因为它基于传统的 XMLHttpRequest (XHR) 模型,或者在 Node.js 中基于 http 模块。在浏览器环境中,Axios 没有原生 EventSource 或 ReadableStream 的支持。
因此,使用 Axios 实现流式输出的标准工作流程是利用 XHR 的 onprogress 事件监听机制(在 Axios 中表现为 onDownloadProgress 回调),手动获取每次新增的数据块,并手动进行 SSE 格式的解析。
-
没有自动解析: 必须手动解析
data:字段和\n\n分隔符。 -
手动追踪数据: 每次
onDownloadProgress触发时,返回的是当前已接收的全部数据,而不是新增的数据。您需要一个指针来追踪上次处理到了哪里,并提取出新的数据块。 - 不支持自动重连: 必须手动实现错误检测和重连逻辑(本示例不包含重连,但需注意)。
流式输出技术的选择
1. 🥇 EventSource API
这是实现 SSE 的标准和首选方案。
-
实现复杂度: 最低。 浏览器原生支持,代码量最少。
-
核心优势: 内置健壮性。 自动处理 SSE 数据格式解析、消息事件 (
onmessage) 触发,并内置了断线重连机制(支持服务器设置retry:间隔)。 -
主要局限:
- 只能 GET 请求。
- 不支持自定义 Header 鉴权。 只能通过 URL 参数传递 Token,安全性相对较低。
-
选择原则: 强烈推荐用于大多数场景,尤其是在安全性要求不涉及请求 Header(例如,使用 Session Cookie 或 URL Token 鉴权)且只需要单向数据推送时。
2. 🥈 Fetch API + ReadableStream
这是在需要高控制度时使用的现代标准方案。
-
实现复杂度: 中高。 需要手动获取流阅读器 (
getReader()),并进入循环,手动进行 SSE 数据解析(解码字节、查找\n\n、提取data:)。 -
核心优势: 控制力最强。
- 支持自定义 HTTP Header(适用于传递 JWT 等鉴权信息)。
- 支持 POST 等其他 HTTP 方法。
- 可以处理非标准的流格式。
-
主要局限: 必须手动编写复杂的 SSE 格式解析逻辑和错误重连机制。
-
选择原则: 当您必须在请求 Header 中传递鉴权信息(例如 JWT),或者需要处理非标准的流格式时,应选择此方案。
3. 🥉 Axios + onDownloadProgress
这是基于旧的 XHR 机制的模拟方案,通常不推荐用于现代 Web 应用的 SSE。
-
实现复杂度: 高。 依赖
onDownloadProgress事件,每次事件触发返回的是全部已接收数据,因此需要额外的逻辑来追踪上次处理的索引,以提取新增的数据块。 -
核心优势: 如果您的项目大量依赖 Axios,可以保持技术栈的统一。
-
主要局限:
- 实现流程复杂且易错。
- 性能不如原生 API,且不支持原生的自动重连。
- 在浏览器环境中,缺乏对流的标准化支持。
-
选择原则: 仅在需要兼容旧环境或因特殊限制无法使用
EventSource或fetch的情况下考虑。
2.基于WebSocket的双向流式输出实现
WebSocket 协议与 SSE(单向流)和传统的 HTTP 请求(一次性传输)有着本质的区别,它在客户端和服务器之间建立了一个持久的、全双工(双向) 的通信通道,非常适合需要高频率、低延迟交互的场景。
1. 协议升级
WebSocket 连接的建立始于一个标准的 HTTP 请求,这个请求包含了特殊的 Header,用于请求将连接从 HTTP 升级(Upgrade) 到 WebSocket 协议。一旦升级成功,连接就不再受限于传统的 HTTP 请求-响应模型。
2. 全双工通信
一旦连接建立,服务器和客户端可以独立、同时地互相发送数据帧。
- 客户端 服务器: 客户端可以随时发送输入、控制命令或心跳包。
- 服务器 客户端: 服务器可以随时发送流式数据、状态更新或响应。
这种双向性使得 WebSocket 不仅可以用于流式输出(服务器推数据),还可以用于实时接收用户输入,完美支持 实时聊天 或 协作编辑 等场景。
3. 基于帧的传输
WebSocket 的数据传输是基于帧的,而不是基于文本流或 HTTP 请求。这使得传输效率更高,延迟更低。帧可以包含文本数据(Text Frame)或二进制数据(Binary Frame)。
基于 WebSocket 的双向流式输出需要前端和后端分别使用专门的库和 API 进行实现。
1. 后端实现 (Node.js/ws 或 Socket.IO)
后端需要一个专门的 WebSocket 服务器来管理连接和发送数据帧。
| 步骤 | 动作描述 | 关键技术点 |
|---|---|---|
| 1. 握手与连接 | 监听 HTTP 升级请求,接受连接,并分配一个唯一的 WebSocket 连接对象。 | ws.on('connection', (socket) => { ... }) |
| 2. 接收客户端输入 | 监听客户端发送过来的数据帧,用于触发 AI 任务或控制流。 | socket.on('message', (input) => { // Process input }) |
| 3. 流式生成与推送 | 在 AI 模型生成数据的过程中,将数据块封装成 WebSocket 帧并实时发送。 | socket.send(data_chunk) |
| 4. 维护连接 | 维护连接状态,处理心跳包(Ping/Pong),并在客户端断开时清理资源。 | socket.on('close', ...) |
const WebSocket = require('ws');
// 创建 WebSocket 服务器,监听 8080 端口
const wss = new WebSocket.Server({ port: 8080 });
console.log('WebSocket Server running on ws://localhost:8080');
// 监听客户端连接事件
wss.on('connection', function connection(ws) {
console.log('--- Client Connected ---');
// 1. 监听客户端发来的消息(双向输入)
ws.on('message', function incoming(message) {
const clientMessage = message.toString();
console.log(`Received message from client: ${clientMessage}`);
try {
const data = JSON.parse(clientMessage);
if (data.command === 'START') {
// 客户端请求开始流式输出
startStreaming(ws, data.prompt);
} else if (data.command === 'STOP') {
// 客户端请求停止流式输出 (实时控制)
stopStreaming(ws);
ws.send(JSON.stringify({ status: 'info', message: 'Streaming stopped by client.' }));
}
} catch (e) {
console.error('Error parsing client message:', e);
ws.send(JSON.stringify({ status: 'error', message: 'Invalid JSON format.' }));
}
});
// 监听连接关闭事件
ws.on('close', function close() {
console.log('--- Client Disconnected ---');
// 清理资源,停止该连接上的所有流
stopStreaming(ws);
});
// 初始问候
ws.send(JSON.stringify({ status: 'ready', message: 'Welcome! Send {"command": "START", "prompt": "..."} to begin streaming.' }));
});
// 存储当前正在流式输出的连接和定时器
const activeStreams = new Map();
/**
* 模拟 AI 模型流式输出的函数
* @param {WebSocket} ws - 当前连接的 WebSocket 实例
* @param {string} prompt - 客户端提供的输入提示
*/
function startStreaming(ws, prompt) {
if (activeStreams.has(ws)) {
ws.send(JSON.stringify({ status: 'info', message: 'Stream is already active.' }));
return;
}
let counter = 0;
const intervalId = setInterval(() => {
counter++;
const chunk = `[Chunk ${counter}] Output for "${prompt.substring(0, 15)}...": Data block ${counter}.`;
// 2. 实时将数据块作为 WebSocket 帧发送给客户端
// 使用 JSON 格式封装数据,方便客户端解析
const dataToSend = JSON.stringify({
type: 'data',
content: chunk,
count: counter
});
// 检查连接是否仍然打开
if (ws.readyState === WebSocket.OPEN) {
ws.send(dataToSend);
console.log(`Pushed data chunk ${counter}`);
} else {
// 如果连接关闭,停止流
clearInterval(intervalId);
activeStreams.delete(ws);
}
if (counter >= 15) {
// 3. 流结束,发送结束标记
clearInterval(intervalId);
activeStreams.delete(ws);
if (ws.readyState === WebSocket.OPEN) {
ws.send(JSON.stringify({ type: 'done', message: 'Streaming complete.' }));
}
console.log('--- Streaming finished ---');
}
}, 500); // 每 500 毫秒发送一个数据块
activeStreams.set(ws, intervalId);
}
/**
* 停止指定连接的流式输出
*/
function stopStreaming(ws) {
if (activeStreams.has(ws)) {
clearInterval(activeStreams.get(ws));
activeStreams.delete(ws);
}
}
2. 前端实现 (Browser WebSocket API)
前端使用浏览器原生的 WebSocket API 来建立连接和处理双向通信。
| 步骤 | 动作描述 | 关键技术点 |
|---|---|---|
| 1. 建立连接 | 使用 new WebSocket(ws://url) 建立连接。注意协议是 ws:// 或 wss://。 |
new WebSocket('ws://localhost:8080') |
| 2. 监听连接状态 | 监听 onopen(连接成功)、onerror 和 onclose 事件。 |
ws.onopen = () => {...} |
| 3. 接收流式输出 | 监听 onmessage 事件,接收服务器推送的流式数据帧,并实时追加到 UI。 |
ws.onmessage = (event) => { // Append event.data } |
| 4. 发送客户端输入 | 当用户有新输入或需要发送控制命令时,通过 send() 方法发送数据帧到服务器。 |
ws.send(JSON.stringify({command: 'stop'})) |
<!DOCTYPE html>
<html lang="zh-CN">
<head>
<meta charset="UTF-8">
<title>WebSocket 双向流式输出</title>
<style>
body { font-family: Arial, sans-serif; padding: 20px; }
#status { font-weight: bold; margin-bottom: 15px; }
#output-area { border: 1px solid #ccc; padding: 15px; min-height: 150px; background-color: #f9f9f9; white-space: pre-wrap; overflow-y: scroll; }
.control-panel { margin-top: 20px; }
</style>
</head>
<body>
<h1>💬 WebSocket 双向流式交互</h1>
<p id="status" style="color: gray;">连接状态: 待连接...</p>
<h3>服务器输出流:</h3>
<div id="output-area"></div>
<div class="control-panel">
<label for="prompt">输入提示:</label>
<input type="text" id="prompt" value="请给我写一篇关于AI流式输出的文章" style="width: 300px;">
<button onclick="connectWebSocket()">建立连接</button>
<button onclick="startStream()">▶️ 启动流式输出</button>
<button onclick="stopStream()">🛑 实时停止流</button>
<button onclick="closeWebSocket()">断开连接</button>
</div>
<script>
const WS_URL = 'ws://localhost:8080';
let ws = null;
const statusElement = document.getElementById('status');
const outputElement = document.getElementById('output-area');
const promptInput = document.getElementById('prompt');
function log(message) {
outputElement.innerHTML += `<p style="margin: 0; padding: 2px 0;">${message}</p>`;
outputElement.scrollTop = outputElement.scrollHeight;
}
function updateStatus(text, color) {
statusElement.textContent = text;
statusElement.style.color = color;
}
// 1. 建立 WebSocket 连接
function connectWebSocket() {
if (ws && ws.readyState === WebSocket.OPEN) {
updateStatus('连接已存在', 'blue');
return;
}
ws = new WebSocket(WS_URL);
updateStatus('正在连接...', 'orange');
outputElement.innerHTML = '';
// 监听连接成功事件
ws.onopen = () => {
updateStatus('连接状态: ✅ 已建立', 'green');
log('--- WebSocket Connection Opened ---');
};
// 2. 监听接收到的数据帧 (核心)
ws.onmessage = (event) => {
try {
const data = JSON.parse(event.data);
if (data.type === 'data') {
// 实时追加流式输出的内容
log(`[Server Stream]: ${data.content}`);
} else if (data.type === 'done') {
log(`[INFO]: ${data.message}`);
updateStatus('流已完成', 'blue');
} else if (data.status === 'info') {
log(`[INFO]: ${data.message}`);
}
} catch (e) {
// 处理非 JSON 格式的消息
log(`[RAW MESSAGE]: ${event.data}`);
}
};
// 监听错误事件
ws.onerror = (error) => {
updateStatus('连接状态: ❌ 发生错误', 'red');
console.error('WebSocket Error:', error);
};
// 监听连接关闭事件 (连接断开)
ws.onclose = () => {
updateStatus('连接状态: 🛑 已断开', 'red');
log('--- WebSocket Connection Closed ---');
ws = null;
};
}
// 3. 客户端发送命令:启动流式输出
function startStream() {
if (ws && ws.readyState === WebSocket.OPEN) {
const prompt = promptInput.value || "Default prompt";
const command = {
command: 'START',
prompt: prompt
};
// 发送 JSON 格式的启动命令
ws.send(JSON.stringify(command));
updateStatus('流式输出已启动...', 'purple');
log(`[Client Command]: Starting stream for prompt: "${prompt}"`);
} else {
updateStatus('请先建立连接', 'red');
}
}
// 4. 客户端发送命令:实时停止流
function stopStream() {
if (ws && ws.readyState === WebSocket.OPEN) {
const command = {
command: 'STOP'
};
// 发送 JSON 格式的停止命令
ws.send(JSON.stringify(command));
updateStatus('发送停止命令', 'purple');
log(`[Client Command]: Sending STOP command.`);
} else {
updateStatus('请先建立连接', 'red');
}
}
function closeWebSocket() {
if (ws && ws.readyState === WebSocket.OPEN) {
ws.close();
}
}
// 页面加载后自动尝试连接
window.onload = connectWebSocket;
</script>
</body>
</html>
| 特性 | WebSocket 双向流 | SSE 单向流 |
|---|---|---|
| 通信方向 | 双向(全双工) | 单向(服务器 客户端) |
| 协议基础 | 独立于 HTTP 的 TCP 协议 | 基于 HTTP 协议 |
| 适用场景 | 实时聊天、协作编辑、AI 实时交互控制 | 纯粹的 AI 流式输出、新闻推送 |
| 实现复杂度 | 较高(需独立服务器支持) | 较低(基于标准 HTTP) |
| 自动重连 | 无原生支持,需要手动实现 | 浏览器原生支持 |
如果你的 AI 应用只需要将 LLM 的结果流式输出给用户,SSE 是更简单、更健壮的选择。但如果您的应用需要用户在流式输出过程中实时中断、修改输入、或者实现多人协作,WebSocket 是唯一的选择。