Skip to content

Kitex 基础

概述

Kitex 是 CloudWeGo 的核心 RPC 框架,是一个高性能、可扩展的 Go 语言 RPC 框架。Kitex 支持多种协议(Thrift、Protobuf、gRPC),内置丰富的服务治理能力,经过字节跳动超大规模流量验证。

最新特性

Kitex 2.0+ 核心特性

  • 性能提升:进一步优化网络库和序列化性能,单机 QPS 提升 15-20%
  • AI 原生支持:增强流式处理能力,支持大模型推理场景
  • 服务网格集成:支持与 Istio 等服务网格的集成
  • 自动代码生成:增强代码生成工具,支持更多自定义选项
  • 安全增强:支持 mTLS、JWT 等安全认证方式
  • 可观测性提升:原生支持 OpenTelemetry、Prometheus 等监控系统

新特性详解

1. 流式处理增强

go
// 双向流式处理
func (h *HelloHandler) BidirectionalStream(stream hello.HelloService_BidirectionalStreamServer) error {
    for {
        req, err := stream.Recv()
        if err != nil {
            return err
        }
        
        // 处理请求
        resp := &hello.Response{
            Message: "Hello, " + req.Name + "!",
        }
        
        // 发送响应
        if err := stream.Send(resp); err != nil {
            return err
        }
    }
}

2. 服务网格集成

go
// 启用服务网格集成
svr := server.NewServer(
    server.WithServiceMesh(),
    server.WithTracer(otel.NewTracer()),
)

3. 自动重试优化

go
// 智能重试策略
cli := client.MustNewClient("hello",
    client.WithRetryPolicy(
        retry.NewFailurePolicy(
            retry.WithMaxRetryTimes(3),
            retry.WithRetryableCodes(errors.New("ECONNREFUSED")),
            retry.WithBackoff(retry.NewExponentialBackoff()),
        ),
    ),
)

核心内容

Kitex 架构

┌─────────────────────────────────────────────────────────────┐
│                        Application                           │
│                    (User Handler/Business Logic)             │
├─────────────────────────────────────────────────────────────┤
│                        Kitex Core                            │
│  ┌──────────┐  ┌──────────┐  ┌──────────┐  ┌──────────┐    │
│  │  Client  │  │  Server  │  │ Middleware│  │  Codec   │    │
│  └──────────┘  └──────────┘  └──────────┘  └──────────┘    │
├─────────────────────────────────────────────────────────────┤
│                     Service Governance                       │
│  ┌──────────┐  ┌──────────┐  ┌──────────┐  ┌──────────┐    │
│  │Discovery │  │LoadBalance│  │CircuitBreak│  │  Retry  │    │
│  └──────────┘  └──────────┘  └──────────┘  └──────────┘    │
├─────────────────────────────────────────────────────────────┤
│                        Netpoll                               │
│                    (High Performance Network)                │
└─────────────────────────────────────────────────────────────┘

核心概念

1. Server

服务端,接收并处理 RPC 请求:

go
package main

import (
    "github.com/cloudwego/kitex/server"
    "example/kitex_gen/hello/helloservice"
)

func main() {
    // 创建服务端
    svr := helloservice.NewServer(&HelloHandler{},
        server.WithServiceAddr(&net.TCPAddr{
            Port: 8888,
        }),
        server.WithLimit(&server.Limit{
            MaxConnections: 10000,
            MaxQPS:         1000,
        }),
    )
    
    // 启动服务
    if err := svr.Run(); err != nil {
        log.Fatal(err)
    }
}

2. Client

客户端,发起 RPC 调用:

go
package main

import (
    "github.com/cloudwego/kitex/client"
    "example/kitex_gen/hello/helloservice"
)

func main() {
    // 创建客户端
    cli := helloservice.MustNewClient("hello",
        client.WithHostPorts("127.0.0.1:8888"),
    )
    
    // 发起调用
    resp, err := cli.SayHello(context.Background(), &hello.Request{
        Name: "World",
    })
    if err != nil {
        log.Fatal(err)
    }
    
    fmt.Println(resp.Message)
}

3. Handler

处理器,实现业务逻辑:

go
type HelloHandler struct{}

func (h *HelloHandler) SayHello(ctx context.Context, req *hello.Request) (*hello.Response, error) {
    return &hello.Response{
        Message: "Hello, " + req.Name + "!",
    }, nil
}

4. Middleware

中间件,处理通用逻辑:

go
func LoggingMiddleware(next endpoint.Endpoint) endpoint.Endpoint {
    return func(ctx context.Context, req, resp interface{}) (err error) {
        start := time.Now()
        
        // 调用下一个中间件或处理器
        err = next(ctx, req, resp)
        
        // 记录日志
        klog.CtxInfof(ctx, "request: %v, response: %v, cost: %v, err: %v",
            req, resp, time.Since(start), err)
        
        return err
    }
}

协议支持

Thrift 协议

Kitex 默认使用 Thrift 协议:

thrift
// hello.thrift
namespace go hello

struct Request {
    1: string name
}

struct Response {
    1: string message
}

service HelloService {
    Response SayHello(1: Request req)
}

生成代码:

bash
kitex -module example -service hello hello.thrift

Protobuf 协议

支持 Protobuf 协议:

protobuf
// hello.proto
syntax = "proto3";

package hello;

message Request {
    string name = 1;
}

message Response {
    string message = 1;
}

service HelloService {
    rpc SayHello(Request) returns (Response);
}

生成代码:

bash
kitex -module example -service hello -type protobuf hello.proto

gRPC 协议

支持 gRPC 协议:

go
// 创建 gRPC 服务端
svr := server.NewServer(
    server.WithServiceAddr(&net.TCPAddr{Port: 8888}),
    server.WithTransportProtocol(transport.GRPC),
)

// 创建 gRPC 客户端
cli := client.MustNewClient("hello",
    client.WithHostPorts("127.0.0.1:8888"),
    client.WithTransportProtocol(transport.GRPC),
)

服务治理能力

1. 超时控制

go
// 服务端超时
svr := server.NewServer(
    server.WithTimeout(3 * time.Second),
)

// 客户端超时
cli := client.MustNewClient("hello",
    client.WithTimeout(3 * time.Second),
    client.WithConnectTimeout(1 * time.Second),
)

2. 重试策略

go
cli := client.MustNewClient("hello",
    client.WithRetryPolicy(
        retry.NewFailurePolicy(
            retry.WithMaxRetryTimes(3),
            retry.WithRetryBreaker(0.5),
            retry.WithDDLStop(0.9),
        ),
    ),
)

3. 熔断器

go
import "github.com/cloudwego/kitex/pkg/circuitbreak"

cli := client.MustNewClient("hello",
    client.WithCircuitBreaker(circuitbreak.NewCBSuite(
        "hello",
        circuitbreak.WithErrorRateThreshold(0.5),
        circuitbreak.WithMinSample(100),
    )),
)

4. 负载均衡

go
import "github.com/cloudwego/kitex/pkg/loadbalance"

cli := client.MustNewClient("hello",
    client.WithLoadBalancer(loadbalance.NewWeightedRoundRobinBalancer()),
)

代码生成

kitex 工具

bash
# 生成服务端代码
kitex -module example -service hello hello.thrift

# 生成客户端代码
kitex -module example hello.thrift

# 指定生成路径
kitex -module example -service hello -gen-path gen hello.thrift

# 使用自定义模板
kitex -module example -service hello -template-dir template hello.thrift

目录结构

hello-service/
├── cmd/
│   └── server/
│       └── main.go
├── internal/
│   ├── handler/
│   │   └── hello_handler.go
│   ├── service/
│   │   └── hello_service.go
│   └── repository/
│       └── user_repo.go
├── kitex_gen/              # 生成的代码
│   └── hello/
├── idl/
│   └── hello.thrift
├── go.mod
└── Makefile

性能优化策略

1. 网络优化

  • 连接池配置

    go
    // 服务端连接池
    svr := server.NewServer(
        server.WithConnectionNumber(10000),
        server.WithKeepAlive(true),
    )
    
    // 客户端连接池
    cli := client.MustNewClient("hello",
        client.WithPoolSize(100),
        client.WithMaxIdlePerAddress(20),
    )
  • 网络参数调优

    go
    // 设置 TCP 参数
    svr := server.NewServer(
        server.WithTCPKeepAliveInterval(30 * time.Second),
        server.WithTCPKeepAliveTimeout(120 * time.Second),
    )

2. 序列化优化

  • 选择合适的协议

    • Thrift:性能最佳,适合内部服务
    • Protobuf:跨语言友好
    • gRPC:标准协议,生态丰富
  • 使用压缩

    go
    // 启用压缩
    svr := server.NewServer(
        server.WithCompressType(transport.Gzip),
    )
    
    cli := client.MustNewClient("hello",
        client.WithCompressType(transport.Gzip),
    )

3. 内存优化

  • 对象池

    go
    // 使用对象池减少 GC
    import "sync"
    
    var responsePool = sync.Pool{
        New: func() interface{} {
            return &hello.Response{}
        },
    }
    
    func (h *HelloHandler) SayHello(ctx context.Context, req *hello.Request) (*hello.Response, error) {
        resp := responsePool.Get().(*hello.Response)
        resp.Message = "Hello, " + req.Name + "!"
        
        // 使用完成后放回池中
        defer responsePool.Put(resp)
        return resp, nil
    }
  • 避免内存分配

    • 使用 sync.Pool 复用对象
    • 减少字符串拼接
    • 预分配切片容量

4. 并发优化

  • 合理设置 goroutine 池

    go
    // 设置服务端 goroutine 池
    svr := server.NewServer(
        server.WithConcurrency(1000),
    )
  • 使用工作池模式

    go
    // 工作池处理并发任务
    type WorkerPool struct {
        tasks chan func()
        wg    sync.WaitGroup
    }
    
    func NewWorkerPool(size int) *WorkerPool {
        pool := &WorkerPool{
            tasks: make(chan func(), 1000),
        }
        
        for i := 0; i < size; i++ {
            pool.wg.Add(1)
            go func() {
                defer pool.wg.Done()
                for task := range pool.tasks {
                    task()
                }
            }()
        }
        
        return pool
    }

5. 监控与调优

  • 启用性能监控

    go
    // 启用 Prometheus 监控
    svr := server.NewServer(
        server.WithTracer(prometheus.NewTracer()),
    )
  • 性能分析

    • 使用 pprof 进行 CPU 和内存分析
    • 监控关键指标:QPS、延迟、错误率
    • 定期进行负载测试

6. 生产环境最佳实践

  • 配置调优

    • 根据机器配置调整并发数
    • 合理设置超时时间
    • 启用熔断和限流
  • 部署策略

    • 使用容器化部署
    • 配置健康检查
    • 实现优雅启停
  • 安全配置

    • 启用 TLS 加密
    • 配置认证授权
    • 防范 DDoS 攻击

小结

本章介绍了 Kitex 的基础概念和最新特性:

  1. 架构设计:Server、Client、Handler、Middleware
  2. 协议支持:Thrift、Protobuf、gRPC
  3. 服务治理:超时控制、重试策略、熔断器、负载均衡
  4. 代码生成:kitex 工具的使用
  5. 最新特性:流式处理增强、服务网格集成、智能重试
  6. 性能优化:网络优化、序列化优化、内存优化、并发优化
  7. 生产实践:监控调优、部署策略、安全配置

Kitex 作为 CloudWeGo 的核心 RPC 框架,提供了高性能、可扩展的微服务通信解决方案。通过本章的学习,你应该对 Kitex 的基本原理和使用方法有了全面的了解。

在下一章中,我们将深入学习 Kitex 服务端开发,包括高级配置、安全设置和性能调优。