RocketMQ 在项目中用于两个场景:文档分块异步处理和用户反馈异步持久化。
选型上,事务消息是核心刚需——文档分块需要保证"数据库写 + 消息发"的原子性,Kafka 的事务消息面向流处理场景不太匹配,RabbitMQ 没有原生事务消息需要自己实现可靠投递。延迟消息能力作为后续摄入失败自动重试的基础。
使用 RocketMQ 5.2.0,通过 Spring Boot Starter 集成。配置简洁:NameServer 地址、生产者组、发送超时三个参数即可启动。
1. 基础能力封装
1.1 统一的生产者接口
MessageQueueProducer 定义了两种消息发送模式:
package rag.framework.mq.producer;
import org.apache.rocketmq.client.producer.SendResult;
import java.util.function.Consumer;
/**
* 消息队列生产者接口。
*
* <p>用于统一封装业务系统向消息队列发送消息的入口,屏蔽底层消息中间件实现差异,
* 当前主要提供普通消息发送与事务消息发送两类能力。</p>
*
* <p>使用约束:</p>
* <ul>
* <li>topic 应与消费端约定一致,避免因路由错误导致消息不可达。</li>
* <li>keys 建议使用订单号、任务 ID 等业务唯一标识,便于链路追踪和幂等控制。</li>
* <li>body 为业务消息体,建议使用可序列化且语义明确的数据对象。</li>
* </ul>
*
* @author Wang
*/
public interface MessageQueueProducer {
/**
* 同步发送普通消息。
*
* <p>适用于不依赖本地事务一致性的消息投递场景,例如事件通知、异步触发、日志采集等。
* 调用成功仅表示消息已完成生产端发送,不代表消费端已经处理完成。</p>
*
* @param topic 目标主题
* @param keys 消息业务键,建议传入具备唯一性的业务标识
* @param bizDesc 业务描述,用于日志打印和问题排查
* @param body 消息体对象
* @return 发送结果,包含消息 ID、发送状态等信息
*/
SendResult send(String topic, String keys, String bizDesc, Object body);
/**
* 发送事务消息。
*
* <p>适用于本地事务与消息投递需要保持最终一致性的场景。典型流程为:先发送半消息,
* 再执行本地事务,最后根据本地事务执行结果提交或回滚消息。</p>
*
* <p>事务回查逻辑由按 topic 注册的 {@link TransactionChecker} 负责,调用前需通过
* {@link DelegatingTransactionListener#registerChecker(String, TransactionChecker)}
* 完成回查器注册。</p>
*
* @param topic 目标主题
* @param keys 消息业务键,建议传入具备唯一性的业务标识
* @param bizDesc 业务描述,用于日志打印和问题排查
* @param body 消息体对象
* @param localTransaction 本地事务执行逻辑,在半消息发送成功后触发;执行异常时应回滚消息
*/
void sendInTransaction(String topic, String keys, String bizDesc, Object body,
Consumer<Object> localTransaction);
}
- send(topic, keys, bizDesc, body) —— 普通消息,同步发送,返回发送结果
- sendInTransaction(topic, keys, bizDesc, body, localTransaction) —— 事务消息,发送半消息后执行本地事务,根据结果提交或回滚
接口不暴露 RocketMQ 的原生 API,业务方只关心主题、业务标识和消息体。这个抽象层的价值在于:如果未来切换消息队列(比如迁移到 Pulsar),只需替换实现类,业务代码不受影响。
1.2 RocketMQProducerAdapter
package rag.framework.mq.producer;
import cn.hutool.core.util.StrUtil;
import rag.framework.mq.MessageWrapper;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.TransactionSendResult;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import java.util.UUID;
import java.util.function.Consumer;
/**
* 基于 RocketMQ 的消息生产者适配器。
*
* <p>实现 {@link MessageQueueProducer},负责将业务消息统一封装为
* {@link MessageWrapper} 后发送到 RocketMQ,并对普通消息与事务消息提供统一入口。</p>
*
* <p>实现说明:</p>
* <ul>
* <li>普通消息通过同步发送方式返回发送结果,便于调用方感知投递状态。</li>
* <li>事务消息通过事务监听器协调本地事务执行和回查处理。</li>
* <li>当调用方未显式传入业务键时,自动生成默认唯一标识。</li>
* </ul>
*
* @author Wang
*/
@Slf4j
@RequiredArgsConstructor
public class RocketMqProducerAdapter implements MessageQueueProducer {
/**
* RocketMQ 操作模板。
*/
private final RocketMQTemplate rocketMQTemplate;
/**
* 事务消息监听器。
*/
private final DelegatingTransactionListener transactionListener;
/**
* 同步发送普通消息。
*
* <p>该方法会先构造统一消息包装体,再通过 RocketMQ 同步发送到指定 topic。
* 如果调用方未传入业务键,则自动生成默认唯一标识。</p>
*
* @param topic 目标主题
* @param keys 业务键,为空时自动生成默认值
* @param bizDesc 业务描述,用于日志打印
* @param body 业务载荷
* @return 发送结果,包含消息 ID、发送状态和队列信息
* @throws Throwable 消息发送异常
* @see MessageQueueProducer#send(String, String, String, Object)
*/
@Override
public SendResult send(String topic, String keys, String bizDesc, Object body) {
// 如果 keys 为空,生成 UUID 作为默认值
keys = StrUtil.isEmpty(keys) ? UUID.randomUUID().toString() : keys;
// 构建消息:包装业务数据并设置 keys
Message<MessageWrapper<Object>> message = MessageBuilder
.withPayload(MessageWrapper.builder().keys(keys).body(body).build())
.setHeader(MessageConst.PROPERTY_KEYS, keys)
.build();
SendResult sendResult;
try {
// 同步发送消息到 Broker
sendResult = rocketMQTemplate.syncSend(topic, message);
} catch (Throwable ex) {
log.error("[生产者] {} - 消息发送失败,topic: {}, keys: {}", bizDesc, topic, keys, ex);
throw ex;
}
log.info("[生产者] {} - 发送结果: {}, 消息ID: {}, Keys: {}", bizDesc, sendResult.getSendStatus(), sendResult.getMsgId(), keys);
return sendResult;
}
/**
* 发送事务消息。
*
* <p>该方法会先注册本地事务逻辑,再发送 RocketMQ 半消息,随后由
* {@link DelegatingTransactionListener} 完成本地事务执行与回查处理。</p>
*
* @param topic 目标主题
* @param keys 业务键,为空时自动生成默认值
* @param bizDesc 业务描述,用于日志打印
* @param body 业务载荷
* @param localTransaction 本地事务逻辑
* @throws Throwable 半消息发送异常
* @see MessageQueueProducer#sendInTransaction(String, String, String, Object, Consumer)
* @see DelegatingTransactionListener#executeLocalTransaction(org.springframework.messaging.Message, Object)
*/
@Override
public void sendInTransaction(String topic, String keys, String bizDesc, Object body,
Consumer<Object> localTransaction) {
// 如果 keys 为空,生成 UUID 作为默认值
keys = StrUtil.isEmpty(keys) ? UUID.randomUUID().toString() : keys;
// 生成事务 ID,用于关联本地事务逻辑
String txId = UUID.randomUUID().toString();
// 注册本地事务逻辑到监听器
transactionListener.registerLocalTransaction(txId, localTransaction);
// 构建事务消息:包装业务数据并设置 keys、txId、topic
Message<MessageWrapper<Object>> message = MessageBuilder
.withPayload(MessageWrapper.builder().keys(keys).body(body).build())
.setHeader(MessageConst.PROPERTY_KEYS, keys)
.setHeader(DelegatingTransactionListener.HEADER_TX_ID, txId)
.setHeader(DelegatingTransactionListener.HEADER_TOPIC, topic)
.build();
TransactionSendResult sendResult;
try {
// 发送 Half 消息到 Broker(第一阶段)
sendResult = rocketMQTemplate.sendMessageInTransaction(topic, message, null);
} catch (Throwable ex) {
log.error("[生产者] {} - 事务消息发送失败,topic: {}, keys: {}", bizDesc, topic, keys, ex);
throw ex;
}
log.info("[生产者] {} - 事务消息发送结果: {}, 本地事务状态: {}, 消息ID: {}, Keys: {}",
bizDesc, sendResult.getSendStatus(), sendResult.getLocalTransactionState(), sendResult.getMsgId(), keys);
}
}RocketMQProducerAdapter是 MessageQueueProducer 的 RocketMQ 实现,依赖 RocketMQTemplate(Spring Boot Starter 提供)。
普通消息发送:
Message<MessageWrapper<Object>> message = MessageBuilder
.withPayload(MessageWrapper.builder().keys(keys).body(body).build())
.setHeader(MessageConst.PROPERTY_KEYS, keys)
.build();
SendResult sendResult = rocketMQTemplate.syncSend(topic, message);消息体包装在 MessageWrapper 中,携带 keys(业务标识,用于幂等判断和消息追踪)和 body(业务载荷)。发送结果包含 sendStatusmsgId,全部记日志方便排查。
如果 keys 为空,自动生成 UUID 作为消息标识,保证每条消息可追溯。
事务消息发送:
// 1. 注册本地事务逻辑(仅在当前实例有效)
transactionListener.registerLocalTransaction(txId, localTransaction);
// 2. 构建消息,注入事务 ID 到消息头
Message<MessageWrapper<Object>> message = MessageBuilder
.withPayload(...)
.setHeader(HEADER_TX_ID, txId) // 事务 ID,用于 executeLocalTransaction 找回本地逻辑
.setHeader(HEADER_TOPIC, topic) // 主题名,用于 checkLocalTransaction 找回回查器
.build();
// 3. 发送事务消息(半消息 → 执行本地事务 → commit/rollback)
TransactionSendResult sendResult = rocketMQTemplate.sendMessageInTransaction(topic, message, null);1.3 DelegatingTransactionListener —— 事务消息的核心
package rag.framework.mq.producer;
import rag.framework.mq.MessageWrapper;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.support.TransactionTemplate;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.function.Consumer;
/**
* 通用 RocketMQ 事务消息监听器。
*
* <p>用于承接 RocketMQ 事务消息的本地事务执行与事务状态回查逻辑,统一协调消息发送端
* 与业务本地事务之间的最终一致性处理。</p>
*
* <p>职责说明:</p>
* <ul>
* <li>根据事务上下文标识定位并执行对应的本地事务逻辑。</li>
* <li>根据 topic 定位对应的事务回查器,返回事务最终状态。</li>
* <li>在事务执行完成后及时清理进程内临时上下文,避免无界增长。</li>
* </ul>
*
* <p>设计约束:</p>
* <ul>
* <li>本地事务映射仅在当前实例内有效,不适用于跨实例共享状态。</li>
* <li>事务回查可能落到任意实例,因此回查结果必须依赖持久化数据来源。</li>
* <li>topic 与回查器的注册关系应在应用启动阶段完成初始化。</li>
* </ul>
*
* @author Wang
*/
@Slf4j
@RocketMQTransactionListener
public class DelegatingTransactionListener implements RocketMQLocalTransactionListener {
/**
* 事务上下文标识消息头。
*
* <p>用于在执行本地事务时关联发送阶段注册的事务执行逻辑。</p>
*/
static final String HEADER_TX_ID = "TRANSACTION_CONTEXT_ID";
/**
* 事务主题消息头。
*
* <p>用于在事务回查阶段定位对应的 {@link TransactionChecker}。</p>
*/
static final String HEADER_TOPIC = "TRANSACTION_TOPIC";
/**
* 本地事务执行逻辑映射表。
*
* <p>Key 为事务上下文标识,Value 为待执行的本地事务逻辑。该映射仅在当前应用实例内生效,
* 事务执行完成后会立即移除对应项。</p>
*/
private final ConcurrentMap<String, Consumer<Object>> localTransactionMap = new ConcurrentHashMap<>();
/**
* 事务回查器映射表。
*
* <p>Key 为 topic,Value 为对应的事务回查器。回查时 Broker 可能将请求路由到任意实例,
* 因此回查器实现必须能够基于持久化状态独立判断事务结果。</p>
*/
private final ConcurrentMap<String, TransactionChecker> checkerMap = new ConcurrentHashMap<>();
@Autowired
private PlatformTransactionManager transactionManager;
/**
* 注册本地事务执行逻辑。
*
* <p>在发送事务消息前调用,将事务上下文标识与业务本地事务逻辑建立关联。
* 每个事务上下文标识仅应使用一次。</p>
*
* @param txId 事务上下文标识
* @param localTransaction 本地事务执行逻辑
* @see #executeLocalTransaction(Message, Object)
*/
public void registerLocalTransaction(String txId, Consumer<Object> localTransaction) {
localTransactionMap.put(txId, localTransaction);
}
/**
* 注册事务回查器。
*
* <p>用于为指定 topic 绑定对应的事务回查实现。通常在应用启动阶段完成注册。</p>
*
* @param topic 主题名称
* @param checker 事务回查器
* @see #checkLocalTransaction(Message)
*/
public void registerChecker(String topic, TransactionChecker checker) {
checkerMap.put(topic, checker);
}
/**
* 执行业务本地事务。
*
* <p>该方法由 RocketMQ 在半消息发送成功后回调。方法内部会提取事务上下文标识,
* 获取对应的本地事务逻辑,并在 Spring 事务模板中执行。</p>
*
* @param message 半消息对象,包含事务上下文等消息头信息
* @param arg 业务参数
* @return 返回本地事务执行结果对应的 RocketMQ 事务状态
*/
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object arg) {
String txId = (String) message.getHeaders().get(HEADER_TX_ID);
// 获取并移除本地事务逻辑(确保只执行一次)
Consumer<Object> localTransaction = txId != null ? localTransactionMap.remove(txId) : null;
if (localTransaction == null) {
log.error("[事务消息] 未找到本地事务逻辑, txId={}", txId);
return RocketMQLocalTransactionState.ROLLBACK;
}
try {
// 在 Spring 事务中执行本地事务
new TransactionTemplate(transactionManager).executeWithoutResult(status -> localTransaction.accept(arg));
return RocketMQLocalTransactionState.COMMIT;
} catch (Exception e) {
log.error("[事务消息] 本地事务执行失败, txId={}", txId, e);
return RocketMQLocalTransactionState.ROLLBACK;
}
}
/**
* 回查本地事务状态。
*
* <p>当 Broker 未收到事务确认结果时,会回调该方法重新确认本地事务最终状态。
* 方法内部会根据消息头中的 topic 查找对应回查器,并依据业务持久化状态返回结果。</p>
*
* @param message 半消息对象,包含主题和业务载荷信息
* @return {@code COMMIT} 表示提交消息,{@code ROLLBACK} 表示回滚消息,
* {@code UNKNOWN} 表示当前无法确认,等待后续继续回查
*/
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
String topic = (String) message.getHeaders().get(HEADER_TOPIC);
TransactionChecker checker = topic != null ? checkerMap.get(topic) : null;
if (checker == null) {
log.warn("[事务消息] 回查时未找到 topic={} 对应的 checker, 默认 ROLLBACK", topic);
return RocketMQLocalTransactionState.ROLLBACK;
}
try {
MessageWrapper<?> wrapper = (MessageWrapper<?>) message.getPayload();
boolean committed = checker.check(wrapper);
RocketMQLocalTransactionState state = committed
? RocketMQLocalTransactionState.COMMIT
: RocketMQLocalTransactionState.ROLLBACK;
log.info("[事务消息] 回查结果: topic={}, state={}", topic, state);
return state;
} catch (Exception e) {
log.error("[事务消息] 回查异常, topic={}", topic, e);
return RocketMQLocalTransactionState.UNKNOWN;
}
}
}DelegatingTransactionListener是事务消息的两阶段处理器,通过 @RocketMQTransactionListener 注册为全局监听器。
两阶段处理:
阶段一——executeLocalTransaction:半消息发送成功后,RocketMQ 回调此方法。从消息头取出 HEADER_TX_ID,在 localTransactionMap 中查找对应的本地事务逻辑并执行。执行过程包装在 Spring 的 TransactionTemplate 中,保证本地事务的数据库操作与消息状态一致:
new TransactionTemplate(transactionManager).executeWithoutResult(
status -> localTransaction.accept(arg)
);
return RocketMQLocalTransactionState.COMMIT;本地事务抛异常则返回 ROLLBACK,消息丢弃。
阶段二——checkLocalTransaction:如果阶段一的 commit/rollback 请求因为网络抖动没到达 Broker,Broker 会定期回调此方法。从消息头取出 HEADER_TOPIC,在 checkerMap 中查找对应的回查器,由回查器根据数据库状态判断事务是否已提交:
TransactionChecker checker = checkerMap.get(topic);
boolean committed = checker.check(wrapper);
return committed ? COMMIT : ROLLBACK;为什么需要 checkLocalTransaction:阶段一的本地事务逻辑存储在 localTransactionMap 中,这是一个 JVM 内存的 ConcurrentHashMap。如果发送半消息后、执行本地事务前 JVM 崩溃了localTransactionMap 里的数据丢失,RocketMQ 回查时找不到本地逻辑。此时 checkLocalTransaction 作为第二道防线,通过 TransactionChecker 查询数据库——数据库里的状态是持久化的,不会因为 JVM 崩溃而丢失。
两个 Map 的生命周期差异:
- localTransactionMap:JVM 内存,仅当前实例有效,消息处理完即移除
- checkerMap:Spring Bean 注册,所有实例共享@PostConstruct 时注册),处理回查请求
1.4 TransactionChecker 接口
package rag.framework.mq.producer;
import rag.framework.mq.MessageWrapper;
/**
* 事务消息回查接口。
*
* <p>用于在 RocketMQ 事务消息回查阶段,根据消息内容判断对应本地事务的最终状态。
* 实现类通常按 topic 维度注册到 {@link DelegatingTransactionListener},供 Broker 回查时调用。</p>
*
* <p>实现约束:</p>
* <ul>
* <li>回查逻辑应基于数据库、缓存或其他持久化状态判断事务结果。</li>
* <li>禁止依赖进程内存中的临时状态,否则在多实例部署场景下可能得到错误结果。</li>
* <li>实现类应保证幂等和线程安全,避免并发回查时产生副作用。</li>
* </ul>
*
* @author Wang
*/
public interface TransactionChecker {
/**
* 检查本地事务是否已提交。
*
* @param message 消息包装对象,包含业务键和业务载荷,可据此查询事务状态
* @return {@code true} 表示本地事务已提交,可以提交消息;
* {@code false} 表示本地事务未提交或已回滚,应回滚消息
*/
boolean check(MessageWrapper<?> message);
}
回查器的实现必须基于数据库查询——因为 Broker 的回查请求可能打到任意实例,不能依赖内存状态。返回 true 表示本地事务已提交(消息可投递),false 表示已回滚(消息丢弃)。
1.5 自动装配
package rag.framework.config;
import rag.framework.mq.producer.DelegatingTransactionListener;
import rag.framework.mq.producer.MessageQueueProducer;
import rag.framework.mq.producer.RocketMqProducerAdapter;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* RocketMQ 消息队列自动装配配置
*
* <p>自动创建 RocketMQ 相关的 Spring Bean:
* <ul>
* <li>DelegatingTransactionListener:事务消息监听器,管理本地事务和回查逻辑</li>
* <li>MessageQueueProducer:消息生产者,提供普通消息和事务消息发送能力</li>
* </ul>
*
* @author Wang
*/
@Configuration
public class RocketMqAutoConfiguration {
/**
* 创建事务消息监听器 Bean
*
* <p>该监听器负责处理 RocketMQ 事务消息的两阶段提交:
* <ul>
* <li>执行本地事务(第二阶段)</li>
* <li>回查本地事务状态(Broker 定期回调)</li>
* </ul>
*
* @return 事务消息监听器实例
* @see rag.framework.mq.producer.DelegatingTransactionListener
*/
@Bean
public DelegatingTransactionListener delegatingTransactionListener() {
return new DelegatingTransactionListener();
}
/**
* 创建消息生产者 Bean
*
* <p>基于 RocketMQTemplate 和 DelegatingTransactionListener 构建消息生产者,
* 提供普通消息和事务消息的发送能力。</p>
*
* @param rocketMqTemplate RocketMQ 模板,由 rocketmq-spring-boot-starter 自动配置
* @param transactionListener 事务消息监听器,用于管理事务消息的本地事务
* @return 消息生产者实例
* @see rag.framework.mq.producer.RocketMqProducerAdapter
*/
@Bean
public MessageQueueProducer messageQueueProducer(RocketMQTemplate rocketMqTemplate,
DelegatingTransactionListener transactionListener) {
return new RocketMqProducerAdapter(rocketMqTemplate, transactionListener);
}
}RocketMQAutoConfiguration负责 Bean 的创建和注入:
2. 场景一:文档分块异步处理
2.1 为什么用消息队列
文档上传后的分块处理(解析 → 拆分 → 向量化 → 写入)是耗时操作,大文档可能需要几十秒。如果同步处理,用户上传后要盯着进度条等很久。用消息队列异步化后,上传接口立即返回,分块在后台消费。
更重要的是,异步处理天然支持削峰——多个用户同时上传时,消息在队列中排队,消费者按自己的节奏逐一处理,不会把 Embedding API 的打满。
2.2 为什么用事务消息
分块处理涉及两个动作:更新文档状态为"处理中" + 发送分块消息。这两步必须原子——不能出现"状态更新了但消息没发出去"(文档卡在处理中永不被消费),也不能出现"消息发出去了但状态没更新"(消费者拿到消息但发现文档状态不对,无法处理)。
普通消息解决不了这个问题:先发消息再更新状态,如果状态更新失败消息已发出去了;先更新状态再发消息,如果消息发送失败状态已改了。
RocketMQ 事务消息的两阶段提交正好解决这个原子性问题:
1. 发送半消息(消费者不可见)
2. 执行本地事务:更新文档状态 + 其他数据库操作
3. 本地事务成功 → 提交消息(消费者可见,开始消费)
4. 本地事务失败 → 回滚消息(消费者永远看不到这条消息)
2.3 生产者侧
文档上传后,通过 MessageQueueProducer.sendInTransaction() 发送事务消息到 knowledge-document-chunk_topic:
// 本地事务逻辑:更新文档状态
Consumer<Object> localTx = (arg) -> {
KnowledgeDocumentDO doc = documentMapper.selectById(docId);
doc.setStatus(DocumentStatus.RUNNING.getCode());
documentMapper.updateById(doc);
};
// 事务消息发送
producer.sendInTransaction(
chunkTopic,
docId, // 用文档 ID 作为消息 key,方便关联追踪
"文档分块",
event, // KnowledgeDocumentChunkEvent { docId, kbId, operator }
localTx
);本地事务执行在 Spring 的 TransactionTemplate 中,如果文档状态更新失败,事务回滚,消息也会被 rollback。
2.4 消费者侧
package rag.knowledge.mq;
import rag.framework.context.LoginUser;
import rag.framework.context.UserContext;
import rag.framework.mq.MessageWrapper;
import rag.knowledge.mq.event.KnowledgeDocumentChunkEvent;
import rag.knowledge.service.KnowledgeDocumentService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
/**
* 文档分块事务消息消费者。
* <p>
* 在 RocketMQ 事务消息提交后异步拉起文档分块流程,并补齐消费线程所需的用户上下文,
* 让后续审计字段和业务日志仍然保留触发人的身份信息。
* <p>
* 核心特性:
* <ul>
* <li><b>异步处理</b>:解耦文档上传和分块逻辑,提升接口响应速度</li>
* <li><b>事务保证</b>:基于 RocketMQ 事务消息,确保本地事务和消息发送的原子性</li>
* <li><b>上下文补全</b>:在消费线程中显式设置 {@link UserContext},保证审计字段的正确性</li>
* <li><b>异常隔离</b>:finally 块确保 UserContext 清理,避免线程复用时的上下文污染</li>
* </ul>
* <p>
* 消息配置:
* <pre>{@code
* Topic: knowledge-document-chunk_topic${unique-name:}
* Consumer Group: knowledge-document-chunk_cg${unique-name:}
* }</pre>
* <p>
* 处理流程:
* <ol>
* <li>接收 {@link KnowledgeDocumentChunkEvent} 事件</li>
* <li>从事件中提取操作人信息,构建 {@link LoginUser} 并设置到 {@link UserContext}</li>
* <li>调用 {@link rag.knowledge.service.KnowledgeDocumentService#executeChunk} 执行分块</li>
* <li>finally 块清理 {@link UserContext},防止内存泄漏</li>
* </ol>
* <p>
* 注意事项:
* <ul>
* <li>消费失败时 RocketMQ 会自动重试,需保证分块逻辑的幂等性</li>
* <li>长时间运行的分块任务应配合超时机制和进度监控</li>
* <li>UserContext 必须在线程结束时清理,否则会影响后续消息的消费</li>
* </ul>
*
*/
@Slf4j
@Component
@RequiredArgsConstructor
@RocketMQMessageListener(
topic = "knowledge-document-chunk_topic${unique-name:}",
consumerGroup = "knowledge-document-chunk_cg${unique-name:}"
)
public class KnowledgeDocumentChunkConsumer implements RocketMQListener<MessageWrapper<KnowledgeDocumentChunkEvent>> {
private final KnowledgeDocumentService documentService;
/**
* 处理文档分块消息。
* <p>
* 此方法由 RocketMQ 消费线程调用,负责解析消息、设置用户上下文并执行分块逻辑。
*
* @param message 消息包装对象,包含 {@link KnowledgeDocumentChunkEvent} 事件数据
*/
@Override
public void onMessage(MessageWrapper<KnowledgeDocumentChunkEvent> message) {
KnowledgeDocumentChunkEvent event = message.getBody();
log.info("[消费者] 开始消费文档分块任务,docId={}, keys={}", event.getDocId(), message.getKeys());
// RocketMQ 消费线程没有 Web 请求上下文,这里显式补充操作者信息供服务层复用。
UserContext.set(LoginUser.builder().username(event.getOperator()).build());
try {
documentService.executeChunk(event.getDocId());
} finally {
UserContext.clear();
}
}
}KnowledgeDocumentChunkConsumer监听 knowledge-document-chunk_topic,收到消息后执行分块逻辑:
用户上下文的恢复:消息体中携带了 operator(操作人),消费者在处理时将其恢复到 UserContext 中。这是因为分块过程中会产生节点日志和 trace 记录,这些记录需要关联到具体操作人。不依赖 TTL 传递是因为 RocketMQ 消费线程和 HTTP 请求线程属于完全独立的请求边界——TTL 可以在同一次请求的异步线程间传递上下文,但无法跨越不同请求的线程边界。消息体中携带操作人信息是唯一可靠的跨边界传递方式。
2.5 事务回查
package rag.knowledge.mq;
import cn.hutool.json.JSONUtil;
import rag.framework.mq.MessageWrapper;
import rag.framework.mq.producer.DelegatingTransactionListener;
import rag.framework.mq.producer.TransactionChecker;
import rag.knowledge.dao.entity.KnowledgeDocumentDO;
import rag.knowledge.dao.mapper.KnowledgeDocumentMapper;
import rag.knowledge.enums.DocumentStatus;
import rag.knowledge.mq.event.KnowledgeDocumentChunkEvent;
import jakarta.annotation.PostConstruct;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
/**
* 文档分块事务消息回查器。
* <p>
* 用于 RocketMQ 事务消息的回查机制,当 Broker 未收到本地事务的提交/回滚确认时,
* 会定期调用此回查器判断本地事务是否已成功提交。
* <p>
* 核心功能:
* <ul>
* <li><b>事务一致性保证</b>:通过查询数据库中文档状态,判断本地事务是否已提交</li>
* <li><b>自动注册</b>:在 {@link PostConstruct} 阶段按 topic 注册到 {@link DelegatingTransactionListener}</li>
* <li><b>分布式支持</b>:Broker 回查时可路由到任意实例,无需关心消息最初由哪个节点发送</li>
* </ul>
* <p>
* 回查逻辑:
* <pre>{@code
* 1. 从消息体中提取 docId
* 2. 查询 knowledge_document 表获取文档记录
* 3. 判断文档状态是否为 RUNNING
* - 是 → 返回 true,表示本地事务已提交,Broker 应 commit 消息
* - 否 → 返回 false,表示本地事务未提交或已回滚,Broker 应 rollback 消息
* }</pre>
* <p>
* 状态说明:
* <ul>
* <li><b>RUNNING</b>:文档已进入分块流程,本地事务已提交,应 commit 消息</li>
* <li><b>PENDING/FAILED/SUCCESS</b>:文档未进入分块流程或已结束,应 rollback 消息</li>
* <li><b>记录不存在</b>:文档已被删除或从未创建,应 rollback 消息</li>
* </ul>
* <p>
* 注意事项:
* <ul>
* <li>回查频率由 RocketMQ Broker 配置决定,通常为每分钟一次</li>
* <li>回查方法应保持轻量,避免复杂查询和长时间锁定</li>
* <li>幂等性保证:即使多次回查,结果应保持一致</li>
* </ul>
*
*/
@Slf4j
@Component
@RequiredArgsConstructor
public class KnowledgeDocumentChunkTransactionChecker implements TransactionChecker {
private final KnowledgeDocumentMapper documentMapper;
private final DelegatingTransactionListener transactionListener;
/** 文档分块任务的 Topic 名称。 */
@Value("knowledge-document-chunk_topic${unique-name:}")
private String chunkTopic;
/**
* 初始化方法,将当前回查器注册到事务监听器代理中。
* <p>
* 在 Spring Bean 初始化完成后自动调用,确保回查器能够接收 Broker 的回查请求。
*/
@PostConstruct
public void init() {
transactionListener.registerChecker(chunkTopic, this);
}
/**
* 执行事务回查,判断本地事务是否已提交。
* <p>
* 此方法由 RocketMQ Broker 在以下场景调用:
* <ul>
* <li>本地事务执行完成后,未及时向 Broker 发送 commit/rollback 确认</li>
* <li>网络抖动导致确认消息丢失</li>
* <li>应用重启导致未完成的事务需要恢复</li>
* </ul>
* <p>
* 回查策略:
* <ol>
* <li>从消息体中提取 {@link KnowledgeDocumentChunkEvent} 事件</li>
* <li>根据 docId 查询 {@code knowledge_document} 表</li>
* <li>判断文档状态是否为 {@link DocumentStatus#RUNNING}</li>
* <li>返回 true(已提交)或 false(未提交/已回滚)</li>
* </ol>
*
* @param message 待回查的消息包装对象
* @return true 表示本地事务已提交,Broker 应 commit 消息;false 表示应 rollback 消息
*/
@Override
public boolean check(MessageWrapper<?> message) {
log.info("[事务回查] 文档分块,消息体:{}", JSONUtil.toJsonStr(message));
KnowledgeDocumentChunkEvent event = (KnowledgeDocumentChunkEvent) message.getBody();
String docId = event.getDocId();
KnowledgeDocumentDO documentDO = documentMapper.selectById(docId);
return documentDO != null
&& DocumentStatus.RUNNING.getCode().equals(documentDO.getStatus());
}
}
KnowledgeDocumentChunkTransactionChecker实现了 TransactionChecker 接口,在 @PostConstruct 时注册到 DelegatingTransactionListener:
回查逻辑的核心是:查数据库中文档的状态。如果状态是 RUNNING,说明 localTx 已经执行成功,返回 COMMIT;否则返回 ROLLBACK。这个判断基于持久化的数据库状态,不依赖 JVM 内存,所以即使回查请求打到了另一个实例上,也能正确判断。
2.6 消息重复消费的兜底
即使事务消息保证了正常路径下的精确一次投递,极端情况下仍可能出现重复投递——消费者处理完了但 offset 没提交,或者回查期间重复投递@IdempotentConsume 注解提供了消费端的幂等保护(详见第 5 节)。
3. 场景二:用户反馈异步持久化
3.1 为什么用消息队列
用户对回答的点赞/点踩是一个高频操作,但不需要同步处理——用户不关心反馈是否已经写入数据库。异步化的好处是:反馈接口立即返回,不阻塞用户继续提问。
这和场景一的事务消息不同——反馈场景不需要事务保证,因为反馈是独立操作,不涉及"先写 A 再发消息"的原子性问题。用普通消息即可。
3.2 生产者侧
package rag.controller;
import rag.controller.request.MessageFeedbackRequest;
import rag.framework.convention.Result;
import rag.framework.web.Results;
import rag.service.MessageFeedbackService;
import lombok.RequiredArgsConstructor;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;
/**
* 会话消息反馈控制器。
* <p>
* 负责接收用户对助手回答的点赞、点踩和原因说明,并异步投递到反馈处理链路。
* <p>
* 核心功能:
* <ul>
* <li><b>反馈收集</b>:接收用户的点赞(vote=1)或点踩(vote=-1)</li>
* <li><b>原因记录</b>:支持用户填写不满意的具体原因</li>
* <li><b>异步处理</b>:通过 MQ 异步投递反馈数据,避免阻塞主流程</li>
* </ul>
*/
@RestController
@RequiredArgsConstructor
public class MessageFeedbackController {
private final MessageFeedbackService feedbackService;
/**
* 提交消息反馈。
* <p>
* 处理流程:
* <ol>
* <li>校验消息 ID 是否存在且属于当前用户</li>
* <li>更新消息的 vote 字段(1=点赞、-1=点踩)</li>
* <li>如果有点踩原因,保存到 feedback_reason 字段</li>
* <li>异步发送反馈事件到 MQ,供后续分析和优化使用</li>
* <li>立即返回成功响应,不等待异步处理完成</li>
* </ol>
* <p>
* 说明:采用异步处理方式,确保用户体验不受影响。
*
* @param messageId 助手消息 ID,路径变量
* @param request 反馈内容,包含 vote(点赞/点踩)和 reason(原因说明)
* @return 空结果,表示反馈已接收
*/
@PostMapping("/conversations/messages/{messageId}/feedback")
public Result<Void> submitFeedback(@PathVariable String messageId,
@RequestBody MessageFeedbackRequest request) {
feedbackService.submitFeedbackAsync(messageId, request);
return Results.success();
}
}MessageFeedbackController 接收用户反馈后,通过 MessageQueueProducer.send() 发送普通消息到 message-feedback_topic。
3.3 消费者侧
package rag.mq;
import rag.framework.mq.MessageWrapper;
import rag.mq.event.MessageFeedbackEvent;
import rag.service.MessageFeedbackService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
/**
* 消息反馈 MQ 消费者,负责将点赞/点踩事件异步持久化到数据库
*/
@Slf4j
@Component
@RequiredArgsConstructor
@RocketMQMessageListener(
topic = "message-feedback_topic${unique-name:}",
consumerGroup = "message-feedback_cg${unique-name:}"
)
public class MessageFeedbackConsumer implements RocketMQListener<MessageWrapper<MessageFeedbackEvent>> {
private final MessageFeedbackService feedbackService;
@Override
public void onMessage(MessageWrapper<MessageFeedbackEvent> message) {
MessageFeedbackEvent event = message.getBody();
log.info("[消费者] 开始处理点赞/点踩事件,messageId: {}, userId: {}, vote: {}, keys: {}",
event.getMessageId(), event.getUserId(), event.getVote(), message.getKeys());
feedbackService.submitFeedbackByEvent(event);
}
}
MessageFeedbackConsumer监听 message-feedback_topic,收到消息后调用 feedbackService.submitFeedbackByEvent(event) 处理。
MessageFeedbackEvent 包含:消息 ID、用户 ID、反馈值(1=点赞,-1=点踩)、反馈原因、提交时间戳。提交时间戳用于保证最终一致性——多节点消费时,总是以最近一次提交的反馈为准。
3.4 为什么没有用事务消息
反馈场景满足"先到先得"——不需要和其他数据库操作绑定。用户点完赞后,反馈被异步消费写入 t_message_feedback 表。消费者挂了,消息还在队列里,重启后继续消费。反馈没有"同步必须成功"的强要求,普通消息的至少一次投递已经够用。
4. 延迟消息场景:消费重试与定时任务
4.1 当前状态
RocketMQ 的延迟消息能力项目已具备MessageQueueProducer 接口预留了扩展空间。以下是两个明确的延迟消息使用场景。
4.2 场景一:消费失败后的指数退避重试
文档分块消费失败时,当前的处理方式是抛异常让 RocketMQ 按默认重试策略重新投递。RocketMQ 的默认重试是固定间隔的——第 1 次 10 秒、第 2 次 30 秒、第 3 次 1 分钟……逐步拉长,但这套策略是全局的,所有 Topic 共享,不够灵活。
延迟消息可以做到更精细的控制:消费失败后,不抛异常让 RocketMQ 自动重试,而是主动发送一条延迟消息到同一 Topic,延迟级别根据失败次数递增。比如:
第 1 次失败 → 延迟 30 秒 → 第 2 次 → 延迟 2 分钟 → 第 3 次 → 延迟 10 分钟 → 超过次数上限 → 写死信表 + 告警RocketMQ 内置了 18 个延迟级别(1s / 5s / 10s / 30s / 1m / 2m / 3m / 4m / 5m / 6m / 7m / 8m / 9m / 10m / 20m / 30m / 1h / 2h),覆盖了从秒级到小时级的重试间隔。如果需要更精细的间隔,可以通过定时任务扫表来弥补。
这个场景的价值在于:Embedding API 偶发超时通常是临时的——网络抖动或 API 短暂过载——等几十秒再试大概率成功。固定间隔的 RocketMQ 默认重试虽然也能用,但延迟消息可以把重试间隔和业务场景对齐,减少无效重试。
4.3 场景二:定时文档摄入调度
项目中 t_ingestion_pipeline 表存储了管道配置,部分场景可能需要定时执行——比如"每天凌晨 2 点从 S3 拉取最新文档并重新索引"或"每周一同步飞书知识库"。
这种场景可以用延迟消息实现:每次消费完成后,发送一条新的延迟消息指向下一次执行时间。这样不需要引入独立的定时任务框架(如 Quartz),也不需要额外的分布式锁来协调多实例的定时任务执行。
当前项目已通过 KnowledgeDocumentScheduleJob + Redis 锁实现了定时调度,但如果调度需求增多(多个知识库、不同频率),延迟消息的"消费完自动排下次"模式比定时扫表更轻量——不需要维护 cron 表达式,每条消息自带下一次执行时间。
4.4 为什么不直接启用
延迟消息没有被正式集成到主链路的原因有三个:
1. 当前同步摄入流程的失败处理已经由事务回滚 + 人工重试覆盖,消费端的幂等保护也保证了 RocketMQ 默认重试不会产生重复数据。延迟消息带来的改进(更精细的重试间隔)在当前量级下不是刚需
2. RocketMQ 的延迟级别是固定的 18 个,不能自定义任意秒数。如果需要"第 1 次 45 秒、第 2 次 3 分 15 秒"这种精确间隔,延迟消息做不到,得配合定时任务或外部调度
3. 消息堆积风险:如果大量消息被延迟到未来某个时间点,Broker 的延迟队列可能会堆积,影响普通消息的投递时效
这三个限制在实际场景中都有办法缓解(18 个级别基本够用、业务上不需要精确到秒、延迟消息量不大),但在当前量级下,收益不足以覆盖引入新模式的复杂度。
5. 消费幂等保护
5.1 问题
RocketMQ 在以下情况下可能投递重复消息:
- 消费者处理完成,但在提交 offset 前崩溃
- 事务消息的回查期间重复投递
- 网络分区导致 ACK 丢失
如果不处理重复消费,文档分块场景下同一份文档会被分块两次、向量化两次——浪费 Embedding API 配额和存储空间。
5.2 实现方案
@IdempotentConsume 注解 + AOP 切面IdempotentConsumeAspect,通过 Redis Lua 脚本实现原子性的防重复消费。
注解定义:
- keyPrefix:Redis Key 前缀
- key:SpEL 表达式,从方法参数中提取唯一标识(如消息 ID 或文档 ID)
- keyTimeout:Redis Key 过期时间(秒)
Lua 脚本(在 Redis 服务端单线程执行,天然原子):
local key = KEYS[1]
local value = ARGV[1]
local expire_time_ms = ARGV[2]
return redis.call('SET', key, value, 'NX', 'GET', 'PX', expire_time_ms)SET NX GET 的语义是:如果 Key 不存在,设置值为 CONSUMING 并返回 nil(第一次消费);如果 Key 已存在,返回当前值(可能是 CONSUMING 或 CONSUMED)。
状态流转:
首次消费 → SET NX → 返回 nil → 执行消费 → SET CONSUMED
重复消费 → SET NX → 返回 CONSUMING → 抛异常(让 MQ 稍后重试)
重复消费 → SET NX → 返回 CONSUMED → 直接跳过(已消费完成)消费成功后:将 Redis Key 的值更新为 CONSUMED,保留 keyTimeout 秒。这段时间内相同消息再次到达会直接跳过。
消费失败后:删除 Redis Key。这样 MQ 重试投递时,下次可以重新消费。
注意事项:
- Key 的过期时间必须大于消息的最大处理时间。如果处理还在进行中 Key 就过期了,重复消息到达时会重新执行——这是 SET NX 防不住的情况,也是为什么异常时删 Key 而不是保留 CONSUMING 的原因
- 最终防线是数据库唯一索引。即使 Redis SET NX 被绕过(比如 Redis 挂了),数据库的唯一约束也能拦下真正的重复数据
6. RocketMQ 配置
rocketmq:
name-server: 127.0.0.1:9876
producer:
group: ragent-producer${unique-name:}_pg
send-message-timeout: 2000- name-server:NameServer 地址,集群模式用分号分隔
- group:生产者组名${unique-name:} 支持环境变量注入,多实例部署时避免组名冲突
- send-message-timeout:同步发送超时 2 秒。这个值设得比较短——摄入场景的本地事务只做数据库操作,通常毫秒级完成,2 秒足够。如果超时,RocketMQ 会触发回查
消费者的 Topic 和 ConsumerGroup 由 @RocketMQMessageListener 注解声明,同样支持 ${unique-name:} 占位符。
7. 为什么不选 Kafka / RabbitMQ
Kafka:事务消息机制是为流处理场景设计的(精确一次语义),适合"消费一条消息、经过转换、产生一条新消息"的模式。我们的场景是"发送一条消息 + 执行一个数据库事务"的原子绑定,Kafka 的事务不适合这种业务级别的分布式事务。另外 Kafka 没有原生的延迟消息,重试机制需要自行实现。
RabbitMQ:有事务机制,但是基于 AMQP 协议的同步确认模型,吞吐量比 RocketMQ 的异步事务消息低一个数量级。延迟消息通过死信队列 + TTL 实现,属于 workaround,不是为延迟消息设计的原语。而且 RabbitMQ 在单队列吞吐和堆积能力上跟 RocketMQ 有差距,批量文档摄入时消息堆积量可能很大。
RocketMQ:事务消息和延迟消息都是原生能力,不需要自己造轮子。同时项目技术栈偏阿里生态(百炼、Sa-Token),团队对 RocketMQ 的运维和调优有经验积累,出了问题能快速排查。
8. 边界情况与容错
Broker 宕机:生产者侧同步发送会抛异常,调用方可以感知并决定重试或降级。消费者侧 offset 持久化在 Broker,重启后从未提交的 offset 继续消费。
半消息超时:如果本地事务执行时间超过了半消息的超时时间,Broker 会触发事务回查。这就是为什么 TransactionChecker 必须基于数据库查询——本地事务可能已经完成了但 commit 请求丢包了,回查时通过数据库状态判断。
消费者处理失败:抛异常后 RocketMQ 会按重试策略重新投递@IdempotentConsume 保证了重试不会导致重复处理——第一次处理失败时 Redis Key 已被删除,重试时可以重新获取锁并执行。如果反复失败,达到最大重试次数后进入死信队列,防止一条坏消息拖死整个消费组。
Redis 不可用时的幂等降级IdempotentConsumeAspect 依赖 Redis 做防重复。如果 Redis 不可用,Lua 脚本执行失败,AOP 切面会抛异常。这意味着 Redis 不可用会导致所有 MQ 消费中断——这是一个需要权衡的设计。如果消费量不大,可以降级为不加幂等防护直接消费。
Topic 隔离${unique-name:} 占位符支持不同环境使用不同后缀的 Topic,避免开发环境和生产环境的消息串扰。