Skip to content

Kitex 客户端开发

概述

本章将深入介绍 Kitex 客户端开发,包括 Client 创建、调用方式、配置选项、连接池管理等内容,帮助你高效地发起 RPC 调用。

核心内容

创建 Client

基本创建

go
package main

import (
    "context"
    "log"
    
    "example/kitex_gen/hello/helloservice"
)

func main() {
    // 创建客户端
    cli := helloservice.MustNewClient("hello",
        client.WithHostPorts("127.0.0.1:8888"),
    )
    
    // 发起调用
    resp, err := cli.SayHello(context.Background(), &hello.Request{
        Name: "World",
    })
    if err != nil {
        log.Fatal(err)
    }
    
    log.Println(resp.Message)
}

错误处理

go
// 使用 NewClient 处理错误
cli, err := helloservice.NewClient("hello",
    client.WithHostPorts("127.0.0.1:8888"),
)
if err != nil {
    log.Fatalf("create client failed: %v", err)
}

服务发现

直连模式

go
cli := helloservice.MustNewClient("hello",
    client.WithHostPorts("127.0.0.1:8888", "127.0.0.1:8889"),
)

DNS 解析

go
import "github.com/cloudwego/kitex/pkg/resolver/dns"

cli := helloservice.MustNewClient("hello",
    client.WithResolver(dns.NewDNSResolver()),
    client.WithTag("dns", "hello-service.example.com"),
)

Nacos 注册中心

go
import (
    "github.com/kitex-contrib/registry-nacos/resolver"
)

r, err := resolver.NewDefaultNacosResolver("nacos://127.0.0.1:8848")
if err != nil {
    log.Fatal(err)
}

cli := helloservice.MustNewClient("hello",
    client.WithResolver(r),
)

Client 配置选项

1. 超时配置

go
cli := helloservice.MustNewClient("hello",
    client.WithHostPorts("127.0.0.1:8888"),
    client.WithTimeout(3*time.Second),           // 请求超时
    client.WithConnectTimeout(1*time.Second),    // 连接超时
)

2. 连接池配置

go
import "github.com/cloudwego/kitex/pkg/pool"

cli := helloservice.MustNewClient("hello",
    client.WithHostPorts("127.0.0.1:8888"),
    client.WithLongConnectionPoolConfig(&pool.LongPoolConfig{
        MaxIdlePerAddress:     100,  // 每个地址最大空闲连接
        MaxIdleGlobal:         1000, // 全局最大空闲连接
        MaxIdleTimeout:        30 * time.Second, // 空闲连接超时
        MaxConnIdleTimeout:    30 * time.Second, // 连接空闲超时
        MinIdlePerAddress:     10,   // 每个地址最小空闲连接
        MinIdleGlobal:         50,   // 全局最小空闲连接
        DialTimeout:           2 * time.Second, // 拨号超时
        KeepAlive:             true,  // 启用 keepalive
    }),
)

连接池类型

  • LongConnectionPool:长连接池,适合高频调用场景
  • ShortConnectionPool:短连接池,适合低频调用场景
  • NoConnectionPool:无连接池,每次调用创建新连接

连接池最佳实践

  • 根据服务调用频率选择合适的连接池类型
  • 合理设置最大空闲连接数,避免资源浪费
  • 设置适当的超时时间,避免连接泄漏
  • 启用 keepalive,保持连接活跃
  • 监控连接池状态,及时调整配置

连接池监控

go
// 监控连接池状态
metrics := pool.GetMetrics()
log.Printf("Connection pool metrics: %+v", metrics)

3. 中间件配置

go
cli := helloservice.MustNewClient("hello",
    client.WithHostPorts("127.0.0.1:8888"),
    client.WithMiddleware(LoggingMiddleware),
    client.WithMiddleware(MetricsMiddleware),
)

4. 负载均衡

go
import "github.com/cloudwego/kitex/pkg/loadbalance"

cli := helloservice.MustNewClient("hello",
    client.WithResolver(resolver),
    client.WithLoadBalancer(loadbalance.NewWeightedRoundRobinBalancer()),
)

5. 重试策略

go
import "github.com/cloudwego/kitex/pkg/retry"

cli := helloservice.MustNewClient("hello",
    client.WithHostPorts("127.0.0.1:8888"),
    client.WithRetryPolicy(
        retry.NewFailurePolicy(
            retry.WithMaxRetryTimes(3),
            retry.WithRetryBreaker(0.5),
            retry.WithDDLStop(0.9),
        ),
    ),
)

6. 熔断器

go
import "github.com/cloudwego/kitex/pkg/circuitbreak"

cli := helloservice.MustNewClient("hello",
    client.WithHostPorts("127.0.0.1:8888"),
    client.WithCircuitBreaker(circuitbreak.NewCBSuite(
        "hello",
        circuitbreak.WithErrorRateThreshold(0.5),
        circuitbreak.WithMinSample(100),
    )),
)

调用方式

1. 同步调用

go
resp, err := cli.SayHello(ctx, &hello.Request{
    Name: "World",
})
if err != nil {
    return err
}

2. 异步调用

go
// 使用 goroutine
go func() {
    resp, err := cli.SayHello(ctx, req)
    if err != nil {
        log.Printf("async call failed: %v", err)
        return
    }
    log.Printf("async response: %v", resp)
}()

3. 并发调用

go
func batchCall(ctx context.Context, cli helloservice.Client, names []string) ([]*hello.Response, error) {
    var wg sync.WaitGroup
    results := make([]*hello.Response, len(names))
    errors := make([]error, len(names))
    
    for i, name := range names {
        wg.Add(1)
        go func(idx int, n string) {
            defer wg.Done()
            resp, err := cli.SayHello(ctx, &hello.Request{Name: n})
            results[idx] = resp
            errors[idx] = err
        }(i, name)
    }
    
    wg.Wait()
    
    // 检查错误
    for _, err := range errors {
        if err != nil {
            return nil, err
        }
    }
    
    return results, nil
}

元信息传递

go
import "github.com/cloudwego/kitex/pkg/transmeta"

// 客户端传递元信息
ctx := context.Background()
ctx = transmeta.SetMeta(ctx, "trace-id", "123456")
ctx = transmeta.SetMeta(ctx, "user-id", "user001")

resp, err := cli.SayHello(ctx, req)

客户端代理模式

go
// 创建带缓存的客户端代理
type CachedClient struct {
    cli   helloservice.Client
    cache *cache.RedisCache
}

func NewCachedClient(cli helloservice.Client, cache *cache.RedisCache) *CachedClient {
    return &CachedClient{cli: cli, cache: cache}
}

func (c *CachedClient) SayHello(ctx context.Context, req *hello.Request) (*hello.Response, error) {
    // 检查缓存
    key := "hello:" + req.Name
    if msg, err := c.cache.Get(ctx, key); err == nil {
        return &hello.Response{Message: msg}, nil
    }
    
    // 调用服务
    resp, err := c.cli.SayHello(ctx, req)
    if err != nil {
        return nil, err
    }
    
    // 写入缓存
    c.cache.Set(ctx, key, resp.Message, 5*time.Minute)
    
    return resp, nil
}

故障处理策略

1. 智能重试策略

go
import (
    "github.com/cloudwego/kitex/pkg/retry"
    "github.com/cloudwego/kitex/pkg/rpcinfo"
)

cli := helloservice.MustNewClient("hello",
    client.WithRetryPolicy(
        retry.NewFailurePolicy(
            // 最大重试次数
            retry.WithMaxRetryTimes(3),
            // 重试断路器(失败率超过50%时停止重试)
            retry.WithRetryBreaker(0.5),
            // 重试停止时间点(超时前90%)
            retry.WithDDLStop(0.9),
            // 可重试的错误码
            retry.WithRetryableCodes(
                rpcinfo.ErrRPCInfoUnavailable,
                rpcinfo.ErrInternalServer,
            ),
            // 退避策略
            retry.WithBackoff(retry.NewExponentialBackoff(
                100*time.Millisecond,  // 初始退避时间
                1*time.Second,        // 最大退避时间
                2.0,                 // 退避因子
            )),
        ),
    ),
)

2. 高级熔断策略

go
import "github.com/cloudwego/kitex/pkg/circuitbreak"

cli := helloservice.MustNewClient("hello",
    client.WithCircuitBreaker(circuitbreak.NewCBSuite(
        "hello",
        // 错误率阈值(50%)
        circuitbreak.WithErrorRateThreshold(0.5),
        // 最小采样数(100个请求)
        circuitbreak.WithMinSample(100),
        // 半开状态尝试次数(5个请求)
        circuitbreak.WithHalfOpenRatio(0.1),
        // 熔断持续时间(30秒)
        circuitbreak.WithBreakDuration(30*time.Second),
        // 恢复持续时间(60秒)
        circuitbreak.WithRecoveryDuration(60*time.Second),
    )),
)

3. 超时控制策略

go
cli := helloservice.MustNewClient("hello",
    // 请求超时
    client.WithTimeout(3*time.Second),
    // 连接超时
    client.WithConnectTimeout(1*time.Second),
    // 读写超时
    client.WithReadWriteTimeout(2*time.Second),
    // 重试超时(累积)
    client.WithRPCTimeout(5*time.Second),
)

4. 错误处理最佳实践

go
func callWithRetry(ctx context.Context, cli helloservice.Client, req *hello.Request) (*hello.Response, error) {
    // 重试封装
    var lastErr error
    for i := 0; i < 3; i++ {
        resp, err := cli.SayHello(ctx, req)
        if err == nil {
            return resp, nil
        }
        
        lastErr = err
        
        // 检查是否是可重试的错误
        if !isRetryableError(err) {
            break
        }
        
        // 退避
        time.Sleep(time.Duration(i*100) * time.Millisecond)
    }
    
    return nil, lastErr
}

func isRetryableError(err error) bool {
    // 检查错误类型
    switch err.(type) {
    case *kerrors.DeadlineExceededError:
        return false // 超时错误不重试
    case *kerrors.ServiceDiscoveryError:
        return true  // 服务发现错误可重试
    case *kerrors.NetworkError:
        return true  // 网络错误可重试
    default:
        return false
    }
}

健康检查

服务健康检查

go
import "github.com/cloudwego/kitex/pkg/retry"

// 健康检查函数
func healthCheck(ctx context.Context, addr string) error {
    // 尝试连接服务
    conn, err := net.DialTimeout("tcp", addr, 500*time.Millisecond)
    if err != nil {
        return err
    }
    defer conn.Close()
    return nil
}

// 创建带健康检查的客户端
cli := helloservice.MustNewClient("hello",
    client.WithHostPorts("127.0.0.1:8888", "127.0.0.1:8889"),
    client.WithHealthChecker(healthCheck),
    client.WithHealthCheckInterval(30*time.Second),
)

性能优化

1. 批量调用

go
func batchCall(ctx context.Context, cli helloservice.Client, requests []*hello.Request) ([]*hello.Response, error) {
    type result struct {
        resp *hello.Response
        err  error
        idx  int
    }
    
    results := make([]*hello.Response, len(requests))
    errCh := make(chan error, 1)
    resultCh := make(chan result, len(requests))
    
    // 并发调用
    for i, req := range requests {
        go func(idx int, r *hello.Request) {
            resp, err := cli.SayHello(ctx, r)
            resultCh <- result{resp: resp, err: err, idx: idx}
        }(i, req)
    }
    
    // 收集结果
    for i := 0; i < len(requests); i++ {
        select {
        case res := <-resultCh:
            if res.err != nil {
                select {
                case errCh <- res.err:
                default:
                }
            }
            results[res.idx] = res.resp
        case err := <-errCh:
            return nil, err
        case <-ctx.Done():
            return nil, ctx.Err()
        }
    }
    
    return results, nil
}

2. 缓存优化

go
// 带缓存的客户端
 type CachedClient struct {
    cli    helloservice.Client
    cache  *sync.Map // 简单内存缓存
    ttl    time.Duration
}

func NewCachedClient(cli helloservice.Client, ttl time.Duration) *CachedClient {
    return &CachedClient{
        cli: cli,
        cache: &sync.Map{},
        ttl: ttl,
    }
}

func (c *CachedClient) SayHello(ctx context.Context, req *hello.Request) (*hello.Response, error) {
    key := "hello:" + req.Name
    
    // 检查缓存
    if val, ok := c.cache.Load(key); ok {
        if item, ok := val.(*cacheItem); ok {
            if time.Since(item.timestamp) < c.ttl {
                return item.resp, nil
            }
        }
    }
    
    // 调用服务
    resp, err := c.cli.SayHello(ctx, req)
    if err != nil {
        return nil, err
    }
    
    // 写入缓存
    c.cache.Store(key, &cacheItem{
        resp:      resp,
        timestamp: time.Now(),
    })
    
    return resp, nil
}

type cacheItem struct {
    resp      *hello.Response
    timestamp time.Time
}

完整示例

go
package main

import (
    "context"
    "log"
    "time"
    
    "github.com/cloudwego/kitex/client"
    "github.com/cloudwego/kitex/pkg/circuitbreak"
    "github.com/cloudwego/kitex/pkg/loadbalance"
    "github.com/cloudwego/kitex/pkg/pool"
    "github.com/cloudwego/kitex/pkg/retry"
    "example/kitex_gen/hello"
    "example/kitex_gen/hello/helloservice"
)

func main() {
    // 创建客户端
    cli := helloservice.MustNewClient("hello",
        // 服务地址
        client.WithHostPorts("127.0.0.1:8888"),
        
        // 超时配置
        client.WithTimeout(3*time.Second),
        client.WithConnectTimeout(1*time.Second),
        
        // 连接池配置
        client.WithLongConnectionPoolConfig(&pool.LongPoolConfig{
            MaxIdlePerAddress: 100,
            MaxIdleGlobal:     1000,
            MaxIdleTimeout:    30 * time.Second,
            MinIdlePerAddress: 10,
            KeepAlive:         true,
        }),
        
        // 负载均衡
        client.WithLoadBalancer(loadbalance.NewWeightedRoundRobinBalancer()),
        
        // 重试策略
        client.WithRetryPolicy(
            retry.NewFailurePolicy(
                retry.WithMaxRetryTimes(3),
                retry.WithRetryBreaker(0.5),
                retry.WithDDLStop(0.9),
            ),
        ),
        
        // 熔断策略
        client.WithCircuitBreaker(circuitbreak.NewCBSuite(
            "hello",
            circuitbreak.WithErrorRateThreshold(0.5),
            circuitbreak.WithMinSample(100),
        )),
    )
    
    // 发起调用
    ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
    defer cancel()
    
    resp, err := cli.SayHello(ctx, &hello.Request{
        Name: "World",
    })
    if err != nil {
        log.Fatalf("call failed: %v", err)
    }
    
    log.Printf("response: %s", resp.Message)
}

小结

本章介绍了 Kitex 客户端开发的核心内容:

  1. Client 创建:基本创建、错误处理、服务发现
  2. 配置选项:超时配置、连接池配置、中间件配置、负载均衡
  3. 故障处理
    • 智能重试策略:退避、断路器、可重试错误码
    • 高级熔断策略:错误率阈值、半开状态、恢复机制
    • 超时控制策略:请求超时、连接超时、读写超时
    • 错误处理最佳实践:错误分类、重试判断
  4. 连接池管理
    • 连接池类型:长连接池、短连接池、无连接池
    • 连接池配置:最大/最小空闲连接、超时设置、keepalive
    • 连接池监控:状态监控、指标收集
  5. 健康检查:服务健康检查、检查间隔配置
  6. 性能优化
    • 批量调用:并发执行、结果收集
    • 缓存优化:内存缓存、TTL管理
  7. 调用方式:同步调用、异步调用、并发调用
  8. 元信息传递:上下文信息传递、透传机制
  9. 代理模式:带缓存的客户端代理、功能扩展

通过本章的学习,你应该掌握了如何构建一个高性能、可靠的 Kitex 客户端,包括连接池管理、故障处理、健康检查和性能优化等关键技术。在下一章中,我们将学习 Kitex 中间件机制,进一步扩展客户端和服务端的功能。