这条链路是 RAG 的核心——从用户输入到流式回答的全过程。整体耗时预算在 5 到 15 秒(从用户发消息到首字出现),其中 LLM 推理占大头,前面的检索链路需要控制在 1 到 3 秒内完成。
请求进入后先过限流排队拿 LLM 调用许可,然后统一编排 8 个阶段串行推进,但阶段内部有并行和短路设计。
记忆加载 -> 改写拆分 -> 意图解析 -> 歧义引导 -> 系统响应 / 检索 -> Prompt 组装 -> 流式输出
1. 基础机制:上下文传递与限流排队
1.1 用户上下文传递
每个请求进入时从 SaToken 中提取登录用户 ID,查数据库拿到完整用户信息,放入 UserContext 中供全链路使用。
这里的关键设计是用 TTL(TransmittableThreadLocal)而不是普通 ThreadLocal。普通 ThreadLocal 在线程池复用时上下文会丢失——检索阶段多通道是并行提交到线程池执行的,主线程里的用户信息传递不到异步线程。TTL 由阿里开源,专门解决线程池场景下的上下文传递问题,异步线程可以自动继承主线程的用户信息。
请求完成后 UserContext.clear() 清理上下文,防止线程复用时上一个用户的身份残留到下一个请求。
UserContext
用户上下文容器(基于 TTL 传递当前线程的登录用户)。
该类提供线程隔离的用户信息存储与访问能力,支持在异步线程池中透传用户上下文。 基于阿里巴巴的 TransmittableThreadLocal(TTL)实现,相比普通 ThreadLocal, TTL 能在线程池异步任务中自动传递上下文,适用于 @Async、CompletableFuture 等异步场景。
核心功能:
设置用户:set(LoginUser) - 在拦截器中设置当前登录用户
获取用户:get() / requireUser() - 获取当前线程的登录用户信息
快捷方法:getUserId() / getUsername() / getRole() / getAvatar() - 获取用户的特定字段
清理上下文:clear() - 在请求结束时清理,防止内存泄漏
判断登录:hasUser() - 判断当前线程是否存在用户上下文
典型使用场景:
拦截器中设置:UserContextInterceptor.preHandle 从 Sa-Token 获取用户信息后调用 set()
Service 层获取:业务逻辑中通过 requireUser() 获取当前操作用户 ID/角色
请求结束清理:UserContextInterceptor.afterCompletion 调用 clear() 避免内存泄漏
异步场景透传:在 @Async 或 CompletableFuture 中自动传递用户上下文
使用示例:
// 1. 在拦截器中设置用户上下文
LoginUser loginUser = LoginUser.builder()
.userId("1234567890")
.username("admin")
.role("admin")
.avatar("https://...")
.build();
UserContext.set(loginUser);
// 2. 在 Service 层获取当前用户
LoginUser user = UserContext.requireUser();
String userId = user.getUserId();
String role = user.getRole();
// 3. 或者使用快捷方法
String userId = UserContext.getUserId();
String username = UserContext.getUsername();
// 4. 在请求结束时清理(通常在拦截器的 finally 块中)
try {
// 业务逻辑
} finally {
UserContext.clear();
}
// 5. 异步场景中自动透传(无需额外处理)
@Async
public void asyncMethod() {
// 这里可以正常获取用户上下文
LoginUser user = UserContext.get();
}
重要注意事项:
必须清理:必须在请求结束时调用 clear() 清理上下文,否则在线程池复用场景下可能导致用户信息错乱
线程安全:所有方法都是线程安全的,不同线程之间的上下文互不影响
异步透传:使用 TTL 后,在 @Async、CompletableFuture 等异步场景中会自动传递上下文
空值处理:get() 可能返回 null,如果业务要求必须登录,应使用 requireUser()
不可变性:LoginUser 对象创建后不应修改,保证上下文的一致性1.2 限流排队
请求进入流水线之前,先过 ChatQueueLimiter.enqueue() 排队拿 LLM 调用许可。LLM API 有并发上限,多个服务实例共享配额,必须跨实例协调。
限流器设计为公平排队:基于 Redis ZSET 做先到先得的排队队列(score 为全局递增序列号),Redisson 的可过期信号量做许可池(许可数等于 LLM API 并发上限),Lua 脚本把"检查队头 + 出队 + 删标记"合并为一次原子操作。跨实例唤醒通过 RTopic 发布订阅实现——任一实例释放许可后广播消息,其他实例的等待者收到通知后立即检查能否拿到许可。
排队超时(默认 15 秒)走 REJECT 路径:向前端发送"系统繁忙"提示,同时把用户问题和系统回复写入对话记录,保证用户在历史记录里能看到自己问过什么。
许可租约 30 秒,覆盖单次 LLM 推理的最大耗时。请求处理完成后在 finally 块释放许可,用户中途关闭页面通过 SSE 的回调触发取消,清理排队位置。
ChatQueueLimiter
SSE 流式对话全局并发限流与排队处理器。
基于 Redis 实现公平队列和信号量限流,保证系统在高并发场景下的稳定性。
核心功能:
全局限流:通过 Redis PermitExpirableSemaphore 控制全局最大并发数
公平排队:使用 Redis SortedSet 实现 FIFO 队列,按入队顺序分配 permit
原子抢锁:通过 Lua 脚本保证队列检查和 permit 获取的原子性,避免竞态条件
超时拒绝:等待超过最大时长后返回友好提示,并记录到对话历史
实时通知:permit 释放时通过 Redis Topic 广播,通知排队中的请求立即重试
优雅关闭:应用关闭时清理监听器、定时任务和资源
技术架构:
Redis 数据结构:
Semaphore: rag:global:chat - 控制并发数的信号量
SortedSet: rag:global:chat:queue - 排队队列,score 为入队序号
AtomicLong: rag:global:chat:queue:seq - 生成单调递增的入队序号
Topic: rag:global:chat:queue:notify - permit 释放时的广播通道
Lua 脚本:lua/queue_claim_atomic.lua - 原子性地检查队列头部并移除
定时任务:ScheduledThreadPoolExecutor - 定期轮询队列,尝试获取 permit
通知机制:PollNotifier - 收到广播后立即触发所有排队请求重试2. 总体架构
整体流程(8 个阶段 + 3 个短路分支)
1. 记忆加载 —— 读历史 + 追加当前消息,两级记忆(滑动窗口原文 + 摘要压缩)
2. 查询重写 —— LLM 改写 + 规则兜底,输出改写结果和子问题列表
3. 意图识别 —— 意图树 + LLM 分类 + 置信度门控,输出每个子问题的意图匹配
├─ 短路1:歧义反问 —— 最高置信度低于阈值时生成澄清问题反问用户,不继续后续阶段
├─ 短路2:系统直答 —— 所有意图均为 SYSTEM 类型时跳过检索,用节点模板直接生成回复
4. 多通道检索 —— 外层按子问题并行编排(KB 检索 + MCP 工具调用),内层两通道并行
├─ 短路3:空结果 —— 检索返回空时直接回复"未检索到相关文档",不浪费 LLM 调用
5. 后处理链 —— 去重 + Rerank 精排(在 RetrievalEngine 内部完成)
6. 提示词构建 —— 系统指令 + 长期记忆 + 短期记忆 + 检索上下文 + 用户问题
7. LLM 流式推理 —— 模型路由 + 首包探活 + 故障转移,逐 token 通过 SSE 推送前端
短路设计
三个短路分支用 handleXxx 方法实现,返回 true 表示"已处理,流程终止", 返回 false 表示"不满足短路条件,继续后续阶段"。这个模式避免了深层嵌套的 if-else,每个短路分支的触发条件和处理逻辑都在独立方法内,新增短路路径只需加一个 handle 方法并在 execute 中调用。
上下文传递
所有中间状态通过 StreamChatContext 在阶段间传递——它既是入参容器(用户问题、 会话ID、深度思考开关),也是出参容器(历史消息、重写结果、意图列表、检索上下文)。 每个阶段从 ctx 读取上一阶段的输出,处理后将结果写回 ctx,下一阶段继续消费。 这样做的好处是阶段之间零耦合——任一阶段的实现变更不影响其他阶段。
容错策略
不在此类内部做 try-catch。每个阶段的异常由调用方统一捕获并处理。这样编排器保持简洁,错误处理集中管理。2.1 阶段一:记忆加载
记忆加载做了两件事:读历史 + 追加当前消息。合在一起的好处是在一次操作中完成"加载上下文"和"写入新消息",不会出现历史读到了但当前消息还没写进去导致下一阶段拿到不完整上下文的间隙。
记忆本身分两层:
短期记忆:滑动窗口保留最近 N 轮原文。窗口大小是可配置的rag.memory.history-keep-turns,默认 4 轮。
为什么是 4 轮而不是更多?
因为每多一轮就多几百个 token,而对话超过 4 轮之后,最早的那几轮通常跟当前问题已经没有直接关联了。保留它们是浪费上下文窗口。
长期记忆:超过阈值(默认 5 轮)后触发摘要压缩。后台异步调 LLM 把早期对话压缩成一段摘要,存到 t_conversation_summary 表。摘要不会阻塞当前请求——如果摘要还没生成完,先用原文顶一次,下次请求就能用上摘要了。
两种记忆同时注入提示词。先放摘要(全局背景),再放近期原文(精确上下文)。这个顺序是有意为之——LLM 倾向于更关注靠后的内容,近期对话应该比摘要更有影响力。
2.2 阶段二:查询重写
直接用用户原始问题去查向量库,效果通常很差。用户会说"上次那个怎么样了""A 和 B 有什么区别",这些指代模糊、语义混合的表达,向量匹配根本对不上。
重写解决三类问题:
指代消解:"上次那个" → 从对话历史里找到指代的具体实体,替换进去。
多义拆分:"A 和 B 有什么区别" → 拆成"A 是什么""B 是什么""A 和 B 有什么区别"三个子问题,各自独立检索。
口语规范化:"那个日志服务一天能写多少条" → "SLS 日志服务写入配额",更接近文档的表述方式。
实现上走 LLM 主路径 + 规则兜底。LLM 调用传最近 2 轮历史(为什么是 2 轮?再多历史对指代消解帮助不大,反而增加 token 消耗)。温度设 0.1,要求 JSON 格式输出。LLM 挂了或者返回格式解析失败,按标点符号做简单拆分,保证链路不断。
2.3 阶段三:意图识别
意图识别不是通用的语义分类,而是**检索路由器的前置决策**——判断用户问题应该走哪个知识库、调哪个 MCP 工具、还是系统直接回复。
2.3.1 意图树设计
意图以树形结构组织,但分类时不是逐层调用 LLM,而是一次调用对所有叶子节点同时打分。树的层级结构是为了可维护性和可读性——业务域、子域、具体场景三层嵌套,比扁平标签更容易理解和配置。只有叶子节点才挂实际配置(知识库或工具),非叶子节点仅作为结构容器。
一次调用的原因:当前意图规模下(叶子节点通常几十个以内),LLM 完全有能力一次性准确区分。如果逐层分类,每层都是一次 LLM 调用,延时叠加反而更慢。当意图规模膨胀到上百个叶子节点时,再考虑分层分类或先用向量相似度做粗筛。
节点结构(数据库表 t_intent_node):
- intentCode:业务唯一标识,如 group-hrbiz-oa-intro
- kind:节点类型,0=KB(知识库),1=SYSTEM(系统回复),2=MCP(工具调用)
- collectionName:KB 类型指向的向量集合
- mcpToolId:MCP 类型指向的工具
- topK:节点级检索数量,没配就用全局默认值
- examples:示例问题,帮助 LLM 理解这个节点的覆盖范围
只有叶子节点才挂实际配置(知识库或工具),非叶子节点只做分类路由。这个设计保证了意图树的每一层职责清晰:中间层只管分类方向,叶子层负责执行动作。
2.3.2 分类流程
树形意图分类器 做一次 LLM 调用完成全部分类。把所有叶子节点的 id、路径、描述、示例问题列在提示词里,要求 LLM 返回每个节点的置信度分数。输出格式是 JSON 数组[{"id": "...", "score": 0.9, "reason": "..."}]。
解析响应时做了白名单校验——LLM 返回的 id 不在意图树里就直接丢弃。这是防幻觉的兜底,LLM 可能编造不存在的节点 ID。
分类结果按分数降序排列,低于最低阈值minIntentScore,默认 0.4 的节点被过滤掉。每个子问题输出一个 List<NodeScore>。
2.3.3 意图树缓存
意图树从数据库加载后缓存到 RedisIntentTreeCacheManager,7 天过期。
为什么是 7 天而不是永久?
管理后台改了意图配置后主动删缓存,但如果删缓存的操作因为网络抖动没执行成功,永久缓存意味着永远读到旧数据。7 天过期是安全网。
每次意图分类请求都从 Redis 重新读取,保证管理后台改完配置后最多等一次缓存重建就能生效。
2.4 阶段四:分支路由
意图识别结果不一定都要走检索。三种分支:
歧义反问:最高置信度低于阈值时,不硬猜。通过 意图引导服务 负责检测用户问题的歧义性,当检测到多个系统意图得分接近时, 生成一个澄清问题,直接通过流式回调发给用户。比如用户问"那个系统怎么用",意图识别拿到两个低分候选(OA 系统 0.45、CRM 系统 0.41),系统反问"你是指 OA 系统还是 CRM 系统?"。
这个短路的价值在于:避免了一次大概率错误的检索和一次浪费 token 的 LLM 推理。反问用户的代价远小于给错答案的代价。
系统直答:所有子问题意图都是 SYSTEM 类型时(比如"你是谁""你能做什么"),跳过检索,直接用节点的提示词模板调 LLM 生成回复。这类问题不需要知识库,省掉检索的延时。
继续检索:有 KB 或 MCP 意图,进入检索阶段。
2.5 阶段五:多通道检索
检索分两层:外层按子问题并行编排,内层按通道并行检索。
2.5.1 外层编排
RetrievalEngine
检索引擎。
负责协调多通道检索(知识库)和 MCP(模型控制协议)工具的调用,并对检索结果进行重排序和格式化,最终生成用于 LLM 的上下文。
核心功能:
多子问题并行:对多个子问题并发执行检索,提升效率
知识库检索:调用 MultiChannelRetrievalEngine 执行多通道检索
MCP 工具调用:并行执行多个 MCP 工具获取动态数据
上下文格式化:将检索结果格式化为 LLM 可读的 Markdown 文本
异常容错:单个子问题或工具失败不影响整体流程
MultiChannelRetrievalEngine
多通道检索引擎。
负责协调多个检索通道和后置处理器,实现智能的多路召回策略。
核心功能:
并行检索:并发执行所有启用的检索通道,提升检索效率
后置处理链:依次执行去重、Rerank 等后置处理器
异常容错:单个通道或处理器失败不影响整体流程
详细统计:记录每个通道和处理器的执行情况RetrievalEngine 对每个子问题并行处理:
- KB 意图 → 走 MultiChannelRetrievalEngine 做多通道检索
- MCP 意图 → 走 MCP 工具调用链路
子问题之间没有依赖,用 CompletableFuture 提交到独立的线程池 ragContextExecutor 并行执行。子问题的实际检索数量取该子问题匹配的所有 KB 节点中 topK 字段的最大值。
MCP 工具调用链路分为三步:
MCPParameterExtractor
MCP 参数提取器接口。
定义从用户问题中提取 MCP 工具调用所需参数的标准方法,支持默认提示词和自定义提示词两种模式。
核心功能:
参数提取:根据工具定义从用户问题中提取参数
自定义提示词:支持传入自定义参数提取提示词模板
MCPToolExecutor
MCP 工具执行器接口。
定义 MCP 工具的执行标准,包括获取工具定义、执行调用和请求匹配等功能。
核心功能:
工具定义:获取工具的元数据信息
工具执行:根据请求执行工具调用并返回响应
请求匹配:判断是否支持某个请求
MCPToolRegistry
MCP 工具注册表接口。
定义 MCP 工具执行器的注册、注销和查询标准,支持动态管理工具生命周期。
核心功能:
工具注册:注册和注销工具执行器
工具查询:按 toolId 查询、列表查询、存在性检查
统计信息:获取已注册工具数量参数提取McpParameterExtractor:根据意图节点配置的工具 ID 获取工具定义(参数名、类型、描述),调用 LLM 从用户问题中提取参数值。如果意图节点配置了自定义的参数提取提示词模板,优先使用节点配置;否则使用通用模板。
工具执行McpToolExecutor:通过 McpToolRegistry 找到对应的工具执行器,传入提取的参数执行调用。工具执行器通过 MCP 协议与外部工具服务通信(MCP Java SDK 1.1.2),支持同步调用和流式响应。每个 MCP 意图独立提交到 mcpBatchExecutor 线程池并行执行。
结果格式化ContextFormatter.formatMcpContext():将工具返回的结构化结果转为 LLM 可读的文本格式,和 KB 检索结果按相同的上下文格式拼接,确保 KB 检索和 MCP 工具调用的结果在提示词中呈现方式一致。
KB 检索和 MCP 调用的结果最终在 buildSubQuestionContext() 中合并:KB 上下文和 MCP 上下文分别按子问题组装,多子问题时各自附加序号和原始问题标签。
2.5.2 内层通道:意图定向 vs 全局向量
SearchChannel
检索通道抽象接口。
每个通道代表一种独立召回策略,例如全局向量检索或意图定向检索。多通道检索引擎会并发执行它们, 再统一合并结果。
核心功能:
通道标识:提供名称、优先级和类型用于调度和监控
启用判断:根据上下文动态决定是否参与本次检索
执行检索:返回当前通道的召回结果及统计信息
IntentDirectedSearchChannel
意图定向检索通道。
基于意图识别结果,在特定知识库中进行定向检索。这是最精确的检索方式,优先级最高。
核心功能:
意图过滤:只提取 KB 类型的意图节点
并行检索:对多个意图对应的知识库并发执行检索
动态 TopK:根据配置倍率扩大检索范围
异常容错:检索失败时返回空结果,不影响其他通道
VectorGlobalSearchChannel
向量全局检索通道。
在所有知识库中进行向量检索,作为意图定向检索的兜底策略。当没有识别出意图或意图置信度较低时启用。
核心功能:
全量检索:在所有 KB collection 中并行执行向量检索
智能启用:根据意图置信度动态决定是否启用
异常容错:检索失败时返回空结果,不影响其他通道
AbstractParallelRetriever<T>
并行检索抽象模板类。
封装通用的并行检索逻辑,采用模板方法模式,子类只需实现具体的检索任务创建逻辑。
核心功能:
并行执行:使用 CompletableFuture 并发提交多个检索任务到线程池
结果收集:等待所有任务完成并合并结果
异常容错:单个任务失败不影响其他任务,记录失败日志
统计信息:记录成功/失败数量和总 Chunk 数
子类需要实现三个抽象方法:
createRetrievalTask - 创建单个检索任务
getTargetIdentifier - 获取目标标识(用于日志)
getStatisticsName - 获取统计名称(用于日志)
类型形参:
<T> – 检索目标类型(如 NodeScore、String)两个通道互补,用策略模式解耦,通过 SearchChannel 接口统一。
意图定向通道(IntentDirectedSearchChannel,优先级 1):
配置 intentDirected.enabled=true 且存在 KB 意图时启用。从匹配的意图节点中提取 collectionName,在对应的向量集合中检索。每个意图召回 baseTopK × topKMultiplier(默认 ×2)个候选。
为什么是 ×2 而不是精准召回 TopK?
因为要给后续精排模型留足候选池。向量检索是粗筛,只能保证相关文档大概率在结果里,但不能保证排序最优。多拿一些候选,让精排模型去挑最合适的。
全局向量通道(VectorGlobalSearchChannel,优先级 10):
配置 vectorGlobal.enabled=true 时,按以下条件判断是否启用:
- 意图定向没开 → 必须用全局兜底,没别的选择了
- 没有识别出任何意图 → 全局兜底
- 最高意图置信度 < 0.6confidenceThreshold→ 意图不够可靠,全局补充
- 只有一个意图且置信度在 0.6 到 0.8 之间singleIntentSupplementThreshold→ 中等把握,全局补充安全网
检索范围是所有 KB 类型的向量集合,每个集合召回 topK × topKMultiplier(默认 ×3)。全局的倍数比意图定向大,因为它没有意图过滤,噪声多,需要更宽的召回面来保证精度。
两个通道并行执行,任一通道失败不影响另一个。通道内部对多个集合的并行检索统一走 AbstractParallelRetriever 模板类——子类只需实现"怎么查单个集合",并行编排、结果收集、统计日志都在父类里。
2.5.3 pgvector 检索原理
PgRetrieverService
PostgreSQL pgVector 向量检索服务实现。
基于 PostgreSQL 的 pgVector 扩展实现向量相似度检索,支持 HNSW 索引和余弦距离计算。
核心功能:
向量嵌入:使用 EmbeddingService 将文本转换为向量
向量归一化:对嵌入向量进行 L2 归一化
HNSW 检索:设置 ef_search 参数提升召回率
余弦相似度:使用 <=> 运算符计算余弦距离项目主用 pgvector + HNSW 索引PgRetrieverService.retrieveByVector() 每次检索前执行 SET hnsw.ef_search = 200。
默认的 ef_search 是 40,在百万级向量库下召回率掉得很明显。调到 200 是在延时和召回率之间取平衡——再往上加精度提升很有限,但查询延时线性增长。如果数据量上到千万级,需要继续往上调。
向量先做 L2 归一化,再用 pgvector 的 <=> 操作符计算余弦距离。必须归一化是因为 <=> 算的是余弦距离的前提是向量已经是单位向量,没归一化算出来的不是余弦距离,检索结果没意义。相似度公式是 score = 1 - cosine_distance。
2.6 阶段六:后处理链
两个通道的结果合并后,进入串行后处理链。处理器按 order 属性排序执行,任一处理器失败不影响后继(跳过继续)。
2.6.1 去重
始终启用,是后处理链的第一步。
去重的关键是**同一个文本块被两个通道同时命中时,保留分数最高的版本**。通道按优先级排序遍历:意图定向通道优先级最高,全局向量通道优先级最低。以 chunkId 为 key,后来的如果分数更高会覆盖先到的。这样定向通道的高分不会被全局通道的低分覆盖。
用 chunkId 而不是内容哈希做去重 key 是有考虑的:两个不同的文本块即使内容一样,也应该都保留——它们可能来自不同的文档,携带不同的元数据,对 LLM 回答的价值不同。只有同一个 chunk(同一条数据库记录)被多个通道重复命中才需要去重。
2.6.2 精排
通过系统配置控制开关。启用时调用精排模型(默认百炼 Qwen3-Reranker)对去重后的候选做交叉注意力计算。
精排和向量检索的分工不同。向量检索是把文本压成固定长度的向量,用向量距离做相似度判断。这个压缩过程会丢信息——"怎么配置防火墙"和"怎么关闭防火墙"在向量空间里几乎重叠,但它们的意思完全相反。
精排模型不做压缩,它把问题和每个候选文本成对输入,做词级别的交叉注意力计算,能感知到细微的语义差异。代价是计算量大,不能在全库上跑。所以两者的配合是:向量检索从全库快速粗筛出 30 个候选,精排从中挑出最好的 5 个。粗筛保证不漏,精排保证不偏。
精排不可用时(模型故障或开关关闭),直接按向量得分截断,降级但不中断服务。
2.7 阶段七:提示词构建
提示词组装按固定顺序注入:
1. 系统指令:从 chat-system-prompt.st 模板加载,定义回答风格和约束
2. 长期记忆:摘要内容,提供全局对话背景
3. 短期记忆:最近 N 轮原文,提供精确上下文
4. 检索上下文:KB 检索结果和 MCP 工具调用结果。多子问题时附带序号和原始问题标签,帮助 LLM 组织回答结构
5. 当前问题:改写后的问题
温度策略:纯 KB 场景温度设 0,确保回答严格基于文档内容;MCP 混合场景温度 0.3,因为工具调用结果的结构化程度差一些,需要模型稍微灵活地组织语言。
2.8. 阶段八:流式推理与输出
2.8.1 模型路由和故障转移
ModelSelector
模型选择器,负责按能力类型、优先级、思考模式和健康状态筛选可用候选。
核心功能:
能力分类:支持 Chat、Embedding、Rerank 三种能力的模型选择
深度思考:根据是否需要深度思考,选择不同的聊天模型
优先级排序:按首选模型、优先级分数、ID 顺序进行多级排序
健康过滤:自动跳过被熔断或不可用的模型
配置校验:检查提供商配置是否存在,缺失时记录警告并跳过
选择流程:
从 AIModelProperties 获取对应能力的模型组配置
解析首选模型 ID(深度思考时使用 deepThinkingModel,否则使用 defaultModel)
过滤候选列表:排除已禁用的模型,深度思考模式下只保留支持思考的模型
排序候选列表:首选模型排第一,然后按优先级降序,最后按 ID 升序
构建 ModelTarget:为每个候选创建包含完整配置的 ModelTarget 对象
健康检查:跳过已被判定为不可用的模型
提供商校验:检查提供商配置是否存在,缺失时记录警告
使用场景:
Chat 服务:RoutingChatService 调用 selectChatCandidates() 获取聊天模型列表
Embedding 服务:RoutingEmbeddingService 调用 selectEmbeddingCandidates() 获取嵌入模型列表
Rerank 服务:RoutingRerankService 调用 selectRerankCandidates() 获取重排序模型列表
设计原则:
灵活性:支持通过配置动态调整候选列表和优先级,无需修改代码
容错性:配置缺失或模型不可用时自动降级,不会导致整个服务失败
可扩展性:新增能力类型只需添加对应的 selectXXXCandidates() 方法
性能优化:在构建阶段就过滤掉不可用模型,避免在执行阶段快速失败
典型用法:
// 1. 选择聊天模型候选(非深度思考模式)
List<ModelTarget> chatCandidates = modelSelector.selectChatCandidates(false);
// 2. 选择聊天模型候选(深度思考模式)
List<ModelTarget> deepChatCandidates = modelSelector.selectChatCandidates(true);
// 3. 选择 embedding 模型候选
List<ModelTarget> embeddingCandidates = modelSelector.selectEmbeddingCandidates();
// 4. 选择 rerank 模型候选
List<ModelTarget> rerankCandidates = modelSelector.selectRerankCandidates();
// 5. 遍历候选列表,按顺序尝试调用
for (ModelTarget target : chatCandidates) {
String modelId = target.id();
ModelCandidate candidate = target.candidate();
ProviderConfig provider = target.provider();
// 构建客户端并调用模型...
}
ProbeStreamBridge
流式首包探测桥接器。
在真正把内容转发给下游前,先短暂缓冲首批事件,用于判断当前模型是否“成功开始输出”。 路由层据此决定是认定本次流调用成功,还是切到下一个候选模型重试,避免首包前失败直接暴露给前端。
核心功能:
首包探测:等待第一个内容或思考片段到达,确认模型正常响应
事件缓冲:在首包确认前缓冲所有事件,避免下游看到半截输出
故障转移支持:如果首包超时或出错,路由层可以切换到备选模型
线程安全:使用 synchronized 保证缓冲和提交的原子性
工作流程:
创建 ProbeStreamBridge,包装下游回调
将桥接器传递给 ChatClient.streamChat()
当收到 onContent() 或 onThinking() 时,标记首包成功并缓冲事件
路由层调用 awaitFirstPacket() 等待首包结果
如果首包成功,调用 commit() 将缓冲事件按顺序转发给下游
如果首包失败/超时,路由层可以选择切换到备选模型
使用场景:
多模型路由:主模型无响应时自动切换到备选模型
用户体验优化:避免前端长时间等待后显示错误
容错处理:在网络不稳定或服务异常时提供降级方案
设计模式:
装饰器模式:包装下游 StreamCallback,添加首包探测功能
桥接模式:连接上游模型调用和下游业务回调
观察者模式:通过 CompletableFuture 通知等待者首包结果
注意事项:
该类是包级私有,仅供内部路由逻辑使用
必须在独立的线程中调用 streamChat(),避免阻塞主线程
awaitFirstPacket() 会阻塞当前线程,直到首包到达或超时
一旦 commit() 被调用,后续事件会直接转发给下游,不再缓冲
如果首包失败,缓冲的事件不会被转发,路由层应该切换到备选模型LLM API 不是百分之百可靠——会超时、会限流、会无响应。路由层做了三级保障:
优先级路由:按配置文件中的优先级排序遍历候选模型ModelSelector 的排序逻辑是先按是否为 defaultModel 排序(defaultModel 排最前),再按 priority 升序。
实际降级顺序:**qwen3-max(首选)→ qwen-plus(百炼 priority=1)→ Ollama 本地(priority=2)→ GLM-4.7(SiliconFlow priority=4)→ GPT-5.4(AIHubMix priority=5)**。
这里有一个值得注意的点:qwen3-max 和 qwen-plus 都走百炼 API,如果百炼整体故障,前两次尝试都会失败,到第三次 Ollama 才接通。Ollama 本地部署不走外部网络,是百炼的独立降级路径。当前顺序下,百炼故障时会多浪费一次 qwen-plus 的尝试,但由于三态熔断机制,qwen-plus 很快会被熔断跳过,实际影响有限。
三态熔断:连续失败 2 次进入 OPEN 状态,拒绝请求 30 秒。然后转 HALF_OPEN 放一个请求试探,成功就回 CLOSED,失败继续 OPEN。阈值设为 2 而不是更大的数,是因为 LLM API 的故障模式是"要么通要么不通",很少出现间歇性抖动。设 2 次快速触发熔断,减少无效等待。
首包探活:这是专门为流式场景设计的健康检查。HTTP 200 不代表模型真的在工作——连接通了、状态码对了、响应头也到了,但内容迟迟不发,HTTP 层面看不出来ProbeStreamBridge 在流开启后阻塞等待第一个内容块,60 秒超时。首包到了才标记模型健康,首包不到就切下一个候选。
2.8.2 HTTP 层实现
AbstractOpenAIStyleChatClient
OpenAI 兼容协议聊天客户端抽象基类。
用于沉淀基于 OpenAI 风格 HTTP 协议的大模型调用公共逻辑,包括请求体构建、 鉴权头设置、同步响应解析、流式 SSE 读取以及异常封装。不同提供商只需覆写少量钩子方法 即可复用整体调用链路。
核心功能:
模板方法模式:定义同步和流式调用的标准流程,子类只需实现特定钩子方法
请求体构建:自动构建符合 OpenAI 协议的 JSON 请求体
鉴权处理:自动添加 Authorization 头(Bearer Token)
同步调用:支持阻塞式问答,返回完整响应内容
流式调用:支持 SSE 流式响应,实时推送思考内容和回答内容
异常封装:统一将 HTTP 错误、网络异常封装为 ModelClientException
Thinking 支持:支持解析 reasoning_content 字段(如 DeepSeek-R1)
设计模式:
模板方法模式:doChat() 和 doStreamChat() 定义算法骨架,子类可定制部分步骤
钩子方法:isReasoningEnabledForStream()、customizeRequestBody()、requiresApiKey()
策略模式:通过 StreamCallback 接口支持不同的流式处理策略
使用示例:
// 1. 实现具体提供商客户端
public class DeepSeekChatClient extends AbstractOpenAIStyleChatClient {
public DeepSeekChatClient(OkHttpClient syncClient, OkHttpClient streamClient, Executor executor) {
super(syncClient, streamClient, executor);
}
@Override
public String provider() {
return "deepseek";
}
@Override
protected void customizeRequestBody(JsonObject body, ChatRequest request) {
super.customizeRequestBody(body, request);
// 添加 DeepSeek 特有参数
if (request.getTemperature() != null) {
body.addProperty("temperature", request.getTemperature());
}
}
}
// 2. 同步调用
ChatRequest request = ChatRequest.builder()
.messages(List.of(
ChatMessage.system("你是一个助手"),
ChatMessage.user("你好")
))
.build();
ModelTarget target = new ModelTarget("deepseek-chat", List.of("deepseek-chat"));
String response = client.chat(request, target);
// 3. 流式调用
client.streamChat(request, new StreamCallback() {
@Override
public void onThinking(String thinking) {
System.out.print(thinking); // 打印思考过程
}
@Override
public void onContent(String content) {
System.out.print(content); // 打印回答内容
}
@Override
public void onComplete() {
System.out.println("\n完成");
}
@Override
public void onError(Throwable error) {
error.printStackTrace();
}
}, target);
注意事项:
该抽象类不是线程安全的,但每个实例可以被多个线程共享使用
同步和流式调用使用不同的 OkHttpClient 实例,避免连接池冲突
流式调用在独立的线程池中执行,不会阻塞主线程
子类必须实现 provider() 方法返回提供商名称
如果提供商不需要 API Key,可以覆写 requiresApiKey() 返回 falseAbstractOpenAIStyleChatClient 用 OkHttp 异步发起 POST 请求,所有供应商统一走 OpenAI 兼容协议/v1/chat/completions)。请求头设 Accept: text/event-stream 开启流式返回。
响应处理逐行读取:
- 解析 SSE 格式的 data: {...} 行
- 提取 choices[0].delta.content(回复内容)和 delta.reasoning_content(思考内容)
- 收到 finish_reason=stop 或者 [DONE] 标记时回调完成
取消通过一个原子标志位实现——上层调用 cancel 后将标志置为 true,读取循环在每轮迭代时检查该标志,一旦置位就退出循环。
2.8.3 SSE 输出与事件协议
前端通过 EventSource 接收 SSE 事件。服务端 StreamChatEventHandler 将 LLM 的 token 流转为结构化事件:
内容输出做了分块聚合:不是每收到一个 token 就推一次 SSE 事件,而是凑够 5 个字符messageChunkSize 可配置 才推一次。一个中文 token 可能只有 1 个字,如果每个字都推,前端的渲染频率太高,反而卡。
流过程中用 StringBuilder 累积完整内容。正常结束时写入 t_message 表,用户取消时在取消回调里也落库——取消不等于内容就丢了。
2.8.4 跨实例取消
用户取消请求可能落在实例 A,但 LLM 推理跑在实例 B。取消信号需要跨实例传递。
StreamTaskManager 的实现:取消时先在 Redis 设一个取消标记 Key(30 分钟过期),再通过 RTopic 广播。所有实例订阅了取消主题,收到消息后本地 CAS 取消任务。取消执行包括:停止 LLM 的 HTTP 请求、持久化已累积内容、发 CANCEL 和 DONE 事件、关闭 SSE 连接。
幂等保护通过 AtomicBoolean 的 CAS 实现——取消逻辑只执行一次,即使重复收到取消消息也不会重复写数据库或发重复事件。
3. 边界情况与容错
记忆加载失败:返回空历史,后续阶段用纯用户问题继续。降级不中断。
查询重写 LLM 失败:规则拆分兜底。原始问题用标点符号拆分后直接走意图识别。
意图识别 LLM 幻觉:返回了不存在的节点 ID → 白名单校验丢弃,降级到全局检索。
全部通道检索返回空handleEmptyRetrieval() 短路,回复"未检索到相关文档",不浪费 LLM 调用。
LLM 首包超时:切下一个候选模型重试。所有候选失败 → callback.onError() 通知前端。
用户中途关闭浏览器:SSE 断开 → SseEmitter 触发回调 → 取消 LLM 请求 → 已生成内容落库。
模型输出失控:在流过程中检查累积长度,超过阈值强制取消。这个边界在生产上是一个重要的保护——提示词设计不当或模型状态异常可能进入重复生成循环。
4. 全链路追踪
每个阶段都有独立的 trace 节点,通过 @RagTraceNode 注解 + AOP 实现。记录内容:入参、出参、耗时(毫秒)。
完整追踪链路
load-memory → rewrite-query → intent-classify → retrieval-engine → multi-channel-retrieval → deduplication → rerank → llm-stream-routing → llm-first-packet
数据写入 t_rag_trace_run 和 t_rag_trace_node 表,通过 taskId 查询完整调用链。出问题时,先看哪个阶段耗时突增,再针对性排查。