Spring Cloud 熔断限流
1. 熔断限流概述
1.1 为什么需要熔断限流
- 熔断:防止故障蔓延,保护系统稳定性
- 限流:防止系统过载,保护系统资源
- 降级:提供兜底方案,保证用户体验
1.2 核心概念
| 概念 | 说明 |
|---|---|
| 熔断器 | 当错误率达到阈值时,快速失败 |
| 限流器 | 控制请求速率,防止过载 |
| 降级 | 服务不可用时返回默认响应 |
2. Resilience4j
2.1 添加依赖
xml
<dependency>
<groupId>io.github.resilience4j</groupId>
<artifactId>resilience4j-spring-boot3</artifactId>
<version>2.2.0</version>
</dependency>
<dependency>
<groupId>io.github.resilience4j</groupId>
<artifactId>resilience4j-reactor</artifactId>
<version>2.2.0</version>
</dependency>2.2 熔断器配置
yaml
resilience4j:
circuitbreaker:
configs:
default:
slidingWindowSize: 10
slidingWindowType: COUNT_BASED
failureRateThreshold: 50
slowCallDurationThreshold: 2s
slowCallRateThreshold: 50
waitDurationInOpenState: 10s
permittedNumberOfCallsInHalfOpenState: 5
minimumNumberOfCalls: 5
recordExceptions:
- java.io.IOException
- java.net.ConnectException
ignoreExceptions:
- com.example.BusinessException
instances:
user-service:
baseConfig: default
failureRateThreshold: 60
waitDurationInOpenState: 30s
order-service:
baseConfig: default
slidingWindowSize: 202.3 使用熔断器
java
@Service
public class UserService {
private final UserClient userClient;
@CircuitBreaker(name = "user-service", fallbackMethod = "getUserFallback")
public User getUser(Long id) {
return userClient.getUser(id);
}
public User getUserFallback(Long id, Throwable t) {
log.warn("获取用户失败,执行降级: {}", t.getMessage());
return new User(id, "默认用户", "default@example.com");
}
@CircuitBreaker(name = "user-service")
@Retry(name = "user-service")
public User getUserWithRetry(Long id) {
return userClient.getUser(id);
}
}2.4 熔断器状态
java
@RestController
@RequestMapping("/circuitbreaker")
public class CircuitBreakerController {
@Autowired
private CircuitBreakerRegistry registry;
@GetMapping("/state/{name}")
public Map<String, Object> getState(@PathVariable String name) {
CircuitBreaker circuitBreaker = registry.circuitBreaker(name);
CircuitBreaker.Metrics metrics = circuitBreaker.getMetrics();
return Map.of(
"name", name,
"state", circuitBreaker.getState().name(),
"failureRate", metrics.getFailureRate(),
"slowCallRate", metrics.getSlowCallRate(),
"numberOfCalls", metrics.getNumberOfCalls(),
"numberOfFailedCalls", metrics.getNumberOfFailedCalls(),
"numberOfSlowCalls", metrics.getNumberOfSlowCalls()
);
}
@PostMapping("/reset/{name}")
public String reset(@PathVariable String name) {
registry.circuitBreaker(name).reset();
return "reset success";
}
@PostMapping("/open/{name}")
public String open(@PathVariable String name) {
registry.circuitBreaker(name).transitionToOpenState();
return "open success";
}
@PostMapping("/close/{name}")
public String close(@PathVariable String name) {
registry.circuitBreaker(name).transitionToClosedState();
return "close success";
}
}3. 限流器
3.1 限流器配置
yaml
resilience4j:
ratelimiter:
configs:
default:
limitForPeriod: 10
limitRefreshPeriod: 1s
timeoutDuration: 0
registerHealthIndicator: true
instances:
user-service:
baseConfig: default
limitForPeriod: 20
order-service:
baseConfig: default
limitForPeriod: 503.2 使用限流器
java
@Service
public class UserService {
@RateLimiter(name = "user-service", fallbackMethod = "rateLimitFallback")
public User getUser(Long id) {
return userRepository.findById(id).orElse(null);
}
public User rateLimitFallback(Long id, Throwable t) {
log.warn("触发限流: {}", t.getMessage());
return null;
}
@RateLimiter(name = "user-service")
@CircuitBreaker(name = "user-service")
public User getUserWithProtection(Long id) {
return userRepository.findById(id).orElse(null);
}
}4. 重试机制
4.1 重试配置
yaml
resilience4j:
retry:
configs:
default:
maxAttempts: 3
waitDuration: 500ms
enableExponentialBackoff: true
exponentialBackoffMultiplier: 2
retryExceptions:
- java.io.IOException
- java.net.ConnectException
ignoreExceptions:
- com.example.BusinessException
instances:
user-service:
baseConfig: default
maxAttempts: 5
waitDuration: 1s4.2 使用重试
java
@Service
public class UserService {
@Retry(name = "user-service", fallbackMethod = "retryFallback")
public User getUser(Long id) {
return userClient.getUser(id);
}
public User retryFallback(Long id, Throwable t) {
log.warn("重试失败: {}", t.getMessage());
return new User(id, "默认用户", "default@example.com");
}
}5. 舱壁隔离
5.1 舱壁配置
yaml
resilience4j:
bulkhead:
configs:
default:
maxConcurrentCalls: 25
maxWaitDuration: 0
writableStackTraceEnabled: true
instances:
user-service:
baseConfig: default
maxConcurrentCalls: 50
order-service:
baseConfig: default
maxConcurrentCalls: 1005.2 使用舱壁
java
@Service
public class UserService {
@Bulkhead(name = "user-service", fallbackMethod = "bulkheadFallback")
public User getUser(Long id) {
return userClient.getUser(id);
}
public User bulkheadFallback(Long id, Throwable t) {
log.warn("舱壁隔离: {}", t.getMessage());
return null;
}
@Bulkhead(name = "user-service", type = Bulkhead.Type.THREADPOOL)
@TimeLimiter(name = "user-service")
public CompletableFuture<User> getUserAsync(Long id) {
return CompletableFuture.supplyAsync(() -> userClient.getUser(id));
}
}6. 时间限制
6.1 超时配置
yaml
resilience4j:
timelimiter:
configs:
default:
timeoutDuration: 3s
cancelRunningFuture: true
instances:
user-service:
timeoutDuration: 5s
order-service:
timeoutDuration: 10s6.2 使用超时
java
@Service
public class UserService {
@TimeLimiter(name = "user-service", fallbackMethod = "timeoutFallback")
public CompletableFuture<User> getUserAsync(Long id) {
return CompletableFuture.supplyAsync(() -> userClient.getUser(id));
}
public CompletableFuture<User> timeoutFallback(Long id, Throwable t) {
log.warn("请求超时: {}", t.getMessage());
return CompletableFuture.completedFuture(
new User(id, "默认用户", "default@example.com")
);
}
}7. 组合使用
7.1 多重保护
java
@Service
public class OrderService {
@CircuitBreaker(name = "order-service", fallbackMethod = "fallback")
@RateLimiter(name = "order-service")
@Retry(name = "order-service")
@Bulkhead(name = "order-service")
@TimeLimiter(name = "order-service")
public CompletableFuture<Order> createOrder(OrderRequest request) {
return CompletableFuture.supplyAsync(() -> {
return orderClient.createOrder(request);
});
}
public CompletableFuture<Order> fallback(OrderRequest request, Throwable t) {
log.warn("订单服务降级: {}", t.getMessage());
return CompletableFuture.completedFuture(null);
}
}7.2 注解顺序
java
// 执行顺序(从外到内):
// RateLimiter -> Bulkhead -> TimeLimiter -> Retry -> CircuitBreaker
@RateLimiter(name = "service")
@Bulkhead(name = "service")
@TimeLimiter(name = "service")
@Retry(name = "service")
@CircuitBreaker(name = "service", fallbackMethod = "fallback")
public CompletableFuture<String> protectedMethod() {
return CompletableFuture.supplyAsync(() -> {
// 业务逻辑
return "result";
});
}8. 监控指标
8.1 指标配置
yaml
management:
endpoints:
web:
exposure:
include: health,metrics,prometheus
endpoint:
health:
show-details: always
health:
circuitbreakers:
enabled: true
ratelimiters:
enabled: true8.2 自定义指标
java
@Configuration
public class Resilience4jMetricsConfig {
@Bean
public MeterBinder circuitBreakerMetrics(CircuitBreakerRegistry registry) {
return new MeterBinder() {
@Override
public void bindTo(MeterRegistry meterRegistry) {
registry.getAllCircuitBreakers().forEach(cb -> {
CircuitBreaker.Metrics metrics = cb.getMetrics();
Gauge.builder("resilience4j.circuitbreaker.state",
cb, c -> c.getState().getOrder())
.tag("name", cb.getName())
.register(meterRegistry);
Gauge.builder("resilience4j.circuitbreaker.failure_rate",
metrics, CircuitBreaker.Metrics::getFailureRate)
.tag("name", cb.getName())
.register(meterRegistry);
Gauge.builder("resilience4j.circuitbreaker.slow_call_rate",
metrics, CircuitBreaker.Metrics::getSlowCallRate)
.tag("name", cb.getName())
.register(meterRegistry);
Counter.builder("resilience4j.circuitbreaker.calls")
.tag("name", cb.getName())
.tag("kind", "successful")
.register(meterRegistry)
.increment(metrics.getNumberOfSuccessfulCalls());
});
}
};
}
}9. 小结
本章学习了 Spring Cloud 熔断限流的核心内容:
| 内容 | 要点 |
|---|---|
| 熔断器 | 状态转换、配置、使用 |
| 限流器 | 令牌桶、滑动窗口 |
| 重试机制 | 重试策略、退避策略 |
| 舱壁隔离 | 并发控制、线程池隔离 |
| 时间限制 | 超时控制 |
| 组合使用 | 多重保护、执行顺序 |
| 监控指标 | Prometheus 指标 |
下一章将学习 Spring Cloud 消息驱动。