Skip to content

Kitex 服务端开发

概述

本章将深入介绍 Kitex 服务端开发,包括 Server 创建、配置选项、生命周期管理、优雅关闭等内容,帮助你构建高性能、高可用的 RPC 服务。

核心内容

创建 Server

基本创建

go
package main

import (
    "log"
    "net"
    
    "github.com/cloudwego/kitex/server"
    "example/kitex_gen/hello/helloservice"
)

func main() {
    // 创建服务端
    svr := helloservice.NewServer(&HelloHandler{})
    
    // 启动服务
    if err := svr.Run(); err != nil {
        log.Fatal(err)
    }
}

指定监听地址

go
svr := helloservice.NewServer(&HelloHandler{},
    server.WithServiceAddr(&net.TCPAddr{
        IP:   net.ParseIP("0.0.0.0"),
        Port: 8888,
    }),
)

使用已有 Listener

go
listener, err := net.Listen("tcp", ":8888")
if err != nil {
    log.Fatal(err)
}

svr := helloservice.NewServer(&HelloHandler{},
    server.WithListener(listener),
)

Server 配置选项

1. 超时配置

go
svr := helloservice.NewServer(&HelloHandler{},
    server.WithTimeout(3 * time.Second),           // 请求超时
    server.WithReadWriteTimeout(5 * time.Second),  // 读写超时
)

2. 连接限制

go
svr := helloservice.NewServer(&HelloHandler{},
    server.WithLimit(&server.Limit{
        MaxConnections: 10000,  // 最大连接数
        MaxQPS:         1000,   // 最大 QPS
        MaxConnIdleTime: 30 * time.Second, // 连接最大空闲时间
    }),
)

3. 并发配置

go
svr := helloservice.NewServer(&HelloHandler{},
    server.WithConcurrency(1000),  // 最大并发请求数
)

4. 中间件配置

go
svr := helloservice.NewServer(&HelloHandler{},
    server.WithMiddleware(LoggingMiddleware),
    server.WithMiddleware(RecoveryMiddleware),
)

5. 服务注册

go
import (
    "github.com/kitex-contrib/registry-nacos/registry"
)

r, err := registry.NewDefaultNacosRegistry("nacos://127.0.0.1:8848")
if err != nil {
    log.Fatal(err)
}

svr := helloservice.NewServer(&HelloHandler{},
    server.WithRegistry(r),
)

6. TLS 配置

go
import "crypto/tls"

cert, err := tls.LoadX509KeyPair("server.crt", "server.key")
if err != nil {
    log.Fatal(err)
}

svr := helloservice.NewServer(&HelloHandler{},
    server.WithTLSConfig(&tls.Config{
        Certificates: []tls.Certificate{cert},
    }),
)

服务生命周期

生命周期钩子

go
svr := helloservice.NewServer(&HelloHandler{},
    server.WithOnAccept(func(conn net.Conn) context.Context {
        // 新连接建立
        klog.Infof("new connection: %s", conn.RemoteAddr())
        return context.Background()
    }),
    server.WithOnConnect(func(ctx context.Context, conn net.Conn) context.Context {
        // 连接成功
        klog.CtxInfof(ctx, "connection established")
        return ctx
    }),
    server.WithOnDisconnect(func(ctx context.Context) {
        // 连接断开
        klog.CtxInfof(ctx, "connection closed")
    }),
)

优雅关闭

go
func main() {
    svr := helloservice.NewServer(&HelloHandler{})
    
    // 启动服务(非阻塞)
    go func() {
        if err := svr.Run(); err != nil {
            log.Printf("server stopped: %v", err)
        }
    }()
    
    // 监听信号
    sigCh := make(chan os.Signal, 1)
    signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
    <-sigCh
    
    // 优雅关闭
    ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
    defer cancel()
    
    if err := svr.Stop(); err != nil {
        log.Printf("server stop error: %v", err)
    }
    
    log.Println("server stopped gracefully")
}

Handler 实现

基本实现

go
type HelloHandler struct{}

func (h *HelloHandler) SayHello(ctx context.Context, req *hello.Request) (*hello.Response, error) {
    return &hello.Response{
        Message: "Hello, " + req.Name + "!",
    }, nil
}

依赖注入

go
type HelloHandler struct {
    userService *service.UserService
    cache       *cache.RedisCache
}

func NewHelloHandler(userService *service.UserService, cache *cache.RedisCache) *HelloHandler {
    return &HelloHandler{
        userService: userService,
        cache:       cache,
    }
}

func (h *HelloHandler) SayHello(ctx context.Context, req *hello.Request) (*hello.Response, error) {
    // 检查缓存
    if msg, err := h.cache.Get(ctx, "hello:"+req.Name); err == nil {
        return &hello.Response{Message: msg}, nil
    }
    
    // 调用服务
    user, err := h.userService.GetByName(ctx, req.Name)
    if err != nil {
        return nil, err
    }
    
    msg := "Hello, " + user.Name + "!"
    
    // 写入缓存
    h.cache.Set(ctx, "hello:"+req.Name, msg, 5*time.Minute)
    
    return &hello.Response{Message: msg}, nil
}

多服务支持

go
func main() {
    // 创建多个服务
    helloSvr := helloservice.NewServer(&HelloHandler{},
        server.WithServiceAddr(&net.TCPAddr{Port: 8888}),
    )
    
    userSvr := userservice.NewServer(&UserHandler{},
        server.WithServiceAddr(&net.TCPAddr{Port: 8889}),
    )
    
    // 启动多个服务
    errCh := make(chan error, 2)
    
    go func() {
        errCh <- helloSvr.Run()
    }()
    
    go func() {
        errCh <- userSvr.Run()
    }()
    
    // 等待任一服务停止
    if err := <-errCh; err != nil {
        log.Fatal(err)
    }
}

高级配置

1. 网络配置

go
import "github.com/cloudwego/kitex/pkg/transport"

svr := helloservice.NewServer(&HelloHandler{},
    // TCP 配置
    server.WithTransportProtocol(transport.TTHeader),
    server.WithReadWriteTimeout(10*time.Second),
    server.WithTCPKeepAlive(true),
    server.WithTCPKeepAliveInterval(30*time.Second),
    server.WithTCPKeepAliveTimeout(120*time.Second),
    server.WithMaxConnectionAge(10*time.Minute),
    server.WithMaxConnectionAgeGrace(2*time.Minute),
)

2. 内存配置

go
svr := helloservice.NewServer(&HelloHandler{},
    // 内存池配置
    server.WithReadBufferSize(8192),
    server.WithWriteBufferSize(8192),
    server.WithMaxHeaderSize(4096),
    server.WithMaxRequestBodySize(1024*1024), // 1MB
)

3. 线程模型配置

go
svr := helloservice.NewServer(&HelloHandler{},
    // 线程池配置
    server.WithConcurrency(2000),
    server.WithTaskQueueLength(10000),
    server.WithWorkerPoolSize(4), // 工作线程数
)

安全设置

1. TLS 高级配置

go
import (
    "crypto/tls"
    "crypto/x509"
    "os"
)

// 加载证书
cert, err := tls.LoadX509KeyPair("server.crt", "server.key")
if err != nil {
    log.Fatal(err)
}

// 加载 CA 证书
caCert, err := os.ReadFile("ca.crt")
if err != nil {
    log.Fatal(err)
}

caCertPool := x509.NewCertPool()
caCertPool.AppendCertsFromPEM(caCert)

svr := helloservice.NewServer(&HelloHandler{},
    server.WithTLSConfig(&tls.Config{
        Certificates: []tls.Certificate{cert},
        ClientCAs:    caCertPool,
        ClientAuth:   tls.RequireAndVerifyClientCert, // 要求客户端证书
        MinVersion:   tls.VersionTLS12,
        CipherSuites: []uint16{
            tls.TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384,
            tls.TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256,
        },
    }),
)

2. 认证授权

go
// 自定义认证中间件
func AuthMiddleware(next endpoint.Endpoint) endpoint.Endpoint {
    return func(ctx context.Context, req, resp interface{}) (err error) {
        // 从上下文获取认证信息
        authInfo := ctx.Value("auth")
        if authInfo == nil {
            return errors.New("unauthorized")
        }
        
        // 验证认证信息
        if !validateAuth(authInfo) {
            return errors.New("invalid auth")
        }
        
        return next(ctx, req, resp)
    }
}

svr := helloservice.NewServer(&HelloHandler{},
    server.WithMiddleware(AuthMiddleware),
)

3. 安全头部

go
// 安全头部中间件
func SecurityHeaderMiddleware(next endpoint.Endpoint) endpoint.Endpoint {
    return func(ctx context.Context, req, resp interface{}) (err error) {
        // 设置安全头部
        ctx = context.WithValue(ctx, "X-Content-Type-Options", "nosniff")
        ctx = context.WithValue(ctx, "X-Frame-Options", "DENY")
        ctx = context.WithValue(ctx, "X-XSS-Protection", "1; mode=block")
        return next(ctx, req, resp)
    }
}

svr := helloservice.NewServer(&HelloHandler{},
    server.WithMiddleware(SecurityHeaderMiddleware),
)

监控与可观测性

1. Prometheus 监控

go
import "github.com/kitex-contrib/monitor-prometheus"

// 创建监控器
monitor := prometheus.NewServerMonitor(
    prometheus.WithNamespace("kitex"),
    prometheus.WithSubsystem("server"),
    prometheus.WithLabels(map[string]string{
        "service": "hello-service",
        "version": "v1.0.0",
    }),
)

svr := helloservice.NewServer(&HelloHandler{},
    server.WithTracer(monitor),
)

2. OpenTelemetry 追踪

go
import "github.com/kitex-contrib/tracer-opentelemetry"

// 初始化 OpenTelemetry
tp, err := tracer.InitDefault(
    tracer.WithServiceName("hello-service"),
    tracer.WithExportEndpoint("http://localhost:4317"),
    tracer.WithSampler(trace.AlwaysSample()),
)
if err != nil {
    log.Fatal(err)
}
defer tp.Shutdown(context.Background())

svr := helloservice.NewServer(&HelloHandler{},
    server.WithTracer(tracer.NewServerTracer()),
)

3. 日志配置

go
import "github.com/cloudwego/kitex/pkg/klog"

// 配置日志
klog.SetOutput(os.Stdout)
klog.SetLevel(klog.LevelInfo)
klog.SetFormatter(&klog.TextFormatter{
    FullTimestamp: true,
    TimestampFormat: "2006-01-02 15:04:05",
})

svr := helloservice.NewServer(&HelloHandler{},
    server.WithLogger(klog.DefaultLogger()),
)

服务元信息

go
import "github.com/cloudwego/kitex/pkg/serviceinfo"

svr := helloservice.NewServer(&HelloHandler{},
    server.WithServiceInfo(&serviceinfo.Info{
        ServiceName: "hello-service",
        Methods:     map[string]serviceinfo.MethodInfo{},
    }),
)

完整示例

go
package main

import (
    "context"
    "log"
    "net"
    "os"
    "os/signal"
    "syscall"
    "time"
    
    "github.com/cloudwego/kitex/pkg/klog"
    "github.com/cloudwego/kitex/server"
    "example/kitex_gen/hello"
    "example/kitex_gen/hello/helloservice"
)

type HelloHandler struct{}

func (h *HelloHandler) SayHello(ctx context.Context, req *hello.Request) (*hello.Response, error) {
    klog.CtxInfof(ctx, "SayHello called with name: %s", req.Name)
    return &hello.Response{
        Message: "Hello, " + req.Name + "!",
    }, nil
}

func main() {
    // 创建服务端
    svr := helloservice.NewServer(&HelloHandler{},
        server.WithServiceAddr(&net.TCPAddr{
            IP:   net.ParseIP("0.0.0.0"),
            Port: 8888,
        }),
        server.WithTimeout(3*time.Second),
        server.WithLimit(&server.Limit{
            MaxConnections: 10000,
            MaxQPS:         1000,
        }),
    )
    
    // 启动服务
    go func() {
        log.Println("server starting on :8888")
        if err := svr.Run(); err != nil {
            log.Fatalf("server stopped: %v", err)
        }
    }()
    
    // 优雅关闭
    sigCh := make(chan os.Signal, 1)
    signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
    <-sigCh
    
    log.Println("shutting down server...")
    if err := svr.Stop(); err != nil {
        log.Printf("server stop error: %v", err)
    }
    log.Println("server stopped")
}

小结

本章介绍了 Kitex 服务端开发的核心内容:

  1. Server 创建:基本创建、监听地址、Listener 配置
  2. 配置选项:超时、连接限制、并发、中间件、服务注册、TLS
  3. 生命周期管理:钩子函数、优雅关闭
  4. Handler 实现:基本实现、依赖注入
  5. 多服务支持:同时运行多个服务
  6. 高级配置:网络配置、内存配置、线程模型配置
  7. 安全设置:TLS 高级配置、认证授权、安全头部
  8. 监控与可观测性:Prometheus 监控、OpenTelemetry 追踪、日志配置

通过本章的学习,你应该掌握了如何构建一个高性能、安全、可靠的 Kitex 服务端。在下一章中,我们将学习 Kitex 客户端开发,包括客户端配置、连接池管理和故障处理等内容。