重试与超时
概述
重试与超时是微服务架构中处理网络抖动和临时故障的重要机制。合理的重试策略可以提高请求成功率,而恰当的超时设置可以防止资源长时间占用。在 CloudWeGo 生态中,Kitex 提供了灵活的重试和超时配置能力。
为什么需要重试与超时?
在分布式系统中存在各种不确定性:
- 网络抖动导致请求失败
- 服务临时不可用后快速恢复
- 请求处理时间不可预测
- 资源竞争导致响应延迟
重试与超时通过合理的容错机制,提升系统的鲁棒性和用户体验。
核心内容
1. 重试机制原理
基本概念
- 重试(Retry):请求失败后自动重新发起请求
- 重试策略:定义何时重试、重试几次、重试间隔等
- 幂等性:多次重试不会产生副作用
- 退避策略:重试间隔的递增方式
重试场景
可重试场景:
- 网络超时
- 服务临时不可用
- 资源暂时耗尽
不可重试场景:
- 业务逻辑错误
- 参数校验失败
- 权限不足2. Kitex 重试配置
基本使用
Kitex 通过 retry 扩展点配置重试策略:
go
package main
import (
"github.com/cloudwego/kitex/client"
"github.com/cloudwego/kitex/pkg/retry"
)
func main() {
fp := retry.NewFailurePolicy()
fp.MaxRetryTimes = 3
fp.MaxDuration = 5 * time.Second
c, err := echo.NewClient(
"echo",
client.WithFailureRetry(fp),
)
if err != nil {
panic(err)
}
ctx := context.Background()
resp, err := c.Echo(ctx, &api.Request{Message: "Hello"})
if err != nil {
log.Printf("call failed after retries: %v", err)
return
}
log.Println(resp.Message)
}重试策略配置
go
package main
import (
"time"
"github.com/cloudwego/kitex/pkg/retry"
)
func main() {
fp := &retry.FailurePolicy{
MaxRetryTimes: 3,
MaxDuration: 10 * time.Second,
RetrySameNode: false,
BackOffPolicy: retry.NewFixedBackOffPolicy(100 * time.Millisecond),
RetryTimeout: true,
RetrySameNetwork: false,
}
c, err := echo.NewClient(
"echo",
client.WithFailureRetry(fp),
)
if err != nil {
panic(err)
}
}配置说明:
MaxRetryTimes:最大重试次数MaxDuration:最大重试时间RetrySameNode:是否重试同一节点BackOffPolicy:退避策略RetryTimeout:超时是否重试
3. 退避策略
固定退避
每次重试间隔固定时间:
go
package main
import (
"time"
"github.com/cloudwego/kitex/pkg/retry"
)
func main() {
fp := &retry.FailurePolicy{
MaxRetryTimes: 3,
BackOffPolicy: retry.NewFixedBackOffPolicy(100 * time.Millisecond),
}
c, err := echo.NewClient(
"echo",
client.WithFailureRetry(fp),
)
if err != nil {
panic(err)
}
}随机退避
每次重试间隔随机时间:
go
package main
import (
"time"
"github.com/cloudwego/kitex/pkg/retry"
)
func main() {
fp := &retry.FailurePolicy{
MaxRetryTimes: 3,
BackOffPolicy: retry.NewRandomBackOffPolicy(
50*time.Millisecond,
200*time.Millisecond,
),
}
c, err := echo.NewClient(
"echo",
client.WithFailureRetry(fp),
)
if err != nil {
panic(err)
}
}指数退避
重试间隔按指数增长:
go
package main
import (
"time"
"github.com/cloudwego/kitex/pkg/retry"
)
func main() {
fp := &retry.FailurePolicy{
MaxRetryTimes: 3,
BackOffPolicy: retry.NewExponentialBackOffPolicy(
10*time.Millisecond,
1*time.Second,
2.0,
),
}
c, err := echo.NewClient(
"echo",
client.WithFailureRetry(fp),
)
if err != nil {
panic(err)
}
}4. 超时机制原理
基本概念
- 超时(Timeout):请求在指定时间内未完成则取消
- 连接超时:建立连接的超时时间
- 读写超时:数据传输的超时时间
- 处理超时:服务端处理的超时时间
超时层级
客户端超时:
- 连接超时
- 请求超时
- RPC 超时
服务端超时:
- 读超时
- 写超时
- 处理超时5. Kitex 超时配置
客户端超时
go
package main
import (
"time"
"github.com/cloudwego/kitex/client"
)
func main() {
c, err := echo.NewClient(
"echo",
client.WithConnectTimeout(1*time.Second),
client.WithTimeout(3*time.Second),
)
if err != nil {
panic(err)
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
resp, err := c.Echo(ctx, &api.Request{Message: "Hello"})
if err != nil {
log.Printf("call failed: %v", err)
return
}
log.Println(resp.Message)
}服务端超时
go
package main
import (
"time"
"github.com/cloudwego/kitex/server"
)
func main() {
svr := echo.NewServer(
new(EchoImpl),
server.WithReadWriteTimeout(30*time.Second),
)
if err := svr.Run(); err != nil {
log.Println("server stopped with error:", err)
}
}6. Context 超时控制
使用 Context 进行超时控制:
go
package main
import (
"context"
"time"
)
func main() {
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
resp, err := c.Echo(ctx, &api.Request{Message: "Hello"})
if err != nil {
if errors.Is(err, context.DeadlineExceeded) {
log.Println("request timeout")
} else {
log.Printf("call failed: %v", err)
}
return
}
log.Println(resp.Message)
}7. 自定义重试策略
实现自定义重试策略:
go
package myretry
import (
"github.com/cloudwego/kitex/pkg/retry"
"github.com/cloudwego/kitex/pkg/rpcinfo"
)
type MyRetryPolicy struct {
maxRetryTimes int
backoff retry.BackOffPolicy
}
func NewMyRetryPolicy(maxRetryTimes int, backoff retry.BackOffPolicy) *MyRetryPolicy {
return &MyRetryPolicy{
maxRetryTimes: maxRetryTimes,
backoff: backoff,
}
}
func (p *MyRetryPolicy) AllowRetry(req rpcinfo.RPCInfo, prevErr error, retryTimes int) bool {
if retryTimes >= p.maxRetryTimes {
return false
}
if isBusinessError(prevErr) {
return false
}
return true
}
func (p *MyRetryPolicy) BackOff(retryTimes int) time.Duration {
return p.backoff.BackOff(retryTimes)
}
func isBusinessError(err error) bool {
return false
}8. 幂等性设计
重试需要保证幂等性:
go
package main
import (
"context"
"crypto/md5"
"encoding/hex"
)
type IdempotentService struct {
client *echo.Client
cache map[string]*api.Response
}
func (s *IdempotentService) Echo(ctx context.Context, req *api.Request) (*api.Response, error) {
requestID := s.generateRequestID(req)
if resp, ok := s.cache[requestID]; ok {
return resp, nil
}
resp, err := s.client.Echo(ctx, req)
if err != nil {
return nil, err
}
s.cache[requestID] = resp
return resp, nil
}
func (s *IdempotentService) generateRequestID(req *api.Request) string {
h := md5.New()
h.Write([]byte(req.Message))
return hex.EncodeToString(h.Sum(nil))
}9. 超时传播
在服务链路中传播超时:
go
package main
import (
"context"
"time"
)
func (s *ServiceImpl) Process(ctx context.Context, req *api.Request) (*api.Response, error) {
deadline, ok := ctx.Deadline()
if !ok {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, 5*time.Second)
defer cancel()
} else {
remaining := time.Until(deadline)
if remaining < 100*time.Millisecond {
return nil, errors.New("insufficient time remaining")
}
}
return s.callDownstream(ctx, req)
}
func (s *ServiceImpl) callDownstream(ctx context.Context, req *api.Request) (*api.Response, error) {
return s.client.Echo(ctx, req)
}10. 重试与熔断配合
重试与熔断机制配合使用:
go
package main
import (
"github.com/cloudwego/kitex/client"
"github.com/cloudwego/kitex/pkg/circuitbreak"
"github.com/cloudwego/kitex/pkg/retry"
)
func main() {
cb := circuitbreak.NewCBSuite(
circuitbreak.WithServiceRatio(
"echo",
circuitbreak.ErrRateConfig{
Enable: true,
Threshold: 0.5,
MinSamples: 100,
},
),
)
fp := retry.NewFailurePolicy()
fp.MaxRetryTimes = 3
c, err := echo.NewClient(
"echo",
client.WithMiddleware(cb.CBMiddleware()),
client.WithFailureRetry(fp),
)
if err != nil {
panic(err)
}
ctx := context.Background()
resp, err := c.Echo(ctx, &api.Request{Message: "Hello"})
if err != nil {
log.Printf("call failed: %v", err)
return
}
log.Println(resp.Message)
}最佳实践
1. 重试配置建议
| 参数 | 推荐值 | 说明 |
|---|---|---|
| MaxRetryTimes | 2-3 | 避免过多重试 |
| MaxDuration | 5-10s | 总重试时间 |
| BackOffPolicy | 指数退避 | 避免雪崩 |
| RetrySameNode | false | 切换节点重试 |
2. 超时配置建议
go
type TimeoutConfig struct {
ConnectTimeout time.Duration
RequestTimeout time.Duration
ReadWriteTimeout time.Duration
}
func NewTimeoutConfig() *TimeoutConfig {
return &TimeoutConfig{
ConnectTimeout: 1 * time.Second,
RequestTimeout: 3 * time.Second,
ReadWriteTimeout: 30 * time.Second,
}
}3. 错误处理
go
func handleError(err error) {
if errors.Is(err, context.DeadlineExceeded) {
log.Println("request timeout")
} else if errors.Is(err, context.Canceled) {
log.Println("request canceled")
} else if circuitbreak.IsCircuitBreakerError(err) {
log.Println("circuit breaker open")
} else {
log.Printf("unknown error: %v", err)
}
}小结
本章介绍了 CloudWeGo 生态中的重试与超时机制:
- 重试机制原理:理解重试的场景和幂等性要求
- Kitex 重试配置:掌握重试策略的配置和使用
- 退避策略:了解固定、随机、指数退避的特点
- 超时机制原理:理解超时的层级和控制方式
- Kitex 超时配置:掌握客户端和服务端的超时配置
- Context 超时控制:学会使用 Context 进行超时管理
- 自定义扩展:学会实现自定义重试策略
- 幂等性设计:掌握重试场景下的幂等性保证
- 超时传播:了解服务链路中的超时传播机制
- 最佳实践:学会合理配置重试和超时参数
重试与超时是提升系统鲁棒性的重要手段,合理配置和使用这些机制对于构建可靠的微服务系统至关重要。