← 返回 JUC 列表

Spring Boot / Spring Cloud 中的线程池实战场景

Spring Boot / Spring Cloud 中的线程池实战场景

本文聚焦线程池在 Spring Boot / Spring Cloud 微服务开发中的真实业务场景,每个场景包含业务背景、技术背景、实现方案、代码示例和方案评价。


场景一:@Async 注解异步执行

1.1 业务背景

微服务中有大量非核心、可异步执行的任务:

  • 用户注册成功后发邮件/短信通知
  • 操作审计日志记录
  • 消息推送(App Push / WebSocket)
  • 生成报表导出 Excel

这些任务如果同步执行,响应时间被拉长,影响用户体验。

1.2 技术背景

Spring 3.0 提供了 @Async 注解,但默认使用 SimpleAsyncTaskExecutor——它为每个任务都创建一个新线程,不重用线程,生产环境会 OOM。因此必须配置自定义线程池。

1.3 实现思路

请求进来 → Controller → Service(同步执行业务逻辑)
                         ↓
                   @Async 异步方法
                         ↓
                   ThreadPoolExecutor 处理
                         ↓
                   主线程直接返回,不阻塞

1.4 代码示例

第一步:配置线程池

java
@Configuration
@EnableAsync // 开启异步支持
public class AsyncConfig implements AsyncConfigurer {

    @Override
    public Executor getAsyncExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(5);
        executor.setMaxPoolSize(10);
        executor.setQueueCapacity(200);
        executor.setKeepAliveSeconds(60);
        executor.setThreadNamePrefix("async-task-");
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        executor.setWaitForTasksToCompleteOnShutdown(true);
        executor.setAwaitTerminationSeconds(30);
        executor.initialize();
        return executor;
    }

    @Override
    public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
        return (ex, method, params) -> {
            // 异步方法无返回值时的异常处理(有返回值通过 Future 捕获)
            log.error("异步方法执行异常: {}#{}, 参数: {}",
                method.getDeclaringClass().getName(), method.getName(), params, ex);
        };
    }
}

第二步:使用 @Async

java
@Service
@Slf4j
public class UserService {

    // 同步注册
    public User register(UserRegisterReq req) {
        // 1. 写入数据库(同步)
        User user = userMapper.insert(req.toUser());
        // 2. 发邮件/短信(异步,不阻塞)
        sendNotificationAsync(user);
        return user;
    }

    @Async // 使用配置的线程池异步执行
    public void sendNotificationAsync(User user) {
        mailService.sendWelcomeMail(user.getEmail());
        smsService.sendRegisterSms(user.getPhone());
        log.info("异步通知发送完成: userId={}", user.getId());
    }

    @Async
    public Future<String> generateReportAsync(Long reportId) {
        // 有返回值的异步任务
        String reportUrl = reportService.generate(reportId);
        return new AsyncResult<>(reportUrl);
    }
}

第三步:调用方获取异步结果

java
// Controller
@PostMapping("/report")
public ResponseEntity<?> createReport(@RequestBody ReportReq req) {
    Future<String> future = userService.generateReportAsync(req.getReportId());
    // 立即返回,不阻塞
    return ResponseEntity.accepted().body("报表正在生成中");
}

// 或异步回调
future.thenAccept(url -> log.info("报表生成完成: {}", url));

1.5 优缺点

维度 说明
降低接口延迟 耗时操作异步化,主线程快速返回
解耦业务 核心逻辑与非核心逻辑分离
Spring 原生支持 注解方式,无侵入,使用简单
事务隔离 异步方法的事务与调用方不在同一个事务上下文
上下文丢失 RequestContextHolder、SecurityContext 默认不传递
异常处理麻烦 无返回值异步方法的异常不会抛到调用方
线程池配置被忽略 很多人忘记自定义线程池,使用默认的 SimpleAsyncTaskExecutor 导致 OOM

解决上下文丢失:

java
// 自定义 TaskDecorator 传递上下文
public class ContextCopyingDecorator implements TaskDecorator {
    @Override
    public Runnable decorate(Runnable runnable) {
        // 获取主线程上下文
        RequestAttributes requestAttributes = RequestContextHolder.currentRequestAttributes();
        Map<String, String> contextMap = TraceContext.traceId();
        return () -> {
            try {
                // 传递给异步线程
                RequestContextHolder.setRequestAttributes(requestAttributes);
                TraceContext.setTraceId(contextMap);
                runnable.run();
            } finally {
                RequestContextHolder.resetRequestAttributes();
                TraceContext.clear();
            }
        };
    }
}

// 配置时设置装饰器
executor.setTaskDecorator(new ContextCopyingDecorator());

场景二:CompletableFuture 并行查询微服务

2.1 业务背景

微服务架构中,前端一个页面往往需要聚合多个下游服务的数据:

商品详情页需要:
  ├── 商品基本信息  → 调用商品服务(product-service)
  ├── 实时库存      → 调用库存服务(stock-service)
  ├── 价格信息      → 调用价格服务(price-service)
  ├── 商家信息      → 调用商家服务(shop-service)
  └── 用户评价      → 调用评价服务(review-service)

如果串行调用,总耗时 = 所有服务耗时之和(可能 5 × 200ms = 1s+)。
如果并行调用,总耗时 ≈ 最慢服务的耗时(可能 200ms)。

2.2 技术背景

Spring Cloud 微服务调用通常是网络 IO 密集型(HTTP/RPC),等待 IO 时 CPU 空闲。使用 CompletableFuture + 自定义线程池可以将串行等待改为并行等待,大幅降低接口响应时间。

2.3 实现思路

请求到达
    │
    ├──→ CompletableFuture.supplyAsync(() → 商品服务, threadPool)
    ├──→ CompletableFuture.supplyAsync(() → 库存服务, threadPool)
    ├──→ CompletableFuture.supplyAsync(() → 价格服务, threadPool)
    ├──→ CompletableFuture.supplyAsync(() → 商家服务, threadPool)
    └──→ CompletableFuture.supplyAsync(() → 评价服务, threadPool)
                              │
                              ▼
                    allOf(...) 等待所有完成
                              │
                              ▼
                    thenApply 合并结果 → 返回 VO

2.4 代码示例

java
@Service
@Slf4j
public class ProductAggService {

    // IO 密集型,线程数适当放大
    private final ThreadPoolExecutor ioPool = new ThreadPoolExecutor(
        20, 40, 60L, TimeUnit.SECONDS,
        new LinkedBlockingQueue<>(500),
        r -> {
            Thread t = new Thread(r);
            t.setName("parallel-io-" + t.getId());
            return t;
        },
        new ThreadPoolExecutor.CallerRunsPolicy()
    );

    @Autowired private ProductClient productClient;
    @Autowired private StockClient stockClient;
    @Autowired private PriceClient priceClient;
    @Autowired private ShopClient shopClient;
    @Autowired private ReviewClient reviewClient;

    /**
     * 并行查询商品详情(5 个下游服务并行调用)
     */
    public ProductDetailVO getProductDetail(Long productId) {
        long start = System.currentTimeMillis();

        // 1. 并行发起 5 个异步调用
        CompletableFuture<ProductInfo> infoFuture =
            CompletableFuture.supplyAsync(() -> productClient.getInfo(productId), ioPool);

        CompletableFuture<Integer> stockFuture =
            CompletableFuture.supplyAsync(() -> stockClient.getStock(productId), ioPool);

        CompletableFuture<PriceVO> priceFuture =
            CompletableFuture.supplyAsync(() -> priceClient.getPrice(productId), ioPool);

        CompletableFuture<ShopVO> shopFuture =
            CompletableFuture.supplyAsync(() -> shopClient.getShop(productId), ioPool);

        CompletableFuture<List<ReviewVO>> reviewFuture =
            CompletableFuture.supplyAsync(() -> reviewClient.getReviews(productId), ioPool);

        // 2. 等待全部完成,合并结果
        ProductDetailVO result = CompletableFuture
            .allOf(infoFuture, stockFuture, priceFuture, shopFuture, reviewFuture)
            .thenApply(v -> {
                ProductDetailVO vo = new ProductDetailVO();
                vo.setProductInfo(infoFuture.join());
                vo.setStock(stockFuture.join());
                vo.setPrice(priceFuture.join());
                vo.setShop(shopFuture.join());
                vo.setReviews(reviewFuture.join());
                return vo;
            })
            .exceptionally(ex -> {
                log.error("查询商品详情失败: productId={}", productId, ex);
                throw new BizException("商品信息获取异常");
            })
            .join(); // 这里等待是为了同步返回给 Controller

        log.info("商品详情查询完成, productId={}, 耗时={}ms",
            productId, System.currentTimeMillis() - start);
        return result;
    }
}

Controller 层:

java
@RestController
@RequestMapping("/product")
public class ProductController {

    @Autowired private ProductAggService productAggService;

    @GetMapping("/detail/{id}")
    public Result<ProductDetailVO> detail(@PathVariable Long id) {
        // 同步等待(但内部是并行的,已在 AggService 中 join)
        ProductDetailVO vo = productAggService.getProductDetail(id);
        return Result.success(vo);
    }
}

2.5 优缺点

维度 说明
大幅降低响应时间 串行 1s → 并行 200ms,提升 5 倍
CPU 利用率提升 IO 等待时 CPU 处理其他任务
故障隔离 某个服务异常只影响它的 Future,可单独降级
线程池资源消耗 并发请求高时线程数暴增
异常处理复杂 一个服务异常需要降级策略
内存开销 大量 Future 对象占用堆内存

常见降级策略:

java
// 单个服务异常 + 降级
CompletableFuture.supplyAsync(() -> stockClient.getStock(productId), ioPool)
    .exceptionally(ex -> {
        log.warn("库存查询失败,使用默认值", ex);
        return 0; // 降级返回 0
    });

场景三:多业务线程池隔离

3.1 业务背景

微服务中不同业务类型对线程池的需求不同:

业务 类型 特点 风险
订单处理 IO 密集型 调用支付、物流等多个服务 慢,容易积压
数据计算 CPU 密集型 处理大量数据计算 CPU 高,卡住其他任务
异步通知推送 IO 密集型 发给大量用户 任务量大
日志上报 可丢弃 非核心,丢了也没事 量大但不重要

如果所有业务共享一个线程池:

  • 计算任务占满线程 → 订单处理排队超时
  • 通知推送队列堆积 → 日志上报被阻塞
  • 一个业务出问题 → 拖垮整个应用

3.2 技术背景

线程池隔离是资源隔离的一种实现方式。不同业务使用不同的线程池,一个线程池出问题不会影响其他业务。这是 Hystrix 舱壁模式(Bulkhead)的核心思想。

3.3 实现思路

应用
 ├── 订单线程池(core=10, max=20, 有界队列 200)
 │    └── 订单处理、支付回调
 │
 ├── 计算线程池(core=CPU+1, max=CPU+1, 队列 100)
 │    └── 报表生成、数据聚合
 │
 ├── 推送线程池(core=5, max=10, 队列 1000)
 │    └── 消息推送、通知发送
 │
 └── 日志线程池(core=2, max=5, 队列 10000, DiscardPolicy)
      └── 操作日志、审计记录

3.4 代码示例

java
@Configuration
public class ThreadPoolConfig {

    // ========== 1. 订单业务线程池(IO 密集型) ==========
    @Bean("orderExecutor")
    public ThreadPoolExecutor orderExecutor() {
        return new ThreadPoolExecutor(
            10, 20, 60L, TimeUnit.SECONDS,
            new LinkedBlockingQueue<>(500),
            threadFactory("order-pool-"),
            new ThreadPoolExecutor.CallerRunsPolicy()
        );
    }

    // ========== 2. 计算业务线程池(CPU 密集型) ==========
    @Bean("computeExecutor")
    public ThreadPoolExecutor computeExecutor() {
        int cores = Runtime.getRuntime().availableProcessors();
        return new ThreadPoolExecutor(
            cores + 1, cores + 1, 0L, TimeUnit.SECONDS,
            new LinkedBlockingQueue<>(100),
            threadFactory("compute-pool-"),
            new ThreadPoolExecutor.AbortPolicy()
        );
    }

    // ========== 3. 推送业务线程池 ==========
    @Bean("pushExecutor")
    public ThreadPoolExecutor pushExecutor() {
        return new ThreadPoolExecutor(
            5, 10, 60L, TimeUnit.SECONDS,
            new LinkedBlockingQueue<>(1000),
            threadFactory("push-pool-"),
            new ThreadPoolExecutor.CallerRunsPolicy()
        );
    }

    // ========== 4. 日志线程池(可丢弃) ==========
    @Bean("logExecutor")
    public ThreadPoolExecutor logExecutor() {
        return new ThreadPoolExecutor(
            2, 5, 30L, TimeUnit.SECONDS,
            new LinkedBlockingQueue<>(10000),
            threadFactory("log-pool-"),
            new ThreadPoolExecutor.DiscardPolicy() // 日志可丢弃
        );
    }

    private ThreadFactory threadFactory(String prefix) {
        return new ThreadFactory() {
            private final AtomicInteger counter = new AtomicInteger(1);
            @Override
            public Thread newThread(Runnable r) {
                Thread t = new Thread(r);
                t.setName(prefix + counter.getAndIncrement());
                t.setDaemon(false);
                return t;
            }
        };
    }
}

业务中使用:

java
@Service
public class OrderService {

    @Autowired @Qualifier("orderExecutor")
    private ThreadPoolExecutor orderExecutor;

    @Autowired @Qualifier("pushExecutor")
    private ThreadPoolExecutor pushExecutor;

    @Autowired @Qualifier("logExecutor")
    private ThreadPoolExecutor logExecutor;

    // 订单处理
    public void processOrder(Order order) {
        orderExecutor.execute(() -> {
            // 调用支付、物流等 IO 服务
            paymentService.pay(order);
            logisticsService.ship(order);
        });

        // 订单完成后推送通知
        pushExecutor.execute(() -> {
            pushService.notifyUser(order.getUserId(), "订单已处理");
        });
    }

    // 操作日志(可丢弃)
    public void recordLog(LogRecord log) {
        logExecutor.execute(() -> {
            logMapper.insert(log);
        });
    }
}

3.5 优缺点

维度 说明
故障隔离 一个业务线程池阻塞不影响其他业务
精准调优 每个线程池可按业务特性独立调参
可观测性 每个线程池独立监控,快速定位问题
资源浪费 线程池过多增加线程上下文切换
配置复杂 需要维护多个线程池参数
内存开销 每个线程池占用独立内存和线程资源

场景四:定时任务线程池隔离

4.1 业务背景

Spring Boot 应用中常用 @Scheduled 执行定时任务:

java
@Scheduled(cron = "0 0/1 * * * ?")  // 每分钟执行
public void checkHealth() { ... }

@Scheduled(cron = "0 0 2 * * ?")    // 每天凌晨 2 点
public void dailyReport() { ... }

@Scheduled(fixedRate = 5000)       // 每 5 秒
public void refreshCache() { ... }

4.2 技术背景

Spring @Scheduled 默认使用单线程池ScheduledThreadPoolExecutor corePoolSize = 1)。多个定时任务使用同一个线程,前一个任务没执行完,后一个任务就得等着。如果某个定时任务阻塞(如网络超时、死锁),所有定时任务都会被拖死。

4.3 实现思路

配置专用的 TaskScheduler,使用多线程的 ScheduledThreadPoolExecutor,并为不同优先级的任务分配不同的线程池。

4.4 代码示例

java
@Configuration
public class SchedulerConfig {

    // ========== 核心定时任务线程池 ==========
    @Bean("coreScheduler")
    public TaskScheduler coreScheduler() {
        ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
        scheduler.setPoolSize(5);
        scheduler.setThreadNamePrefix("scheduler-core-");
        scheduler.setWaitForTasksToCompleteOnShutdown(true);
        scheduler.setAwaitTerminationSeconds(30);
        scheduler.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
        return scheduler;
    }

    // ========== 报表定时任务线程池(CPU 密集型) ==========
    @Bean("reportScheduler")
    public TaskScheduler reportScheduler() {
        ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
        scheduler.setPoolSize(2);
        scheduler.setThreadNamePrefix("scheduler-report-");
        scheduler.setWaitForTasksToCompleteOnShutdown(true);
        scheduler.setAwaitTerminationSeconds(60);
        return scheduler;
    }
}

使用不同线程池:

java
@Component
public class ScheduleService {

    // ===== 核心定时任务(核心线程池) =====

    @Scheduled(fixedRate = 5000)
    public void refreshCache() {
        // 每 5 秒刷新缓存,使用默认调度器
        cacheService.evictAll();
    }

    @Scheduled(cron = "0/30 * * * * ?")
    public void healthCheck() {
        // 每 30 秒健康检查
        healthService.checkAll();
    }

    // ===== 报表定时任务(独立线程池) =====

    @Scheduled(cron = "0 0 2 * * ?")
    public void dailyReport() {
        // 每天凌晨 2 点生成日报
        reportService.generateDaily();
    }
}

通过 SchedulingConfigurer 指定不同调度器:

java
@Configuration
public class SchedulingConfig implements SchedulingConfigurer {

    @Autowired @Qualifier("coreScheduler")
    private TaskScheduler coreScheduler;

    @Override
    public void configureTasks(ScheduledTaskRegistrar taskRegistrar) {
        // 设置默认调度器
        taskRegistrar.setTaskScheduler(coreScheduler);
    }
}

4.5 优缺点

维度 说明
避免单线程阻塞 多线程调度,一个任务卡住不影响其他任务
任务隔离 重任务(报表)和轻任务(心跳)用不同线程池
关闭安全 waitForTasksToCompleteOnShutdown 确保关闭前完成正在执行的任务
配置增加 需要额外的 @Bean 定义
线程占用 定时任务线程池空闲时也占用线程资源

场景五:线程池监控与动态调参

5.1 业务背景

生产环境中线程池是黑盒——出了问题才会被发现(接口超时、拒绝异常)。需要实时了解线程池的运行状态:

想实时知道:
  - 当前活跃线程数
  - 队列积压了多少任务
  - 已拒绝了多少任务
  - 线程池是否快满了
  - 历史任务执行耗时分布

5.2 技术背景

Spring Boot Actuator 提供了 ThreadPoolExecutor 的监控端点,配合 Micrometer 可以将指标推送到 Prometheus + Grafana。同时可以结合 Apollo/Nacos 配置中心实现动态调参

5.3 实现方案

方案 A:ThreadPoolExecutor 包装 + Actuator 导出

java
@Configuration
public class ThreadPoolMonitorConfig {

    @Bean("monitoredExecutor")
    public ThreadPoolExecutor monitoredExecutor() {
        return new ThreadPoolExecutor(
            10, 20, 60L, TimeUnit.SECONDS,
            new LinkedBlockingQueue<>(500),
            threadFactory("monitor-pool-"),
            new ThreadPoolExecutor.AbortPolicy()
        ) {
            @Override
            protected void beforeExecute(Thread t, Runnable r) {
                super.beforeExecute(t, r);
                // 记录开始时间
                MDC.put("taskStart", String.valueOf(System.currentTimeMillis()));
            }

            @Override
            protected void afterExecute(Runnable r, Throwable t) {
                super.afterExecute(r, t);
                // 计算任务耗时
                String startStr = MDC.get("taskStart");
                if (startStr != null) {
                    long cost = System.currentTimeMillis() - Long.parseLong(startStr);
                    // 记录耗时到指标(Prometheus / Micrometer)
                    Metrics.timer("threadpool.task.cost",
                        "pool", "business"
                    ).record(Duration.ofMillis(cost));
                    MDC.remove("taskStart");
                }
                // 记录异常
                if (t != null) {
                    Metrics.counter("threadpool.task.error",
                        "pool", "business"
                    ).increment();
                }
            }
        };
    }
}

方案 B:Spring Boot Actuator 监控端点

yaml
# application.yml
management:
  endpoints:
    web:
      exposure:
        include: threadpool, metrics
  metrics:
    export:
      prometheus:
        enabled: true
java
@Component
public class ThreadPoolMetricsBinder {

    @Autowired @Qualifier("orderExecutor")
    private ThreadPoolExecutor orderExecutor;

    @Autowired @Qualifier("computeExecutor")
    private ThreadPoolExecutor computeExecutor;

    @PostConstruct
    public void bindMetrics() {
        // 注册到 Micrometer
        monitor("order.pool", orderExecutor);
        monitor("compute.pool", computeExecutor);
    }

    private void monitor(String prefix, ThreadPoolExecutor executor) {
        // 活跃线程数
        Metrics.gauge(prefix + ".active.threads", executor,
            e -> (double) e.getActiveCount());
        // 核心线程数
        Metrics.gauge(prefix + ".core.pool.size", executor,
            e -> (double) e.getCorePoolSize());
        // 最大线程数
        Metrics.gauge(prefix + ".max.pool.size", executor,
            e -> (double) e.getMaximumPoolSize());
        // 当前线程总数
        Metrics.gauge(prefix + ".pool.size", executor,
            e -> (double) e.getPoolSize());
        // 队列积压
        Metrics.gauge(prefix + ".queue.size", executor,
            e -> (double) e.getQueue().size());
        // 已完成任务数
        Metrics.gauge(prefix + ".completed.tasks", executor,
            e -> (double) e.getCompletedTaskCount());
        // 拒绝任务数(需要自定义统计)
    }
}

方案 C:动态线程池(结合配置中心)

java
@Component
@Slf4j
public class DynamicThreadPool {

    private volatile ThreadPoolExecutor executor;

    @PostConstruct
    public void init() {
        // 从配置中心读取初始配置(如 Nacos / Apollo)
        ThreadPoolConfig config = remoteConfig.getThreadPoolConfig();
        refreshExecutor(config);
        // 监听配置变化
        remoteConfig.addListener("threadpool", configChange -> {
            refreshExecutor(configChange.toConfig());
        });
    }

    private void refreshExecutor(ThreadPoolConfig config) {
        ThreadPoolExecutor newExecutor = new ThreadPoolExecutor(
            config.getCorePoolSize(),
            config.getMaximumPoolSize(),
            config.getKeepAliveTime(), TimeUnit.SECONDS,
            new LinkedBlockingQueue<>(config.getQueueCapacity()),
            threadFactory("dynamic-pool-"),
            createRejectedHandler(config.getRejectPolicy())
        );
        // 先记下旧池
        ThreadPoolExecutor old = this.executor;
        this.executor = newExecutor;
        // 优雅关闭旧池
        if (old != null) {
            old.shutdown();
        }
        log.info("线程池配置已更新: core={}, max={}, queue={}",
            config.getCorePoolSize(), config.getMaximumPoolSize(),
            config.getQueueCapacity());
    }

    public void execute(Runnable task) {
        executor.execute(task);
    }
}

Nacos 配置中心线程池配置:

yaml
# nacos: threadpool-config.yaml
threadpool:
  order:
    core-pool-size: 10
    maximum-pool-size: 20
    queue-capacity: 500
    keep-alive-time: 60
    reject-policy: CallerRuns
  compute:
    core-pool-size: 8
    maximum-pool-size: 8
    queue-capacity: 100
    keep-alive-time: 0
    reject-policy: Abort

5.4 优缺点

维度 说明
可观测性 实时掌握线程池运行状态
快速定位 队列积压、拒绝异常可第一时间发现
动态调优 无需重启即可调整参数,灰度验证
监控开销 频繁采集指标有一定性能损耗
动态替换风险 新旧线程池切换过程中的任务丢失风险

总结:五个场景选型对照

场景 核心痛点 解决方案 关键配置
@Async 异步执行 同步阻塞、响应慢 @Async + 自定义线程池 CallerRunsPolicy 拒绝策略
并行查询微服务 串行调用响应慢 CompletableFuture + IO 池 线程数 = CPU × 2
多业务隔离 业务相互影响 多个独立 ThreadPoolExecutor 按业务特性分别调参
定时任务隔离 单线程阻塞所有任务 独立 ScheduledThreadPoolExecutor poolSize > 1
监控与动态调参 线程池黑盒 Actuator + Micrometer + 配置中心 结合 Prometheus/Grafana