Spring Boot 使用 Function 和异步线程池处理列表拆分任务

作者:old wang 发布时间: 2022-03-28 阅读量:4 评论数:0

在业务开发中,经常会遇到批量处理数据的场景。

例如:

  • 批量导入 Excel 数据;

  • 批量调用第三方接口;

  • 批量处理订单;

  • 批量同步用户数据;

  • 大列表分片计算;

  • 批量生成报表数据。

如果直接用单线程遍历整个列表,数据量较大时处理时间会比较长。

一种常见做法是:

将大列表拆分成多个小列表,每个小列表交给线程池异步处理,最后再汇总所有子任务结果。

本文记录一种基于 Spring Boot、ThreadPoolTaskExecutorCompletableFutureFunction<List<T>, ProcessResult> 的实现方式。

一、实现思路

整体流程如下:

1. 准备一个大列表
2. 按指定 chunkSize 拆分成多个子列表
3. 每个子列表交给 Function 处理
4. 使用异步线程池并发执行子任务
5. 每个子任务返回 ProcessResult
6. 主线程等待所有任务完成
7. 汇总所有子任务结果

这样可以把“数据拆分、异步调度、业务处理、结果汇总”拆开。

其中:

  • 列表拆分逻辑由 ListProcessor 负责;

  • 子任务处理逻辑由调用方通过 Function 传入;

  • 线程池由 Spring 管理;

  • 子任务结果由 ProcessResult 封装。

二、结果封装类 ProcessResult

先定义一个结果对象,用于保存每个子任务的处理结果。

public class ProcessResult {

    private int sum;

    private StringBuilder msg;

    public ProcessResult(int sum, StringBuilder msg) {
        this.sum = sum;
        this.msg = msg;
    }

    public int getSum() {
        return sum;
    }

    public StringBuilder getMsg() {
        return msg;
    }
}

这里示例中保存了两个字段:

  • sum:用于统计子任务的计算结果;

  • msg:用于保存子任务处理过程中的消息。

实际业务中可以根据需要调整。

例如可以改成:

private int successCount;
private int failCount;
private List<String> errorMessages;

也可以封装成更通用的业务结果对象。

三、配置独立线程池

为了避免和项目中其他异步任务互相影响,建议单独配置一个线程池。

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.concurrent.Executor;

@Configuration
public class ThreadPoolConfig {

    @Bean(name = "customThreadPool")
    public Executor customThreadPool() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();

        executor.setCorePoolSize(5);
        executor.setMaxPoolSize(10);
        executor.setQueueCapacity(25);
        executor.setThreadNamePrefix("custom-thread-");

        executor.initialize();

        return executor;
    }
}

这里配置了:

核心线程数:5
最大线程数:10
队列容量:25
线程名前缀:custom-thread-

实际项目中不建议把这些参数写死在代码中。

更推荐放到配置文件里,例如:

async:
  task:
    core-pool-size: 5
    max-pool-size: 10
    queue-capacity: 25
    thread-name-prefix: custom-thread-

线程池参数需要结合业务处理耗时、CPU、数据库连接池、第三方接口限流等因素综合调整。

四、列表处理器 ListProcessor

核心处理类负责三件事:

  1. 拆分列表;

  2. 提交异步任务;

  3. 收集任务结果。

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.function.Function;

@Component
public class ListProcessor<T> {

    @Autowired
    private ApplicationContext applicationContext;

    /**
     * 异步处理主方法。
     * 负责拆分列表、异步调用处理方法,并收集处理结果。
     */
    public List<ProcessResult> processList(List<T> list,
                                           int chunkSize,
                                           Function<List<T>, ProcessResult> function)
            throws InterruptedException, ExecutionException {

        List<List<T>> chunks = splitList(list, chunkSize);

        List<CompletableFuture<ProcessResult>> futures = new ArrayList<>();

        ListProcessor<T> self = applicationContext.getBean(this.getClass());

        for (List<T> chunk : chunks) {
            futures.add(self.processChunkAsync(chunk, function));
        }

        List<ProcessResult> results = new ArrayList<>();

        for (CompletableFuture<ProcessResult> future : futures) {
            results.add(future.get());
        }

        return results;
    }

    /**
     * 异步处理单个子列表。
     */
    @Async("customThreadPool")
    public CompletableFuture<ProcessResult> processChunkAsync(List<T> chunk,
                                                              Function<List<T>, ProcessResult> function) {
        return CompletableFuture.completedFuture(function.apply(chunk));
    }

    /**
     * 按指定大小拆分列表。
     */
    private List<List<T>> splitList(List<T> list, int chunkSize) {
        List<List<T>> chunks = new ArrayList<>();

        for (int i = 0; i < list.size(); i += chunkSize) {
            chunks.add(list.subList(i, Math.min(i + chunkSize, list.size())));
        }

        return chunks;
    }
}

这里有一个比较关键的写法:

ListProcessor<T> self = applicationContext.getBean(this.getClass());

因为 @Async 是基于 Spring AOP 代理实现的。

如果在同一个类中直接调用:

this.processChunkAsync(...)

不会经过 Spring 代理,@Async 不会生效。

所以这里通过 Spring 容器拿到当前 Bean 的代理对象,再调用异步方法。

五、为什么用 Function

Function<List<T>, ProcessResult> 的作用是把业务处理逻辑交给调用方。

列表处理器只关心:

  • 如何拆分;

  • 如何异步执行;

  • 如何收集结果。

它不关心每个子列表具体怎么处理。

例如:

Function<List<Integer>, ProcessResult> function = chunk -> {
    int sum = 0;
    StringBuilder msg = new StringBuilder();

    for (int num : chunk) {
        sum += num;
        msg.append(num).append(" ");
    }

    return new ProcessResult(sum, msg);
};

这样 ListProcessor 就可以复用于不同业务场景。

例如:

批量计算
批量入库
批量调用接口
批量校验数据
批量生成结果

调用方只需要传入不同的 Function 即可。

六、主程序测试示例

启用异步能力:

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableAsync;

@SpringBootApplication
@EnableAsync
public class MainApplication {

    public static void main(String[] args) {
        SpringApplication.run(MainApplication.class, args);
    }
}

测试逻辑可以写在 CommandLineRunner 中。

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;

import java.util.ArrayList;
import java.util.List;
import java.util.StringJoiner;
import java.util.function.Function;

@Component
public class TestRunner implements CommandLineRunner {

    @Autowired
    private ListProcessor<Integer> listProcessor;

    @Override
    public void run(String... args) throws Exception {
        List<Integer> list = new ArrayList<>();

        for (int i = 1; i <= 10; i++) {
            list.add(i);
        }

        int chunkSize = 3;

        Function<List<Integer>, ProcessResult> function = chunk -> {
            int sum = 0;
            StringBuilder msg = new StringBuilder();

            for (int num : chunk) {
                sum += num;
                msg.append(num).append(" ");
            }

            return new ProcessResult(sum, msg);
        };

        List<ProcessResult> results = listProcessor.processList(list, chunkSize, function);

        int totalSum = 0;
        StringJoiner joiner = new StringJoiner(", ");

        for (ProcessResult result : results) {
            totalSum += result.getSum();
            joiner.add(result.getMsg().toString());
        }

        System.out.println("sum = " + totalSum);
        System.out.println("msg = " + joiner);
    }
}

列表:

1,2,3,4,5,6,7,8,9,10

按照每 3 个元素拆分后:

[1, 2, 3]
[4, 5, 6]
[7, 8, 9]
[10]

每个子列表单独处理,最后汇总结果。

七、不需要返回值时的处理方式

有些场景不需要子任务返回结果。

例如:

  • 批量发送消息;

  • 批量写日志;

  • 批量更新状态;

  • 批量调用没有返回值的接口。

这种情况下,可以使用 Consumer<List<T>>CountDownLatch

public <T> void processListWithoutResult(List<T> list,
                                         int chunkSize,
                                         Consumer<List<T>> consumer)
        throws InterruptedException {

    if (list == null || list.isEmpty()) {
        return;
    }

    if (list.size() <= chunkSize) {
        consumer.accept(list);
        return;
    }

    List<List<T>> chunks = splitList(list, chunkSize);

    CountDownLatch latch = new CountDownLatch(chunks.size());

    ListProcessor<T> self = applicationContext.getBean(this.getClass());

    for (List<T> chunk : chunks) {
        self.processChunkAsync(chunk, consumer, latch);
    }

    latch.await();
}

异步执行方法:

@Async("customThreadPool")
public <T> void processChunkAsync(List<T> chunk,
                                  Consumer<List<T>> consumer,
                                  CountDownLatch latch) {
    try {
        consumer.accept(chunk);
    } finally {
        latch.countDown();
    }
}

这里必须把:

latch.countDown();

放在 finally 中。

否则某个子任务异常时,主线程可能一直阻塞在:

latch.await();

八、列表拆分方法

列表拆分是整个流程的基础。

private List<List<T>> splitList(List<T> list, int chunkSize) {
    List<List<T>> chunks = new ArrayList<>();

    for (int i = 0; i < list.size(); i += chunkSize) {
        chunks.add(list.subList(i, Math.min(i + chunkSize, list.size())));
    }

    return chunks;
}

这里使用:

Math.min(i + chunkSize, list.size())

是为了处理最后一个子列表不足 chunkSize 的情况。

例如列表长度是 10,chunkSize 是 3:

[1, 2, 3]
[4, 5, 6]
[7, 8, 9]
[10]

最后一个子列表只有 1 个元素。

九、需要注意的问题

1. chunkSize 要合理设置

chunkSize 太小,会产生大量异步任务,线程调度成本变高。

chunkSize 太大,每个任务处理时间变长,并发效果下降。

需要结合实际业务压测。

例如:

100
500
1000
2000

都可以作为初始测试值。

2. 线程池大小不要盲目调大

线程池大小需要结合任务类型判断。

如果任务主要是 CPU 计算,线程数不宜远大于 CPU 核数。

如果任务主要是 IO,例如数据库、网络接口,可以适当增加线程数,但也要考虑:

  • 数据库连接池大小;

  • 第三方接口限流;

  • JVM 内存;

  • 下游服务承载能力。

3. future.get() 会阻塞

在结果汇总阶段:

future.get()

会阻塞等待子任务完成。

如果子任务异常,get() 会抛出 ExecutionException

实际项目中建议对异常进行统一处理。

例如:

try {
    results.add(future.get());
} catch (ExecutionException e) {
    // 记录失败任务
}

4. subList 是原列表视图

list.subList() 返回的是原列表的一段视图,不是完全独立的新列表。

如果后续原列表会被修改,可能引发问题。

更稳妥的写法是复制一份:

chunks.add(new ArrayList<>(list.subList(i, Math.min(i + chunkSize, list.size()))));

如果只是只读处理,直接使用 subList() 问题不大。

5. @Async 同类方法调用不会生效

如果在同一个类里直接调用异步方法:

this.processChunkAsync(...)

不会触发异步。

需要通过 Spring 代理对象调用。

本文示例中使用:

applicationContext.getBean(this.getClass())

来获取当前 Bean 代理对象。

也可以把异步方法拆到另一个 Bean 中,这样结构更清晰。

十、适用场景

这种方式适合:

  • 大列表拆分处理;

  • 子任务之间互不依赖;

  • 每个子任务可以独立执行;

  • 最终需要汇总处理结果;

  • 业务逻辑希望通过函数式接口传入。

例如:

批量校验 Excel 数据
批量插入数据库
批量调用远程接口
批量生成统计结果
批量处理订单状态

十一、不适合的场景

这种方式不适合:

  1. 子任务之间有强依赖关系;

  2. 所有子任务必须在同一个数据库事务中完成;

  3. 任务执行顺序必须严格保证;

  4. 子任务数量巨大但没有限流控制;

  5. 下游系统无法承受并发压力。

特别是数据库操作场景,需要额外注意事务边界。

如果多个子任务分别在线程池中执行,它们通常不在同一个事务中。

如果业务要求“全部成功或全部失败”,就不能简单用这种方式拆分并发执行。

十二、可以改进的地方

1. 使用 allOf 等待所有任务

除了逐个 future.get(),也可以使用:

CompletableFuture.allOf(...)

例如:

CompletableFuture<Void> allFuture = CompletableFuture.allOf(
        futures.toArray(new CompletableFuture[0])
);

allFuture.join();

List<ProcessResult> results = futures.stream()
        .map(CompletableFuture::join)
        .collect(Collectors.toList());

这种方式更适合统一等待所有任务完成。

2. 增加异常收集

可以定义一个更完整的结果对象,例如:

public class ProcessResult {

    private int successCount;

    private int failCount;

    private List<String> errorMessages;
}

这样更适合批量任务处理。

3. 支持超时控制

如果某些子任务可能执行很久,可以增加超时控制,避免主线程无限等待。

例如使用:

future.get(30, TimeUnit.SECONDS)

或者在 CompletableFuture 链路中配置超时策略。

结论

本文记录了一种在 Spring Boot 中处理大列表拆分任务的方式。

核心思路是:

  1. 使用 splitList 将大列表拆成多个小列表;

  2. 使用 Function<List<T>, ProcessResult> 抽象子任务处理逻辑;

  3. 使用 @Async 和独立线程池并发执行子任务;

  4. 使用 CompletableFuture 获取子任务结果;

  5. 主线程汇总所有子任务结果。

这种方式的优点是:

  • 拆分逻辑和业务逻辑分离;

  • 子任务处理逻辑可以通过 Function 灵活传入;

  • 可以并发处理大列表;

  • 结果可以统一汇总;

  • 代码结构比较清晰。

当一个大列表可以被拆分成多个互不依赖的小任务时,可以使用 Function + @Async + CompletableFuture 实现并发处理和结果汇总;但需要注意线程池大小、异常处理、事务边界和下游系统承载能力。

评论