Spring Boot 异步处理
1. 异步处理概述
1.1 为什么需要异步
- 提高系统吞吐量
- 减少用户等待时间
- 解耦耗时操作
- 充分利用系统资源
1.2 异步处理方式
| 方式 | 说明 | 应用场景 |
|---|---|---|
| @Async | 方法级异步 | 简单异步任务 |
| CompletableFuture | 异步编排 | 复杂异步流程 |
| 消息队列 | 解耦异步 | 分布式系统 |
| 虚拟线程 | 轻量线程 | 高并发场景 |
2. @Async 异步方法
2.1 启用异步
java
@SpringBootApplication
@EnableAsync
public class MyappApplication {
public static void main(String[] args) {
SpringApplication.run(MyappApplication.class, args);
}
}2.2 异步方法
java
@Service
public class EmailService {
@Async
public void sendEmail(String to, String subject, String content) {
// 模拟耗时操作
try {
Thread.sleep(2000);
log.info("发送邮件到: {}", to);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
@Async
public CompletableFuture<String> sendEmailAsync(String to, String subject, String content) {
try {
Thread.sleep(2000);
log.info("发送邮件到: {}", to);
return CompletableFuture.completedFuture("发送成功");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return CompletableFuture.failedFuture(e);
}
}
}2.3 自定义线程池
java
@Configuration
@EnableAsync
public class AsyncConfig implements AsyncConfigurer {
@Override
@Bean(name = "taskExecutor")
public Executor getAsyncExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(5);
executor.setMaxPoolSize(10);
executor.setQueueCapacity(100);
executor.setThreadNamePrefix("async-");
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.initialize();
return executor;
}
@Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
return new CustomAsyncExceptionHandler();
}
}
public class CustomAsyncExceptionHandler implements AsyncUncaughtExceptionHandler {
@Override
public void handleUncaughtException(Throwable ex, Method method, Object... params) {
log.error("异步方法异常: {} - {}", method.getName(), ex.getMessage(), ex);
}
}2.4 指定线程池
java
@Service
public class NotificationService {
@Async("taskExecutor")
public void sendNotification(String message) {
// 使用指定的线程池
}
@Async("emailExecutor")
public void sendEmail(String to, String content) {
// 使用邮件专用线程池
}
}3. CompletableFuture
3.1 基本使用
java
@Service
public class UserService {
public CompletableFuture<User> getUserAsync(Long id) {
return CompletableFuture.supplyAsync(() -> {
return userRepository.findById(id).orElse(null);
});
}
public CompletableFuture<List<Order>> getOrdersAsync(Long userId) {
return CompletableFuture.supplyAsync(() -> {
return orderRepository.findByUserId(userId);
});
}
}3.2 异步编排
java
@Service
public class UserDashboardService {
private final UserService userService;
private final OrderService orderService;
private final ProductService productService;
public CompletableFuture<UserDashboard> getDashboard(Long userId) {
CompletableFuture<User> userFuture = userService.getUserAsync(userId);
CompletableFuture<List<Order>> ordersFuture = orderService.getOrdersAsync(userId);
CompletableFuture<List<Product>> recommendationsFuture =
productService.getRecommendationsAsync(userId);
return CompletableFuture.allOf(userFuture, ordersFuture, recommendationsFuture)
.thenApply(v -> {
UserDashboard dashboard = new UserDashboard();
dashboard.setUser(userFuture.join());
dashboard.setOrders(ordersFuture.join());
dashboard.setRecommendations(recommendationsFuture.join());
return dashboard;
});
}
}3.3 异步组合
java
@Service
public class CompositeService {
public CompletableFuture<String> chainOperations(Long id) {
return CompletableFuture
.supplyAsync(() -> getUser(id))
.thenApplyAsync(user -> processUser(user))
.thenComposeAsync(processed -> saveResult(processed))
.exceptionally(ex -> {
log.error("操作失败", ex);
return "默认值";
});
}
public CompletableFuture<String> parallelOperations(Long id) {
CompletableFuture<String> task1 = CompletableFuture.supplyAsync(() -> task1(id));
CompletableFuture<String> task2 = CompletableFuture.supplyAsync(() -> task2(id));
CompletableFuture<String> task3 = CompletableFuture.supplyAsync(() -> task3(id));
return CompletableFuture.allOf(task1, task2, task3)
.thenApply(v -> {
String r1 = task1.join();
String r2 = task2.join();
String r3 = task3.join();
return r1 + r2 + r3;
});
}
public CompletableFuture<Object> anyOf(Long id) {
CompletableFuture<String> task1 = CompletableFuture.supplyAsync(() -> task1(id));
CompletableFuture<String> task2 = CompletableFuture.supplyAsync(() -> task2(id));
return CompletableFuture.anyOf(task1, task2);
}
}3.4 超时处理
java
@Service
public class TimeoutService {
public CompletableFuture<String> withTimeout(Long id) {
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(5000);
return "结果";
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return "中断";
}
});
return future.orTimeout(3, TimeUnit.SECONDS)
.exceptionally(ex -> "超时默认值");
}
public CompletableFuture<String> completeOnTimeout(Long id) {
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(5000);
return "结果";
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return "中断";
}
});
return future.completeOnTimeout("超时默认值", 3, TimeUnit.SECONDS);
}
}4. 定时任务
4.1 启用定时任务
java
@SpringBootApplication
@EnableScheduling
public class MyappApplication {
public static void main(String[] args) {
SpringApplication.run(MyappApplication.class, args);
}
}4.2 定时任务定义
java
@Service
public class ScheduledService {
@Scheduled(fixedRate = 5000)
public void fixedRateTask() {
log.info("固定频率任务: {}", LocalDateTime.now());
}
@Scheduled(fixedDelay = 5000)
public void fixedDelayTask() {
log.info("固定延迟任务: {}", LocalDateTime.now());
}
@Scheduled(initialDelay = 10000, fixedRate = 5000)
public void initialDelayTask() {
log.info("初始延迟任务: {}", LocalDateTime.now());
}
@Scheduled(cron = "0 0 2 * * ?")
public void cronTask() {
log.info("Cron 表达式任务: {}", LocalDateTime.now());
}
}4.3 Cron 表达式
秒 分 时 日 月 周
* * * * * *
0 0 2 * * ? 每天凌晨2点
0 0/30 * * * ? 每30分钟
0 0 9-17 * * ? 9点到17点每小时
0 0 0 * * MON 每周一零点4.4 异步定时任务
java
@Service
public class AsyncScheduledService {
@Async
@Scheduled(fixedRate = 5000)
public CompletableFuture<Void> asyncScheduledTask() {
log.info("异步定时任务: {}", LocalDateTime.now());
return CompletableFuture.completedFuture(null);
}
}4.5 动态定时任务
java
@Service
public class DynamicScheduledService implements SchedulingConfigurer {
private String cronExpression = "0 0 2 * * ?";
@Override
public void configureTasks(ScheduledTaskRegistrar taskRegistrar) {
taskRegistrar.addTriggerTask(
() -> log.info("动态定时任务: {}", LocalDateTime.now()),
triggerContext -> {
CronTrigger trigger = new CronTrigger(cronExpression);
return trigger.nextExecutionTime(triggerContext);
}
);
}
public void updateCron(String newCron) {
this.cronExpression = newCron;
}
}5. 虚拟线程
5.1 启用虚拟线程
yaml
spring:
threads:
virtual:
enabled: true5.2 Tomcat 虚拟线程
java
@Configuration
public class VirtualThreadConfig {
@Bean
public TomcatProtocolHandlerCustomizer<?> virtualThreadExecutor() {
return protocolHandler -> {
protocolHandler.setExecutor(
Executors.newVirtualThreadPerTaskExecutor()
);
};
}
}5.3 异步虚拟线程
java
@Configuration
public class AsyncVirtualThreadConfig {
@Bean(name = "virtualThreadExecutor")
public Executor virtualThreadExecutor() {
return Executors.newVirtualThreadPerTaskExecutor();
}
@Bean
public AsyncTaskExecutor applicationTaskExecutor() {
return new TaskExecutorAdapter(Executors.newVirtualThreadPerTaskExecutor());
}
}
@Service
public class VirtualThreadService {
@Async("virtualThreadExecutor")
public void virtualThreadTask() {
log.info("虚拟线程任务: {}", Thread.currentThread());
}
}6. 异步事件
6.1 定义事件
java
public record UserCreatedEvent(
Long userId,
String username,
LocalDateTime createdAt
) {}
public record OrderCompletedEvent(
Long orderId,
Long userId,
BigDecimal amount
) {}6.2 发布事件
java
@Service
public class UserService {
private final ApplicationEventPublisher eventPublisher;
@Transactional
public User createUser(UserRequest request) {
User user = new User();
user.setUsername(request.username());
user.setEmail(request.email());
User saved = userRepository.save(user);
eventPublisher.publishEvent(new UserCreatedEvent(
saved.getId(),
saved.getUsername(),
LocalDateTime.now()
));
return saved;
}
}6.3 监听事件
java
@Component
public class UserEventListener {
@EventListener
public void handleUserCreated(UserCreatedEvent event) {
log.info("用户创建事件: {}", event);
}
@Async
@EventListener
public void handleUserCreatedAsync(UserCreatedEvent event) {
// 异步处理
emailService.sendWelcomeEmail(event.username());
}
@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
public void handleAfterCommit(UserCreatedEvent event) {
// 事务提交后处理
}
}7. 异步请求处理
7.1 Callable
java
@RestController
public class AsyncController {
@GetMapping("/callable")
public Callable<String> callableRequest() {
return () -> {
Thread.sleep(2000);
return "callable-result";
};
}
}7.2 DeferredResult
java
@RestController
public class DeferredController {
private final Queue<DeferredResult<String>> resultQueue = new ConcurrentLinkedQueue<>();
@GetMapping("/deferred")
public DeferredResult<String> deferredRequest() {
DeferredResult<String> result = new DeferredResult<>(5000L, "timeout");
resultQueue.add(result);
result.onCompletion(() -> resultQueue.remove(result));
result.onTimeout(() -> result.setResult("超时"));
return result;
}
@GetMapping("/complete")
public void completeAll() {
resultQueue.forEach(result -> result.setResult("completed"));
resultQueue.clear();
}
}7.3 ResponseBodyEmitter
java
@RestController
public class EmitterController {
@GetMapping("/events")
public ResponseBodyEmitter handleEvents() {
ResponseBodyEmitter emitter = new ResponseBodyEmitter(60000L);
CompletableFuture.runAsync(() -> {
try {
for (int i = 0; i < 10; i++) {
emitter.send("Event " + i + "\n");
Thread.sleep(1000);
}
emitter.complete();
} catch (Exception e) {
emitter.completeWithError(e);
}
});
return emitter;
}
}7.4 Server-Sent Events
java
@RestController
public class SseController {
@GetMapping(value = "/sse", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public SseEmitter handleSse() {
SseEmitter emitter = new SseEmitter(60000L);
CompletableFuture.runAsync(() -> {
try {
for (int i = 0; i < 10; i++) {
SseEmitter.SseEventBuilder event = SseEmitter.event()
.data("Event data " + i)
.id(String.valueOf(i))
.name("message");
emitter.send(event);
Thread.sleep(1000);
}
emitter.complete();
} catch (Exception e) {
emitter.completeWithError(e);
}
});
return emitter;
}
}8. 线程池监控
8.1 线程池指标
java
@Configuration
public class ThreadPoolMetricsConfig {
@Bean
public MeterBinder threadPoolMetrics(ThreadPoolTaskExecutor taskExecutor) {
return new MeterBinder() {
@Override
public void bindTo(MeterRegistry registry) {
ThreadPoolExecutor executor = taskExecutor.getThreadPoolExecutor();
Gauge.builder("thread.pool.active", executor, ThreadPoolExecutor::getActiveCount)
.description("Active threads")
.register(registry);
Gauge.builder("thread.pool.queue.size", executor,
e -> e.getQueue().size())
.description("Queue size")
.register(registry);
Gauge.builder("thread.pool.core.size", executor,
ThreadPoolExecutor::getCorePoolSize)
.description("Core pool size")
.register(registry);
Gauge.builder("thread.pool.max.size", executor,
ThreadPoolExecutor::getMaximumPoolSize)
.description("Max pool size")
.register(registry);
}
};
}
}9. 小结
本章学习了 Spring Boot 异步处理的核心内容:
| 内容 | 要点 |
|---|---|
| @Async | 异步方法、线程池配置 |
| CompletableFuture | 异步编排、组合、超时 |
| 定时任务 | @Scheduled、Cron 表达式 |
| 虚拟线程 | 启用配置、线程池 |
| 异步事件 | ApplicationEvent、@EventListener |
| 异步请求 | Callable、DeferredResult、SSE |
| 线程池监控 | Micrometer 指标 |
下一章将学习 Spring Boot 消息队列。