Spring Boot 批量插入大量数据

作者:old wang 发布时间: 2022-04-04 阅读量:10 评论数:0

Spring Boot 使用 ThreadPoolTaskExecutor 批量插入大量数据

在业务系统中,经常会遇到批量导入数据的场景。

例如:

  • Excel 批量导入;

  • 日志数据入库;

  • 第三方数据同步;

  • 历史数据迁移;

  • 大批量初始化数据。

如果数据量只有几千条,单线程批量插入通常就够了。

但如果数据量达到几十万、上百万级,单线程处理耗时会明显增加。这个时候,可以考虑使用线程池对数据进行分片,然后并发批量插入。

记录一种基于 Spring Boot、MyBatis-Plus 和 ThreadPoolTaskExecutor 的批量插入实现方式。

一、实现思路

整体思路比较简单:

1. 准备待插入的数据集合
2. 按固定大小切分成多个子集合
3. 每个子集合交给一个异步任务处理
4. 使用 ThreadPoolTaskExecutor 执行异步任务
5. 使用 CountDownLatch 等待所有任务执行完成
6. 最后返回处理结果

例如有 200 万条数据,可以按每 100 条一组进行切分。

每一组数据通过一个异步任务执行批量插入。

主线程负责提交任务,并等待所有任务执行完成。

二、线程池配置

先在配置文件中添加线程池参数。

# 异步线程配置

# 核心线程数
async.executor.thread.core_pool_size=30

# 最大线程数
async.executor.thread.max_pool_size=30

# 队列大小
async.executor.thread.queue_capacity=99988

# 线程名称前缀
async.executor.thread.name.prefix=async-importDB-

这里配置了:

  • 核心线程数:30;

  • 最大线程数:30;

  • 队列容量:99988;

  • 线程名前缀:async-importDB-

实际项目中,这些参数不要直接照抄,需要结合机器配置、数据库连接池大小、数据库承载能力和单批数据大小进行调整。

三、注册 ThreadPoolTaskExecutor

接下来创建线程池配置类。

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;

@Configuration
@EnableAsync
@Slf4j
public class ExecutorConfig {

    @Value("${async.executor.thread.core_pool_size}")
    private int corePoolSize;

    @Value("${async.executor.thread.max_pool_size}")
    private int maxPoolSize;

    @Value("${async.executor.thread.queue_capacity}")
    private int queueCapacity;

    @Value("${async.executor.thread.name.prefix}")
    private String namePrefix;

    @Bean(name = "asyncServiceExecutor")
    public Executor asyncServiceExecutor() {
        log.info("init asyncServiceExecutor");

        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();

        executor.setCorePoolSize(corePoolSize);
        executor.setMaxPoolSize(maxPoolSize);
        executor.setQueueCapacity(queueCapacity);
        executor.setThreadNamePrefix(namePrefix);

        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());

        executor.initialize();

        return executor;
    }
}

这里使用了:

@EnableAsync

开启 Spring 异步能力。

然后通过:

@Bean(name = "asyncServiceExecutor")

注册一个指定名称的线程池。

后续业务方法可以通过:

@Async("asyncServiceExecutor")

指定使用这个线程池执行异步任务。

四、创建异步插入任务

定义异步任务方法,用来执行单个分片的数据插入。

import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;

import java.util.List;
import java.util.concurrent.CountDownLatch;

@Service
@Slf4j
public class AsyncServiceImpl implements AsyncService {

    @Override
    @Async("asyncServiceExecutor")
    public void executeAsync(List<LogOutputResult> logOutputResults,
                             LogOutputResultMapper logOutputResultMapper,
                             CountDownLatch countDownLatch) {
        try {
            log.info("start executeAsync, size={}", logOutputResults.size());

            logOutputResultMapper.addLogOutputResultBatch(logOutputResults);

            log.info("end executeAsync, size={}", logOutputResults.size());
        } finally {
            countDownLatch.countDown();
        }
    }
}

这里有一个关键点:

finally {
    countDownLatch.countDown();
}

countDown() 必须放在 finally 中。

否则某个异步任务执行异常后,如果没有调用 countDown(),主线程会一直阻塞在:

countDownLatch.await();

最终导致接口或任务无法结束。

五、批量插入主流程

主流程中先准备数据,然后切分集合,再提交异步任务。

@Override
public int testMultiThread() {
    List<LogOutputResult> logOutputResults = getTestData();

    // 每 100 条数据切分为一组
    List<List<LogOutputResult>> lists = ConvertHandler.splitList(logOutputResults, 100);

    CountDownLatch countDownLatch = new CountDownLatch(lists.size());

    for (List<LogOutputResult> listSub : lists) {
        asyncService.executeAsync(listSub, logOutputResultMapper, countDownLatch);
    }

    try {
        countDownLatch.await();
    } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
        log.error("批量插入等待中断", e);
    }

    return logOutputResults.size();
}

这里的关键步骤是:

List<List<LogOutputResult>> lists = ConvertHandler.splitList(logOutputResults, 100);

将大集合切成多个小集合。

然后每个小集合提交给异步线程池:

asyncService.executeAsync(listSub, logOutputResultMapper, countDownLatch);

主线程最后通过:

countDownLatch.await();

等待所有异步任务执行完成。

六、集合切分示例

如果项目中没有现成的集合切分工具,可以自己写一个简单方法。

import java.util.ArrayList;
import java.util.List;

public class ConvertHandler {

    public static <T> List<List<T>> splitList(List<T> source, int batchSize) {
        List<List<T>> result = new ArrayList<>();

        if (source == null || source.isEmpty()) {
            return result;
        }

        if (batchSize <= 0) {
            throw new IllegalArgumentException("batchSize must be greater than 0");
        }

        int size = source.size();

        for (int i = 0; i < size; i += batchSize) {
            int end = Math.min(i + batchSize, size);
            result.add(source.subList(i, end));
        }

        return result;
    }
}

比如总共有 2000003 条数据,每 100 条一组,大约会切成 20001 个任务。

七、Mapper 批量插入示例

Mapper 层可以使用 MyBatis 的 foreach 批量插入。

示例:

<insert id="addLogOutputResultBatch">
    INSERT INTO log_output_result (
        id,
        content,
        create_time
    )
    VALUES
    <foreach collection="list" item="item" separator=",">
        (
            #{item.id},
            #{item.content},
            #{item.createTime}
        )
    </foreach>
</insert>

对应 Mapper 接口:

import org.apache.ibatis.annotations.Param;

import java.util.List;

public interface LogOutputResultMapper {

    int addLogOutputResultBatch(@Param("list") List<LogOutputResult> list);
}

实际项目中,字段需要按自己的表结构调整。

八、测试结果记录

测试数据量:

2000003 条

测试方式:

每 100 条数据为一批
使用 30 个线程并发插入

测试结果:

插入方式

数据量

耗时

单线程批量插入

2000003 条

约 5.75 分钟

多线程批量插入

2000003 条

约 1.67 分钟

从这组测试结果看,多线程批量插入相比单线程有明显提升。

不过这个结果只代表当时测试环境下的表现。

实际性能会受到很多因素影响,例如:

  • 机器 CPU;

  • 内存;

  • 数据库连接池大小;

  • 数据库服务器性能;

  • 网络延迟;

  • 表索引数量;

  • 单批次数据大小;

  • 是否开启事务;

  • 数据库写入压力。

因此,线程数和批次大小不能固定照搬,需要结合具体环境压测。

九、数据正确性检查

批量并发插入后,需要检查两个问题:

1. 是否重复插入

可以按主键或业务唯一键分组检查。

示例:

SELECT id, COUNT(*)
FROM log_output_result
GROUP BY id
HAVING COUNT(*) > 1;

如果查询结果为空,说明没有发现重复主键数据。

如果使用的是业务唯一键,也可以按业务唯一键检查。

2. 数据是否完整

可以检查总数量:

SELECT COUNT(*)
FROM log_output_result;

确认入库数量是否和原始数据量一致。

例如原始数据是:

2000003 条

那么最终数据库中的数量也应该是:

2000003 条

十、线程数不是越多越好

多线程确实可以提升插入效率,但线程数不是越多越好。

线程数过多可能导致:

  • CPU 上下文切换增加;

  • 数据库连接不够用;

  • 数据库写入压力过大;

  • 锁竞争变严重;

  • 事务提交压力变大;

  • 应用内存占用上升。

有一种常见估算方式:

CPU 核心数 * 2 + 2

但这只能作为初始参考。

对于批量入库这种场景,更重要的是结合数据库连接池和数据库写入能力调整。

例如:

  • 如果数据库连接池最大连接数只有 20,线程池开 100 个线程意义不大;

  • 如果单批数据太小,任务数量过多,线程调度成本会上升;

  • 如果单批数据太大,单条 SQL 过长,也可能影响数据库执行效率。

比较稳妥的做法是从较小线程数开始压测,例如:

5
10
20
30

同时观察:

  • 应用 CPU;

  • JVM 内存;

  • 数据库连接数;

  • 数据库 CPU;

  • 慢 SQL;

  • 插入耗时。

最终选择一个整体吞吐和稳定性都比较好的配置。

十一、需要注意的几个问题

1. 注意数据库连接池大小

线程池并发执行插入任务时,每个任务都可能占用数据库连接。

如果线程池线程数大于数据库连接池最大连接数,任务可能会阻塞等待连接。

所以线程池大小要和数据库连接池配置一起看。

2. 注意事务边界

如果每个异步任务单独执行批量插入,那么每个任务通常是独立事务。

如果希望所有数据要么全部成功,要么全部失败,这种多线程拆分方式就不适合直接使用。

因为跨多个线程统一控制一个大事务比较复杂,也不推荐这么做。

这类场景更适合:

  • 先落临时表;

  • 再做校验;

  • 最后通过单事务合并;

  • 或者设计补偿和重试机制。

3. 注意异常收集

示例中 CountDownLatch 只负责等待任务完成,不负责收集异常。

如果异步任务中插入失败,主线程不一定能直接感知到具体错误。

可以增加一个线程安全集合记录异常。

示例:

List<Throwable> errors = Collections.synchronizedList(new ArrayList<>());

异步任务中捕获异常后写入:

try {
    logOutputResultMapper.addLogOutputResultBatch(logOutputResults);
} catch (Exception e) {
    errors.add(e);
    log.error("批量插入失败", e);
} finally {
    countDownLatch.countDown();
}

主线程等待结束后判断:

if (!errors.isEmpty()) {
    throw new RuntimeException("部分批量任务执行失败");
}

4. 注意批次大小

示例中每 100 条一批。

这个值不是固定答案。

批次太小,任务数量太多,线程调度和 SQL 执行次数会增加。

批次太大,单条 SQL 过长,可能导致数据库压力过大。

需要结合实际环境调整,例如:

100
500
1000
2000

逐步压测。

5. 注意幂等性

批量插入过程中,如果某些任务成功、某些任务失败,再次重试时可能产生重复数据。

因此批量导入场景最好有幂等设计,例如:

  • 使用业务唯一键;

  • 使用唯一索引;

  • 插入前清理批次数据;

  • 使用导入批次号;

  • 失败后可按批次回滚或重试。

十二、完整流程回顾

整个流程可以概括为:

1. 配置 ThreadPoolTaskExecutor
2. 使用 @Async 指定线程池
3. 准备待插入数据
4. 将大集合按 batchSize 切分
5. 每个分片提交一个异步任务
6. 每个异步任务执行批量插入
7. CountDownLatch 等待所有任务完成
8. 检查数据总量和重复数据
9. 根据压测结果调整线程数和批次大小

结论

使用 ThreadPoolTaskExecutor 可以提升大量数据批量插入的处理效率。

在测试中,2000003 条数据:

  • 单线程耗时约 4.34 分钟;

  • 30 个线程并发插入耗时约 1.1 分钟。

不过,多线程批量插入不是简单地把线程数调大。

真正需要关注的是:

  • 线程池大小;

  • 数据库连接池大小;

  • 单批次数据量;

  • 数据库写入能力;

  • 异常处理;

  • 事务边界;

  • 数据幂等性;

  • 压测结果。

评论