Skip to content

重试与超时

概述

重试与超时是微服务架构中处理网络抖动和临时故障的重要机制。合理的重试策略可以提高请求成功率,而恰当的超时设置可以防止资源长时间占用。在 CloudWeGo 生态中,Kitex 提供了灵活的重试和超时配置能力。

为什么需要重试与超时?

在分布式系统中存在各种不确定性:

  • 网络抖动导致请求失败
  • 服务临时不可用后快速恢复
  • 请求处理时间不可预测
  • 资源竞争导致响应延迟

重试与超时通过合理的容错机制,提升系统的鲁棒性和用户体验。

核心内容

1. 重试机制原理

基本概念

  • 重试(Retry):请求失败后自动重新发起请求
  • 重试策略:定义何时重试、重试几次、重试间隔等
  • 幂等性:多次重试不会产生副作用
  • 退避策略:重试间隔的递增方式

重试场景

可重试场景:
- 网络超时
- 服务临时不可用
- 资源暂时耗尽

不可重试场景:
- 业务逻辑错误
- 参数校验失败
- 权限不足

2. Kitex 重试配置

基本使用

Kitex 通过 retry 扩展点配置重试策略:

go
package main

import (
    "github.com/cloudwego/kitex/client"
    "github.com/cloudwego/kitex/pkg/retry"
)

func main() {
    fp := retry.NewFailurePolicy()
    fp.MaxRetryTimes = 3
    fp.MaxDuration = 5 * time.Second
    
    c, err := echo.NewClient(
        "echo",
        client.WithFailureRetry(fp),
    )
    if err != nil {
        panic(err)
    }
    
    ctx := context.Background()
    resp, err := c.Echo(ctx, &api.Request{Message: "Hello"})
    if err != nil {
        log.Printf("call failed after retries: %v", err)
        return
    }
    log.Println(resp.Message)
}

重试策略配置

go
package main

import (
    "time"
    
    "github.com/cloudwego/kitex/pkg/retry"
)

func main() {
    fp := &retry.FailurePolicy{
        MaxRetryTimes:    3,
        MaxDuration:      10 * time.Second,
        RetrySameNode:    false,
        BackOffPolicy:    retry.NewFixedBackOffPolicy(100 * time.Millisecond),
        RetryTimeout:     true,
        RetrySameNetwork: false,
    }
    
    c, err := echo.NewClient(
        "echo",
        client.WithFailureRetry(fp),
    )
    if err != nil {
        panic(err)
    }
}

配置说明:

  • MaxRetryTimes:最大重试次数
  • MaxDuration:最大重试时间
  • RetrySameNode:是否重试同一节点
  • BackOffPolicy:退避策略
  • RetryTimeout:超时是否重试

3. 退避策略

固定退避

每次重试间隔固定时间:

go
package main

import (
    "time"
    
    "github.com/cloudwego/kitex/pkg/retry"
)

func main() {
    fp := &retry.FailurePolicy{
        MaxRetryTimes: 3,
        BackOffPolicy: retry.NewFixedBackOffPolicy(100 * time.Millisecond),
    }
    
    c, err := echo.NewClient(
        "echo",
        client.WithFailureRetry(fp),
    )
    if err != nil {
        panic(err)
    }
}

随机退避

每次重试间隔随机时间:

go
package main

import (
    "time"
    
    "github.com/cloudwego/kitex/pkg/retry"
)

func main() {
    fp := &retry.FailurePolicy{
        MaxRetryTimes: 3,
        BackOffPolicy: retry.NewRandomBackOffPolicy(
            50*time.Millisecond,
            200*time.Millisecond,
        ),
    }
    
    c, err := echo.NewClient(
        "echo",
        client.WithFailureRetry(fp),
    )
    if err != nil {
        panic(err)
    }
}

指数退避

重试间隔按指数增长:

go
package main

import (
    "time"
    
    "github.com/cloudwego/kitex/pkg/retry"
)

func main() {
    fp := &retry.FailurePolicy{
        MaxRetryTimes: 3,
        BackOffPolicy: retry.NewExponentialBackOffPolicy(
            10*time.Millisecond,
            1*time.Second,
            2.0,
        ),
    }
    
    c, err := echo.NewClient(
        "echo",
        client.WithFailureRetry(fp),
    )
    if err != nil {
        panic(err)
    }
}

4. 超时机制原理

基本概念

  • 超时(Timeout):请求在指定时间内未完成则取消
  • 连接超时:建立连接的超时时间
  • 读写超时:数据传输的超时时间
  • 处理超时:服务端处理的超时时间

超时层级

客户端超时:
- 连接超时
- 请求超时
- RPC 超时

服务端超时:
- 读超时
- 写超时
- 处理超时

5. Kitex 超时配置

客户端超时

go
package main

import (
    "time"
    
    "github.com/cloudwego/kitex/client"
)

func main() {
    c, err := echo.NewClient(
        "echo",
        client.WithConnectTimeout(1*time.Second),
        client.WithTimeout(3*time.Second),
    )
    if err != nil {
        panic(err)
    }
    
    ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
    defer cancel()
    
    resp, err := c.Echo(ctx, &api.Request{Message: "Hello"})
    if err != nil {
        log.Printf("call failed: %v", err)
        return
    }
    log.Println(resp.Message)
}

服务端超时

go
package main

import (
    "time"
    
    "github.com/cloudwego/kitex/server"
)

func main() {
    svr := echo.NewServer(
        new(EchoImpl),
        server.WithReadWriteTimeout(30*time.Second),
    )
    
    if err := svr.Run(); err != nil {
        log.Println("server stopped with error:", err)
    }
}

6. Context 超时控制

使用 Context 进行超时控制:

go
package main

import (
    "context"
    "time"
)

func main() {
    ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
    defer cancel()
    
    resp, err := c.Echo(ctx, &api.Request{Message: "Hello"})
    if err != nil {
        if errors.Is(err, context.DeadlineExceeded) {
            log.Println("request timeout")
        } else {
            log.Printf("call failed: %v", err)
        }
        return
    }
    log.Println(resp.Message)
}

7. 自定义重试策略

实现自定义重试策略:

go
package myretry

import (
    "github.com/cloudwego/kitex/pkg/retry"
    "github.com/cloudwego/kitex/pkg/rpcinfo"
)

type MyRetryPolicy struct {
    maxRetryTimes int
    backoff       retry.BackOffPolicy
}

func NewMyRetryPolicy(maxRetryTimes int, backoff retry.BackOffPolicy) *MyRetryPolicy {
    return &MyRetryPolicy{
        maxRetryTimes: maxRetryTimes,
        backoff:       backoff,
    }
}

func (p *MyRetryPolicy) AllowRetry(req rpcinfo.RPCInfo, prevErr error, retryTimes int) bool {
    if retryTimes >= p.maxRetryTimes {
        return false
    }
    
    if isBusinessError(prevErr) {
        return false
    }
    
    return true
}

func (p *MyRetryPolicy) BackOff(retryTimes int) time.Duration {
    return p.backoff.BackOff(retryTimes)
}

func isBusinessError(err error) bool {
    return false
}

8. 幂等性设计

重试需要保证幂等性:

go
package main

import (
    "context"
    "crypto/md5"
    "encoding/hex"
)

type IdempotentService struct {
    client *echo.Client
    cache  map[string]*api.Response
}

func (s *IdempotentService) Echo(ctx context.Context, req *api.Request) (*api.Response, error) {
    requestID := s.generateRequestID(req)
    
    if resp, ok := s.cache[requestID]; ok {
        return resp, nil
    }
    
    resp, err := s.client.Echo(ctx, req)
    if err != nil {
        return nil, err
    }
    
    s.cache[requestID] = resp
    return resp, nil
}

func (s *IdempotentService) generateRequestID(req *api.Request) string {
    h := md5.New()
    h.Write([]byte(req.Message))
    return hex.EncodeToString(h.Sum(nil))
}

9. 超时传播

在服务链路中传播超时:

go
package main

import (
    "context"
    "time"
)

func (s *ServiceImpl) Process(ctx context.Context, req *api.Request) (*api.Response, error) {
    deadline, ok := ctx.Deadline()
    if !ok {
        var cancel context.CancelFunc
        ctx, cancel = context.WithTimeout(ctx, 5*time.Second)
        defer cancel()
    } else {
        remaining := time.Until(deadline)
        if remaining < 100*time.Millisecond {
            return nil, errors.New("insufficient time remaining")
        }
    }
    
    return s.callDownstream(ctx, req)
}

func (s *ServiceImpl) callDownstream(ctx context.Context, req *api.Request) (*api.Response, error) {
    return s.client.Echo(ctx, req)
}

10. 重试与熔断配合

重试与熔断机制配合使用:

go
package main

import (
    "github.com/cloudwego/kitex/client"
    "github.com/cloudwego/kitex/pkg/circuitbreak"
    "github.com/cloudwego/kitex/pkg/retry"
)

func main() {
    cb := circuitbreak.NewCBSuite(
        circuitbreak.WithServiceRatio(
            "echo",
            circuitbreak.ErrRateConfig{
                Enable:     true,
                Threshold:  0.5,
                MinSamples: 100,
            },
        ),
    )
    
    fp := retry.NewFailurePolicy()
    fp.MaxRetryTimes = 3
    
    c, err := echo.NewClient(
        "echo",
        client.WithMiddleware(cb.CBMiddleware()),
        client.WithFailureRetry(fp),
    )
    if err != nil {
        panic(err)
    }
    
    ctx := context.Background()
    resp, err := c.Echo(ctx, &api.Request{Message: "Hello"})
    if err != nil {
        log.Printf("call failed: %v", err)
        return
    }
    log.Println(resp.Message)
}

最佳实践

1. 重试配置建议

参数推荐值说明
MaxRetryTimes2-3避免过多重试
MaxDuration5-10s总重试时间
BackOffPolicy指数退避避免雪崩
RetrySameNodefalse切换节点重试

2. 超时配置建议

go
type TimeoutConfig struct {
    ConnectTimeout   time.Duration
    RequestTimeout   time.Duration
    ReadWriteTimeout time.Duration
}

func NewTimeoutConfig() *TimeoutConfig {
    return &TimeoutConfig{
        ConnectTimeout:   1 * time.Second,
        RequestTimeout:   3 * time.Second,
        ReadWriteTimeout: 30 * time.Second,
    }
}

3. 错误处理

go
func handleError(err error) {
    if errors.Is(err, context.DeadlineExceeded) {
        log.Println("request timeout")
    } else if errors.Is(err, context.Canceled) {
        log.Println("request canceled")
    } else if circuitbreak.IsCircuitBreakerError(err) {
        log.Println("circuit breaker open")
    } else {
        log.Printf("unknown error: %v", err)
    }
}

小结

本章介绍了 CloudWeGo 生态中的重试与超时机制:

  1. 重试机制原理:理解重试的场景和幂等性要求
  2. Kitex 重试配置:掌握重试策略的配置和使用
  3. 退避策略:了解固定、随机、指数退避的特点
  4. 超时机制原理:理解超时的层级和控制方式
  5. Kitex 超时配置:掌握客户端和服务端的超时配置
  6. Context 超时控制:学会使用 Context 进行超时管理
  7. 自定义扩展:学会实现自定义重试策略
  8. 幂等性设计:掌握重试场景下的幂等性保证
  9. 超时传播:了解服务链路中的超时传播机制
  10. 最佳实践:学会合理配置重试和超时参数

重试与超时是提升系统鲁棒性的重要手段,合理配置和使用这些机制对于构建可靠的微服务系统至关重要。