别把语音 Agent 当成“接两个 API”——用 NestJS 搭一套 ASR + LLM + 流式 TTS 的实时语音助手
我们现在看到的大多数 AI 助手,已经默认具备语音能力:你说一句话,它先把语音转成文字;大模型理解问题后,边生成文字边输出答案;最后,再把这段答案用自然语音朗读出来。
从表面看,这件事像是把三个能力串起来:
- ASR(Automatic Speech Recognition,语音识别)
- LLM(大模型推理)
- TTS(Text To Speech,语音合成)
但真正做过这类系统,你会发现问题根本不在“有没有接上接口”,而在链路能不能协同工作。
很多 Demo 的问题不是不能跑,而是体验不对:
- 录音能识别,但只能整段上传,交互很生硬
- 大模型能流式返回,但语音要等整段文本结束后才开始播放
- 前端能显示文字,但音频播放一顿一顿
- 文本和语音各走各的,最后很容易出现“字已经出完了,音频还没开始”
- 一旦中间某条连接断掉,整条语音链路就会失去同步
所以,这篇文章我不打算把它写成“如何分别调用腾讯云 ASR、腾讯云 TTS 和大模型 API”的资料拼盘。我想讲清楚一个更关键的结论:
语音版 AI 助手真正的难点,不是单独把 ASR、LLM、TTS 跑通,而是把“上传式语音识别、SSE 文本流、WebSocket 二进制音频流、服务端事件桥接、前端流式播放”组织成一条低耦合、可持续输出的实时链路。
本文基于一个真实可运行的 NestJS 项目来展开,项目里已经具备这几部分能力:
- 浏览器录音并上传到
/speech/asr - 服务端调用腾讯云 ASR 做语音转文字
- 文本问题进入
/ai/chat/stream,以 SSE 形式流式输出 - 服务端将大模型输出通过事件桥接给流式 TTS
- 腾讯云流式 TTS 返回二进制音频,通过 WebSocket 推给前端
- 前端用
MediaSource + SourceBuffer做边收边播
这套设计不一定是生产级语音系统的终点,但非常适合作为一个工程上讲得通、链路上闭得上、博客里讲得清的默认方案。
一、先别急着写代码:语音 AI 助手其实是三条链路
如果你一开始就把语音助手理解成“录音之后调一次接口”,你大概率会把结构做歪。
从工程上看,至少要先拆出三条职责不同的链路:
- 输入链路:浏览器录音 -> 上传音频 -> ASR -> 文本
- 推理链路:文本问题 -> LLM -> 流式文本输出
- 播报链路:流式文本 -> TTS -> 二进制音频 -> 边收边播
这三条链路的通信方式、时序要求、数据形态都不一样:
- 录音上传是文件型请求,适合
multipart/form-data - 大模型文本输出是连续文本流,适合 SSE
- 音频是二进制数据流,更适合 WebSocket
也就是说,这不是“一个接口做三件事”的问题,而是“多条流如何协作”的问题。
如果把这三段混成一个大接口,通常会出现两个后果:
- 业务代码耦合严重,后面任何一段升级都很痛苦
- 文本和音频时序失控,体验会非常差
所以我建议你先把这件事理解成一个多流协同系统,再去看具体实现。
二、这套项目的核心架构是什么
先看整条链路的全貌。本文分析的项目里,NestJS 并不是简单的 API 网关,而是把三种协议和两种外部能力组织起来的中枢。
flowchart LR
subgraph Browser[浏览器端]
A[录音采集<br/>MediaRecorder]
B[上传音频<br/>POST /speech/asr]
O[文字逐字显示]
M[语音通道<br/>GET /speech/tts/ws]
N[流式播放<br/>MediaSource + SourceBuffer]
P[边收边播]
end
subgraph Server[NestJS 服务端]
C[SpeechController / SpeechService]
F[AiController<br/>SSE /ai/chat/stream]
G[AiService + LangChain]
I[事件桥接<br/>AI_TTS_STREAM_EVENT]
J[TtsRelayService]
end
subgraph Cloud[外部云服务]
D[腾讯云 ASR]
K[腾讯云流式 TTS WebSocket]
end
A --> B
B --> C
C --> D
D --> E[识别文本]
E --> F
F --> G
G --> H[大模型流式文本]
H --> O
H --> I
I --> J
J --> K
K --> L[二进制 MP3 音频帧]
L --> M
M --> N
N --> P
这张图里最值得注意的,不是腾讯云,也不是模型,而是中间这几个“看起来不起眼”的节点:
SSEWebSocketAI_TTS_STREAM_EVENTMediaSourceSourceBuffer
这几个点决定了语音链路到底是“实时协同”,还是“能跑但体验别扭”。
再看一次时序,你会更直观一些:
sequenceDiagram
participant U as 用户
participant FE as 浏览器前端
participant ASR as NestJS /speech/asr
participant TCASR as 腾讯云 ASR
participant AI as NestJS /ai/chat/stream
participant LLM as 大模型
participant RELAY as TtsRelayService
participant TCTTS as 腾讯云流式TTS
U->>FE: 录音并停止
FE->>ASR: 上传音频文件
ASR->>TCASR: SentenceRecognition
TCASR-->>ASR: 返回识别文本
ASR-->>FE: text
FE->>RELAY: 建立 /speech/tts/ws
FE->>AI: 发起 /ai/chat/stream?ttsSessionId=xxx
AI->>LLM: 流式生成回答
LLM-->>AI: 文本 chunk
AI-->>FE: SSE 文本 chunk
AI->>RELAY: 发出 chunk 事件
RELAY->>TCTTS: ACTION_SYNTHESIS 分段文本
TCTTS-->>RELAY: 二进制音频帧
RELAY-->>FE: WebSocket 音频数据
FE->>FE: SourceBuffer appendBuffer
FE-->>U: 边显示文字边播放语音
理解了这张图,后面的代码就不再是“API 堆砌”,而是各自承担某个链路角色。
三、先看项目结构:这个仓库为什么这么拆
这个项目的 README 仍然是 NestJS 默认模板,真正的信息都在源码里。核心结构大致如下:
src/
ai/
ai.config.ts
ai.controller.ts
ai.module.ts
ai.service.ts
speech/
speech.config.ts
speech.controller.ts
speech.module.ts
speech.service.ts
tts-relay.service.ts
tts-text-segmentation.ts
common/
stream-events.ts
main.ts
public/
asr.html
asr-stream.html
这套拆分是合理的:
-
ai/:只关心文本问答链路 -
speech/:只关心语音输入、语音输出以及第三方语音服务接入 -
common/stream-events.ts:作为事件约定,让 AI 和 TTS 解耦 -
public/asr.html:单独验证 ASR 上传链路 -
public/asr-stream.html:完整验证“录音 -> 识别 -> AI -> TTS”链路
四、为什么 ASR 这里我更推荐“录完再识别”而不是一上来就做流式识别
很多人一聊语音系统,就默认“必须实时流式 ASR”。这其实是个常见误区。
要不要流式识别,不应该由技术潮流决定,而应该由交互目标决定。
在这个项目里,语音输入的目标不是做电话机器人,也不是做毫秒级打断对话,而是做一个像豆包那样的单轮语音提问:
- 用户说完一段话
- 系统识别成文本
- 大模型开始回答
- 回答边生成边播报
在这种场景里,先录完再识别,其实是非常合理的默认方案:
- 实现复杂度明显更低
- 浏览器端更容易兼容
- 后端不需要先处理麦克风实时分片上送
- 对“问一句 -> 回一句”的交互已经够用
换句话说,流式 ASR 不是默认最优,而是更高阶、更高成本的能力。
如果你的目标只是做一个能交互、可演示、可扩展的语音 AI 助手,把复杂度优先放在“输出链路流式化”上,通常是更划算的。
五、ASR 后端是怎么接的:/speech/asr 这条链路的职责非常清晰
先看控制器:
// src/speech/speech.controller.ts
@Controller('speech')
export class SpeechController {
constructor(private readonly speechService: SpeechService) {}
@Post('asr')
@UseInterceptors(FileInterceptor('audio'))
async recognize(
@UploadedFile()
file?: {
buffer: Buffer;
originalname: string;
mimetype: string;
size: number;
},
) {
if (!file?.buffer?.length) {
throw new BadRequestException(
'请通过 FormData 的 audio 字段上传音频文件',
);
}
const text = await this.speechService.recognizeBySentence(file);
return { text };
}
}
这个接口做得很干净:
- 它不负责录音
- 不负责转码
- 不负责前端 UI
- 只负责接收
audio文件并转交给SpeechService
这就是好接口的样子:边界清楚,职责单一。
再看识别逻辑:
// src/speech/speech.service.ts
@Injectable()
export class SpeechService {
constructor(@Inject('ASR_CLIENT') private readonly asrClient: AsrClient) {}
async recognizeBySentence(file: UploadedAudio): Promise<string> {
const audioBase64 = file.buffer.toString('base64');
const result = await this.asrClient.SentenceRecognition({
EngSerViceType: '16k_zh',
SourceType: 1,
Data: audioBase64,
DataLen: file.buffer.length,
VoiceFormat: 'ogg-opus',
});
return result.Result ?? '';
}
}
这里有几个参数值得讲透,而不是只说“它们怎么填”:
1)为什么要 buffer -> base64
因为这里调用的是云厂商 SDK 的句子识别接口,音频数据需要以指定格式进入请求体。浏览器上传到服务端后,Nest 拿到的是内存中的二进制 Buffer,而腾讯云接口在这个模式下要吃的是 Base64 文本。
也就是说,这一步不是“多余的转换”,而是协议适配。
2)EngSerViceType: '16k_zh' 在表达什么
这个参数不是“中文模式”这么简单,它实际上约束了:
- 识别语种
- 采样率预期
- 模型适配方向
如果你音频本身和服务类型不匹配,识别结果就容易变差,甚至直接报错。很多人觉得“ASR 效果不好”,其实不是模型差,而是输入格式、采样率、编码方案就没对齐。
3)为什么 VoiceFormat 这里用 ogg-opus
因为前端 MediaRecorder 优先录的是:
const preferredMimeType = "audio/ogg;codecs=opus";
前后端格式是对齐的。这个细节非常关键。
如果你前端录出来的是 webm/opus,服务端却告诉云厂商它是 ogg-opus,结果要么识别失败,要么内容异常。音频链路里,格式匹配远比“我觉得差不多”更重要。
六、前端录音链路的设计,为什么比“点一下录音按钮”复杂
在 public/asr.html 里,项目专门做了一个 ASR 验证页。这个页面的意义并不只是演示,而是把“录音 -> 上传 -> 识别”单独拆出来验证。
核心代码是这段:
mediaStream = await navigator.mediaDevices.getUserMedia({ audio: true });
chunks = [];
const mimeType = MediaRecorder.isTypeSupported(preferredMimeType)
? preferredMimeType
: fallbackMimeType;
mediaRecorder = new MediaRecorder(mediaStream, { mimeType });
mediaRecorder.ondataavailable = (event) => {
if (event.data && event.data.size > 0) {
chunks.push(event.data);
}
};
mediaRecorder.onstop = async () => {
const blob = new Blob(chunks, {
type: mediaRecorder.mimeType || fallbackMimeType,
});
const data = await uploadRecording(blob);
resultEl.textContent = data.text || '(空结果)';
};
mediaRecorder.start(250);
这段代码在整条链路里的位置,是语音输入采集层。它解决的不是识别,而是三个更基础的问题:
- 如何向浏览器申请麦克风权限
- 如何把录音分片收集起来
- 如何在停止录音后整合成可上传的 Blob
注意这里 start(250) 的意义:虽然当前链路是“录完再识别”,但录音过程中仍然是按 250ms 分片收集的。这么做的好处是:
- 浏览器侧更平滑
- 后续如果要升级成更实时的链路,基础采集方式不用推翻
- 可以更容易做波形、计时、录音中状态等 UI
也就是说,这个前端实现虽然现在走的是上传式 ASR,但它没有把自己写死在“纯离线式”的思路里。
七、为什么大模型输出要走 SSE,而不是普通 HTTP 返回一整段文本
语音助手如果只返回整段文本,问题不止是“慢”,而是整个系统没法形成实时反馈。
用户体验上的关键差别在这里:
- 普通 HTTP:用户必须等待全部生成完成
- SSE:前端可以随着 chunk 逐步展示答案
在语音场景里,这个差异会进一步放大。因为 TTS 的输入来源就是大模型流式文本。如果文本不流,语音就没法流。
也就是说,TTS 能不能边播,根上取决于 LLM 文本能不能边出。
项目里的 SSE 接口非常简洁:
// src/ai/ai.controller.ts
@Controller('ai')
export class AiController {
constructor(
private readonly aiService: AiService,
private readonly eventEmitter: EventEmitter2,
) {}
@Sse('chat/stream')
chatStream(
@Query('query') query: string,
@Query('ttsSessionId') ttsSessionId?: string,
): Observable<{ data: string }> {
const sessionId = ttsSessionId?.trim();
if (sessionId) {
const startEvent: AiTtsStreamEvent = { type: 'start', sessionId, query };
this.eventEmitter.emit(AI_TTS_STREAM_EVENT, startEvent);
}
return from(this.aiService.streamChain(query, sessionId)).pipe(
map((chunk) => ({ data: chunk })),
);
}
}
这里有两个关键点:
第一,接口同时服务了两种消费方
- 前端通过 SSE 消费文本
- TTS 服务通过事件总线消费同一份文本流
这意味着这不是一个“只给前端看的接口”,而是整个问答输出链路的上游。
第二,ttsSessionId 把文本流和音频流关联起来了
为什么这里要额外传 ttsSessionId?
因为前端和后端之间其实维护着两条通道:
- 一条是
EventSource文本流 - 一条是
WebSocket音频流
如果没有一个会话 ID 把二者绑定起来,你根本没法知道“这一段文本应该送到哪条 TTS WebSocket 会话里去”。
这就是语音系统和普通文本聊天系统的本质差别之一:你必须处理跨协议、跨通道的会话一致性。
八、AI 模块看起来简单,但其实承担的是“文本流标准化”的职责
再看 AiService:
// src/ai/ai.service.ts
@Injectable()
export class AiService {
private readonly chain: Runnable;
constructor(
@Inject('CHAT_MODEL') model: ChatOpenAI,
private readonly eventEmitter: EventEmitter2,
) {
const prompt = PromptTemplate.fromTemplate('请回答以下问题:\n\n{query}');
this.chain = prompt.pipe(model).pipe(new StringOutputParser());
}
async *streamChain(
query: string,
ttsSessionId?: string,
): AsyncGenerator<string> {
try {
const stream = (await this.chain.stream({ query })) as AsyncIterable<unknown>;
for await (const rawChunk of stream) {
let chunk = '';
if (typeof rawChunk === 'string') {
chunk = rawChunk;
} else if (
typeof rawChunk === 'number' ||
typeof rawChunk === 'boolean' ||
typeof rawChunk === 'bigint'
) {
chunk = String(rawChunk);
}
if (!chunk) continue;
if (ttsSessionId) {
this.eventEmitter.emit(AI_TTS_STREAM_EVENT, {
type: 'chunk',
sessionId: ttsSessionId,
chunk,
});
}
yield chunk;
}
if (ttsSessionId) {
this.eventEmitter.emit(AI_TTS_STREAM_EVENT, {
type: 'end',
sessionId: ttsSessionId,
});
}
} catch (error) {
if (ttsSessionId) {
this.eventEmitter.emit(AI_TTS_STREAM_EVENT, {
type: 'error',
sessionId: ttsSessionId,
error: error instanceof Error ? error.message : String(error),
});
}
throw error;
}
}
}
很多文章写到这里,就会开始说“看,这里用了 LangChain”。但真正有价值的,不是它用了哪个框架,而是这个 Service 做了什么抽象。
我认为这段代码承担的是三层职责:
1)把模型输出统一成字符串流
模型底层返回的 chunk 未必永远是字符串,代码里显式对 number / boolean / bigint 做了兼容转换。这是很实在的工程写法:不要假设上游永远完美,尽量把消费层看到的输出标准化。
2)把文本流同时暴露给两类消费者
-
yield chunk给 SSE -
eventEmitter.emit(...)给 TTS 侧
注意,这里不是“生成两次”,而是一份文本流,多方消费。这在实时系统里非常重要,否则你很容易出现“前端看到的内容”和“语音朗读的内容”不一致。
3)在流的生命周期上补全事件
除了 chunk 之外,它还显式发出了:
startenderror
这意味着 TTS 侧不只是“收到一点字就说一点字”,而是知道:
- 什么时候会话开始
- 什么时候应该收尾
- 什么时候要终止并清理资源
这就是完整流生命周期管理,而不是简单回调。
九、为什么流式 TTS 不能和 SSE 混在一起
很多第一次做这个系统的人会问:既然已经有 SSE 了,为什么不直接在 SSE 里把音频也发回来?
原因很简单:SSE 适合文本,不适合二进制音频。
如果你强行用 SSE 传音频,一般只有两条路:
- 把音频转 Base64 再发
- 伪装成文本分块传输
这两种路都不太好:
- Base64 体积会膨胀
- 前端要自己解码
- 时序会更难控制
- 对播放器非常不友好
而 WebSocket 天然适合持续传二进制帧,所以这里单独开一条 /speech/tts/ws 通道,是一个非常明确的工程判断:
文本输出归 SSE,音频输出归 WebSocket。
这不是“多开一个接口显得复杂”,而是“按数据类型选协议”。
十、真正决定这套系统可扩展性的,是事件桥接而不是 API 调用
这套项目里,我最认可的一点是没有把 TTS 逻辑硬塞进 AiController 或 AiService 里,而是通过事件来桥接。
事件定义很简单:
// src/common/stream-events.ts
export const AI_TTS_STREAM_EVENT = 'ai.tts.stream';
export type AiTtsStreamEvent =
| { type: 'start'; sessionId: string; query: string }
| { type: 'chunk'; sessionId: string; chunk: string }
| { type: 'end'; sessionId: string }
| { type: 'error'; sessionId: string; error: string };
这段代码看似不复杂,但它非常值钱。因为它明确告诉你:AI 模块不关心 TTS 怎么连腾讯云、怎么推前端、怎么关连接,它只负责把“文本流生命周期”以事件方式发出去。
这带来两个工程收益:
1)低耦合
未来你要把腾讯云 TTS 换成别家,实现新的 listener 就行,AI 侧不用改。
2)可演进
今天事件被 TtsRelayService 消费,明天也可以被日志系统、审计系统、字幕系统、消息持久化系统消费。
这就是为什么我说,语音系统的核心不在“调 API”,而在“如何组织流”。
十一、TtsRelayService 才是这套方案最核心的后端实现
如果让我选一个最值得反复讲的文件,那一定是:
src/speech/tts-relay.service.ts
它的职责不是“做 TTS”这么简单,而是做了三层中继:
- 管理浏览器和服务端之间的 TTS 会话
- 管理服务端和腾讯云流式 TTS 之间的 WebSocket 连接
- 在文本分段、发送节奏、二进制转发之间做协调
先看客户端会话注册:
registerClient(clientWs: WebSocket, wantedSessionId?: string): string {
const sessionId = wantedSessionId?.trim() || randomUUID();
const existing = this.sessions.get(sessionId);
if (existing) {
this.closeSession(sessionId, 'client reconnected');
}
this.sessions.set(sessionId, {
sessionId,
clientWs,
ready: false,
pendingChunks: [],
textBuffer: '',
closed: false,
});
this.sendClientJson(clientWs, { type: 'session', sessionId });
return sessionId;
}
这里不是简单“建立一个 ws 就完事”,而是创建了一个完整的 session 对象。里面几个字段都非常有用:
-
ready:腾讯云流式 TTS 是否已经可以收文本 -
pendingChunks:还没来得及送出去的待合成文本 -
textBuffer:当前累计但尚未完成分段的文本 -
closed:避免已关闭 session 继续写数据
这说明作者不是把 WebSocket 当成“收发消息”的黑盒,而是把它当成一个有状态流会话来管理。
再看事件消费:
@OnEvent(AI_TTS_STREAM_EVENT)
handleAiStreamEvent(event: AiTtsStreamEvent): void {
const session = this.sessions.get(event.sessionId);
if (!session) return;
switch (event.type) {
case 'start': {
this.ensureTencentConnection(session);
this.sendClientJson(session.clientWs, {
type: 'tts_started',
sessionId: session.sessionId,
query: event.query,
});
break;
}
case 'chunk': {
const chunk = event.chunk;
if (!chunk) return;
this.queueSpeakableSegments(session, chunk);
break;
}
case 'end': {
this.queueSpeakableSegments(session, '', true);
this.flushPendingChunks(session);
if (session.tencentWs && session.tencentWs.readyState === WebSocket.OPEN) {
session.tencentWs.send(JSON.stringify({
session_id: session.sessionId,
action: 'ACTION_COMPLETE',
}));
}
break;
}
case 'error': {
this.sendClientJson(session.clientWs, {
type: 'tts_error',
message: event.error,
});
this.closeSession(session.sessionId, 'ai stream error');
break;
}
}
}
这段逻辑的价值在于:它把 TTS 的行为建立在流生命周期事件之上,而不是建立在“文本一下子全来了”之上。
尤其是 end 分支非常重要:
- 先强制把剩余文本分段刷出去
- 再把待发送队列 flush
- 最后给腾讯云发
ACTION_COMPLETE
这表示“输入流结束”的协议语义被显式处理了。如果少了这一步,常见后果就是最后一段语音永远不出。
十二、流式 TTS 最大的坑,不是连接,而是文本分段
很多人第一次做流式 TTS,会把模型每次吐出的 chunk 原封不动地送去合成。结果通常很糟糕:
- 语音频繁断句
- 一两个字就触发一次合成
- 朗读节奏极不自然
- 网络与云服务调用次数暴涨
所以,这个项目专门做了一个分段器:src/speech/tts-text-segmentation.ts。
核心逻辑是:
const SENTENCE_END_RE = /[。!?!?;;\n]/;
const FORCE_SPLIT_RE = /[,,、::\s]/g;
const MIN_FORCE_SPLIT_LENGTH = 18;
const MAX_BUFFER_LENGTH = 48;
export function extractTtsSegments(
input: string,
forceFlush = false,
): { segments: string[]; rest: string } {
let rest = input;
const segments: string[] = [];
while (rest) {
const sentenceMatch = rest.match(SENTENCE_END_RE);
if (sentenceMatch?.index !== undefined) {
const endIndex = sentenceMatch.index + sentenceMatch[0].length;
const segment = finalizeSegment(rest.slice(0, endIndex));
if (segment) segments.push(segment);
rest = rest.slice(endIndex).trimStart();
continue;
}
const forcedSplitIndex = findForcedSplitIndex(rest);
if (forcedSplitIndex > 0) {
const segment = finalizeSegment(rest.slice(0, forcedSplitIndex));
if (segment) segments.push(segment);
rest = rest.slice(forcedSplitIndex).trimStart();
continue;
}
break;
}
if (forceFlush) {
const finalSegment = finalizeSegment(rest);
if (finalSegment) segments.push(finalSegment);
rest = '';
}
return { segments, rest };
}
这里体现了一个很重要的工程判断:
流式 TTS 追求的不是“字一出来马上说”,而是“在足够低延迟的前提下,让语音仍然像人在说话”。
这段策略基本分三层:
1)优先按句末标点切分
像 。!?; 这种天然句边界,最适合合成。因为朗读的停顿也会更自然。
2)句末标点迟迟不来时,允许在逗号、顿号、空格附近强制切分
这是为了控制首包延迟。否则模型一直生成长句,但迟迟不出句号,用户就会觉得“怎么还不说话”。
3)最后在流结束时强制 flush
如果最后一段没等到标点,也不能丢,必须在 forceFlush 时兜底发出去。
很多系统之所以语音体验差,不是模型不行,而是文本分段策略太粗糙。
十三、腾讯云流式 TTS 的真正接入点,不是 SDK,而是 WebSocket 协议管理
TTS 中继服务还有一个核心职责:维护服务端到腾讯云的流式 WebSocket 连接。
例如这个签名 URL 构建:
private buildTencentTtsWsUrl(sessionId: string): string {
const now = Math.floor(Date.now() / 1000);
const params: Record<string, string | number> = {
Action: 'TextToStreamAudioWSv2',
AppId: this.appId,
Codec: 'mp3',
Expired: now + 3600,
SampleRate: 16000,
SecretId: this.secretId,
SessionId: sessionId,
Speed: 0,
Timestamp: now,
VoiceType: this.voiceType,
Volume: 5,
};
const signStr = Object.keys(params)
.sort()
.map((k) => `${k}=${params[k]}`)
.join('&');
const rawStr = `GETtts.cloud.tencent.com/stream_wsv2?${signStr}`;
const signature = createHmac('sha1', this.secretKey)
.update(rawStr)
.digest('base64');
return `wss://tts.cloud.tencent.com/stream_wsv2?...`;
}
这段代码告诉你两件事:
- 流式 TTS 本质上是一个 WebSocket 协议接入问题
- 真正的复杂度不在“调某个函数”,而在“参数、签名、时序、收尾”这些协议细节
再比如文本发送逻辑:
private sendTencentChunk(session: ClientSession, text: string): void {
if (!session.tencentWs || session.tencentWs.readyState !== WebSocket.OPEN) {
session.pendingChunks.push(text);
return;
}
session.tencentWs.send(
JSON.stringify({
session_id: session.sessionId,
message_id: `msg_${Date.now()}_${Math.random().toString(36).slice(2, 8)}`,
action: 'ACTION_SYNTHESIS',
data: text,
}),
);
}
为什么这里不是“收到文本就立刻发”?因为发送之前必须确认:
- 连接是否建立
- 会话是否 ready
- 上游是否还有待合成文本未处理
这就是为什么 pendingChunks 队列存在。它解决的是流的生产速度和消费速度不一致的问题。
十四、前端为什么要用 MediaSource + SourceBuffer,而不是直接拿到 MP3 再播
如果你只是做“整段 TTS 合成后再播放”,那确实可以直接把一个完整音频文件地址塞进 <audio>。
但这个项目要解决的是:后端持续推音频二进制,前端边收到边播放。
这时,普通 audio 标签就不够了,你需要一个可以持续追加媒体数据的机制。这就是 MediaSource。
看前端核心代码:
function prepareStreamingAudio () {
if (ttsMediaSource && ttsSourceBuffer) return true;
if (!window.MediaSource || !MediaSource.isTypeSupported("audio/mpeg")) {
return false;
}
resetTtsPlayer();
ttsMediaSource = new MediaSource();
ttsObjectUrl = URL.createObjectURL(ttsMediaSource);
ttsAudioEl.src = ttsObjectUrl;
ttsMediaSource.addEventListener("sourceopen", () => {
ttsSourceBuffer = ttsMediaSource.addSourceBuffer("audio/mpeg");
ttsSourceBuffer.mode = "sequence";
ttsSourceBuffer.addEventListener("updateend", flushTtsBufferQueue);
flushTtsBufferQueue();
}, { once: true });
return true;
}
这段代码的含义是:
-
MediaSource提供一个可动态追加媒体内容的容器 -
SourceBuffer负责真正追加二进制音频片段 -
sequence模式意味着按顺序拼接音频流
接下来是关键的队列刷新逻辑:
function flushTtsBufferQueue () {
if (!ttsSourceBuffer || !ttsMediaSource) return;
if (ttsSourceBuffer.updating) return;
if (ttsPendingBuffers.length > 0) {
const next = ttsPendingBuffers.shift();
if (next) {
ttsSourceBuffer.appendBuffer(next);
if (ttsAudioEl.paused) {
ttsAudioEl.play().catch(() => {
setStatus("语音已就绪,请点击播放器开始播报");
});
}
}
return;
}
if (ttsStreamFinal && ttsMediaSource.readyState === "open") {
try {
ttsMediaSource.endOfStream();
} catch {
// ignore
}
}
}
这一段是前端流式音频播放的关键:
- 如果
SourceBuffer还在更新,就不能继续 append - 如果还有待处理音频帧,就一帧一帧追加
- 如果流结束了并且队列空了,再
endOfStream
很多人流式播放做不顺,问题就出在这里:不是收不到数据,而是 append 时序没管好。
十五、为什么前端要先建立 TTS WebSocket,再发起 AI SSE 请求
在 public/asr-stream.html 里,有一个很重要的设计顺序:
await ensureTtsConnection();
await streamAiReply(trimmed);
这个顺序不是随便写的。
原因是:SSE 一旦启动,大模型文本可能马上就开始输出,而文本一输出,服务端就会尝试把 chunk 转发给 TTS。如果这时候前端的 TTS WebSocket 还没准备好,就会出现:
- 文字已经开始显示
- 语音通道 session 还没拿到
- 结果最前面几段音频可能丢失或延迟很大
所以更稳妥的做法是:
- 先准备好 TTS WS 通道和
ttsSessionId - 再发起
EventSource文本流请求 - 文本流和音频流用同一个 session 关联
这就是实时系统里的经典原则:
先准备消费端,再启动生产端。
十六、配置项不是“填上就行”,它们决定了系统的行为边界
这个项目里有两组配置文件:
src/ai/ai.config.tssrc/speech/speech.config.ts
比如模型配置:
const apiKey =
configService.get<string>('DASHSCOPE_API_KEY') ??
configService.get<string>('OPENAI_API_KEY');
const baseURL =
configService.get<string>('DASHSCOPE_BASE_URL') ??
configService.get<string>('OPENAI_BASE_URL') ??
'https://dashscope.aliyuncs.com/compatible-mode/v1';
const model = configService.get<string>('MODEL_NAME') ?? 'qwen-plus';
这段写法的工程意义是:
- 兼容 OpenAI 风格 SDK
- 允许底层实际使用通义千问兼容接口
- 模型供应商可替换,但上层调用保持稳定
这比把云厂商调用细节直接写死在业务逻辑里要强得多。
再比如语音配置:
const secretId =
configService.get<string>('TENCENT_SECRET_ID') ??
configService.get<string>('SECRET_ID');
const secretKey =
configService.get<string>('TENCENT_SECRET_KEY') ??
configService.get<string>('SECRET_KEY');
const appId =
configService.get<string>('TENCENT_APP_ID') ??
configService.get<string>('APP_ID');
这个 fallback 设计也挺实用:既兼容语音模块自己的命名,又兼容已有环境变量命名,方便从脚本实验迁移到 NestJS 服务。
此外,这些参数背后都有实际含义:
-
MODEL_NAME:决定回答质量、速度与成本 -
TTS_VOICE_TYPE:决定音色,不只是“换个声音”这么简单,也会影响风格一致性 -
SampleRate:影响音频兼容性与传输成本 -
Codec: mp3:直接决定浏览器流式播放的适配路线
参数不是配置表,而是系统行为控制面。
十七、这套方案做对了什么
如果从技术博客的视角总结,这个项目最值得肯定的地方有五个。
1. 先把 ASR 和完整语音链路分开验证
asr.html 专注于验证录音上传与识别,asr-stream.html 才负责完整交互。这样调试效率非常高。
2. 给文本和音频分别选了合适的协议
- 文本:SSE
- 音频:WebSocket
这是正确的边界划分。
3. 用事件总线把 AI 和 TTS 解耦
这一步让整个系统有了继续演化的空间。
4. 文本分段策略考虑了“可听性”
这意味着作者不是只想“跑通”,而是在意真实体验。
5. 前端流式播放没有偷懒
很多 Demo 到 TTS 就退回“整段音频播放”,而这个项目真的做了 MediaSource + SourceBuffer,这是它最接近真实产品体验的地方。
十八、但如果你要把它推进到真实业务,还差哪些东西
这套方案已经很适合作为教程和演示项目,但离生产级还有明显距离。
1. 目前 ASR 仍然是上传式,不是流式
这意味着:
- 用户必须说完再等识别
- 无法做边说边识别
- 无法做更自然的打断式交互
如果你的场景是客服、通话机器人、实时陪练,这会成为瓶颈。
2. 缺少 VAD(静音检测)
目前录音停止主要靠用户点击按钮。真实产品里通常会结合静音检测、自动截断、超时策略,减少用户操作成本。
3. 缺少更完整的错误恢复
比如:
- 腾讯云 TTS 中途断开如何重连
- SSE 断流后是否要补偿
- session 超时如何清理
- 客户端页面刷新后旧连接如何回收
4. 缺少鉴权与限流
语音系统很容易被滥用,因为它天然涉及高成本外部服务。如果不加认证、配额和频率限制,线上风险会很高。
5. 缺少可观测性
真正的语音体验优化,一定离不开下面这些指标:
- 录音时长
- ASR 耗时
- LLM 首 token 延迟
- TTS 首包延迟
- 前端开始播放时间
- 整体对话完成耗时
如果没有这些指标,你只能凭感觉优化,最后会越改越盲。
十九、这套方案最容易踩的坑,我建议你提前避开
坑 1:前端录音格式和云端识别格式不一致
这是最常见的问题之一。一定要确认:
- 浏览器录了什么格式
-
Blob.type是什么 - 服务端告诉 ASR 的
VoiceFormat是什么
这三者必须对齐。
坑 2:把每个 token 都立即送进 TTS
这样会让语音像卡壳一样,一两个字就停一次。分段策略一定要做。
坑 3:没有处理浏览器自动播放限制
前端已经做了:
ttsAudioEl.play().catch(() => {
setStatus("语音已就绪,请点击播放器开始播报");
});
这就是在兜 autoplay 被拦截的情况。很多人本地测得好好的,上线后却发现“没声音”,原因就在这。
坑 4:文本流和音频流没有共享 session
如果没有 ttsSessionId 这层关联,多人并发时非常容易串音。
坑 5:流结束时没有明确收尾
不管是 SSE、TTS 还是前端 MediaSource,你都不能只处理“进行中”,还必须处理:
- 何时 flush 剩余数据
- 何时发送 complete
- 何时
endOfStream - 何时 close session
实时系统最怕的不是报错,而是没报错但尾巴没收干净。
二十、如果是我继续演进这套系统,我会怎么做
如果你的目标不是只做 Demo,而是把它向更真实的产品推进,我会建议按下面的顺序演进。
第一步:补观测,而不是先改架构
先量化链路延迟,知道瓶颈在哪。
第二步:把上传式 ASR 升级成流式 ASR
这样可以减少等待感,让语音输入更像自然对话。
第三步:增加打断能力
比如:
- 用户说新问题时,当前 TTS 立刻中断
- 前端停止播放并清空后续 buffer
- 服务端结束当前 session
第四步:把 TTS Relay 独立成可复用的语音网关
当前它是项目内 service,但未来完全可以变成一个独立语音中继层,为多个 AI 应用复用。
第五步:把知识库/RAG 接进来
当语音链路稳定后,真正决定业务价值的就不再是“会不会说话”,而是“回答是否可靠”。
到那一步,系统的重点就会从“多流协同”继续延伸到“检索增强 + 语音交互”的组合能力。
二十一、我的最终判断:这类语音 Agent 的默认方案,应该怎么选
如果你的目标是:
- 做一个可演示、可讲解、可扩展的语音 AI 助手
- 让用户获得“能说、能看、能听”的闭环体验
- 不想一上来就被全双工实时语音系统的复杂度拖死
那么我认为这套方案是非常合适的默认起点:
- 输入:上传式 ASR
- 文本输出:SSE
- 语音输出:WebSocket 流式 TTS
- 服务端协同:事件桥接
- 前端播放:MediaSource + SourceBuffer
它不是终极架构,但它在复杂度、教学价值和可演进性之间取得了很好的平衡。
反过来说,如果你的目标是:
- 电话机器人
- 实时会议同传
- 全双工强交互语音陪练
- 极低延迟语音中断
那这套方案就只是过渡阶段,你迟早会走向:
- 流式 ASR
- 更强的状态机
- 更复杂的音频管线
- 更严格的会话与时延控制
所以,不要把“是否先进”当成选型标准,而要把“当前场景最需要解决什么问题”当成标准。
总结
回到文章开头的那个结论。
很多人做语音 AI 助手时,会把注意力放在:
- 接哪家 ASR
- 接哪家大模型
- 接哪家 TTS
这些当然重要,但它们不是核心矛盾。
真正决定体验和工程质量的,是你能不能把下面这几件事组织成一个协同系统:
- 录音上传与格式适配
- 文本流式生成
- 文本到语音的分段策略
- SSE 与 WebSocket 的职责分离
- 服务端事件桥接
- 前端流式音频播放
- 生命周期、会话和收尾处理
换句话说,语音 Agent 的本质不是“多接两个接口”,而是“多条流如何在正确的时机,用正确的协议,完成一次完整协作”。
而这,正是这份 NestJS 项目最值得学习的地方。
如果你后面还想继续扩展,我建议下一篇就顺着这条线往下写:
- 把上传式 ASR 升级成流式 ASR
- 增加打断、重说与会话中止能力
- 接入 RAG,让它从“会说话”升级成“会回答业务问题”
到那时,这就不只是一个语音 Demo,而会成为一个真正有业务形态的 AI 应用底座。