Skip to content

Spring Boot 异步处理

1. 异步处理概述

1.1 为什么需要异步

  • 提高系统吞吐量
  • 减少用户等待时间
  • 解耦耗时操作
  • 充分利用系统资源

1.2 异步处理方式

方式说明应用场景
@Async方法级异步简单异步任务
CompletableFuture异步编排复杂异步流程
消息队列解耦异步分布式系统
虚拟线程轻量线程高并发场景

2. @Async 异步方法

2.1 启用异步

java
@SpringBootApplication
@EnableAsync
public class MyappApplication {
    public static void main(String[] args) {
        SpringApplication.run(MyappApplication.class, args);
    }
}

2.2 异步方法

java
@Service
public class EmailService {
    
    @Async
    public void sendEmail(String to, String subject, String content) {
        // 模拟耗时操作
        try {
            Thread.sleep(2000);
            log.info("发送邮件到: {}", to);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
    
    @Async
    public CompletableFuture<String> sendEmailAsync(String to, String subject, String content) {
        try {
            Thread.sleep(2000);
            log.info("发送邮件到: {}", to);
            return CompletableFuture.completedFuture("发送成功");
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            return CompletableFuture.failedFuture(e);
        }
    }
}

2.3 自定义线程池

java
@Configuration
@EnableAsync
public class AsyncConfig implements AsyncConfigurer {
    
    @Override
    @Bean(name = "taskExecutor")
    public Executor getAsyncExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(5);
        executor.setMaxPoolSize(10);
        executor.setQueueCapacity(100);
        executor.setThreadNamePrefix("async-");
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        executor.initialize();
        return executor;
    }
    
    @Override
    public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
        return new CustomAsyncExceptionHandler();
    }
}

public class CustomAsyncExceptionHandler implements AsyncUncaughtExceptionHandler {
    
    @Override
    public void handleUncaughtException(Throwable ex, Method method, Object... params) {
        log.error("异步方法异常: {} - {}", method.getName(), ex.getMessage(), ex);
    }
}

2.4 指定线程池

java
@Service
public class NotificationService {
    
    @Async("taskExecutor")
    public void sendNotification(String message) {
        // 使用指定的线程池
    }
    
    @Async("emailExecutor")
    public void sendEmail(String to, String content) {
        // 使用邮件专用线程池
    }
}

3. CompletableFuture

3.1 基本使用

java
@Service
public class UserService {
    
    public CompletableFuture<User> getUserAsync(Long id) {
        return CompletableFuture.supplyAsync(() -> {
            return userRepository.findById(id).orElse(null);
        });
    }
    
    public CompletableFuture<List<Order>> getOrdersAsync(Long userId) {
        return CompletableFuture.supplyAsync(() -> {
            return orderRepository.findByUserId(userId);
        });
    }
}

3.2 异步编排

java
@Service
public class UserDashboardService {
    
    private final UserService userService;
    private final OrderService orderService;
    private final ProductService productService;
    
    public CompletableFuture<UserDashboard> getDashboard(Long userId) {
        CompletableFuture<User> userFuture = userService.getUserAsync(userId);
        CompletableFuture<List<Order>> ordersFuture = orderService.getOrdersAsync(userId);
        CompletableFuture<List<Product>> recommendationsFuture = 
            productService.getRecommendationsAsync(userId);
        
        return CompletableFuture.allOf(userFuture, ordersFuture, recommendationsFuture)
            .thenApply(v -> {
                UserDashboard dashboard = new UserDashboard();
                dashboard.setUser(userFuture.join());
                dashboard.setOrders(ordersFuture.join());
                dashboard.setRecommendations(recommendationsFuture.join());
                return dashboard;
            });
    }
}

3.3 异步组合

java
@Service
public class CompositeService {
    
    public CompletableFuture<String> chainOperations(Long id) {
        return CompletableFuture
            .supplyAsync(() -> getUser(id))
            .thenApplyAsync(user -> processUser(user))
            .thenComposeAsync(processed -> saveResult(processed))
            .exceptionally(ex -> {
                log.error("操作失败", ex);
                return "默认值";
            });
    }
    
    public CompletableFuture<String> parallelOperations(Long id) {
        CompletableFuture<String> task1 = CompletableFuture.supplyAsync(() -> task1(id));
        CompletableFuture<String> task2 = CompletableFuture.supplyAsync(() -> task2(id));
        CompletableFuture<String> task3 = CompletableFuture.supplyAsync(() -> task3(id));
        
        return CompletableFuture.allOf(task1, task2, task3)
            .thenApply(v -> {
                String r1 = task1.join();
                String r2 = task2.join();
                String r3 = task3.join();
                return r1 + r2 + r3;
            });
    }
    
    public CompletableFuture<Object> anyOf(Long id) {
        CompletableFuture<String> task1 = CompletableFuture.supplyAsync(() -> task1(id));
        CompletableFuture<String> task2 = CompletableFuture.supplyAsync(() -> task2(id));
        
        return CompletableFuture.anyOf(task1, task2);
    }
}

3.4 超时处理

java
@Service
public class TimeoutService {
    
    public CompletableFuture<String> withTimeout(Long id) {
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(5000);
                return "结果";
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                return "中断";
            }
        });
        
        return future.orTimeout(3, TimeUnit.SECONDS)
            .exceptionally(ex -> "超时默认值");
    }
    
    public CompletableFuture<String> completeOnTimeout(Long id) {
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(5000);
                return "结果";
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                return "中断";
            }
        });
        
        return future.completeOnTimeout("超时默认值", 3, TimeUnit.SECONDS);
    }
}

4. 定时任务

4.1 启用定时任务

java
@SpringBootApplication
@EnableScheduling
public class MyappApplication {
    public static void main(String[] args) {
        SpringApplication.run(MyappApplication.class, args);
    }
}

4.2 定时任务定义

java
@Service
public class ScheduledService {
    
    @Scheduled(fixedRate = 5000)
    public void fixedRateTask() {
        log.info("固定频率任务: {}", LocalDateTime.now());
    }
    
    @Scheduled(fixedDelay = 5000)
    public void fixedDelayTask() {
        log.info("固定延迟任务: {}", LocalDateTime.now());
    }
    
    @Scheduled(initialDelay = 10000, fixedRate = 5000)
    public void initialDelayTask() {
        log.info("初始延迟任务: {}", LocalDateTime.now());
    }
    
    @Scheduled(cron = "0 0 2 * * ?")
    public void cronTask() {
        log.info("Cron 表达式任务: {}", LocalDateTime.now());
    }
}

4.3 Cron 表达式

秒 分 时 日 月 周
*  *  *  *  *  *

0 0 2 * * ?     每天凌晨2点
0 0/30 * * * ?  每30分钟
0 0 9-17 * * ?  9点到17点每小时
0 0 0 * * MON   每周一零点

4.4 异步定时任务

java
@Service
public class AsyncScheduledService {
    
    @Async
    @Scheduled(fixedRate = 5000)
    public CompletableFuture<Void> asyncScheduledTask() {
        log.info("异步定时任务: {}", LocalDateTime.now());
        return CompletableFuture.completedFuture(null);
    }
}

4.5 动态定时任务

java
@Service
public class DynamicScheduledService implements SchedulingConfigurer {
    
    private String cronExpression = "0 0 2 * * ?";
    
    @Override
    public void configureTasks(ScheduledTaskRegistrar taskRegistrar) {
        taskRegistrar.addTriggerTask(
            () -> log.info("动态定时任务: {}", LocalDateTime.now()),
            triggerContext -> {
                CronTrigger trigger = new CronTrigger(cronExpression);
                return trigger.nextExecutionTime(triggerContext);
            }
        );
    }
    
    public void updateCron(String newCron) {
        this.cronExpression = newCron;
    }
}

5. 虚拟线程

5.1 启用虚拟线程

yaml
spring:
  threads:
    virtual:
      enabled: true

5.2 Tomcat 虚拟线程

java
@Configuration
public class VirtualThreadConfig {
    
    @Bean
    public TomcatProtocolHandlerCustomizer<?> virtualThreadExecutor() {
        return protocolHandler -> {
            protocolHandler.setExecutor(
                Executors.newVirtualThreadPerTaskExecutor()
            );
        };
    }
}

5.3 异步虚拟线程

java
@Configuration
public class AsyncVirtualThreadConfig {
    
    @Bean(name = "virtualThreadExecutor")
    public Executor virtualThreadExecutor() {
        return Executors.newVirtualThreadPerTaskExecutor();
    }
    
    @Bean
    public AsyncTaskExecutor applicationTaskExecutor() {
        return new TaskExecutorAdapter(Executors.newVirtualThreadPerTaskExecutor());
    }
}

@Service
public class VirtualThreadService {
    
    @Async("virtualThreadExecutor")
    public void virtualThreadTask() {
        log.info("虚拟线程任务: {}", Thread.currentThread());
    }
}

6. 异步事件

6.1 定义事件

java
public record UserCreatedEvent(
    Long userId,
    String username,
    LocalDateTime createdAt
) {}

public record OrderCompletedEvent(
    Long orderId,
    Long userId,
    BigDecimal amount
) {}

6.2 发布事件

java
@Service
public class UserService {
    
    private final ApplicationEventPublisher eventPublisher;
    
    @Transactional
    public User createUser(UserRequest request) {
        User user = new User();
        user.setUsername(request.username());
        user.setEmail(request.email());
        
        User saved = userRepository.save(user);
        
        eventPublisher.publishEvent(new UserCreatedEvent(
            saved.getId(),
            saved.getUsername(),
            LocalDateTime.now()
        ));
        
        return saved;
    }
}

6.3 监听事件

java
@Component
public class UserEventListener {
    
    @EventListener
    public void handleUserCreated(UserCreatedEvent event) {
        log.info("用户创建事件: {}", event);
    }
    
    @Async
    @EventListener
    public void handleUserCreatedAsync(UserCreatedEvent event) {
        // 异步处理
        emailService.sendWelcomeEmail(event.username());
    }
    
    @TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
    public void handleAfterCommit(UserCreatedEvent event) {
        // 事务提交后处理
    }
}

7. 异步请求处理

7.1 Callable

java
@RestController
public class AsyncController {
    
    @GetMapping("/callable")
    public Callable<String> callableRequest() {
        return () -> {
            Thread.sleep(2000);
            return "callable-result";
        };
    }
}

7.2 DeferredResult

java
@RestController
public class DeferredController {
    
    private final Queue<DeferredResult<String>> resultQueue = new ConcurrentLinkedQueue<>();
    
    @GetMapping("/deferred")
    public DeferredResult<String> deferredRequest() {
        DeferredResult<String> result = new DeferredResult<>(5000L, "timeout");
        resultQueue.add(result);
        
        result.onCompletion(() -> resultQueue.remove(result));
        result.onTimeout(() -> result.setResult("超时"));
        
        return result;
    }
    
    @GetMapping("/complete")
    public void completeAll() {
        resultQueue.forEach(result -> result.setResult("completed"));
        resultQueue.clear();
    }
}

7.3 ResponseBodyEmitter

java
@RestController
public class EmitterController {
    
    @GetMapping("/events")
    public ResponseBodyEmitter handleEvents() {
        ResponseBodyEmitter emitter = new ResponseBodyEmitter(60000L);
        
        CompletableFuture.runAsync(() -> {
            try {
                for (int i = 0; i < 10; i++) {
                    emitter.send("Event " + i + "\n");
                    Thread.sleep(1000);
                }
                emitter.complete();
            } catch (Exception e) {
                emitter.completeWithError(e);
            }
        });
        
        return emitter;
    }
}

7.4 Server-Sent Events

java
@RestController
public class SseController {
    
    @GetMapping(value = "/sse", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public SseEmitter handleSse() {
        SseEmitter emitter = new SseEmitter(60000L);
        
        CompletableFuture.runAsync(() -> {
            try {
                for (int i = 0; i < 10; i++) {
                    SseEmitter.SseEventBuilder event = SseEmitter.event()
                        .data("Event data " + i)
                        .id(String.valueOf(i))
                        .name("message");
                    emitter.send(event);
                    Thread.sleep(1000);
                }
                emitter.complete();
            } catch (Exception e) {
                emitter.completeWithError(e);
            }
        });
        
        return emitter;
    }
}

8. 线程池监控

8.1 线程池指标

java
@Configuration
public class ThreadPoolMetricsConfig {
    
    @Bean
    public MeterBinder threadPoolMetrics(ThreadPoolTaskExecutor taskExecutor) {
        return new MeterBinder() {
            @Override
            public void bindTo(MeterRegistry registry) {
                ThreadPoolExecutor executor = taskExecutor.getThreadPoolExecutor();
                
                Gauge.builder("thread.pool.active", executor, ThreadPoolExecutor::getActiveCount)
                    .description("Active threads")
                    .register(registry);
                
                Gauge.builder("thread.pool.queue.size", executor, 
                    e -> e.getQueue().size())
                    .description("Queue size")
                    .register(registry);
                
                Gauge.builder("thread.pool.core.size", executor, 
                    ThreadPoolExecutor::getCorePoolSize)
                    .description("Core pool size")
                    .register(registry);
                
                Gauge.builder("thread.pool.max.size", executor, 
                    ThreadPoolExecutor::getMaximumPoolSize)
                    .description("Max pool size")
                    .register(registry);
            }
        };
    }
}

9. 小结

本章学习了 Spring Boot 异步处理的核心内容:

内容要点
@Async异步方法、线程池配置
CompletableFuture异步编排、组合、超时
定时任务@Scheduled、Cron 表达式
虚拟线程启用配置、线程池
异步事件ApplicationEvent、@EventListener
异步请求Callable、DeferredResult、SSE
线程池监控Micrometer 指标

下一章将学习 Spring Boot 消息队列。