文档入库不是上传完文件就结束,后面还有解析、分块、Embedding、向量写入、状态更新这些步骤。这里面解析可能失败,Embedding 可能超时,向量库可能写入失败,用户也可能在任务执行中删除文档或取消任务。所以我不会把它做成一个同步接口,而是设计成 上传接口 + 任务状态机 + MQ 异步消费 + 节点日志 + 幂等恢复。
我会这样拆:
用户上传文档后,接口只做三件事:
1.保存原始文件、创建文档记录、创建入库任务,然后发送 MQ 消息。
2.真正的解析、分块、Embedding、向量写入由消费者异步执行。
3.每个节点执行前都检查任务状态,执行后记录节点日志。如果某个节点失败,就把任务停在对应失败状态,并记录失败原因,后续可以从失败节点重试,而不是整篇文档从头开始。
实际使用中容易出现的问题是:如果只用一个 PROCESSING / SUCCESS / FAILED 大状态,失败后不知道卡在哪里;如果没有幂等,MQ 重复消费会重复写 chunk 或重复写向量;如果没有取消逻辑,用户删除知识库后,后台任务可能还在继续写向量。
排查时,看任务表当前状态,再看节点日志定位失败节点,最后结合 MQ 消费日志、文档表、chunk 表和向量库状态判断是解析失败、分块失败、Embedding 失败还是索引写入失败。
优化上,可以从状态机细化、节点日志、幂等键、失败重试、死信队列、取消检查、向量写入补偿几个方向处理。
这里的取舍是:状态机和节点日志会增加表设计和开发复杂度,但能换来失败可恢复和问题可追踪。所以我不会把文档入库做成一个同步大方法,而是拆成异步任务系统。
1. 核心表设计
1)文档表:knowledge_document
文档表记录用户上传的文档本身。
CREATE TABLE knowledge_document (
id BIGINT PRIMARY KEY,
knowledge_base_id BIGINT NOT NULL,
file_name VARCHAR(255) NOT NULL,
file_type VARCHAR(50),
file_size BIGINT,
file_hash VARCHAR(128),
storage_url VARCHAR(512),
version INT NOT NULL DEFAULT 1,
status VARCHAR(50) NOT NULL,
created_by BIGINT,
created_at TIMESTAMP,
updated_at TIMESTAMP,
deleted BOOLEAN DEFAULT FALSE
);
关键字段说明:
knowledge_base_id:属于哪个知识库
file_hash:用于判断重复上传
version:文档版本,重建索引时很重要
status:文档状态,比如 UPLOADED、INDEXING、AVAILABLE、FAILED、DELETED
storage_url:原始文件存储位置
常见索引:
CREATE INDEX idx_doc_kb_status ON knowledge_document(knowledge_base_id, status);
CREATE UNIQUE INDEX uk_doc_kb_hash_version ON knowledge_document(knowledge_base_id, file_hash, version);
2)入库任务表:document_ingestion_task
任务表记录一次文档入库流程。
CREATE TABLE document_ingestion_task (
id BIGINT PRIMARY KEY,
task_id VARCHAR(64) NOT NULL UNIQUE,
document_id BIGINT NOT NULL,
knowledge_base_id BIGINT NOT NULL,
version INT NOT NULL,
status VARCHAR(50) NOT NULL,
current_node VARCHAR(50),
retry_count INT DEFAULT 0,
max_retry_count INT DEFAULT 3,
error_code VARCHAR(100),
error_message TEXT,
cancel_reason VARCHAR(255),
created_by BIGINT,
created_at TIMESTAMP,
updated_at TIMESTAMP,
started_at TIMESTAMP,
finished_at TIMESTAMP
);
关键字段说明:
task_id:任务唯一 ID
document_id:对应文档
version:对应文档版本
status:任务状态
current_node:当前执行到哪个节点
retry_count:当前重试次数
error_message:失败原因
常见索引:
CREATE INDEX idx_task_doc_version ON document_ingestion_task(document_id, version);
CREATE INDEX idx_task_status_updated ON document_ingestion_task(status, updated_at);
CREATE INDEX idx_task_kb_status ON document_ingestion_task(knowledge_base_id, status);
3)分片表:document_chunk
分片表记录解析和切分后的 chunk。
CREATE TABLE document_chunk (
id BIGINT PRIMARY KEY,
chunk_id VARCHAR(64) NOT NULL UNIQUE,
document_id BIGINT NOT NULL,
knowledge_base_id BIGINT NOT NULL,
version INT NOT NULL,
chunk_index INT NOT NULL,
chunk_hash VARCHAR(128) NOT NULL,
title_path VARCHAR(512),
content TEXT NOT NULL,
token_count INT,
embedding_status VARCHAR(50),
vector_status VARCHAR(50),
created_at TIMESTAMP,
updated_at TIMESTAMP
);
关键唯一约束:
CREATE UNIQUE INDEX uk_chunk_doc_version_index
ON document_chunk(document_id, version, chunk_index);
CREATE UNIQUE INDEX uk_chunk_doc_version_hash
ON document_chunk(document_id, version, chunk_hash);
这里的幂等关键点是:
documentId + version + chunkIndex
documentId + version + chunkHash
这样 MQ 重复消费或者任务重试时,不会重复写入相同分片。
4)节点日志表:document_ingestion_node_log
节点日志记录每一步执行情况。
CREATE TABLE document_ingestion_node_log (
id BIGINT PRIMARY KEY,
task_id VARCHAR(64) NOT NULL,
document_id BIGINT NOT NULL,
node_name VARCHAR(50) NOT NULL,
status VARCHAR(50) NOT NULL,
input_summary TEXT,
output_summary TEXT,
error_message TEXT,
retry_count INT DEFAULT 0,
started_at TIMESTAMP,
finished_at TIMESTAMP,
duration_ms BIGINT
);
节点类型可以是:
FETCHER
PARSER
CHUNKER
EMBEDDING
INDEXER
CLEANUP
常见索引:
CREATE INDEX idx_node_task ON document_ingestion_node_log(task_id);
CREATE INDEX idx_node_doc ON document_ingestion_node_log(document_id);
CREATE INDEX idx_node_status ON document_ingestion_node_log(status);
节点日志的价值是:失败时能知道具体失败在哪一步,不是只看到一个 FAILED。
2. 状态机设计
我会把状态分成 任务状态 和 节点状态。
任务状态
INIT
PENDING
PROCESSING
SUCCESS
FAILED
CANCELING
CANCELED
DEAD
含义:
INIT:任务刚创建
PENDING:等待 MQ 消费
PROCESSING:处理中
SUCCESS:全部入库成功
FAILED:可重试失败
CANCELING:正在取消
CANCELED:已取消
DEAD:超过最大重试次数,进入人工处理
节点状态
WAITING
RUNNING
SUCCESS
FAILED
SKIPPED
CANCELED
3. 状态流转
正常流程:
INIT
-> PENDING
-> PROCESSING
-> PARSE_SUCCESS
-> CHUNK_SUCCESS
-> EMBEDDING_SUCCESS
-> INDEX_SUCCESS
-> SUCCESS
更工程化一点,可以把节点状态写到节点日志里,任务表只保留总状态和当前节点:
PENDING
-> PROCESSING(current_node=PARSER)
-> PROCESSING(current_node=CHUNKER)
-> PROCESSING(current_node=EMBEDDING)
-> PROCESSING(current_node=INDEXER)
-> SUCCESS
失败流程:
PROCESSING
-> FAILED
-> PENDING
-> PROCESSING
超过最大重试:
FAILED
-> DEAD
取消流程:
PENDING / PROCESSING
-> CANCELING
-> CANCELED
4. MQ Topic / Tag 设计
我会按任务粒度和节点粒度选择。
简单版本:一个 Topic,一个 Tag
Topic: RAG_DOCUMENT_INGESTION
Tag: INGEST_DOCUMENT
消息体:
{
"taskId": "task_001",
"documentId": 1001,
"knowledgeBaseId": 10,
"version": 1,
"operator": 123,
"traceId": "trace_xxx"
}
消费者拿到消息后,按状态机顺序执行 Parser、Chunker、Embedding、Indexer。
更细版本:按节点拆 Tag
Topic: RAG_DOCUMENT_INGESTION
Tag:
PARSE_DOCUMENT
CHUNK_DOCUMENT
EMBEDDING_CHUNK
INDEX_VECTOR
CLEANUP_DOCUMENT
优点是每个节点可以单独扩容和重试;缺点是任务编排更复杂。
第一版我会用一个文档入库 Topic,消费者内部按 Pipeline 执行节点。等到文档量变大、Embedding 或向量写入成为瓶颈后,再把 Embedding 和 Indexer 拆成单独 Tag 或单独消费者组。
5. 幂等键怎么设计?
幂等要覆盖三层。
1)任务幂等
documentId + version
同一个文档版本只允许有一个有效入库任务。
CREATE UNIQUE INDEX uk_task_doc_version
ON document_ingestion_task(document_id, version);
2)chunk 幂等
documentId + version + chunkIndex
documentId + version + chunkHash
避免重复消费导致重复分片。
3)向量幂等
向量 ID 必须稳定,不能每次随机生成:
vectorId = documentId + "_" + version + "_" + chunkIndex
或者:
vectorId = chunkId
这样重复写入时可以 upsert,也可以先查后写,不会生成多份向量。
6. 消费幂等怎么做?
消费者执行前先做状态判断:
如果任务是 SUCCESS:直接跳过
如果任务是 CANCELED:直接跳过
如果知识库已删除:标记 CANCELED
如果任务是 PROCESSING 但更新时间很新:说明可能有其他消费者在处理,跳过
如果任务是 FAILED 且允许重试:继续执行
更新状态时用条件更新:
UPDATE document_ingestion_task
SET status = 'PROCESSING',
current_node = 'PARSER',
started_at = now()
WHERE task_id = ?
AND status IN ('PENDING', 'FAILED');
只有更新成功的消费者才有执行权。
7. 重试策略
我会区分 临时失败 和 永久失败。
临时失败:可以重试
Embedding 服务超时
向量库连接失败
数据库短暂异常
网络抖动
模型限流
处理方式:
retry_count + 1
状态改为 FAILED
记录失败节点和原因
重新投递 MQ 或等待定时任务扫描重试
永久失败:不应该反复重试
文件损坏
文件格式不支持
文件不存在
知识库已删除
用户无权限
处理方式:
状态改为 FAILED 或 CANCELED
记录明确失败原因
不再自动重试
最大重试次数可以配置,例如:
maxRetryCount = 3
超过后:
状态改为 DEAD
进入死信处理或人工处理
8. 如果 Embedding 成功但向量写入失败怎么办?
我的处理方式是:状态停在向量写入失败节点,重试时从 Indexer 节点恢复。
前提是 chunk 已经落库,Embedding 结果或者可重算文本还在。
如果 embedding 向量已经持久化:
直接读取 embedding 结果,重新写向量库
如果 embedding 没有持久化:
读取 chunk 文本,重新计算 embedding,再写向量库
关键是不能把任务直接标记成功,也不能每次都从解析开始。
节点日志记录:
PARSER SUCCESS
CHUNKER SUCCESS
EMBEDDING SUCCESS
INDEXER FAILED
下次恢复时:
从 INDEXER 开始
9. 任务取消逻辑
任务取消主要发生在:
用户取消入库
用户删除文档
用户删除知识库
管理员下架知识库
取消流程:
第一,把任务状态改为 CANCELING。
UPDATE document_ingestion_task
SET status = 'CANCELING',
cancel_reason = 'USER_DELETE_DOCUMENT'
WHERE document_id = ?
AND status IN ('PENDING', 'PROCESSING', 'FAILED');
第二,消费者每个节点执行前检查任务状态。
如果 status = CANCELING / CANCELED,则停止执行
第三,清理已经产生的数据。
删除 chunk
删除向量
删除临时文件
更新文档状态
第四,状态改为 CANCELED。
CANCELING -> CANCELED
注意:已经在执行中的 Embedding 请求可能无法立刻取消,但执行完后不能继续写向量,必须再次检查任务状态。
10. 用户删除知识库时,还有任务在跑怎么办?
我会这样处理:
第一,知识库先标记为 DELETING,不直接物理删除。
第二,禁止新的入库任务进入。
第三,把该知识库下所有未完成任务标记为 CANCELING。
第四,消费者执行前检查 knowledgeBase 状态。
如果 knowledgeBase = DELETING / DELETED
直接停止任务
第五,异步清理该知识库下的:
document
chunk
vector
task
node_log
原始文件
缓存
第六,清理完成后标记知识库为 DELETED。
这里的重点是:删除是一个状态流转,不是简单 delete 一行数据。
11. 查询接口怎么设计?
1)查询文档入库任务状态
GET /api/knowledge/documents/{documentId}/ingestion-task
返回:
{
"taskId": "task_001",
"documentId": 1001,
"status": "PROCESSING",
"currentNode": "EMBEDDING",
"progress": 65,
"retryCount": 1,
"errorMessage": null
}2)查询节点日志
GET /api/knowledge/ingestion-tasks/{taskId}/nodes
返回:
[
{
"nodeName": "PARSER",
"status": "SUCCESS",
"durationMs": 1200,
"outputSummary": "pages=10, chars=23000"
},
{
"nodeName": "CHUNKER",
"status": "SUCCESS",
"durationMs": 800,
"outputSummary": "chunks=45"
},
{
"nodeName": "EMBEDDING",
"status": "RUNNING",
"durationMs": 5000
}
]
3)手动重试任务
POST /api/knowledge/ingestion-tasks/{taskId}/retry
限制:
只有 FAILED / DEAD 状态可以重试
知识库不能是 DELETING / DELETED
文档不能是 DELETED
4)取消任务
POST /api/knowledge/ingestion-tasks/{taskId}/cancel
5)查询知识库下任务列表
GET /api/knowledge-bases/{kbId}/ingestion-tasks?status=FAILED&page=1&pageSize=20
12. 流程图
用户上传文档
|
v
保存原始文件
|
v
写 document 表
|
v
写 ingestion_task 表:INIT / PENDING
|
v
发送 MQ:RAG_DOCUMENT_INGESTION
|
v
Consumer 消费
|
v
检查任务状态 / 知识库状态
|
v
Parser 解析文档
|
v
写节点日志:PARSER SUCCESS
|
v
Chunker 分块
|
v
写 chunk 表,唯一键保证幂等
|
v
Embedding 生成向量
|
v
Indexer 写向量库
|
v
更新任务 SUCCESS
失败分支:
任意节点失败
|
v
写节点日志 FAILED
|
v
更新 task FAILED + errorMessage
|
v
retryCount < maxRetry ? 重新投递 MQ : DEAD
取消分支:
用户删除文档 / 知识库
|
v
任务状态 CANCELING
|
v
消费者执行前检查到取消
|
v
停止执行
|
v
清理 chunk / vector / 临时文件
|
v
任务状态 CANCELED