项目:国土空间基础信息平台
当前状态:服务间通信靠 Feign(同步 HTTP)+ Redis(异步中转)+ Quartz(定时轮询)
选型:RocketMQ(阿里系生态,与项目技术栈一致,支持事务消息和顺序消息)
1. 现状与痛点
1.1 当前服务间通信方式
服务 A 服务 B
│ │
├─ Feign HTTP 同步调用 ────→ 紧耦合,B 挂了 A 直接报错
├─ Redis String 中间结果 ──→ 无持久化,进程崩溃则数据丢失
├─ Quartz 定时轮询 ────────→ 无效轮询浪费 DB 查询
└─ CompletableFuture 异步链 → JVM 本地,跨实例不可见1.2 核心痛点
2. 选型:RocketMQ
2.1 选型理由
为什么选 RocketMQ,而不是 RabbitMQ 或 Kafka?
三个候选都可以完成基本的异步解耦,但场景需求做了筛选:
事务消息是硬需求。审批流程中"写审批记录"和"触发下一节点"必须原子——DB 写成功则消息必达,DB 回滚则消息不可见。Kafka 不支持事务消息(它的"事务"是生产者到分区的原子写入,不是 DB + MQ 的分布式事务)。RabbitMQ 可以通过"Publisher Confirm + DB 本地事务 + 补偿"模拟,但代码复杂度高,补偿逻辑容易出错。RocketMQ 的"半消息 + 本地事务 + 回查"是原生实现,不需要自己造补偿。
延时消息大量使用。GA 任务状态检查和文件下载都需要"等一段时间再消费",不是即时投递。RabbitMQ 需要安装 rabbitmq_delayed_message_exchange 插件才能支持延时——内网环境装插件要审批流程。Kafka 完全不支持,只能靠消费者自身 sleep 模拟。RocketMQ 的 18 个延时级别(1s 到 2h)开箱即用。
运维复杂度要考虑。平台部署在自然资源管理部门内网,运维团队规模有限。Kafka 依赖 ZooKeeper,ZK 本身的运维(选举、脑裂、磁盘清理)是额外负担。RabbitMQ 基于 Erlang/OTP 构建,内存模型、进程调度、崩溃恢复机制和 Java 完全不同。团队里会 Erlang 的人几乎没有——出了问题看日志看不懂源码,调参数不知道底层影响,只能靠官方文档盲猜。相比之下,RocketMQ 的 Broker 和 NameServer 都是纯 Java,线程模型、堆内存、GC 行为、线程 Dump 都是 Java 团队日常打交道的东西,出问题能自己查。RocketMQ 的 NameServer 是无状态轻量进程,Broker 是纯 Java——部署和排查都在团队的技能范围内。
技术栈一致性的实际价值。项目已经用了 FastJSON(阿里)、XXL-Job(阿里系)、Hutool——引入 RocketMQ 后,社区文档、故障案例、版本兼容性都在阿里技术栈内,不会出现"Spring Boot 2.1.4 + Kafka 2.1.x 的版本错配"这类问题。
不需要 Kafka 的吞吐量。Kafka 的优势是百万 TPS 级别的日志流处理,但 平台 的消息量是"每分钟几次到几十次服务变更通知、每天几百个下载任务"——这个量级下 Kafka 的优势发挥不出来,反而是维护 ZK 的成本成了净开销。
综合看:RabbitMQ 因为 Erlang 运维门槛和事务消息缺失出局,Kafka 因为重运维和延时消息缺失出局,RocketMQ 在三个硬需求(事务消息、延时消息、轻量运维)上全部满足。
2.2 Maven 依赖
<!-- 根 pom.xml 新增 -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.1.1</version>
</dependency>3. 优化场景
3.1 GIS 服务变更通知——从定时轮询到事件驱动
当前代码路径:
Proxy DataSynchronizationJob (Quartz, cron: 0/{cacheCron} * * * * ?)
→ ServiceProxyCacheManager.load()
→ DataServiceService.findAll() // 全量查询
→ 构建新的 Hashtable → 替换 serviceProxyCache问题:GIS 服务注册/下架/修改后,最长需要等一个轮询周期(默认 60 秒)Proxy 才能感知变更。每次轮询都是 findAll() 全量查询,无论是否有数据变更。
MQ 方案:
Resources 模块(数据源)
DataServiceController.save/update/delete
→ 写 DB
→ 发送 MQ 消息 service.change 新增
│
▼
┌─────────────────────────────┐
│ RocketMQ Topic: giscloud │
│ Tag: service.change │
└─────────────────────────────┘
│
├── Proxy 消费者:收到消息 → ServiceProxyCacheManager.reloadSingle(serviceId)
├── Monitor 消费者:收到消息 → 更新监控配置
└── 后续可扩展:数据同步消费者、审计日志消费者实现:
// Resources 模块 — 生产者
@Service
public class DataServiceEventPublisher {
private final RocketMQTemplate rocketMQTemplate;
public DataService save(DataService entity) {
DataService saved = dataServiceRepository.save(entity);
// DB 写成功后发送 MQ。此场景消息丢失可接受(Quartz 兜底),用 asyncSend 不阻塞主流程
// null callback 意味着发送失败时静默丢弃——投递可靠性由 Quartz 定时兜底保证
rocketMQTemplate.asyncSend("giscloud:service.change",
new ServiceChangeEvent(saved.getId(), "CREATE"), null);
return saved;
}
public void delete(String id) {
dataServiceRepository.deleteById(id);
rocketMQTemplate.asyncSend("giscloud:service.change",
new ServiceChangeEvent(id, "DELETE"), null);
}
}
// Proxy 模块 — 消费者
@Component
@RocketMQMessageListener(
topic = "giscloud",
selectorExpression = "service.change",
consumerGroup = "proxy-cache-refresh")
public class ServiceChangeConsumer implements RocketMQListener<ServiceChangeEvent> {
private final ServiceProxyCacheManager cacheManager;
@Override
public void onMessage(ServiceChangeEvent event) {
if ("DELETE".equals(event.getOperation())) {
// remove() 为 MQ 引入后新增方法——从 Hashtable 中移除指定服务
cacheManager.remove(event.getServiceId());
} else {
// reloadSingle() 为 MQ 引入后新增方法——增量更新单个服务,无需全量 load()
cacheManager.reloadSingle(event.getServiceId());
}
}
}收益:
- 服务变更后**实时**刷新 Proxy 缓存,延迟从"最长 60 秒"降到"毫秒级"
- 增量刷新替代全量 findAll(),DB 查询量从 O(n) 降到 O(1)
- Monitor 模块可订阅同一 Topic 更新监控配置,无需新增 Feign 接口
- Quartz DataSynchronizationJob 保留为兜底——MQ 消息丢失时仍能通过定时刷新修正
3.2 文件下载任务——从定时扫描到任务队列
当前代码路径:
GeneratFilesJob (Quartz 定时触发)
→ 扫描 data_service_apply 表中待下载记录
→ 逐条处理:ArcGIS MapServer 查询 → 文件生成 → 落盘问题:
- 所有下载任务在同一 Job 执行周期内串行处理,任务量大时超时
- 无优先级——所有下载请求一起排队
- 无失败重试——任务执行失败后只能等下次扫描
MQ 方案:
用户提交下载请求 → Resources Controller
→ 写 DB(状态 = PENDING)
→ 发送 MQ 延时消息(Level 3 = 10s,留时间给数据准备) ← 新增
│
▼
RocketMQ 延时消息
│
▼
giscloud-job 消费者
→ 拉取最新数据 → ArcGIS 查询 → 文件生成 → 写 DB(状态 = COMPLETED)
→ 失败 → RocketMQ 自动重试 → 写 DB(状态 = FAILED)
→ 发送通知 MQ → Portal 站内信 / 短信核心代码:
// 生产者:下载请求提交
@PostMapping("/service-apply/download")
public ResponseEntity<?> submitDownload(@RequestBody DownloadRequest req) {
String taskId = UUID.randomUUID().toString();
String userId = AuthenticationUtil.getCurrentUserId();
// 写 DB
dataServiceApplyService.updateDownloadStatus(req.getApplyId(), "PENDING", taskId);
// 延时消息(Level 3 = 10s,留时间给数据准备)
DownloadTask task = new DownloadTask(taskId, req.getApplyId(), userId);
Message<DownloadTask> message = MessageBuilder.withPayload(task)
.setDelayTimeLevel(3) // 延时级别:1=1s, 2=5s, 3=10s, 4=30s, 5=1m ...
.build();
rocketMQTemplate.syncSend("giscloud:download-task", message, 3000);
return ResponseResult.success(taskId);
}
// 消费者:任务执行
@Component
@RocketMQMessageListener(topic = "giscloud", selectorExpression = "download-task",
consumerGroup = "job-download-executor")
public class DownloadTaskConsumer implements RocketMQListener<DownloadTask> {
@Override
public void onMessage(DownloadTask task) {
try {
FileInfo file = fileDownloadService.execute(task);
dataServiceApplyService.updateDownloadStatus(task.getApplyId(), "COMPLETED", file.getId());
} catch (Exception e) {
// 抛异常触发 RocketMQ 重试(默认最多 16 次)
throw new RuntimeException("下载任务执行失败, taskId=" + task.getTaskId(), e);
}
}
}收益:
- 下载任务解耦——Controller 提交后立即返回,用户不用等文件生成
- RocketMQ 自带重试机制——失败自动重试,不需要在代码中手动维护
- 延时消息避免用户"提交瞬间就想下载"导致的数据未就绪问题
3.3 操作日志异步解耦
当前代码路径:
Controller 方法
→ @SystemOperationLog 注解
→ SystemOperationLogAspect(@AfterReturning + @AfterThrowing)
→ GisLogger.logOperation() → 同步写入 ES问题:日志写入 ES 的延迟叠加到业务请求的响应时间上。ES 不可用时切面抛异常,不影响业务@AfterReturning 中异常被吞,但日志丢失。
MQ 方案:
Controller 返回前
→ @SystemOperationLog 切面
→ 构建 OperationLog 对象 ← 保留
→ rocketMQTemplate.sendOneWay(...) ← 改为异步发送
│
▼
RocketMQ (one-way 模式,最快)
│
▼
Logger 消费者(批量消费)
→ ES BulkIndex 批量写入 ← 新增收益:
- 日志响应时间从"ES 写入延迟"降到"MQ 发送延迟"(one-way 模式下近乎零开销)
- ES 不可用时日志缓存在 MQ 中,恢复后回放——日志零丢失
- 批量消费 + ES BulkIndex 替代逐条 IndexRequest,ES 写入吞吐量提升 10 倍以上
3.4 大数据分析任务生命周期管理
当前代码路径:
geoanalytics: BigDataAnalysisItemController 接收分析请求
→ CompletableFuture.supplyAsync()
→ BigDataAnalysis*RestService(HTTP POST ArcGIS GA Server)
→ 轮询 GA Server 任务状态(每 5 秒 GET job/{jobId})
→ 完成 → 结果写入 ES → 创建切片
→ 失败 → 标记 BigDataAnalysisItem.status = FAILED问题:
- CompletableFuture 在 JVM 本地,服务重启则任务状态丢失
- 轮询 GA Server 浪费网络带宽——大部分轮询返回"still running"
- 无失败重试——失败后需要人工重新提交
- 多实例环境下无法感知其他实例正在执行的任务
MQ 方案:
Controller 提交分析请求
→ 写 DB(BigDataAnalysisItem.status = SUBMITTED)
→ 发送 MQ 消息 ga.task.submitted ← 新增
│
▼
消费者:提交任务到 GA Server
→ HTTP POST GA Server → 获取 jobId → 更新 DB(status = RUNNING, jobId)
→ 发送延时消息 ga.task.check:{jobId}(延时 30 秒)
│
▼
消费者:检查任务状态
→ GET GA Server /job/{jobId}
├─ 完成 → 结果写 ES → 更新 DB(status = COMPLETED) → 创建切片
├─ 失败 → 更新 DB(status = FAILED) → 发送告警
└─ 运行中 → 再次发送延时消息 ga.task.check:{jobId}(延时 30 秒)← 替代轮询收益:
- 任务状态持久化在 DB + MQ,服务重启不丢任务
- 延时消息替代轮询——不需要定时 CRON 每 5 秒扫一遍所有 Running 任务,每个任务自己管理自己的检查节奏
- GA Server 的压力从"每 5 秒 × Running 任务数"降到"每 30 秒 × 仅当前批次",请求量骤降
3.5 行政审批流程解耦
当前代码路径:
ServiceApplyController 提交申请
→ ServiceApplyService.save()
→ 查下一个审批节点的审批人(AuditProcessNode)
→ 写审批记录 → 返回
AuditProcessController 审批
→ AuditProcessService.approve()
→ 更新审批状态
→ 判断是否终审
→ 是:调用 ServiceItemsController 开通服务权限
→ 否:查下一个审批节点 → 更新待审批人问题:审批流程硬编码在 Service 层,"开通服务权限"和"审批状态流转"耦合在一起。新增审批节点类型(如会签、转办)需要改代码。
MQ 方案——事务消息保证 DB 写 + MQ 发的原子性:
审批人提交审批意见
→ @Transactional
1. 写审批记录 DB(audit_status = APPROVED)
2. 发送 RocketMQ 事务消息 audit.approved
RocketMQ 事务消息机制:
┌─ 半消息(half message):先发到 MQ,消费者不可见
│ ├─ 执行本地事务:写 DB
│ │ ├─ DB 写成功 → Commit → 消费者可见
│ │ └─ DB 写失败 → Rollback → 消费者不可见
│ └─ 超时未收到 Commit/Rollback → MQ 回查本地事务状态
消费者:
audit.approved 消息到达
→ 判断审批流是否终审
├─ 是 → 发送 service.activated 消息 → Resources 开通 GIS 服务权限
└─ 否 → 查下一个审批节点 → 发送 user.notify 消息 → Portal 站内信通知收益:
- 审批流各步骤解耦——新增审批节点类型只需要新增一个消费者,不碰原有代码
- 事务消息保证"审批记录写入 DB"和"触发下一节点"的原子性——DB 写失败则消息不会投递
- 超时自动处理:延时消息 audit.timeout:{auditId}(24 小时)→ 自动驳回或转上级
4. Topic 与 Consumer Group 设计
设计要点:
- 同一个 Topic giscloud,用 Tag 区分子业务——减少 Topic 数量,降低运维复杂度
- 广播模式用于"所有实例都必须知道的"事件(如服务变更:所有 Proxy 实例都要刷新缓存)
- 集群模式用于"只需要一个实例执行"的任务(如文件下载:不允许多实例同时生成同一个文件)
5. 与现有组件的共存策略
6. 容错设计
6.1 消息丢失防护
6.2 消息重复消费
RocketMQ 的 at least once 投递语义意味着消费者必须幂等:
6.3 RocketMQ 不可用降级
RocketMQ 故障
├─ 服务变更通知 → 降级到 Quartz 定时刷新(延迟回到分钟级,但功能不中断)
├─ 文件下载任务 → 降级到 GeneratFilesJob 定时扫描(保留为备用方案)
├─ 操作日志 → 降级到本地 Logback 文件(恢复后手动回放)
├─ GA 任务调度 → 降级到 CompletableFuture 本地轮询
└─ 审批流程 → 降级到同步调用链(无事务消息保障,但功能可用)每个场景都有一套降级路径——MQ 是加速和优化,不是唯一通道。这样即使 MQ 集群长时间不可用,核心业务也不中断。