概述
SSE(Server-Sent Events)流式输出是 RAG 回答链路中直接面向用户的最后一环——将 LLM 逐 token 生成的回复实时推送到前端,让用户感知到"正在回答"而不是盯着白屏等待。
选择 SSE 而非 WebSocket 的核心考量:LLM 推理是服务端到客户端的单向数据流,客户端发完问题后不需要再往服务端发消息。SSE 是标准 HTTP 长连接,浏览器原生 EventSource 支持自动重连,穿透代理和负载均衡不需要额外配置。全双工能力在这个场景下用不上,但 WebSocket 的握手升级、心跳维持、帧协议解析的复杂度一个不少。
整个流式输出覆盖四个层面:
**连接管理**(SSE 长连接的生命周期)
**协议分层**(事件类型定义与前端交互契约)
**流控制**(LLM 的异步 token 流如何转为可控的推送)
**取消机制**(用户断开或超时后如何跨实例回收资源)
## 1. 总体架构
浏览器 EventSource
│
▼
Tomcat 请求线程 → RAGChatServiceImpl.streamChat()
│
├─ 生成 conversationId + taskId(Snowflake)
├─ 创建 StreamChatEventHandler(绑定 SSE 连接)
├─ ChatQueueLimiter.enqueue()(限流排队)
│ ├─ 拿到许可 → 进入流水线
│ └─ 超时 → REJECT 事件 → DONE → 关闭连接
│
▼
StreamChatPipeline.execute()
│
├─ ... 记忆加载、查询重写、意图识别、检索、提示词构建 ...
│
└─ RoutingLLMService.streamChat(request, callback)
│
├─ 候选模型遍历(优先级路由 + 熔断检查)
├─ 首包探活(60s 超时)
│ ├─ 成功 → 继续流式输出
│ └─ 失败 → 切换下一个候选
│
└─ AbstractOpenAIStyleChatClient.doStreamChat()
│
├─ OkHttp 异步 POST → LLM API(OpenAI 兼容协议)
├─ 逐行读取 BufferedSource
├─ SSE 格式解析 → callback.onContent()
└─ 收到 [DONE] → callback.onComplete()
│
▼
StreamChatEventHandler
│
├─ 分块聚合(5 字符一组)
├─ SseEmitterSender.sendEvent()
├─ StringBuilder 累积完整内容
└─ 完成时写入 t_message 表
│
▼
浏览器 EventSource.onmessage2. 为什么选 SSE
这个选择的核心逻辑是"够用就好,不为未来不需要的能力买单"。下面对比两种方案在 RAG 场景下的实际差异:
LLM 流式推理天然单向:用户发完问题后,剩下的就是服务端不断推送 token,客户端只接收和渲染。直到下一个问题之前,客户端不需要往服务端发任何东西。全双工的能力拿不到任何收益,但代价是实实在在的——要维护心跳、处理半开连接、配置负载均衡的 WebSocket 支持。
EventSource 的自动重连是隐形红利:用户网络抖了一下导致 SSE 断开,浏览器自动重连,不需要写一行重连代码。WebSocket 断开后要自己实现重试逻辑、指数退避、消息补偿。
唯一当前没用到但未来可能需要的能力是客户端中途打断:比如用户看到一半觉得不对,想点"停止生成"。目前是通过另一个 HTTP 请求/stop 接口)来通知服务端取消,而不是通过 SSE 通道本身。如果未来要做语音交互场景下的实时打断,WebSocket 的双向能力就变成刚需了。选型不是一成不变的,取决于场景演进。
3. 入口与上下文初始化
初始化阶段完成四件事:
生成标识:会话 ID(如果前端没传则新生成)和任务 ID,均使用 Snowflake 算法。任务 ID 是全链路追踪的主键,串联从排队到 SSE 断开的所有日志。
创建回调处理器StreamChatEventHandler 是实现 StreamCallback 的核心类,它绑定了 SseEmitter(Spring 的 SSE 封装)和会话信息。后续 LLM 的每个 token 都通过它转为 SSE 事件推给前端。构造时会立即发送 META 事件(会话 ID + 任务 ID),前端据此建立上下文。
限流排队:通过 ChatQueueLimiter.enqueue() 进入分布式排队系统。拿到许可后执行流水线,超时走 REJECT 路径。
绑定取消StreamTaskManager.register() 将 SSE 连接注册到任务管理器。如果用户在排队期间就关了页面,SSE 断开触发 cancelBinder 回调,清理排队位置。
4. SSE 连接管理
4.1 SseEmitterSender —— 线程安全的 SSE 封装
Spring 的 SseEmitter 本身不是线程安全的——多线程并发调用 send() 会抛异常。LLM 的 token 回调可能在不同线程中触发(OkHttp 的回调线程池),需要封装来保证线程安全。
package rag.agent.framework.web;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* SSE 发送器封装类。
*
* <p>用于对 Spring {@link SseEmitter} 进行轻量封装,统一处理事件发送、连接关闭、
* 超时和异常终止等行为,降低业务代码直接操作 {@code SseEmitter} 的复杂度。</p>
*
* <p>实现约束:</p>
* <ul>
* <li>连接关闭状态通过原子变量控制,避免并发场景下重复关闭。</li>
* <li>发送失败后仅关闭连接并记录日志,不再继续向上抛出异常。</li>
* <li>连接关闭后再次发送事件会被直接忽略。</li>
* </ul>
*
* @author Wang
*/
@Slf4j
public class SseEmitterSender {
/**
* Spring SSE 发送器实例。
*/
private final SseEmitter emitter;
/**
* 连接关闭状态标识。
*
* <p>{@code true} 表示连接已关闭,{@code false} 表示连接仍可继续发送数据。</p>
*/
private final AtomicBoolean closed = new AtomicBoolean(false);
/**
* 创建 SSE 发送器。
*
* <p>构造时会注册完成、超时和异常回调,用于同步维护连接关闭状态。</p>
*
* @param emitter Spring SSE 发送器
*/
public SseEmitterSender(SseEmitter emitter) {
this.emitter = emitter;
emitter.onCompletion(() -> closed.set(true));
emitter.onTimeout(() -> closed.set(true));
emitter.onError(e -> closed.set(true));
}
/**
* 发送 SSE 事件到客户端。
*
* <p>当事件名为空时发送默认事件;当事件名不为空时发送命名事件。
* 若连接已关闭,则直接忽略本次发送请求。</p>
*
* @param eventName 事件名称,为 {@code null} 时发送默认事件
* @param data 事件数据
*/
public void sendEvent(String eventName, Object data) {
if (closed.get()) {
return;
}
try {
if (eventName == null) {
emitter.send(data);
return;
}
emitter.send(SseEmitter.event().name(eventName).data(data));
} catch (Exception e) {
fail(e);
}
}
/**
* 正常关闭 SSE 连接。
*
* <p>该方法具备幂等性,多次调用仅首次调用会真正触发连接关闭。</p>
*/
public void complete() {
// 使用 CAS 原子操作,确保只关闭一次
if (closed.compareAndSet(false, true)) {
emitter.complete();
}
}
/**
* 以异常方式关闭 SSE 连接。
*
* <p>该方法用于在流式发送过程中出现异常时终止连接,并记录失败日志。
* 为避免流式响应开始后再次触发统一异常响应,方法内部不会继续抛出异常。</p>
*
* @param throwable 异常对象
*/
public void fail(Throwable throwable) {
closeWithError(throwable);
log.warn("SSE send failed", throwable);
}
/**
* 以异常方式关闭连接。
*
* <p>该方法为内部辅助方法,使用 CAS 保证连接仅关闭一次。</p>
*
* @param throwable 导致连接关闭的异常对象
*/
private void closeWithError(Throwable throwable) {
// 使用 CAS 原子操作,确保只关闭一次
if (closed.compareAndSet(false, true)) {
emitter.completeWithError(throwable);
}
}
}
做了两层保护:
连接状态保护:用 AtomicBoolean closed 标记连接是否已关闭。关闭的触发路径有三个—onCompletion(客户端主动断开)onTimeout(SSE 超时)onError(写入异常)。三个回调在构造时注册,任一触发就通过 CAS 置位 closed。所有 sendEvent() 调用前先检查 closed 标记,已关闭直接跳过,避免往断开的连接写数据。
幂等关闭complete() 和 fail() 都用 CAS 确保只执行一次—if (closed.compareAndSet(false, true))。重复调用不会产生异常。
public void sendEvent(String eventName, Object data) {
if (closed.get()) {
return; // 连接已关闭,跳过
}
try {
emitter.send(SseEmitter.event().name(eventName).data(data));
} catch (Exception e) {
fail(e); // 写入异常 → 标记关闭
}
}4.2 连接超时
SSE 连接的超时时间通过 rag.sse-timeout-ms 配置,默认 300000 毫秒(5 分钟)。这个时间需要覆盖一轮完整 LLM 推理的最长耗时——从首包到最后一个 token,加上前面的检索链路耗时。
设 5 分钟的原因:P99 的 LLM 推理耗时通常在 2 分钟以内,5 分钟覆盖了极端情况(复杂推理 + 长回复 + 排队等待)。如果设太短,正常的长回复会被误杀;设太长,静默断开的连接占着 Tomcat 线程不释放。
超时后 SseEmitter 触发 onTimeout 回调 → closed 置位 → StreamTaskManager 收到取消信号 → LLM 推理被取消 → 已累积内容落库。
4.3 连接断开的三条路径
三条路径的处理逻辑一致,都通过 StreamTaskManager 统一收口——cancelBinder 在入队时绑定到票据上,不管哪条路径触发断开,最终都走到同一个清理逻辑。
5. 流式事件协议
5.1 事件类型
前端通过 EventSource 接收 SSE 事件,按事件类型做不同 UI 处理:
5.2 事件顺序约定
正常流程META → (MESSAGE × N) → FINISH → DONE
取消流程META → (MESSAGE × N) → CANCEL → DONE
拒绝流程META → REJECT → DONE
DONE 事件永远是最后一条。前端收到 DONE 后关闭 EventSource,释放连接资源。这个约定让前端可以放心地在收到 DONE 之前保持 loading 状态。
5.3 MESSAGE 事件的 type 区分
type="think" 对应 LLM 的 reasoning_content(思考过程)type="response" 对应 content(正式回复)。前端可以用不同样式渲染——思考过程用折叠区域、灰色斜体,正式回复正常展示。
思考的开始和结束时间也被记录—StreamChatEventHandler 在收到第一个 thinking token 时记录开始时间戳,收到第一个 response token 时计算思考耗时(秒),最终写入消息记录的 thinkingDuration 字段。
---
## 6. 分块聚合与内容持久化
### 6.1 分块聚合
LLM 每个 token 可能只有 1~2 个字符。如果每个 token 都发一次 SSE 事件,1000 个 token 的回复就是 1000 次事件——前端的渲染频率太高,页面反而会卡。
分块聚合的逻辑在 StreamChatEventHandler.sendChunked() 中:每收到一个 token,按 Unicode 码点计数(正确处理多字节字符),攒够 messageChunkSize(默认 5 个字符)才合并为一次 SSE 事件发送。
// 按码点计数,兼容 emoji 等多字节字符
while (idx < length) {
int codePoint = content.codePointAt(idx);
buffer.appendCodePoint(codePoint);
idx += Character.charCount(codePoint);
count++;
if (count >= messageChunkSize) {
sender.sendEvent(SSEEventType.MESSAGE, new MessageDelta(type, buffer.toString()));
buffer.setLength(0);
}
}块大小 5 是经验值——太小则发送频率高、前端渲染压力大;太大则流式感减弱、用户觉得卡。如果网络环境差(高延迟),可以调大到 10 或 20,减少发送次数。
6.2 内容累积
流过程中用两个 StringBuilder 分别累积思考内容和回复内容。每个 onContent 或 onThinking 回调都会追加到对应的 buffer。
正常完成时(onComplete),将完整内容和思考耗时写入 t_message 表——一次数据库写入,不是每个 token 写一次。
用户取消时(cancelLocal),在取消回调里也执行同样的落库逻辑——先把已累积内容写入对话记录,再发 CANCEL + DONE 事件。取消不等于内容就丢了,用户在历史记录里能看到已生成的部分。
// onComplete 中的持久化
String thinkingContent = thinking.isEmpty() ? null : thinking.toString();
ChatMessage message = ChatMessage.assistant(answer.toString(), thinkingContent, thinkingDuration);
messageId = memoryService.append(conversationId, userId, message);7. 跨实例取消机制
7.1 问题场景
用户在前端点了"停止生成"或者关闭了页面。这个 HTTP 取消请求可能被负载均衡打到实例 A 上,但实际跑 LLM 推理的是实例 B。取消信号必须跨实例传递,否则实例 B 上的 LLM 推理还在白白消耗 API 配额。
7.2 实现方案
流式任务管理器 管理了取消信号的全生命周期:
第一步——写标记:取消请求到达后,先在 Redis 设一个取消标记 Keyragent:stream:cancel:{taskId}),值为 true,TTL 30 分钟。这个 TTL 覆盖了单次请求的全部生命周期,过期后自动清理。
第二步——广播:通过 Redisson 的 RTopic 发布取消消息到 ragent:stream:cancel 通道,消息体是 taskId。
第三步——本地消费:所有实例在启动时通过 @PostConstruct 订阅这个 Topic。收到消息后调用 cancelLocal(taskId):
// cancelLocal 核心逻辑
if (!taskInfo.cancelled.compareAndSet(false, true)) {
return; // CAS 保证只执行一次
}
if (taskInfo.handle != null) {
taskInfo.handle.cancel(); // 取消 LLM 推理
}
if (taskInfo.sender != null) {
sender.sendEvent(CANCEL, payload); // 通知前端
sender.sendEvent(DONE, "[DONE]");
sender.complete(); // 关闭 SSE 连接
}第四步——注册时回查:SSE 连接建立时(register),主动查一次 Redis 是否已有取消标记。这是处理"取消先于注册到达"的竞态——取消消息可能在 EventHandler 注册之前就到了,如果不回查,这个取消就丢了。
7.3 幂等保证
取消逻辑的幂等依赖两个机制:
CAS 终态互斥taskInfo.cancelled 是 AtomicBooleancompareAndSet(false, true) 确保只有第一个到达的取消信号(无论是本地调用、Redis 广播还是注册时回查)能执行清理逻辑。后续重复的取消信号直接 return。
本地缓存 + Redis 双层:取消标记同时存在本地 Guava Cachetasks,写入后 30 分钟过期,最大 10000 条)和 Redis。本地缓存提供快速的重复取消拦截,Redis 提供跨实例的取消信号传递。Key 过期后自动清理,不需要手动维护。
7.4 时序问题
取消存在两个竞态窗口:
取消请求先于 LLM 绑定到达:用户在流水线还在跑检索时就点了取消。此时 taskInfo.handle 为 null——LLM 推理还没开始,句柄还没绑定。cancel() 先执行,设置了 Redis 标记 + 广播。当 bindHandle() 后续执行时,发现 taskInfo.cancelled 已为 true,会立即调用 handle.cancel() 取消刚启动的 LLM 推理。
取消请求先于 register 到达cancel() → register() 的时序中,register 内部通过 isTaskCancelledInRedis() 回查 Redis 标记,发现已取消则直接走取消完成流程,发送 CANCEL + DONE 事件。
8. LLM 流式传输层
8.1 协议适配
所有 LLM 供应商统一走 OpenAI 兼容协议/v1/chat/completionsAccept: text/event-stream)AbstractOpenAIStyleChatClient 封装了 HTTP 层的流式调用逻辑。
子类(BaiLianChatClient、SiliconFlowChatClient、AIHubMixChatClient、OllamaChatClient)只需实现 provider() 返回供应商标识,有特殊需求的可以覆写 customizeRequestBody() 和 isReasoningEnabledForStream()。新增供应商只需加一个子类和一行配置。
8.2 OkHttp 异步流式读取
流式调用使用 OkHttp 的同步阻塞读取模型——在独立线程中执行 HTTP 请求,逐行读取响应体:
try (Response response = call.execute()) {
BufferedSource source = response.body().source();
while (!cancelled.get()) {
String line = source.readUtf8Line();
if (line == null) break; // 流结束
if (line.isBlank()) continue; // 跳过空行(SSE 分隔符)
ParsedEvent event = OpenAIStyleSseParser.parseLine(line);
if (event.hasReasoning()) callback.onThinking(event.reasoning());
if (event.hasContent()) callback.onContent(event.content());
if (event.completed()) callback.onComplete();
}
}为什么用同步阻塞而不是 OkHttp 的异步回调?
同步读取让取消逻辑更简单—while (!cancelled.get()) 一行就能控制整个读取循环的启停。异步回调模式下,取消需要同时管理回调链和底层 Call 的 cancel,状态机更复杂。
取消延迟:取消标志置位后,当前正在读取的一行 SSE 数据会继续处理完才退出循环。LLM 可能刚好发了一长串 token 在同一个 SSE 事件里,取消后还要多收几个 token。成本敏感的场景可以在 cancel 时直接调 call.cancel() 强制关闭底层 socket,立竿见影。
8.3 取消句柄的传递链
取消信号从用户浏览器一路传到 OkHttp 的 Socket,经过多层转发:
用户点击取消 → /stop 接口 → StreamTaskManager.cancel(taskId)
→ Redis 设标记 + RTopic 广播
→ cancelLocal(taskId)
→ taskInfo.handle.cancel() ← StreamCancellationHandle 接口
→ inner.cancel() ← StreamAsyncExecutor 内部句柄
→ cancelled.set(true) ← AtomicBoolean,读取循环在下一轮退出StreamCancellationHandle 是一个函数式接口(只有一个 cancel() 方法),各层取消句柄通过 Lambda 串联,不暴露内部实现细节。
9. 首包探活与故障转移
流式场景下的健康检查和普通 HTTP 不同——连接建立、状态码 200、响应头返回,都不代表模型真的在工作。LLM 可能在推理队列里排队、GPU 负载高导致 prefill 阶段卡住,第一个 token 迟迟不出。
9.1 ProbeStreamBridge
ProbeStreamBridge.java是一个插入在 OkHttp 流式响应和业务回调之间的桥接器。它的核心逻辑:
拦截第一个数据块:收到第一个 onContent 或 onThinking 回调时,立即通过 CompletableFuture.complete() 通知探测结果,然后才把后续内容透传给下游回调。
缓冲机制:首包到达前,后续到达的数据块暂存在 buffer 中。首包确认成功后才 commit——把 buffer 中的数据逐条 dispatch 给下游回调。这保证了首包探测期间下游不会收到任何内容,前端不会闪现"正在输入"然后又切换模型。
阻塞等待awaitFirstPacket(timeout) 通过 CompletableFuture.get(timeout, unit) 阻塞等待首包或超时。
9.2 故障转移流程
RoutingLLMService.streamChat() 按优先级遍历候选模型:
for (ModelTarget target : targets) {
1. 解析提供商客户端(如 BailianChatClient)
2. 健康检查(healthStore.allowCall → 检查是否被熔断)
3. 发起流式请求 → ProbeStreamBridge 桥接
4. 阻塞等待首包(60s 超时)
├─ 成功 → 标记健康,返回取消句柄,外层继续流式回调
└─ 失败 → 标记失败,handle.cancel(),continue 到下一个候选
}
全部失败 → callback.onError() 通知前端首包超时和 HTTP 错误分别计入失败统计。连续失败 2 次触发熔断(OPEN 状态 30 秒),之后的请求直接跳过该模型,直到 HALF_OPEN 试探成功才恢复。
10. 全链路追踪
流式调用有两个关键的 trace 节点:
- llm-stream-routing:记录模型路由和故障转移过程,包含首包耗时
- llm-first-packet:记录首包探活的独立耗时(TTFT——Time To First Token)
LlmFirstPacketProbe 是一个独立的 Spring Bean,作用是把 awaitFirstPacket 从 RoutingLLMService 的内部调用拆出来,让 AOP 能拦截到并记录 trace 节点。Spring AOP 不拦截类内部的 self-call,所以必须拆成外部 Bean 调用。
trace 数据写入 t_rag_trace_node 表,包含:模型 ID、首包耗时、是否发生切换、切换次数。首包耗时的 P99 是关键告警指标——突然变高说明模型 API 响应变慢或排队加剧。
11. 边界情况与容错
用户关闭浏览器:SSE 断开 → onCompletion/onError 回调 → cancelBinder 清理排队位置 → StreamTaskManager.cancel() 跨实例取消 LLM 推理 → 已累积内容落库。不会出现"用户关了页面但 LLM 还在跑"的资源浪费。
网络静默断开:用户网络切换(WiFi 切 4G)但 TCP 没有正常关闭。SSE 超时(5 分钟)兜底,到时间后 onTimeout 触发相同的清理流程。
LLM 首包超时:60 秒内未收到第一个 token → 标记失败 → 自动切到下一个候选模型。对用户的影响是多了几百毫秒到几秒的等待,但不会看到报错。
所有 LLM 候选失败:最后一个候选也失败后,通过 callback.onError() 通知前端。前端展示"生成失败,请稍后重试"。
排队超时:15 秒未拿到 LLM 调用许可 → REJECT 事件 → DONE。同时将用户问题写入对话记录,标记为系统拒绝回复,用户在历史记录里能看到。
模型输出失控:提示词设计不当或模型状态异常可能导致重复生成循环。流过程中检查累积内容长度,超过阈值(如 10000 字符)强制取消 LLM 推理,防止内存被打高。
SSE 写入异常:网络断开导致 send 失败 → SseEmitterSender.fail() 置位 closed 标记 → 后续 send 自动跳过 → 不抛异常到上层,避免触发全局异常处理器导致响应冲突。