fetch-event-source源码解读
SSE协议规范
SSE协议实际上是在 HTTP 之上定义了一套严格的数据组织规范,是属于应用层的协议。
-
固定请求头类型:必须是
Content-Type: text/event-stream -
固定文本格式:数据必须以
field: value\n的形式返回 -
固定结束符:消息之间必须以
\n\n分隔。
{
id: '', // 消息唯一标识,实现“断点续传”的关键
event: '', // 事件类型,缺失则默认触发 onmessage
data: '', // 消息内容
retry: undefined, // 重连时间
}
fetchEventSource 执行流程
fetch-event-source底层是使用 fetch 配合 ReadableStream 实现的
1.建立 Fetch 长连接
fetch 读取到的是 Uint8Array(原始二进制字节码),需要手动转换为 SSE 字段。
自动补全 Accept: text/event-stream,确保后端返回正确格式。
2.流式读取
通过 response.body.getReader() 开启循环读取字节流。
3.字节解码与“缓冲区”拼装
网络传输中,数据包可能在任何地方断开(例如:data: hello 被切成了 da 和 ta: hello)。 源码中维护了一个 buffer (Uint8Array) 和一个 position (当前偏移量),这是该库最核心的逻辑。
-
缓冲区逻辑:库内部维护一个
buffer字符串。新到的片段会拼接到buffer后。- 扫描
buffer中是否存在\n\n。 - 如果有,说明消息完整,切出来进行解析。
- 如果没有,继续等待下一个数据块。
- 扫描
4.字段解析与分发
一旦识别出完整的消息块,它会按行拆分,解析出 data:, event:, id: 等字段,并手动调用你在配置中传入的 onmessage, onopen 等回调函数。
核心方法解析
1、fetchEventSource
export interface FetchEventSourceInit extends RequestInit {
headers?: Record<string, string>,
onopen?: (response: Response) => Promise<void>,
onmessage?: (ev: EventSourceMessage) => void;
onclose?: () => void;
onerror?: (err: any) => number | null | undefined | void,
openWhenHidden?: boolean;
fetch?: typeof fetch; // 要使用的 Fetch 函数。默认为 window.fetch
}
export function fetchEventSource(input: RequestInfo, {
signal: inputSignal,
headers: inputHeaders,
onopen: inputOnOpen,
onmessage,
onclose,
onerror,
openWhenHidden,
fetch: inputFetch,
...rest
}: FetchEventSourceInit) {
return new Promise<void>((resolve, reject) => {
// 复制请求头,确保 accept 为 text/event-stream。
const headers = { ...inputHeaders };
if (!headers.accept) {
headers.accept = EventStreamContentType;
}
// 当前请求的 AbortController
let curRequestController: AbortController;
function onVisibilityChange() {
curRequestController.abort(); // 页面隐藏时中断请求
if (!document.hidden) {
create(); // 页面恢复可见时重新建立连接
}
}
// 如果openWhenHidden为false,则监听文档可见性变化事件
if (!openWhenHidden) {
document.addEventListener('visibilitychange', onVisibilityChange);
}
// 资源清理
let retryInterval = DefaultRetryInterval;
let retryTimer = 0;
function dispose() {
// 清理事件监听
document.removeEventListener('visibilitychange', onVisibilityChange);
// 清理定时器和中断请求
window.clearTimeout(retryTimer);
// 中断当前请求
curRequestController.abort();
}
// 如果外部传入了 abort 信号,响应中断并清理资源。
inputSignal?.addEventListener('abort', () => {
dispose();
resolve(); // 不要浪费资源在重试上
});
// 使用传入的 fetch 实现(若有),否则使用全局的 window.fetch。
const fetch = inputFetch ?? window.fetch;
// 使用传入的 onopen 回调(用于验证/处理响应),没有则使用默认的 content-type 校验函数。
const onopen = inputOnOpen ?? defaultOnOpen;
async function create() {
// 为本次请求创建 AbortController,用于后续中止当前请求(可见性变化或外部 abort)。
curRequestController = new AbortController();
try {
// 发起 fetch 请求,合并剩余 init 配置、headers,并将当前 controller 的 signal 传入以便可中止。
const response = await fetch(input, {
...rest,
headers,
signal: curRequestController.signal,
});
await onopen(response);
// 链式处理响应流:
// getMessages返回按行组装成EventSourceMessage对象的online函数
// 第一个参数是id,如果有id就写入headers中,用于在下次重连发送last-event-id
// 第二个参数是retry,如果有retry就更新retryInterval,用于下次重连等待时间
// 第三个参数是onmessage回调,用于处理完整的EventSourceMessage消息
// getLines负责把响应体字节流按行切分,调用上面的online函数
// getBytes负责从response.body中读取响应体的字节流,并把每块传入getLines进行处理,直到结束或终止
await getBytes(response.body!, getLines(getMessages(id => {
if (id) {
// store the id and send it back on the next retry:
headers[LastEventId] = id;
} else {
// don't send the last-event-id header anymore:
delete headers[LastEventId];
}
}, retry => {
retryInterval = retry;
}, onmessage)));
onclose?.(); // 流正常结束后,调用可选的 onclose 回调
dispose(); // 清理(移除可见性监听、清除定时器、abort 当前 controller 等资源)
resolve(); // 完成外部 Promise(表示工作完成,不再重试)。
} catch (err) {
// 只有在不是主动中止的情况下才考虑重试(如果是主动 abort,就不重试)
if (!curRequestController.signal.aborted) {
try {
// 调用用户的 onerror 回调,允许其返回一个重试间隔(毫秒);若未提供或返回 undefined,则使用当前的 retryInterval(来自服务器 retry 字段或默认值)
const interval: any = onerror?.(err) ?? retryInterval;
window.clearTimeout(retryTimer); // 清除之前可能存在的重试定时器。
retryTimer = window.setTimeout(create, interval);
} catch (innerErr) {
dispose(); // 清理资源。
reject(innerErr); // 拒绝外部 Promise,结束整个流程并向调用者报告错误
}
}
}
}
create();
});
}
2、 getBytes() :循环读取流信息
函数接收两个参数:stream和onChunk
-
stream:代表一个可读取的二进制数据流 -
onChunk: 是一个回调函数,每当从流中读取到一块数据时,就会调用这个函数,并将读取到的数据作为参数传递给这个函数。
export async function getBytes(stream: ReadableStream<Uint8Array>, onChunk: (arr: Uint8Array) => void) {
const reader = stream.getReader(); // 创建一个流的阅读器 reader
let result: ReadableStreamDefaultReadResult<Uint8Array>;
while (!(result = await reader.read()).done) { // 循环读取数据块,直到流结束
onChunk(result.value); // 对每个数据块调用回调, onChunk是 getLines()方法的返回值
}
}
3、 getLines() :将字节块解析为EventSource行信息
接收一个回调函数 onLine 作为参数,并返回一个新的函数 onChunk。
-
onLine:每当检测到一行数据时就会调用它规定以`\r`、`\n` 或 `\r\n`作为一行结束的标志 规定以`\n\n`或` \r\n\r\n ` 作为一个消息结束的标志 -
onChunk:用于处理传入的字节块。逐个解析传入的字节块,找到数据中的行结束符。将字节块解析为 EventSource 行缓冲区,并在检测到完整行时调用onLine回调。
它解决了这样一个问题:网络传输的数据可能不是一行一行到达的,而是分块到达的,甚至一行可能被拆成多个块。这个函数负责把这些块拼起来,遇到换行符(\r、\n 或 \r\n)就认为是一行,然后把这一行交给 onLine 处理。
// 模拟字节块的返回
// 块1:data: hello\r\ndata: wo
// 块2:rld\r\n\r\nevent: update\r\ndata: 123\r\n
export function getLines(onLine: (line: Uint8Array, fieldLength: number) => void) {
let buffer: Uint8Array | undefined;
let position: number; // 当前读取位置
let fieldLength: number; // 当前行中有效“字段”部分的长度
let discardTrailingNewline = false; // 标记是否需要跳过紧跟在\r后的\n
// 返回一个函数,处理每个字节块
return function onChunk(arr: Uint8Array) {
if (buffer === undefined) { // 初始化buffer、position、fieldLength,如果未定义也就是意味着这是第一次调用或者前一个缓存区已完全处理完毕
buffer = arr;
position = 0;
fieldLength = -1;
} else {
// 如果buffer已定义(既正在处理一个较大的数据块或连续的数据块),将新的数据块arr追加到现有的buffer后面,主要处理前一个字节处理完还有剩余字节的情况
buffer = concat(buffer, arr);
}
const bufLength = buffer.length;
let lineStart = 0; // 当前行的起始位置
while (position < bufLength) { // 遍历buffer,使用position指针来追踪当前读取的位置
if (discardTrailingNewline) { // 如果设置了discardTrailingNewline标志,则跳过行结束符之后的新行字符,如果上次遇到\r,这次要跳过\n
if (buffer[position] === ControlChars.NewLine) {
lineStart = ++position; // 跳过\n
}
discardTrailingNewline = false;
}
// 查找本行的结束符
let lineEnd = -1;
for (; position < bufLength && lineEnd === -1; ++position) {
switch (buffer[position]) {
case ControlChars.Colon:
if (fieldLength === -1) { // 记录第一个冒号的位置
fieldLength = position - lineStart;
}
break;
case ControlChars.CarriageReturn: // \r
discardTrailingNewline = true; // 标记下次要跳过\n
case ControlChars.NewLine: // \n
lineEnd = position; // 行结束
break;
}
}
if (lineEnd === -1) {
// 没找到行结束符,等下一个字节块
break;
}
// 取出完整的一行,调用 onLine,onLine是 getMessages()方法的返回值
onLine(buffer.subarray(lineStart, lineEnd), fieldLength); // 获取完整的行,并调用onLine回调函数,处理这一行数据
lineStart = position; // 下一行的起始位置
fieldLength = -1; // 更新 fieldLength 为 -1,准备处理下一行的 field 部分
}
if (lineStart === bufLength) {
buffer = undefined; // 全部处理完
} else if (lineStart !== 0) {
// 还有未处理的内容
buffer = buffer.subarray(lineStart); // 把 buffer 变成还没处理完的部分,丢弃已经处理过的内容。这样下次新数据块到来时,可以直接拼接到剩余部分后面。
position -= lineStart; // 更新 position 指针,保证它指向新的 buffer 的正确位置。其实可以直接置为0,因为新的 buffer 是从 lineStart 开始的,但是这样写更通用一些。防止极端情况下 position 指向错误。
}
}
}
4、getMessages():把 EventSource 行组装成完整的 SSE 消息对象
接收三个回调为参数:onId、onRetry、onMessage,并返回一个新的函数onLine
-
onId:回调,在每次检测到消息 ID 时调用,传递 ID 字符串作为参数 -
onRetry:回调,在每次检测到重试时间时调用,传递重试时间的数值作为参数 -
onMessage:回调,在每次消息结束时调用,传递完整的消息对象作为参数 -
onLine:处理每一行的数据
export function getMessages(
onId: (id: string) => void,
onRetry: (retry: number) => void,
onMessage?: (msg: EventSourceMessage) => void
) {
let message = newMessage(); // 初始化一个空消息对象
const decoder = new TextDecoder(); // 用于把字节数组解码为字符串。
// 返回一个函数,每当解析出一行 EventSource 行时就会调用,(由 getLines 传入)
return function onLine(line: Uint8Array, fieldLength: number) {
if (line.length === 0) { // 如果是空行表示消息结束
onMessage?.(message); // 调用 onMessage 回调,将完整的消息对象传递出去
message = newMessage(); // 重置 message 对象
} else if (fieldLength > 0) { // 如果这一行包含有效数据(即不是注释或空行),继续处理。
// 解析字段名(field)和字段值(value)
// 字段名是从行开头到冒号前的部分,字段值是冒号后面,可能有一个空格(协议允许),所以判断是否有空格决定偏移量。
const field = decoder.decode(line.subarray(0, fieldLength));
const valueOffset = fieldLength + (line[fieldLength + 1] === ControlChars.Space ? 2 : 1);
const value = decoder.decode(line.subarray(valueOffset));
switch (field) {
case 'data':
// 如果字段名是 data,把 value 加到 message.data 上
// 如果已经有 data,追加一行(\n),否则直接赋值
message.data = message.data
? message.data + '\n' + value
: value;
break;
case 'event':
// 如果字段名是 event,设置消息的事件类型
message.event = value;
break;
case 'id':
// 如果字段名是 id,设置消息的 id,并调用 onId 回调。
onId(message.id = value);
break;
case 'retry':
// 如果字段名是 retry,尝试解析为整数,合法则设置消息的 retry 并调用 onRetry 回调
const retry = parseInt(value, 10);
if (!isNaN(retry)) { // per spec, ignore non-integers
onRetry(message.retry = retry);
}
break;
}
}
}
}
源码流程详细图解
![]()
总结
SSE 的本质是:一种基于文本行的、约定俗成的 HTTP Body 消费方式。 原生 API 是将这种消费方式『硬件化』在了浏览器里, 而 fetch-event-source 则是用 JS 工具将其『软件化』实现了一遍。