文档入库链路设计

作者:old wang 发布时间: 2025-06-05 阅读量:2 评论数:0

文档入库是将外部文件(PDF、Word、Markdown、网页等)转化为可检索的向量化文本块的完整流程。入口有两个:管理后台上传 和外部来源(S3、HTTP、飞书文档)。

1. 总体架构

创建任务(事务内)
  │
  ├─ 写入任务记录(t_ingestion_task,状态=RUNNING)
  ├─ 组装 IngestionContext(source + rawBytes + mimeType)
  └─ 交给 IngestionEngine 执行
        │
        ├─ 1. FetcherNode    → 从来源获取原始字节
        ├─ 2. ParserNode     → 解析为结构化文本
        ├─ 3. EnhancerNode   → 文档级 LLM 增强(可选)
        ├─ 4. ChunkerNode    → 文本分块 + 向量化
        ├─ 5. EnricherNode   → 块级 LLM 增强(可选)
        └─ 6. IndexerNode    → 写入向量数据库
              │
              ▼
更新任务状态(COMPLETED / FAILED)+ 写入节点日志

整个执行包在一个 @Transactional 事务内,任一节点失败立即停链,状态标记 FAILED,事务回滚。

2. 基础机制

2.1 链式编排引擎

IngestionEngine
 
数据摄取流水线执行引擎。
按节点连线关系驱动 pipeline 链式执行,负责校验拓扑、定位起始节点、执行条件判断、 记录节点日志并在失败时中断后续节点。
核心功能:
拓扑校验:检测环路、缺失节点等配置错误
起始节点定位:自动找到没有被其他节点引用的入口节点
条件评估:根据配置的条件表达式决定是否执行节点
链式执行:按照 nextNodeId 依次执行节点,支持提前终止
日志记录:记录每个节点的执行状态、耗时、输出快照
异常处理:捕获节点执行异常,标记任务为 FAILED 状态

ConditionEvaluator
 
条件评估器组件。
用于根据给定的 IngestionContext 上下文和 JsonNode 格式的条件配置来评估条件是否满足。 支持多种条件表达式类型,包括 SpEL 表达式、逻辑运算符(all/any/not)、字段比较等。
典型的使用场景:
简单布尔值:true 或 false
SpEL 表达式:"ctx.mimeType == 'application/pdf'"
逻辑组合:{"all": [条件1, 条件2]} 表示所有条件都满足
字段比较:{"field": "mimeType", "operator": "eq", "value": "application/pdf"}

IngestionEngine 是管道执行的中央调度器。它与常见的 for 循环遍历不同,采用**基于节点连线的 DAG 链式执行**:

起始节点发现:遍历所有节点的 nextNodeId 引用,没有被任何节点引用的就是起始节点。

环检测:执行前先遍历 nextNodeId 链,用路径集合检测循环引用。发现环直接抛异常,防止执行时无限循环。

链式执行:从起始节点开始,沿 nextNodeId 链逐个执行。每个节点执行前先做条件求值ConditionEvaluator,条件不满足则记录 skip 日志自动跳过。成功后检查 shouldContinue——节点可以返回 false 主动停链(如发现不需要继续处理)。

防死循环:维护已执行节点计数器,超过配置的节点总数直接抛异常。

节点日志:每个节点执行前后记录耗时、成功/失败、错误信息、输出摘要,写入 t_ingestion_task_node 表。

2.2 上下文传递

IngestionContext
 
数据摄取流水线上下文对象。
在节点执行过程中承载原始文件、解析结果、增强结果、分块、日志和异常等共享状态, 是整条 ingestion pipeline 的数据交换载体。各个节点通过读取和修改该上下文来实现数据的传递和处理。
典型的数据流转过程:
FetcherNode:读取 source,获取文档内容并写入 rawBytes
ParserNode:读取 rawBytes,解析后写入 rawText 或 document
EnhancerNode:读取 rawText,增强后写入 enhancedText、keywords、questions
ChunkerNode:读取 rawText 或 enhancedText,分块后写入 chunks
EnricherNode:读取 chunks,为每个分块添加元数据
IndexerNode:读取 chunks,向量化并写入向量数据库

所有节点通过 IngestionContext 共享状态:

- rawBytes → 原始字节(FetcherNode 产出,ParserNode 消费)

- rawText → 解析后的纯文本(ParserNode 产出,ChunkerNode 消费)

- document → 结构化文档对象(ParserNode 产出,包含章节和表格结构)

- enhancedText → LLM 增强后的文本(EnhancerNode 产出,ChunkerNode 优先消费)

- chunks → 分块列表,每块携带向量(ChunkerNode 产出,EnricherNode/IndexerNode 消费)

- metadata → 提取的元数据信息(EnhancerNode 产出,EnricherNode 消费)

- keywords / questions → 文档级关键词和问题(EnhancerNode/EnricherNode 产出)

上下文对象只在内存中,不持久化。事务回滚了它就没了,不会产生外部副作用。

2.3 幂等保护

FetcherNode 的幂等:如果 context.rawBytes 已有数据(比如 upload 接口直接传了字节),跳过获取步骤。这个设计让上传和外部拉取两套入口共用同一套管道。

RocketMQ 消费幂等:异步摄入场景下@IdempotentConsume 注解通过 Redis SET NX + TTL 防止消息重复消费。

3. 节点详解

3.1 FetcherNode —— 获取原始字节

职责:从多种来源获取文档的原始字节流。

采用策略模式,根据 SourceType 路由到不同实现:

来源类型

实现类

说明

FILE

LocalFileFetcher

本地文件系统读取

HTTP/HTTPS

HttpUrlFetcher

HTTP 下载,支持自定义请求头

S3

S3Fetcher

S3 兼容对象存储

FEISHU

FeishuFetcher

飞书文档 API

FetcherNode 在构造时扫描所有 DocumentFetcher 实现,按 supportedType() 建立映射。fetch 完成后将原始字节和推断出的 MIME 类型写入上下文,文件名回填到 DocumentSource

MIME 类型推断通过 MimeTypeDetector.detect() 实现,优先用魔数(文件头特征字节)判断,魔数无法判断时回退到文件名后缀。

幂等设计:如果 context.rawBytes 已有数据(比如 upload 接口直接传了字节),直接跳过。

3.2 ParserNode —— 解析为结构化文本

职责:将原始字节解析为结构化文本,保留章节层级和表格结构。

3.2.1 MIME 类型推断


MimeTypeDetector
 
MIME 类型探测工具,基于 Apache Tika 识别原始文件内容的媒体类型。
该工具通过分析文件的二进制内容和文件名后缀来准确判断文件的真实类型, 而不是仅依赖文件扩展名,可以有效防止文件类型伪装。
典型使用场景:
文档获取后:从 HTTP、S3、飞书等数据源下载文件后检测 MIME 类型
文件上传时:用户上传文件时验证文件类型是否符合要求
路由决策:根据 MIME 类型选择合适的解析器(PDF、Word、HTML等)
支持的常见类型:
application/pdf - PDF 文档
application/vnd.openxmlformats-officedocument.wordprocessingml.document - Word (.docx)
application/msword - Word (.doc)
text/html - HTML 文档
text/plain - 纯文本
image/jpeg, image/png - 图片文件

解析前先确定文件类型。优先使用上下文中已有的 mimeType,没有则通过 MimeTypeDetector 基于文件头魔数和文件名后缀推断。

支持的文件类型:PDF、Word(doc/docx)、Excel(xls/xlsx)、PPT(ppt/pptx)、Markdown、纯文本、图片。

3.2.2 白名单校验

ParserSettings 配置了允许的文件类型规则。文件类型不在白名单中的直接抛异常拒绝,防止垃圾文件进入知识库。配置 ALL 表示允许所有类型。

3.2.3 解析引擎

统一使用 Apache Tika 。Tika 一个库覆盖 PDF/Word/Excel/PPT/HTML 等几十种格式,不需要针对每种格式集成不同的解析器。

每个 MIME 类型可以配置独立的解析规则和选项参数(如 PDF 的 OCR 开关、Excel 的 Sheet 选择),通过 解析器配置对象 匹配。

3.2.4 结构化输出

解析结果不只是纯文本,而是构造为结构化文档对象:

- text:文档纯文本内容

- sections:章节层级结构(标题 → 层级(1-6) → 内容 → 起止偏移量)

- tables:表格结构(标题 → 行数据 → 起止偏移量)

- metadata:文档元数据(作者、创建时间、页数等)

保留结构的目的是:后续分块时可以根据章节边界切分,而不是粗暴地按固定字符数截断。

3.3 EnhancerNode —— 文档级 LLM 增强

PromptTemplateRenderer
 
轻量级提示词模板渲染器,按 `{{key}}` 占位符替换变量。
该工具用于 ingestion 节点里拼装 prompt,不引入完整模板引擎以保持依赖简单。
使用示例:
String template = "你是一个{{role}}助手。请回答以下问题:\n{{question}}";
Map<String, Object> variables = Map.of(
    "role", "知识问答",
    "question", "什么是 RAG?"
);
String result = PromptTemplateRenderer.render(template, variables);
// 输出:你是一个知识问答助手。请回答以下问题:\n什么是 RAG?
特性:
简单占位符:使用 {{key}} 语法,易于阅读和编写
null 安全:null 值会被替换为空字符串
未命中保留:如果模板中有变量在 variables 中不存在,占位符会保留原样,便于排查问题
无外部依赖:不引入 Thymeleaf、Freemarker 等重量级模板引擎
典型使用场景:
IntentClassifierNode:动态生成意图分类的 prompt
QueryRewriteNode:构建查询重写的提示词
其他需要 LLM 交互的节点:任何需要动态拼装 prompt 的场景

职责:在分块之前,对整篇文档做 LLM 增强处理。这是**可选节点**——管道配置可以跳过它。

支持四种增强任务类型:

类型

作用

应用位置

CONTEXT_ENHANCE

LLM 重写原文,补充上下文、消除指代不明

写入 context.enhancedText

KEYWORDS

提取文档级关键词列表

写入 context.keywords

QUESTIONS

自动生成文档能回答的问题列表

写入 context.questions

METADATA

提取作者、日期、主题等结构化元数据

写入 context.metadata

任务可配置多个,按序执行。每个任务可以指定自定义系统提示词和用户提示词模板(通过 PromptTemplateRenderer 做变量替换—{{text}} 替换为文档内容{{mimeType}} 替换为文件类型等)。

上下文增强的价值:原始文档中的指代("如上所述""该功能")脱离了文档上下文后无法理解。LLM 重写后把指代消解掉,"下载 dingtalk.com 的 SDK" 写成 "从钉钉官网 https://dingtalk.com 下载 SDK Resource Kit"——这样每个分块在脱离原文后仍然能独立表达完整语义,检索时命中率显著提升。

CONTEXT_ENHANCE 使用 原始文本 作为输入;其他任务优先使用 增强后的文本(如果已执行了上下文增强),否则用 原始文本

ChunkerNode 的配合:分块时会优先使用 context.enhancedText(如果存在),否则用 rawText。这意味着增强后的文本直接替代原文进入分块流程。

3.4 ChunkerNode —— 文本分块 + 向量化

职责:将文本切分为固定大小的块,并立即为每个块生成向量。这是整个管道中最核心的节点——分块质量直接决定检索质量。

3.4.1 分块策略

通过 文档切分策略工厂 获取配置的分块策略。当前支持两种:

固定大小切分(FIXED_SIZE)

- 默认 chunkSize=512 字符overlapSize=128 字符

- 512 字符约对应 500~800 个 token(中文场景下每字符约 1~2 token),是大多数 Embedding 模型效果最稳定的区间

- 128 重叠(25%)确保关键信息不落在分块边界上被切断

- 智能边界对齐:切分点优先对齐换行符,其次中文句号/感叹号/问号,最后英文句号(且后面必须跟空白,防止把 URL 中的点号误判为句子边界)

- 回退距离不超过 overlap,避免为了对齐导致当前块和上一块几乎完全重叠

结构感知切分(STRUCTURE_AWARE)

- 目标 targetChars=1400,上限 maxChars=1800,下限 minChars=600

- 先将文档扫描为块类型:标题# 开头)、代码围栏(```...```)、原子行(图片/链接)、段落(空行分隔)

- 然后按 min/target/max 预算做装箱——当前块不足 min 时宁可超过 max 也要多吸一个块,最后一块太小则和前一块合并

- 在块边界切分,保证标题、代码块、图片链接不会被截断

管道配置中可按文档类型指定使用哪种策略。

3.4.2 URL 断行修复

PDF 转文本时常见的问题:长 URL 被换行拆开dingtalk.\ncom)。固定大小切分策略在分块前先做文本归一化:

- 识别 http:// / https:// 开头的 URL 片段

- 判断换行后的内容是否属于同一个 URL(如 .com 的域名后缀/路径 的路径片段)

- 防御规则:换行后如果是 "2." 这种列表项开头,绝不合并,避免破坏正常的段落结构

- 同时修复中文词中间的软换行("商\n保通" → "商保通")

3.4.3 向量化

RoutingEmbeddingService

基于模型路由的 embedding 服务默认实现。
它与 ModelSelector、ModelRoutingExecutor 协作,先按配置和健康状态挑选可用模型, 再在 provider 客户端之间执行失败切换。该类本身不持有 provider 级别的路由规则,只负责把 “候选模型 -> 对应客户端 -> 实际 embedding 调用” 串起来。
核心功能:
智能路由:根据配置和健康状态自动选择最优模型
故障转移:主模型失败时自动切换到备选模型
指定模型:支持指定特定模型进行嵌入(不参路由)
批量处理:支持单文本和批量文本嵌入
路由流程:
通过 ModelSelector 获取候选模型列表(按优先级排序)
检查模型健康状态,跳过不健康的模型
通过 resolveClient() 找到对应的 EmbeddingClient
调用 client.embed() 或 client.embedBatch()
如果失败,切换到下一个候选模型
所有候选都失败时抛出 RemoteException
使用场景:
文档入库:批量向量化文档片段后存储到向量数据库
RAG 检索:将用户查询向量化后进行相似度搜索
知识库构建:大规模文本向量化
A/B 测试:对比不同模型的效果
注意事项:
该类是 @Primary 实现,会被优先注入
依赖 Spring 容器中的所有 EmbeddingClient Bean
clientsByProvider Map 在构造函数中初始化,之后不可变
指定 modelId 的方法不参与路由降级,失败时直接抛出异常

分块完成后立即为每个 chunk 调用 Embedding API 生成向量。合在同一个节点的原因:分块结果是向量化的唯一输入,分开两个节点需要在上下文里传大量未向量化的文本,增加内存占用。

向量化通过 ChunkEmbeddingService.embed() 完成,内部走 RoutingEmbeddingService——优先调用 SiliconFlow 的千问 Embedding 8B API(1536 维),失败时降级到本地 Ollama 或 AIHubMix。支持批量调用(batchSize=32)。

生成的向量以 float[] 形式存储在 VectorChunk.embedding 字段中,随上下文传递到后续节点。

3.5 EnricherNode —— 块级 LLM 增强

职责:对每个 chunk 单独做 LLM 增强。和 EnhancerNode 的区别在于工作粒度——Enhancer 是文档级(粗粒度),Enricher 是块级(细粒度)。

支持三种任务类型:

类型

作用

KEYWORDS

为每个 chunk 提取关键词,写入 chunk 的 metadata

SUMMARY

为每个 chunk 生成摘要,写入 chunk 的 metadata

METADATA

为每个 chunk 补充结构化元数据

任务可配置多个,按序执行。每个 chunk 独立调用一次 LLM,每次调用传入 chunk 内容和上下文信息。

性能注意:EnricherNode 逐个 chunk 调 LLM,大文档(几百个 chunk)会很慢。这适合"质量优先"的场景——文档入库频率低但检索精度要求高。高频摄入场景建议关掉或只对关键文档开启。

3.6 IndexerNode —— 写入向量数据库

职责:将处理完的分块写入向量存储,并创建向量索引。

3.6.1 集合管理

VectorStoreAdmin
 
向量空间元数据与索引管理接口。
负责向量空间的创建、存在性检查和兼容性校验,与检索逻辑解耦,提供统一的向量空间管理能力。
核心功能:
幂等创建:确保向量空间存在,不存在则按规格创建,已存在则跳过或校验兼容性
存在性检查:仅判断向量空间是否存在,不执行创建操作
跨引擎统一:定义统一的接口规范,支持 Milvus、pgvector 等多种向量数据库
实现类:
MilvusVectorStoreAdmin - Milvus 向量存储管理实现
PgVectorStoreAdmin - PostgreSQL pgvector 扩展管理实现

通过 VectorStoreAdmin.ensureVectorSpace() 确保向量空间存在:检查对应的集合/表是否已创建,不存在则根据 VectorSpaceSpec(包含空间 ID 和备注)自动创建。pgvector 模式下检查 HNSW 索引是否存在,不存在则创建。

3.6.2 维度校验

从配置rag.default.dimension = 1536 或第一个 chunk 的实际向量维度获取期望维度。写入前逐一校验每个 chunk 的向量维度与期望一致,不一致则抛异常。

3.6.3 元数据构建

每个 chunk 携带丰富的元数据写入向量库:

- chunk_index:块在文档中的序号

- task_id:摄入任务 ID

- pipeline_id:管道 ID

- source_type:文档来源类型(FILE/HTTP/S3/FEISHU)

- source_location:文档来源地址

- 通过配置的 metadataFields 选择性写入自定义元数据

元数据以 JSONB 格式存储在向量表的 metadata 字段中,检索时可以通过 metadata->>'collection_name' 做过滤。

3.6.4 批量写入

pgvector 模式下使用 JdbcTemplate.batchUpdate() 批量写入 t_knowledge_vector 表,一个文档的所有 chunk 在一次 SQL 批处理中完成。Milvus 模式下通过 MilvusVectorStoreService 调用 Milvus 客户端批量插入。

3.6.5 延迟写入

IngestionContext 有一个 skipIndexerWrite 标志。为 true 时,IndexerNode 只做校验和 chunkId/embedding 的填充,不实际写入向量库。由外层调用方在事务中统一完成向量写入。这个设计用于需要将向量写入和数据库元数据写入放在同一个事务中的场景。

3.6.6 内容长度保护

chunk 内容超过 65535 字符时自动截断,防止超出数据库字段长度限制。

4. 管道配置

管道通过 PipelineDefinitionNodeConfig 配置,存储在 t_ingestion_pipelinet_ingestion_pipeline_node 表中。可以自定义节点顺序和连线,例如:

- 标准流程:Fetcher → Parser → Enhancer → Chunker → Enricher → Indexer

- 简化流程(跳过增强):Fetcher → Parser → Chunker → Indexer

- 仅解析和分块(不做索引):Fetcher → Parser → Chunker(最后一个节点的 nextNodeId 为空)

每个节点可配置的参数:

- nodeType:节点类型(FETCHER/PARSER/ENHANCER/CHUNKER/ENRICHER/INDEXER)

- nextNodeId:下一个节点的 ID,为空表示管道终点

- condition:执行前条件求值(如"文档小于 100 字时跳过 EnhancerNode")

- settings:节点特有的配置参数(chunkSize、增强任务列表、元数据字段等)

5. 向量存储抽象

通过 VectorStoreServiceVectorStoreAdmin 两个接口抽象向量后端的差异:

VectorStoreService 数据的增删改——批量索引、单块更新、按文档/块删除

VectorStoreAdmin 元数据管理——确保向量空间存在、检查存在性

当前实现:

- pgvectorrag.vector.type=pg:主方案。向量和元数据在同一张 PostgreSQL 表中,天然事务保证。通过 @ConditionalOnProperty 条件装配

- Milvusrag.vector.type=milvus:可选方案。独立的向量数据库,支持分布式、索引分片。适合千万级以上向量规模

切换只需改一行配置,上层业务代码完全不受影响。

6. 模型调用与熔断

EnhancerNode、EnricherNode 的 LLM 调用和 ChunkerNode 的 Embedding 调用都走模型路由层,享有同样的熔断降级保障:

- 优先级路由:每个模型类型配置多个候选,按优先级排序

- 三态熔断:连续失败 2 次进入 OPEN 状态 30 秒,然后 HALF_OPEN 试探

- 故障转移:当前模型失败自动切下一个候选

这意味着即使 LLM API 临时不可用,由于模型路由层有故障转移,单次 API 调用失败会自动切到备选模型,管道层面不一定失败。但如果所有候选模型都不可用,节点会抛出异常,管道终止并触发事务回滚——因为增强节点失败意味着后续的 ChunkerNode 只能拿到原始文本而非增强文本,管道设计上选择"快速失败"而不是"静默降级",让运维感知到异常并决定是重试还是跳过增强节点重新配置管道。

7. 任务状态与日志

7.1 任务状态流转

RUNNING(创建即执行) → COMPLETED(成功) / FAILED(失败)

任务记录包含:管道 ID、文档来源类型和地址、分块数量、开始时间、完成时间、错误信息、创建人。

7.2 节点日志

每个节点的执行日志写入 t_ingestion_task_node,包含:

- nodeId:节点配置 ID

- nodeType:节点类型

- nodeOrder:节点在管道中的执行序号

- status:success / failed / skipped

- durationMs:执行耗时(毫秒)

- message:执行结果摘要

- errorMessage:失败时的错误信息

- outputJson:节点输出摘要(超过 1MB 自动截断)

通过节点日志可以精确定位摄入失败的原因——是 Fetcher 下载超时、Parser 解析失败、还是 Indexer 向量写入报错。

节点执行顺序通过 构建节点顺序映射 计算:从管道配置的起始节点开始沿 nextNodeId 链遍历,遇到环或查无此节点时终止。

8. 边界情况与容错

Fetcher 下载超时:抛出 IOException,管道终止,状态 FAILED。重试由上层调用方控制。

Parser 遇到不支持的格式:MIME 白名单校验在解析前拦截,直接抛异常。支持的文件格式列表可配置。

Enhancer/Enricher 的 LLM 调用失败:节点抛异常,管道终止。不会静默跳过——因为增强结果直接影响分块质量,失败应该让运维感知到,而不是悄悄降级。

Embedding API 调用失败ChunkEmbeddingService 内部走模型路由的故障转移,会尝试下一个候选模型。所有候选都失败才抛异常。

内容过长:IndexerNode 写入前自动截断超过 65535 字符的内容,防止数据库字段溢出。

输出 JSON 过大:节点输出超过 1MB 时自动截断并附加提示信息...[输出过大,已截断,原始大小: xxx 字节]),防止数据库 max_allowed_packet 限制导致写入失败。

事务回滚与任务恢复:管道执行失败后,事务回滚,任务记录和节点日志均随回滚清除,数据库中不残留任何记录。增加自动重试机制,失败任务由运维重新提交。RocketMQ 的延迟消息和消费重试能力已具备,作为后续异步摄入的自动重试基础。

评论