熔断与限流
概述
熔断与限流是微服务架构中保护系统稳定性的重要机制。熔断器防止故障蔓延,限流器防止系统过载。在 CloudWeGo 生态中,Kitex 提供了完善的熔断和限流能力,帮助构建高可用的分布式系统。
为什么需要熔断与限流?
在分布式系统中存在多种风险:
- 服务依赖故障导致级联失败
- 突发流量超过系统处理能力
- 资源耗尽导致服务不可用
- 故障恢复时间过长
熔断与限流通过主动控制流量,保护系统核心功能,提升整体稳定性。
核心内容
1. 熔断器原理
基本概念
- 熔断器(Circuit Breaker):类似电路熔断器,当错误率达到阈值时自动熔断
- 熔断状态:关闭(Closed)、打开(Open)、半开(Half-Open)
- 错误率阈值:触发熔断的错误比例
- 恢复时间:熔断后尝试恢复的等待时间
状态转换
关闭(Closed)
↓ 错误率超过阈值
打开(Open)
↓ 等待恢复时间
半开(Half-Open)
↓ 成功 → 关闭
↓ 失败 → 打开2. Kitex 熔断配置
基本使用
Kitex 通过中间件实现熔断功能:
go
package main
import (
"github.com/cloudwego/kitex/client"
"github.com/cloudwego/kitex/pkg/circuitbreak"
"github.com/cloudwego/kitex/pkg/rpcinfo"
)
func main() {
cb := circuitbreak.NewCBSuite(
circuitbreak.WithServiceRatio(
"echo",
circuitbreak.ErrRateConfig{
Enable: true,
Threshold: 0.5,
MinSamples: 100,
},
),
)
c, err := echo.NewClient(
"echo",
client.WithMiddleware(cb.CBMiddleware()),
)
if err != nil {
panic(err)
}
ctx := context.Background()
resp, err := c.Echo(ctx, &api.Request{Message: "Hello"})
if err != nil {
log.Printf("call failed: %v", err)
return
}
log.Println(resp.Message)
}熔断策略配置
go
package main
import (
"github.com/cloudwego/kitex/pkg/circuitbreak"
)
func main() {
cb := circuitbreak.NewCBSuite(
circuitbreak.WithServiceRatio(
"echo",
circuitbreak.ErrRateConfig{
Enable: true,
Threshold: 0.3,
MinSamples: 50,
HalfOpenSuc: 5,
HalfOpenFail: 1,
},
),
)
}配置说明:
Threshold:错误率阈值,超过则熔断MinSamples:最小采样数,达到后才计算错误率HalfOpenSuc:半开状态成功次数阈值HalfOpenFail:半开状态失败次数阈值
3. 限流器原理
基本概念
- 限流器(Limiter):控制请求速率,防止系统过载
- 限流算法:计数器、滑动窗口、令牌桶、漏桶等
- 限流维度:QPS、并发数、资源使用率等
- 限流策略:拒绝、排队、降级等
常见算法
计数器算法:
固定时间窗口内请求数超过阈值则拒绝
滑动窗口算法:
将时间窗口分为多个小格,滑动统计请求数
令牌桶算法:
以固定速率生成令牌,请求消耗令牌,无令牌则拒绝
漏桶算法:
请求进入漏桶,以固定速率流出,桶满则拒绝4. Kitex 限流配置
客户端限流
Kitex 客户端通过 limiter 扩展点实现限流:
go
package main
import (
"github.com/cloudwego/kitex/client"
"github.com/cloudwego/kitex/pkg/limiter"
)
func main() {
c, err := echo.NewClient(
"echo",
client.WithLimiter(
limiter.NewQPSLimiter(1000),
),
)
if err != nil {
panic(err)
}
ctx := context.Background()
for i := 0; i < 2000; i++ {
resp, err := c.Echo(ctx, &api.Request{Message: "Hello"})
if err != nil {
log.Printf("call %d failed: %v", i, err)
continue
}
log.Printf("call %d success: %s", i, resp.Message)
}
}服务端限流
服务端限流保护服务不被过载:
go
package main
import (
"github.com/cloudwego/kitex/server"
"github.com/cloudwego/kitex/pkg/limiter"
)
func main() {
svr := echo.NewServer(
new(EchoImpl),
server.WithLimiter(
limiter.NewQPSLimiter(10000),
),
server.WithConcurrencyLimiter(
limiter.NewConcurrencyLimiter(1000),
),
)
if err := svr.Run(); err != nil {
log.Println("server stopped with error:", err)
}
}5. 自定义限流器
实现自定义限流器:
go
package mylimiter
import (
"github.com/cloudwego/kitex/pkg/limiter"
"golang.org/x/time/rate"
)
type RateLimiter struct {
limiter *rate.Limiter
}
func NewRateLimiter(rps int) *RateLimiter {
return &RateLimiter{
limiter: rate.NewLimiter(rate.Limit(rps), rps),
}
}
func (l *RateLimiter) Allow() bool {
return l.limiter.Allow()
}
func (l *RateLimiter) Acquire() error {
return l.limiter.Wait(context.Background())
}使用自定义限流器
go
package main
import (
"mylimiter"
"github.com/cloudwego/kitex/client"
)
func main() {
limiter := mylimiter.NewRateLimiter(1000)
c, err := echo.NewClient(
"echo",
client.WithLimiter(limiter),
)
if err != nil {
panic(err)
}
ctx := context.Background()
resp, err := c.Echo(ctx, &api.Request{Message: "Hello"})
if err != nil {
panic(err)
}
log.Println(resp.Message)
}6. 并发限流
控制同时处理的请求数量:
go
package main
import (
"github.com/cloudwego/kitex/server"
"github.com/cloudwego/kitex/pkg/limiter"
)
func main() {
svr := echo.NewServer(
new(EchoImpl),
server.WithConcurrencyLimiter(
limiter.NewConcurrencyLimiter(500),
),
)
if err := svr.Run(); err != nil {
log.Println("server stopped with error:", err)
}
}7. 分布式限流
使用 Redis 实现分布式限流:
go
package distributed
import (
"context"
"fmt"
"time"
"github.com/go-redis/redis/v8"
)
type RedisLimiter struct {
client *redis.Client
key string
limit int
window time.Duration
}
func NewRedisLimiter(client *redis.Client, key string, limit int, window time.Duration) *RedisLimiter {
return &RedisLimiter{
client: client,
key: key,
limit: limit,
window: window,
}
}
func (l *RedisLimiter) Allow(ctx context.Context) (bool, error) {
now := time.Now().UnixNano()
min := now - int64(l.window)
pipe := l.client.Pipeline()
pipe.ZRemRangeByScore(ctx, l.key, "0", fmt.Sprintf("%d", min))
pipe.ZAdd(ctx, l.key, &redis.Z{Score: float64(now), Member: now})
pipe.ZCard(ctx, l.key)
pipe.Expire(ctx, l.key, l.window)
cmds, err := pipe.Exec(ctx)
if err != nil {
return false, err
}
count := cmds[2].(*redis.IntCmd).Val()
return count <= int64(l.limit), nil
}8. 熔断与限流组合
同时使用熔断和限流:
go
package main
import (
"github.com/cloudwego/kitex/client"
"github.com/cloudwego/kitex/pkg/circuitbreak"
"github.com/cloudwego/kitex/pkg/limiter"
)
func main() {
cb := circuitbreak.NewCBSuite(
circuitbreak.WithServiceRatio(
"echo",
circuitbreak.ErrRateConfig{
Enable: true,
Threshold: 0.5,
MinSamples: 100,
},
),
)
c, err := echo.NewClient(
"echo",
client.WithMiddleware(cb.CBMiddleware()),
client.WithLimiter(limiter.NewQPSLimiter(1000)),
)
if err != nil {
panic(err)
}
ctx := context.Background()
resp, err := c.Echo(ctx, &api.Request{Message: "Hello"})
if err != nil {
log.Printf("call failed: %v", err)
return
}
log.Println(resp.Message)
}9. 降级策略
熔断触发后的降级处理:
go
package main
import (
"context"
"github.com/cloudwego/kitex/pkg/circuitbreak"
)
type FallbackService struct {
client *echo.Client
}
func (s *FallbackService) Echo(ctx context.Context, req *api.Request) (*api.Response, error) {
resp, err := s.client.Echo(ctx, req)
if err != nil {
if circuitbreak.IsCircuitBreakerError(err) {
return s.fallback(req)
}
return nil, err
}
return resp, nil
}
func (s *FallbackService) fallback(req *api.Request) (*api.Response, error) {
return &api.Response{
Message: "Service temporarily unavailable, please try again later",
}, nil
}最佳实践
1. 熔断配置建议
| 参数 | 推荐值 | 说明 |
|---|---|---|
| Threshold | 0.3-0.5 | 错误率阈值 |
| MinSamples | 50-100 | 最小采样数 |
| HalfOpenSuc | 3-5 | 半开成功阈值 |
| HalfOpenFail | 1-2 | 半开失败阈值 |
2. 限流配置建议
go
type RateLimitConfig struct {
QPS int
Concurrency int
Strategy string
}
func NewRateLimitConfig() *RateLimitConfig {
return &RateLimitConfig{
QPS: 10000,
Concurrency: 1000,
Strategy: "token_bucket",
}
}3. 监控指标
go
import (
"github.com/prometheus/client_golang/prometheus"
)
var (
circuitBreakerGauge = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: "circuit_breaker_state",
Help: "Circuit breaker state",
},
[]string{"service"},
)
rateLimitCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "rate_limit_total",
Help: "Rate limit counter",
},
[]string{"service", "action"},
)
)小结
本章介绍了 CloudWeGo 生态中的熔断与限流机制:
- 熔断器原理:理解熔断器的状态转换和工作机制
- Kitex 熔断配置:掌握熔断器的配置和使用方法
- 限流器原理:了解常见限流算法的特点和适用场景
- Kitex 限流配置:掌握 QPS 限流和并发限流的使用
- 自定义扩展:学会实现自定义限流器和分布式限流
- 组合使用:了解熔断与限流的组合策略
- 降级处理:掌握熔断触发后的降级方案
- 最佳实践:学会合理配置熔断和限流参数
熔断与限流是保障系统稳定性的重要手段,合理配置和使用这些机制对于构建高可用微服务系统至关重要。