Skip to content

熔断与限流

概述

熔断与限流是微服务架构中保护系统稳定性的重要机制。熔断器防止故障蔓延,限流器防止系统过载。在 CloudWeGo 生态中,Kitex 提供了完善的熔断和限流能力,帮助构建高可用的分布式系统。

为什么需要熔断与限流?

在分布式系统中存在多种风险:

  • 服务依赖故障导致级联失败
  • 突发流量超过系统处理能力
  • 资源耗尽导致服务不可用
  • 故障恢复时间过长

熔断与限流通过主动控制流量,保护系统核心功能,提升整体稳定性。

核心内容

1. 熔断器原理

基本概念

  • 熔断器(Circuit Breaker):类似电路熔断器,当错误率达到阈值时自动熔断
  • 熔断状态:关闭(Closed)、打开(Open)、半开(Half-Open)
  • 错误率阈值:触发熔断的错误比例
  • 恢复时间:熔断后尝试恢复的等待时间

状态转换

关闭(Closed)
    ↓ 错误率超过阈值
打开(Open)
    ↓ 等待恢复时间
半开(Half-Open)
    ↓ 成功 → 关闭
    ↓ 失败 → 打开

2. Kitex 熔断配置

基本使用

Kitex 通过中间件实现熔断功能:

go
package main

import (
    "github.com/cloudwego/kitex/client"
    "github.com/cloudwego/kitex/pkg/circuitbreak"
    "github.com/cloudwego/kitex/pkg/rpcinfo"
)

func main() {
    cb := circuitbreak.NewCBSuite(
        circuitbreak.WithServiceRatio(
            "echo",
            circuitbreak.ErrRateConfig{
                Enable: true,
                Threshold: 0.5,
                MinSamples: 100,
            },
        ),
    )
    
    c, err := echo.NewClient(
        "echo",
        client.WithMiddleware(cb.CBMiddleware()),
    )
    if err != nil {
        panic(err)
    }
    
    ctx := context.Background()
    resp, err := c.Echo(ctx, &api.Request{Message: "Hello"})
    if err != nil {
        log.Printf("call failed: %v", err)
        return
    }
    log.Println(resp.Message)
}

熔断策略配置

go
package main

import (
    "github.com/cloudwego/kitex/pkg/circuitbreak"
)

func main() {
    cb := circuitbreak.NewCBSuite(
        circuitbreak.WithServiceRatio(
            "echo",
            circuitbreak.ErrRateConfig{
                Enable:      true,
                Threshold:   0.3,
                MinSamples:  50,
                HalfOpenSuc: 5,
                HalfOpenFail: 1,
            },
        ),
    )
}

配置说明:

  • Threshold:错误率阈值,超过则熔断
  • MinSamples:最小采样数,达到后才计算错误率
  • HalfOpenSuc:半开状态成功次数阈值
  • HalfOpenFail:半开状态失败次数阈值

3. 限流器原理

基本概念

  • 限流器(Limiter):控制请求速率,防止系统过载
  • 限流算法:计数器、滑动窗口、令牌桶、漏桶等
  • 限流维度:QPS、并发数、资源使用率等
  • 限流策略:拒绝、排队、降级等

常见算法

计数器算法:
固定时间窗口内请求数超过阈值则拒绝

滑动窗口算法:
将时间窗口分为多个小格,滑动统计请求数

令牌桶算法:
以固定速率生成令牌,请求消耗令牌,无令牌则拒绝

漏桶算法:
请求进入漏桶,以固定速率流出,桶满则拒绝

4. Kitex 限流配置

客户端限流

Kitex 客户端通过 limiter 扩展点实现限流:

go
package main

import (
    "github.com/cloudwego/kitex/client"
    "github.com/cloudwego/kitex/pkg/limiter"
)

func main() {
    c, err := echo.NewClient(
        "echo",
        client.WithLimiter(
            limiter.NewQPSLimiter(1000),
        ),
    )
    if err != nil {
        panic(err)
    }
    
    ctx := context.Background()
    for i := 0; i < 2000; i++ {
        resp, err := c.Echo(ctx, &api.Request{Message: "Hello"})
        if err != nil {
            log.Printf("call %d failed: %v", i, err)
            continue
        }
        log.Printf("call %d success: %s", i, resp.Message)
    }
}

服务端限流

服务端限流保护服务不被过载:

go
package main

import (
    "github.com/cloudwego/kitex/server"
    "github.com/cloudwego/kitex/pkg/limiter"
)

func main() {
    svr := echo.NewServer(
        new(EchoImpl),
        server.WithLimiter(
            limiter.NewQPSLimiter(10000),
        ),
        server.WithConcurrencyLimiter(
            limiter.NewConcurrencyLimiter(1000),
        ),
    )
    
    if err := svr.Run(); err != nil {
        log.Println("server stopped with error:", err)
    }
}

5. 自定义限流器

实现自定义限流器:

go
package mylimiter

import (
    "github.com/cloudwego/kitex/pkg/limiter"
    "golang.org/x/time/rate"
)

type RateLimiter struct {
    limiter *rate.Limiter
}

func NewRateLimiter(rps int) *RateLimiter {
    return &RateLimiter{
        limiter: rate.NewLimiter(rate.Limit(rps), rps),
    }
}

func (l *RateLimiter) Allow() bool {
    return l.limiter.Allow()
}

func (l *RateLimiter) Acquire() error {
    return l.limiter.Wait(context.Background())
}

使用自定义限流器

go
package main

import (
    "mylimiter"
    "github.com/cloudwego/kitex/client"
)

func main() {
    limiter := mylimiter.NewRateLimiter(1000)
    
    c, err := echo.NewClient(
        "echo",
        client.WithLimiter(limiter),
    )
    if err != nil {
        panic(err)
    }
    
    ctx := context.Background()
    resp, err := c.Echo(ctx, &api.Request{Message: "Hello"})
    if err != nil {
        panic(err)
    }
    log.Println(resp.Message)
}

6. 并发限流

控制同时处理的请求数量:

go
package main

import (
    "github.com/cloudwego/kitex/server"
    "github.com/cloudwego/kitex/pkg/limiter"
)

func main() {
    svr := echo.NewServer(
        new(EchoImpl),
        server.WithConcurrencyLimiter(
            limiter.NewConcurrencyLimiter(500),
        ),
    )
    
    if err := svr.Run(); err != nil {
        log.Println("server stopped with error:", err)
    }
}

7. 分布式限流

使用 Redis 实现分布式限流:

go
package distributed

import (
    "context"
    "fmt"
    "time"
    
    "github.com/go-redis/redis/v8"
)

type RedisLimiter struct {
    client *redis.Client
    key    string
    limit  int
    window time.Duration
}

func NewRedisLimiter(client *redis.Client, key string, limit int, window time.Duration) *RedisLimiter {
    return &RedisLimiter{
        client: client,
        key:    key,
        limit:  limit,
        window: window,
    }
}

func (l *RedisLimiter) Allow(ctx context.Context) (bool, error) {
    now := time.Now().UnixNano()
    min := now - int64(l.window)
    
    pipe := l.client.Pipeline()
    pipe.ZRemRangeByScore(ctx, l.key, "0", fmt.Sprintf("%d", min))
    pipe.ZAdd(ctx, l.key, &redis.Z{Score: float64(now), Member: now})
    pipe.ZCard(ctx, l.key)
    pipe.Expire(ctx, l.key, l.window)
    
    cmds, err := pipe.Exec(ctx)
    if err != nil {
        return false, err
    }
    
    count := cmds[2].(*redis.IntCmd).Val()
    return count <= int64(l.limit), nil
}

8. 熔断与限流组合

同时使用熔断和限流:

go
package main

import (
    "github.com/cloudwego/kitex/client"
    "github.com/cloudwego/kitex/pkg/circuitbreak"
    "github.com/cloudwego/kitex/pkg/limiter"
)

func main() {
    cb := circuitbreak.NewCBSuite(
        circuitbreak.WithServiceRatio(
            "echo",
            circuitbreak.ErrRateConfig{
                Enable:     true,
                Threshold:  0.5,
                MinSamples: 100,
            },
        ),
    )
    
    c, err := echo.NewClient(
        "echo",
        client.WithMiddleware(cb.CBMiddleware()),
        client.WithLimiter(limiter.NewQPSLimiter(1000)),
    )
    if err != nil {
        panic(err)
    }
    
    ctx := context.Background()
    resp, err := c.Echo(ctx, &api.Request{Message: "Hello"})
    if err != nil {
        log.Printf("call failed: %v", err)
        return
    }
    log.Println(resp.Message)
}

9. 降级策略

熔断触发后的降级处理:

go
package main

import (
    "context"
    
    "github.com/cloudwego/kitex/pkg/circuitbreak"
)

type FallbackService struct {
    client *echo.Client
}

func (s *FallbackService) Echo(ctx context.Context, req *api.Request) (*api.Response, error) {
    resp, err := s.client.Echo(ctx, req)
    if err != nil {
        if circuitbreak.IsCircuitBreakerError(err) {
            return s.fallback(req)
        }
        return nil, err
    }
    return resp, nil
}

func (s *FallbackService) fallback(req *api.Request) (*api.Response, error) {
    return &api.Response{
        Message: "Service temporarily unavailable, please try again later",
    }, nil
}

最佳实践

1. 熔断配置建议

参数推荐值说明
Threshold0.3-0.5错误率阈值
MinSamples50-100最小采样数
HalfOpenSuc3-5半开成功阈值
HalfOpenFail1-2半开失败阈值

2. 限流配置建议

go
type RateLimitConfig struct {
    QPS         int
    Concurrency int
    Strategy    string
}

func NewRateLimitConfig() *RateLimitConfig {
    return &RateLimitConfig{
        QPS:         10000,
        Concurrency: 1000,
        Strategy:    "token_bucket",
    }
}

3. 监控指标

go
import (
    "github.com/prometheus/client_golang/prometheus"
)

var (
    circuitBreakerGauge = prometheus.NewGaugeVec(
        prometheus.GaugeOpts{
            Name: "circuit_breaker_state",
            Help: "Circuit breaker state",
        },
        []string{"service"},
    )
    
    rateLimitCounter = prometheus.NewCounterVec(
        prometheus.CounterOpts{
            Name: "rate_limit_total",
            Help: "Rate limit counter",
        },
        []string{"service", "action"},
    )
)

小结

本章介绍了 CloudWeGo 生态中的熔断与限流机制:

  1. 熔断器原理:理解熔断器的状态转换和工作机制
  2. Kitex 熔断配置:掌握熔断器的配置和使用方法
  3. 限流器原理:了解常见限流算法的特点和适用场景
  4. Kitex 限流配置:掌握 QPS 限流和并发限流的使用
  5. 自定义扩展:学会实现自定义限流器和分布式限流
  6. 组合使用:了解熔断与限流的组合策略
  7. 降级处理:掌握熔断触发后的降级方案
  8. 最佳实践:学会合理配置熔断和限流参数

熔断与限流是保障系统稳定性的重要手段,合理配置和使用这些机制对于构建高可用微服务系统至关重要。