MQ 使用方案

作者:old wang 发布时间: 2021-01-10 阅读量:1 评论数:0

项目:国土空间基础信息平台

当前状态:服务间通信靠 Feign(同步 HTTP)+ Redis(异步中转)+ Quartz(定时轮询)

选型:RocketMQ(阿里系生态,与项目技术栈一致,支持事务消息和顺序消息)

1. 现状与痛点

1.1 当前服务间通信方式

服务 A                     服务 B
  │                          │
  ├─ Feign HTTP 同步调用 ────→ 紧耦合,B 挂了 A 直接报错
  ├─ Redis String 中间结果 ──→ 无持久化,进程崩溃则数据丢失
  ├─ Quartz 定时轮询 ────────→ 无效轮询浪费 DB 查询
  └─ CompletableFuture 异步链 → JVM 本地,跨实例不可见

1.2 核心痛点

痛点

当前实现

问题

GIS 服务变更通知

Proxy 的 DataSynchronizationJob(Quartz)定时从 DB 全量刷新服务列表

变更后最长等待一个轮询周期才生效;每次刷新全量加载,即使无变化也查 DB

大数据分析任务调度

geoanalytics 模块同步提交任务到 GA Server,CompletableFuture 异步轮询结果

任务生命周期管理复杂;无失败重试;轮询浪费 GA Server 资源

文件下载任务

GeneratFilesJob(Quartz)定时扫描下载请求表

下载量大时单次 Job 执行超时;无优先级区分

操作日志写入

logger 模块通过 SystemOperationLogAspect 切面同步写 ES

日志写入的延迟叠加到业务请求延迟上;ES 不可用时日志丢失

审批流程串行耦合

ServiceApplyController → Service → AuditService 同步调用链

审批节点变更需要改代码;无超时自动处理

服务间紧耦合

Feign 同步调用(Proxy → Resources、Monitor → Resources)

下游挂了上游直接报错;无重试;无削峰

2. 选型:RocketMQ

2.1 选型理由

维度

RocketMQ

RabbitMQ

Kafka

事务消息

✅ 原生支持(半消息 + 回查)

❌ 需插件

❌ 不支持

顺序消息

✅ 队列级顺序

❌ 不支持

分区内有序(限制大)

定时/延时消息

✅ 18 个延时级别

❌ 需插件

集群部署

✅ 轻量 NameServer

✅ 依赖 ZK

技术栈匹配

✅ 阿里系,与 FastJSON/XXL-Job 一致

Spring Boot 2.1.x 兼容

✅ rocketmq-spring-boot-starter 2.1.x

为什么选 RocketMQ,而不是 RabbitMQ 或 Kafka?

三个候选都可以完成基本的异步解耦,但场景需求做了筛选:

事务消息是硬需求。审批流程中"写审批记录"和"触发下一节点"必须原子——DB 写成功则消息必达,DB 回滚则消息不可见。Kafka 不支持事务消息(它的"事务"是生产者到分区的原子写入,不是 DB + MQ 的分布式事务)。RabbitMQ 可以通过"Publisher Confirm + DB 本地事务 + 补偿"模拟,但代码复杂度高,补偿逻辑容易出错。RocketMQ 的"半消息 + 本地事务 + 回查"是原生实现,不需要自己造补偿。

延时消息大量使用。GA 任务状态检查和文件下载都需要"等一段时间再消费",不是即时投递。RabbitMQ 需要安装 rabbitmq_delayed_message_exchange 插件才能支持延时——内网环境装插件要审批流程。Kafka 完全不支持,只能靠消费者自身 sleep 模拟。RocketMQ 的 18 个延时级别(1s 到 2h)开箱即用。

运维复杂度要考虑。平台部署在自然资源管理部门内网,运维团队规模有限。Kafka 依赖 ZooKeeper,ZK 本身的运维(选举、脑裂、磁盘清理)是额外负担。RabbitMQ 基于 Erlang/OTP 构建,内存模型、进程调度、崩溃恢复机制和 Java 完全不同。团队里会 Erlang 的人几乎没有——出了问题看日志看不懂源码,调参数不知道底层影响,只能靠官方文档盲猜。相比之下,RocketMQ 的 Broker 和 NameServer 都是纯 Java,线程模型、堆内存、GC 行为、线程 Dump 都是 Java 团队日常打交道的东西,出问题能自己查。RocketMQ 的 NameServer 是无状态轻量进程,Broker 是纯 Java——部署和排查都在团队的技能范围内。

技术栈一致性的实际价值。项目已经用了 FastJSON(阿里)、XXL-Job(阿里系)、Hutool——引入 RocketMQ 后,社区文档、故障案例、版本兼容性都在阿里技术栈内,不会出现"Spring Boot 2.1.4 + Kafka 2.1.x 的版本错配"这类问题。

不需要 Kafka 的吞吐量。Kafka 的优势是百万 TPS 级别的日志流处理,但 平台 的消息量是"每分钟几次到几十次服务变更通知、每天几百个下载任务"——这个量级下 Kafka 的优势发挥不出来,反而是维护 ZK 的成本成了净开销。

综合看:RabbitMQ 因为 Erlang 运维门槛和事务消息缺失出局,Kafka 因为重运维和延时消息缺失出局,RocketMQ 在三个硬需求(事务消息、延时消息、轻量运维)上全部满足。

2.2 Maven 依赖

<!-- 根 pom.xml 新增 -->

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.1.1</version>
</dependency>

3. 优化场景

3.1 GIS 服务变更通知——从定时轮询到事件驱动

当前代码路径

Proxy DataSynchronizationJob (Quartz, cron: 0/{cacheCron} * * * * ?)
  → ServiceProxyCacheManager.load()
    → DataServiceService.findAll()  // 全量查询
    → 构建新的 Hashtable → 替换 serviceProxyCache

问题:GIS 服务注册/下架/修改后,最长需要等一个轮询周期(默认 60 秒)Proxy 才能感知变更。每次轮询都是 findAll() 全量查询,无论是否有数据变更。

MQ 方案

Resources 模块(数据源)
  DataServiceController.save/update/delete
    → 写 DB
    → 发送 MQ 消息 service.change   新增
         │
         ▼
    ┌─────────────────────────────┐
    │ RocketMQ Topic: giscloud    │
    │ Tag: service.change         │
    └─────────────────────────────┘
         │
         ├── Proxy 消费者:收到消息 → ServiceProxyCacheManager.reloadSingle(serviceId)
         ├── Monitor 消费者:收到消息 → 更新监控配置
         └── 后续可扩展:数据同步消费者、审计日志消费者

实现

// Resources 模块 — 生产者

@Service

public class DataServiceEventPublisher {

    private final RocketMQTemplate rocketMQTemplate;

    public DataService save(DataService entity) {

        DataService saved = dataServiceRepository.save(entity);

        // DB 写成功后发送 MQ。此场景消息丢失可接受(Quartz 兜底),用 asyncSend 不阻塞主流程

        // null callback 意味着发送失败时静默丢弃——投递可靠性由 Quartz 定时兜底保证

        rocketMQTemplate.asyncSend("giscloud:service.change",

            new ServiceChangeEvent(saved.getId(), "CREATE"), null);

        return saved;

    }

    public void delete(String id) {

        dataServiceRepository.deleteById(id);

        rocketMQTemplate.asyncSend("giscloud:service.change",

            new ServiceChangeEvent(id, "DELETE"), null);

    }

}

// Proxy 模块 — 消费者
@Component
@RocketMQMessageListener(
    topic = "giscloud",
    selectorExpression = "service.change",
    consumerGroup = "proxy-cache-refresh")
public class ServiceChangeConsumer implements RocketMQListener<ServiceChangeEvent> {

    private final ServiceProxyCacheManager cacheManager;

    @Override
    public void onMessage(ServiceChangeEvent event) {
        if ("DELETE".equals(event.getOperation())) {
            // remove() 为 MQ 引入后新增方法——从 Hashtable 中移除指定服务
            cacheManager.remove(event.getServiceId());
        } else {
            // reloadSingle() 为 MQ 引入后新增方法——增量更新单个服务,无需全量 load()
            cacheManager.reloadSingle(event.getServiceId());
        }
    }
}

收益

- 服务变更后**实时**刷新 Proxy 缓存,延迟从"最长 60 秒"降到"毫秒级"

- 增量刷新替代全量 findAll(),DB 查询量从 O(n) 降到 O(1)

- Monitor 模块可订阅同一 Topic 更新监控配置,无需新增 Feign 接口

- Quartz DataSynchronizationJob 保留为兜底——MQ 消息丢失时仍能通过定时刷新修正

3.2 文件下载任务——从定时扫描到任务队列

当前代码路径

GeneratFilesJob (Quartz 定时触发)
  → 扫描 data_service_apply 表中待下载记录
    → 逐条处理:ArcGIS MapServer 查询 → 文件生成 → 落盘

问题

- 所有下载任务在同一 Job 执行周期内串行处理,任务量大时超时

- 无优先级——所有下载请求一起排队

- 无失败重试——任务执行失败后只能等下次扫描

MQ 方案

用户提交下载请求 → Resources Controller
  → 写 DB(状态 = PENDING)
  → 发送 MQ 延时消息(Level 3 = 10s,留时间给数据准备)        ← 新增
       │
       ▼
  RocketMQ 延时消息
       │
       ▼
  giscloud-job 消费者
    → 拉取最新数据 → ArcGIS 查询 → 文件生成 → 写 DB(状态 = COMPLETED)
    → 失败 → RocketMQ 自动重试 → 写 DB(状态 = FAILED)
    → 发送通知 MQ → Portal 站内信 / 短信

核心代码

// 生产者:下载请求提交
@PostMapping("/service-apply/download")
public ResponseEntity<?> submitDownload(@RequestBody DownloadRequest req) {
    String taskId = UUID.randomUUID().toString();
    String userId = AuthenticationUtil.getCurrentUserId();
    // 写 DB
    dataServiceApplyService.updateDownloadStatus(req.getApplyId(), "PENDING", taskId);
    // 延时消息(Level 3 = 10s,留时间给数据准备)
    DownloadTask task = new DownloadTask(taskId, req.getApplyId(), userId);
    Message<DownloadTask> message = MessageBuilder.withPayload(task)
        .setDelayTimeLevel(3)  // 延时级别:1=1s, 2=5s, 3=10s, 4=30s, 5=1m ...
        .build();
    rocketMQTemplate.syncSend("giscloud:download-task", message, 3000);
    return ResponseResult.success(taskId);
}

// 消费者:任务执行
@Component
@RocketMQMessageListener(topic = "giscloud", selectorExpression = "download-task",
    consumerGroup = "job-download-executor")
public class DownloadTaskConsumer implements RocketMQListener<DownloadTask> {

    @Override
    public void onMessage(DownloadTask task) {
        try {
            FileInfo file = fileDownloadService.execute(task);
            dataServiceApplyService.updateDownloadStatus(task.getApplyId(), "COMPLETED", file.getId());
        } catch (Exception e) {
            // 抛异常触发 RocketMQ 重试(默认最多 16 次)
            throw new RuntimeException("下载任务执行失败, taskId=" + task.getTaskId(), e);
        }
    }
}

收益

- 下载任务解耦——Controller 提交后立即返回,用户不用等文件生成

- RocketMQ 自带重试机制——失败自动重试,不需要在代码中手动维护

- 延时消息避免用户"提交瞬间就想下载"导致的数据未就绪问题

3.3 操作日志异步解耦

当前代码路径

Controller 方法
  → @SystemOperationLog 注解
    → SystemOperationLogAspect(@AfterReturning + @AfterThrowing)
      → GisLogger.logOperation() → 同步写入 ES

问题:日志写入 ES 的延迟叠加到业务请求的响应时间上。ES 不可用时切面抛异常,不影响业务@AfterReturning 中异常被吞,但日志丢失。

MQ 方案

Controller 返回前
  → @SystemOperationLog 切面
    → 构建 OperationLog 对象              ← 保留
    → rocketMQTemplate.sendOneWay(...)    ← 改为异步发送
         │
         ▼
  RocketMQ (one-way 模式,最快)
         │
         ▼
  Logger 消费者(批量消费)
    → ES BulkIndex 批量写入               ← 新增

收益

- 日志响应时间从"ES 写入延迟"降到"MQ 发送延迟"(one-way 模式下近乎零开销)

- ES 不可用时日志缓存在 MQ 中,恢复后回放——日志零丢失

- 批量消费 + ES BulkIndex 替代逐条 IndexRequest,ES 写入吞吐量提升 10 倍以上

3.4 大数据分析任务生命周期管理

当前代码路径

geoanalytics: BigDataAnalysisItemController 接收分析请求
  → CompletableFuture.supplyAsync()
    → BigDataAnalysis*RestService(HTTP POST ArcGIS GA Server)
      → 轮询 GA Server 任务状态(每 5 秒 GET job/{jobId})
        → 完成 → 结果写入 ES → 创建切片
        → 失败 → 标记 BigDataAnalysisItem.status = FAILED

问题

- CompletableFuture 在 JVM 本地,服务重启则任务状态丢失

- 轮询 GA Server 浪费网络带宽——大部分轮询返回"still running"

- 无失败重试——失败后需要人工重新提交

- 多实例环境下无法感知其他实例正在执行的任务

MQ 方案

Controller 提交分析请求
  → 写 DB(BigDataAnalysisItem.status = SUBMITTED)
  → 发送 MQ 消息 ga.task.submitted                       ← 新增
       │
       ▼
  消费者:提交任务到 GA Server
    → HTTP POST GA Server → 获取 jobId → 更新 DB(status = RUNNING, jobId)
    → 发送延时消息 ga.task.check:{jobId}(延时 30 秒)
       │
       ▼
  消费者:检查任务状态
    → GET GA Server /job/{jobId}
      ├─ 完成 → 结果写 ES → 更新 DB(status = COMPLETED) → 创建切片
      ├─ 失败 → 更新 DB(status = FAILED) → 发送告警
      └─ 运行中 → 再次发送延时消息 ga.task.check:{jobId}(延时 30 秒)← 替代轮询

收益

- 任务状态持久化在 DB + MQ,服务重启不丢任务

- 延时消息替代轮询——不需要定时 CRON 每 5 秒扫一遍所有 Running 任务,每个任务自己管理自己的检查节奏

- GA Server 的压力从"每 5 秒 × Running 任务数"降到"每 30 秒 × 仅当前批次",请求量骤降

3.5 行政审批流程解耦

当前代码路径

ServiceApplyController 提交申请
  → ServiceApplyService.save()
    → 查下一个审批节点的审批人(AuditProcessNode)
    → 写审批记录 → 返回
AuditProcessController 审批
  → AuditProcessService.approve()
    → 更新审批状态
    → 判断是否终审
      → 是:调用 ServiceItemsController 开通服务权限
      → 否:查下一个审批节点 → 更新待审批人

问题:审批流程硬编码在 Service 层,"开通服务权限"和"审批状态流转"耦合在一起。新增审批节点类型(如会签、转办)需要改代码。

MQ 方案——事务消息保证 DB 写 + MQ 发的原子性

审批人提交审批意见
  → @Transactional
    1. 写审批记录 DB(audit_status = APPROVED)
    2. 发送 RocketMQ 事务消息 audit.approved
  RocketMQ 事务消息机制:
    ┌─ 半消息(half message):先发到 MQ,消费者不可见
    │   ├─ 执行本地事务:写 DB
    │   │   ├─ DB 写成功 → Commit → 消费者可见
    │   │   └─ DB 写失败 → Rollback → 消费者不可见
    │   └─ 超时未收到 Commit/Rollback → MQ 回查本地事务状态
  消费者:
    audit.approved 消息到达
      → 判断审批流是否终审
        ├─ 是 → 发送 service.activated 消息 → Resources 开通 GIS 服务权限
        └─ 否 → 查下一个审批节点 → 发送 user.notify 消息 → Portal 站内信通知

收益

- 审批流各步骤解耦——新增审批节点类型只需要新增一个消费者,不碰原有代码

- 事务消息保证"审批记录写入 DB"和"触发下一节点"的原子性——DB 写失败则消息不会投递

- 超时自动处理:延时消息 audit.timeout:{auditId}(24 小时)→ 自动驳回或转上级

4. Topic 与 Consumer Group 设计

Topic

Tag

生产者

消费者

Consumer Group

消息模式

giscloud

service.change

Resources

Proxy, Monitor

proxy-cache-refresh, monitor-config

广播(每个实例都收)

giscloud

download-task

Resources

giscloud-job

job-download-executor

集群(同一个任务只一个实例执行)

giscloud

audit.approved || audit.timeout

Management

Management, Portal

audit-flow, audit-notify

集群

giscloud

ga.task.submitted || ga.task.check

geoanalytics

geoanalytics

ga-task-executor

集群

giscloud

log.operation

所有模块

giscloud-logger

log-writer

集群

giscloud

user.notify

Management

Portal

portal-notify

集群

设计要点

- 同一个 Topic giscloud,用 Tag 区分子业务——减少 Topic 数量,降低运维复杂度

- 广播模式用于"所有实例都必须知道的"事件(如服务变更:所有 Proxy 实例都要刷新缓存)

- 集群模式用于"只需要一个实例执行"的任务(如文件下载:不允许多实例同时生成同一个文件)

5. 与现有组件的共存策略

现有组件

MQ 引入后

关系

Quartz DataSynchronizationJob

保留,改为兜底

MQ 实时通知 + Quartz 定时兜底(防 MQ 消息丢失)

Quartz GeneratFilesJob

废除

MQ 任务队列完全替代定时扫描

Redis SOE 中间结果

保留

MQ 传递 key 引用,Redis 存大 JSON——不把大 JSON 放入 MQ 消息体

Feign 同步调用

逐步替换

通知类场景(缓存刷新、配置更新)用 MQ;查询类场景保留 Feign

CompletableFuture

替换 GA 任务调度

GA 任务生命周期改用 MQ 延时消息管理;简单并行计算保留 CompletableFuture

6. 容错设计

6.1 消息丢失防护

措施

说明

同步发送 + 确认

关键消息(审批、任务下发)使用 syncSend + 等待 SendResult

One-way 发送

日志类消息用 sendOneWay(允许少量丢失,优先吞吐)

Quartz 定时兜底

服务列表缓存等场景保留定时全量刷新,弥补 MQ 丢失

DB 状态机

分析任务和下载任务在 DB 中记录状态,MQ 消息丢失可通过定时扫描 PENDING/RUNNING 状态恢复

6.2 消息重复消费

RocketMQ 的 at least once 投递语义意味着消费者必须幂等:

场景

幂等方案

服务列表增量刷新

reloadSingle(serviceId) 覆盖写入,天然幂等

文件下载任务

DB 状态机:COMPLETED 状态的任务跳过执行

GA 任务检查

DB 状态机:COMPLETED/FAILED 的任务跳过检查

审批流转

DB 审批记录唯一约束:UNIQUE(audit_id, node_id) 防重复流转

6.3 RocketMQ 不可用降级

RocketMQ 故障
  ├─ 服务变更通知 → 降级到 Quartz 定时刷新(延迟回到分钟级,但功能不中断)
  ├─ 文件下载任务 → 降级到 GeneratFilesJob 定时扫描(保留为备用方案)
  ├─ 操作日志 → 降级到本地 Logback 文件(恢复后手动回放)
  ├─ GA 任务调度 → 降级到 CompletableFuture 本地轮询
  └─ 审批流程 → 降级到同步调用链(无事务消息保障,但功能可用)

每个场景都有一套降级路径——MQ 是加速和优化,不是唯一通道。这样即使 MQ 集群长时间不可用,核心业务也不中断。

8. 场景汇总

#

场景

当前方案

MQ 方案

收益

风险

1

服务变更通知

Quartz 定时轮询

MQ 实时事件 + Quartz 兜底

延迟从分钟→毫秒,DB 全量→增量

2

文件下载任务

Quartz 串行扫描

MQ 任务队列 + 重试

解耦、削峰、优先级

3

操作日志写入

切面同步写 ES

MQ 异步 + 批量写入

吞吐量 10x,日志零丢失

4

GA 分析任务调度

CompletableFuture + 轮询

MQ 延时消息 + DB 状态机

服务重启不丢任务,减少无效轮询

5

审批流程

Service 层同步调用链

MQ 事务消息 + 消费者拆分

审批节点解耦,超时自动处理

评论