Kitex 客户端开发
概述
本章将深入介绍 Kitex 客户端开发,包括 Client 创建、调用方式、配置选项、连接池管理等内容,帮助你高效地发起 RPC 调用。
核心内容
创建 Client
基本创建
go
package main
import (
"context"
"log"
"example/kitex_gen/hello/helloservice"
)
func main() {
// 创建客户端
cli := helloservice.MustNewClient("hello",
client.WithHostPorts("127.0.0.1:8888"),
)
// 发起调用
resp, err := cli.SayHello(context.Background(), &hello.Request{
Name: "World",
})
if err != nil {
log.Fatal(err)
}
log.Println(resp.Message)
}错误处理
go
// 使用 NewClient 处理错误
cli, err := helloservice.NewClient("hello",
client.WithHostPorts("127.0.0.1:8888"),
)
if err != nil {
log.Fatalf("create client failed: %v", err)
}服务发现
直连模式
go
cli := helloservice.MustNewClient("hello",
client.WithHostPorts("127.0.0.1:8888", "127.0.0.1:8889"),
)DNS 解析
go
import "github.com/cloudwego/kitex/pkg/resolver/dns"
cli := helloservice.MustNewClient("hello",
client.WithResolver(dns.NewDNSResolver()),
client.WithTag("dns", "hello-service.example.com"),
)Nacos 注册中心
go
import (
"github.com/kitex-contrib/registry-nacos/resolver"
)
r, err := resolver.NewDefaultNacosResolver("nacos://127.0.0.1:8848")
if err != nil {
log.Fatal(err)
}
cli := helloservice.MustNewClient("hello",
client.WithResolver(r),
)Client 配置选项
1. 超时配置
go
cli := helloservice.MustNewClient("hello",
client.WithHostPorts("127.0.0.1:8888"),
client.WithTimeout(3*time.Second), // 请求超时
client.WithConnectTimeout(1*time.Second), // 连接超时
)2. 连接池配置
go
import "github.com/cloudwego/kitex/pkg/pool"
cli := helloservice.MustNewClient("hello",
client.WithHostPorts("127.0.0.1:8888"),
client.WithLongConnectionPoolConfig(&pool.LongPoolConfig{
MaxIdlePerAddress: 100, // 每个地址最大空闲连接
MaxIdleGlobal: 1000, // 全局最大空闲连接
MaxIdleTimeout: 30 * time.Second, // 空闲连接超时
MaxConnIdleTimeout: 30 * time.Second, // 连接空闲超时
MinIdlePerAddress: 10, // 每个地址最小空闲连接
MinIdleGlobal: 50, // 全局最小空闲连接
DialTimeout: 2 * time.Second, // 拨号超时
KeepAlive: true, // 启用 keepalive
}),
)连接池类型:
- LongConnectionPool:长连接池,适合高频调用场景
- ShortConnectionPool:短连接池,适合低频调用场景
- NoConnectionPool:无连接池,每次调用创建新连接
连接池最佳实践:
- 根据服务调用频率选择合适的连接池类型
- 合理设置最大空闲连接数,避免资源浪费
- 设置适当的超时时间,避免连接泄漏
- 启用 keepalive,保持连接活跃
- 监控连接池状态,及时调整配置
连接池监控:
go
// 监控连接池状态
metrics := pool.GetMetrics()
log.Printf("Connection pool metrics: %+v", metrics)3. 中间件配置
go
cli := helloservice.MustNewClient("hello",
client.WithHostPorts("127.0.0.1:8888"),
client.WithMiddleware(LoggingMiddleware),
client.WithMiddleware(MetricsMiddleware),
)4. 负载均衡
go
import "github.com/cloudwego/kitex/pkg/loadbalance"
cli := helloservice.MustNewClient("hello",
client.WithResolver(resolver),
client.WithLoadBalancer(loadbalance.NewWeightedRoundRobinBalancer()),
)5. 重试策略
go
import "github.com/cloudwego/kitex/pkg/retry"
cli := helloservice.MustNewClient("hello",
client.WithHostPorts("127.0.0.1:8888"),
client.WithRetryPolicy(
retry.NewFailurePolicy(
retry.WithMaxRetryTimes(3),
retry.WithRetryBreaker(0.5),
retry.WithDDLStop(0.9),
),
),
)6. 熔断器
go
import "github.com/cloudwego/kitex/pkg/circuitbreak"
cli := helloservice.MustNewClient("hello",
client.WithHostPorts("127.0.0.1:8888"),
client.WithCircuitBreaker(circuitbreak.NewCBSuite(
"hello",
circuitbreak.WithErrorRateThreshold(0.5),
circuitbreak.WithMinSample(100),
)),
)调用方式
1. 同步调用
go
resp, err := cli.SayHello(ctx, &hello.Request{
Name: "World",
})
if err != nil {
return err
}2. 异步调用
go
// 使用 goroutine
go func() {
resp, err := cli.SayHello(ctx, req)
if err != nil {
log.Printf("async call failed: %v", err)
return
}
log.Printf("async response: %v", resp)
}()3. 并发调用
go
func batchCall(ctx context.Context, cli helloservice.Client, names []string) ([]*hello.Response, error) {
var wg sync.WaitGroup
results := make([]*hello.Response, len(names))
errors := make([]error, len(names))
for i, name := range names {
wg.Add(1)
go func(idx int, n string) {
defer wg.Done()
resp, err := cli.SayHello(ctx, &hello.Request{Name: n})
results[idx] = resp
errors[idx] = err
}(i, name)
}
wg.Wait()
// 检查错误
for _, err := range errors {
if err != nil {
return nil, err
}
}
return results, nil
}元信息传递
go
import "github.com/cloudwego/kitex/pkg/transmeta"
// 客户端传递元信息
ctx := context.Background()
ctx = transmeta.SetMeta(ctx, "trace-id", "123456")
ctx = transmeta.SetMeta(ctx, "user-id", "user001")
resp, err := cli.SayHello(ctx, req)客户端代理模式
go
// 创建带缓存的客户端代理
type CachedClient struct {
cli helloservice.Client
cache *cache.RedisCache
}
func NewCachedClient(cli helloservice.Client, cache *cache.RedisCache) *CachedClient {
return &CachedClient{cli: cli, cache: cache}
}
func (c *CachedClient) SayHello(ctx context.Context, req *hello.Request) (*hello.Response, error) {
// 检查缓存
key := "hello:" + req.Name
if msg, err := c.cache.Get(ctx, key); err == nil {
return &hello.Response{Message: msg}, nil
}
// 调用服务
resp, err := c.cli.SayHello(ctx, req)
if err != nil {
return nil, err
}
// 写入缓存
c.cache.Set(ctx, key, resp.Message, 5*time.Minute)
return resp, nil
}故障处理策略
1. 智能重试策略
go
import (
"github.com/cloudwego/kitex/pkg/retry"
"github.com/cloudwego/kitex/pkg/rpcinfo"
)
cli := helloservice.MustNewClient("hello",
client.WithRetryPolicy(
retry.NewFailurePolicy(
// 最大重试次数
retry.WithMaxRetryTimes(3),
// 重试断路器(失败率超过50%时停止重试)
retry.WithRetryBreaker(0.5),
// 重试停止时间点(超时前90%)
retry.WithDDLStop(0.9),
// 可重试的错误码
retry.WithRetryableCodes(
rpcinfo.ErrRPCInfoUnavailable,
rpcinfo.ErrInternalServer,
),
// 退避策略
retry.WithBackoff(retry.NewExponentialBackoff(
100*time.Millisecond, // 初始退避时间
1*time.Second, // 最大退避时间
2.0, // 退避因子
)),
),
),
)2. 高级熔断策略
go
import "github.com/cloudwego/kitex/pkg/circuitbreak"
cli := helloservice.MustNewClient("hello",
client.WithCircuitBreaker(circuitbreak.NewCBSuite(
"hello",
// 错误率阈值(50%)
circuitbreak.WithErrorRateThreshold(0.5),
// 最小采样数(100个请求)
circuitbreak.WithMinSample(100),
// 半开状态尝试次数(5个请求)
circuitbreak.WithHalfOpenRatio(0.1),
// 熔断持续时间(30秒)
circuitbreak.WithBreakDuration(30*time.Second),
// 恢复持续时间(60秒)
circuitbreak.WithRecoveryDuration(60*time.Second),
)),
)3. 超时控制策略
go
cli := helloservice.MustNewClient("hello",
// 请求超时
client.WithTimeout(3*time.Second),
// 连接超时
client.WithConnectTimeout(1*time.Second),
// 读写超时
client.WithReadWriteTimeout(2*time.Second),
// 重试超时(累积)
client.WithRPCTimeout(5*time.Second),
)4. 错误处理最佳实践
go
func callWithRetry(ctx context.Context, cli helloservice.Client, req *hello.Request) (*hello.Response, error) {
// 重试封装
var lastErr error
for i := 0; i < 3; i++ {
resp, err := cli.SayHello(ctx, req)
if err == nil {
return resp, nil
}
lastErr = err
// 检查是否是可重试的错误
if !isRetryableError(err) {
break
}
// 退避
time.Sleep(time.Duration(i*100) * time.Millisecond)
}
return nil, lastErr
}
func isRetryableError(err error) bool {
// 检查错误类型
switch err.(type) {
case *kerrors.DeadlineExceededError:
return false // 超时错误不重试
case *kerrors.ServiceDiscoveryError:
return true // 服务发现错误可重试
case *kerrors.NetworkError:
return true // 网络错误可重试
default:
return false
}
}健康检查
服务健康检查
go
import "github.com/cloudwego/kitex/pkg/retry"
// 健康检查函数
func healthCheck(ctx context.Context, addr string) error {
// 尝试连接服务
conn, err := net.DialTimeout("tcp", addr, 500*time.Millisecond)
if err != nil {
return err
}
defer conn.Close()
return nil
}
// 创建带健康检查的客户端
cli := helloservice.MustNewClient("hello",
client.WithHostPorts("127.0.0.1:8888", "127.0.0.1:8889"),
client.WithHealthChecker(healthCheck),
client.WithHealthCheckInterval(30*time.Second),
)性能优化
1. 批量调用
go
func batchCall(ctx context.Context, cli helloservice.Client, requests []*hello.Request) ([]*hello.Response, error) {
type result struct {
resp *hello.Response
err error
idx int
}
results := make([]*hello.Response, len(requests))
errCh := make(chan error, 1)
resultCh := make(chan result, len(requests))
// 并发调用
for i, req := range requests {
go func(idx int, r *hello.Request) {
resp, err := cli.SayHello(ctx, r)
resultCh <- result{resp: resp, err: err, idx: idx}
}(i, req)
}
// 收集结果
for i := 0; i < len(requests); i++ {
select {
case res := <-resultCh:
if res.err != nil {
select {
case errCh <- res.err:
default:
}
}
results[res.idx] = res.resp
case err := <-errCh:
return nil, err
case <-ctx.Done():
return nil, ctx.Err()
}
}
return results, nil
}2. 缓存优化
go
// 带缓存的客户端
type CachedClient struct {
cli helloservice.Client
cache *sync.Map // 简单内存缓存
ttl time.Duration
}
func NewCachedClient(cli helloservice.Client, ttl time.Duration) *CachedClient {
return &CachedClient{
cli: cli,
cache: &sync.Map{},
ttl: ttl,
}
}
func (c *CachedClient) SayHello(ctx context.Context, req *hello.Request) (*hello.Response, error) {
key := "hello:" + req.Name
// 检查缓存
if val, ok := c.cache.Load(key); ok {
if item, ok := val.(*cacheItem); ok {
if time.Since(item.timestamp) < c.ttl {
return item.resp, nil
}
}
}
// 调用服务
resp, err := c.cli.SayHello(ctx, req)
if err != nil {
return nil, err
}
// 写入缓存
c.cache.Store(key, &cacheItem{
resp: resp,
timestamp: time.Now(),
})
return resp, nil
}
type cacheItem struct {
resp *hello.Response
timestamp time.Time
}完整示例
go
package main
import (
"context"
"log"
"time"
"github.com/cloudwego/kitex/client"
"github.com/cloudwego/kitex/pkg/circuitbreak"
"github.com/cloudwego/kitex/pkg/loadbalance"
"github.com/cloudwego/kitex/pkg/pool"
"github.com/cloudwego/kitex/pkg/retry"
"example/kitex_gen/hello"
"example/kitex_gen/hello/helloservice"
)
func main() {
// 创建客户端
cli := helloservice.MustNewClient("hello",
// 服务地址
client.WithHostPorts("127.0.0.1:8888"),
// 超时配置
client.WithTimeout(3*time.Second),
client.WithConnectTimeout(1*time.Second),
// 连接池配置
client.WithLongConnectionPoolConfig(&pool.LongPoolConfig{
MaxIdlePerAddress: 100,
MaxIdleGlobal: 1000,
MaxIdleTimeout: 30 * time.Second,
MinIdlePerAddress: 10,
KeepAlive: true,
}),
// 负载均衡
client.WithLoadBalancer(loadbalance.NewWeightedRoundRobinBalancer()),
// 重试策略
client.WithRetryPolicy(
retry.NewFailurePolicy(
retry.WithMaxRetryTimes(3),
retry.WithRetryBreaker(0.5),
retry.WithDDLStop(0.9),
),
),
// 熔断策略
client.WithCircuitBreaker(circuitbreak.NewCBSuite(
"hello",
circuitbreak.WithErrorRateThreshold(0.5),
circuitbreak.WithMinSample(100),
)),
)
// 发起调用
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
resp, err := cli.SayHello(ctx, &hello.Request{
Name: "World",
})
if err != nil {
log.Fatalf("call failed: %v", err)
}
log.Printf("response: %s", resp.Message)
}小结
本章介绍了 Kitex 客户端开发的核心内容:
- Client 创建:基本创建、错误处理、服务发现
- 配置选项:超时配置、连接池配置、中间件配置、负载均衡
- 故障处理:
- 智能重试策略:退避、断路器、可重试错误码
- 高级熔断策略:错误率阈值、半开状态、恢复机制
- 超时控制策略:请求超时、连接超时、读写超时
- 错误处理最佳实践:错误分类、重试判断
- 连接池管理:
- 连接池类型:长连接池、短连接池、无连接池
- 连接池配置:最大/最小空闲连接、超时设置、keepalive
- 连接池监控:状态监控、指标收集
- 健康检查:服务健康检查、检查间隔配置
- 性能优化:
- 批量调用:并发执行、结果收集
- 缓存优化:内存缓存、TTL管理
- 调用方式:同步调用、异步调用、并发调用
- 元信息传递:上下文信息传递、透传机制
- 代理模式:带缓存的客户端代理、功能扩展
通过本章的学习,你应该掌握了如何构建一个高性能、可靠的 Kitex 客户端,包括连接池管理、故障处理、健康检查和性能优化等关键技术。在下一章中,我们将学习 Kitex 中间件机制,进一步扩展客户端和服务端的功能。