Kitex 服务端开发
概述
本章将深入介绍 Kitex 服务端开发,包括 Server 创建、配置选项、生命周期管理、优雅关闭等内容,帮助你构建高性能、高可用的 RPC 服务。
核心内容
创建 Server
基本创建
go
package main
import (
"log"
"net"
"github.com/cloudwego/kitex/server"
"example/kitex_gen/hello/helloservice"
)
func main() {
// 创建服务端
svr := helloservice.NewServer(&HelloHandler{})
// 启动服务
if err := svr.Run(); err != nil {
log.Fatal(err)
}
}指定监听地址
go
svr := helloservice.NewServer(&HelloHandler{},
server.WithServiceAddr(&net.TCPAddr{
IP: net.ParseIP("0.0.0.0"),
Port: 8888,
}),
)使用已有 Listener
go
listener, err := net.Listen("tcp", ":8888")
if err != nil {
log.Fatal(err)
}
svr := helloservice.NewServer(&HelloHandler{},
server.WithListener(listener),
)Server 配置选项
1. 超时配置
go
svr := helloservice.NewServer(&HelloHandler{},
server.WithTimeout(3 * time.Second), // 请求超时
server.WithReadWriteTimeout(5 * time.Second), // 读写超时
)2. 连接限制
go
svr := helloservice.NewServer(&HelloHandler{},
server.WithLimit(&server.Limit{
MaxConnections: 10000, // 最大连接数
MaxQPS: 1000, // 最大 QPS
MaxConnIdleTime: 30 * time.Second, // 连接最大空闲时间
}),
)3. 并发配置
go
svr := helloservice.NewServer(&HelloHandler{},
server.WithConcurrency(1000), // 最大并发请求数
)4. 中间件配置
go
svr := helloservice.NewServer(&HelloHandler{},
server.WithMiddleware(LoggingMiddleware),
server.WithMiddleware(RecoveryMiddleware),
)5. 服务注册
go
import (
"github.com/kitex-contrib/registry-nacos/registry"
)
r, err := registry.NewDefaultNacosRegistry("nacos://127.0.0.1:8848")
if err != nil {
log.Fatal(err)
}
svr := helloservice.NewServer(&HelloHandler{},
server.WithRegistry(r),
)6. TLS 配置
go
import "crypto/tls"
cert, err := tls.LoadX509KeyPair("server.crt", "server.key")
if err != nil {
log.Fatal(err)
}
svr := helloservice.NewServer(&HelloHandler{},
server.WithTLSConfig(&tls.Config{
Certificates: []tls.Certificate{cert},
}),
)服务生命周期
生命周期钩子
go
svr := helloservice.NewServer(&HelloHandler{},
server.WithOnAccept(func(conn net.Conn) context.Context {
// 新连接建立
klog.Infof("new connection: %s", conn.RemoteAddr())
return context.Background()
}),
server.WithOnConnect(func(ctx context.Context, conn net.Conn) context.Context {
// 连接成功
klog.CtxInfof(ctx, "connection established")
return ctx
}),
server.WithOnDisconnect(func(ctx context.Context) {
// 连接断开
klog.CtxInfof(ctx, "connection closed")
}),
)优雅关闭
go
func main() {
svr := helloservice.NewServer(&HelloHandler{})
// 启动服务(非阻塞)
go func() {
if err := svr.Run(); err != nil {
log.Printf("server stopped: %v", err)
}
}()
// 监听信号
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
<-sigCh
// 优雅关闭
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
if err := svr.Stop(); err != nil {
log.Printf("server stop error: %v", err)
}
log.Println("server stopped gracefully")
}Handler 实现
基本实现
go
type HelloHandler struct{}
func (h *HelloHandler) SayHello(ctx context.Context, req *hello.Request) (*hello.Response, error) {
return &hello.Response{
Message: "Hello, " + req.Name + "!",
}, nil
}依赖注入
go
type HelloHandler struct {
userService *service.UserService
cache *cache.RedisCache
}
func NewHelloHandler(userService *service.UserService, cache *cache.RedisCache) *HelloHandler {
return &HelloHandler{
userService: userService,
cache: cache,
}
}
func (h *HelloHandler) SayHello(ctx context.Context, req *hello.Request) (*hello.Response, error) {
// 检查缓存
if msg, err := h.cache.Get(ctx, "hello:"+req.Name); err == nil {
return &hello.Response{Message: msg}, nil
}
// 调用服务
user, err := h.userService.GetByName(ctx, req.Name)
if err != nil {
return nil, err
}
msg := "Hello, " + user.Name + "!"
// 写入缓存
h.cache.Set(ctx, "hello:"+req.Name, msg, 5*time.Minute)
return &hello.Response{Message: msg}, nil
}多服务支持
go
func main() {
// 创建多个服务
helloSvr := helloservice.NewServer(&HelloHandler{},
server.WithServiceAddr(&net.TCPAddr{Port: 8888}),
)
userSvr := userservice.NewServer(&UserHandler{},
server.WithServiceAddr(&net.TCPAddr{Port: 8889}),
)
// 启动多个服务
errCh := make(chan error, 2)
go func() {
errCh <- helloSvr.Run()
}()
go func() {
errCh <- userSvr.Run()
}()
// 等待任一服务停止
if err := <-errCh; err != nil {
log.Fatal(err)
}
}高级配置
1. 网络配置
go
import "github.com/cloudwego/kitex/pkg/transport"
svr := helloservice.NewServer(&HelloHandler{},
// TCP 配置
server.WithTransportProtocol(transport.TTHeader),
server.WithReadWriteTimeout(10*time.Second),
server.WithTCPKeepAlive(true),
server.WithTCPKeepAliveInterval(30*time.Second),
server.WithTCPKeepAliveTimeout(120*time.Second),
server.WithMaxConnectionAge(10*time.Minute),
server.WithMaxConnectionAgeGrace(2*time.Minute),
)2. 内存配置
go
svr := helloservice.NewServer(&HelloHandler{},
// 内存池配置
server.WithReadBufferSize(8192),
server.WithWriteBufferSize(8192),
server.WithMaxHeaderSize(4096),
server.WithMaxRequestBodySize(1024*1024), // 1MB
)3. 线程模型配置
go
svr := helloservice.NewServer(&HelloHandler{},
// 线程池配置
server.WithConcurrency(2000),
server.WithTaskQueueLength(10000),
server.WithWorkerPoolSize(4), // 工作线程数
)安全设置
1. TLS 高级配置
go
import (
"crypto/tls"
"crypto/x509"
"os"
)
// 加载证书
cert, err := tls.LoadX509KeyPair("server.crt", "server.key")
if err != nil {
log.Fatal(err)
}
// 加载 CA 证书
caCert, err := os.ReadFile("ca.crt")
if err != nil {
log.Fatal(err)
}
caCertPool := x509.NewCertPool()
caCertPool.AppendCertsFromPEM(caCert)
svr := helloservice.NewServer(&HelloHandler{},
server.WithTLSConfig(&tls.Config{
Certificates: []tls.Certificate{cert},
ClientCAs: caCertPool,
ClientAuth: tls.RequireAndVerifyClientCert, // 要求客户端证书
MinVersion: tls.VersionTLS12,
CipherSuites: []uint16{
tls.TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384,
tls.TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256,
},
}),
)2. 认证授权
go
// 自定义认证中间件
func AuthMiddleware(next endpoint.Endpoint) endpoint.Endpoint {
return func(ctx context.Context, req, resp interface{}) (err error) {
// 从上下文获取认证信息
authInfo := ctx.Value("auth")
if authInfo == nil {
return errors.New("unauthorized")
}
// 验证认证信息
if !validateAuth(authInfo) {
return errors.New("invalid auth")
}
return next(ctx, req, resp)
}
}
svr := helloservice.NewServer(&HelloHandler{},
server.WithMiddleware(AuthMiddleware),
)3. 安全头部
go
// 安全头部中间件
func SecurityHeaderMiddleware(next endpoint.Endpoint) endpoint.Endpoint {
return func(ctx context.Context, req, resp interface{}) (err error) {
// 设置安全头部
ctx = context.WithValue(ctx, "X-Content-Type-Options", "nosniff")
ctx = context.WithValue(ctx, "X-Frame-Options", "DENY")
ctx = context.WithValue(ctx, "X-XSS-Protection", "1; mode=block")
return next(ctx, req, resp)
}
}
svr := helloservice.NewServer(&HelloHandler{},
server.WithMiddleware(SecurityHeaderMiddleware),
)监控与可观测性
1. Prometheus 监控
go
import "github.com/kitex-contrib/monitor-prometheus"
// 创建监控器
monitor := prometheus.NewServerMonitor(
prometheus.WithNamespace("kitex"),
prometheus.WithSubsystem("server"),
prometheus.WithLabels(map[string]string{
"service": "hello-service",
"version": "v1.0.0",
}),
)
svr := helloservice.NewServer(&HelloHandler{},
server.WithTracer(monitor),
)2. OpenTelemetry 追踪
go
import "github.com/kitex-contrib/tracer-opentelemetry"
// 初始化 OpenTelemetry
tp, err := tracer.InitDefault(
tracer.WithServiceName("hello-service"),
tracer.WithExportEndpoint("http://localhost:4317"),
tracer.WithSampler(trace.AlwaysSample()),
)
if err != nil {
log.Fatal(err)
}
defer tp.Shutdown(context.Background())
svr := helloservice.NewServer(&HelloHandler{},
server.WithTracer(tracer.NewServerTracer()),
)3. 日志配置
go
import "github.com/cloudwego/kitex/pkg/klog"
// 配置日志
klog.SetOutput(os.Stdout)
klog.SetLevel(klog.LevelInfo)
klog.SetFormatter(&klog.TextFormatter{
FullTimestamp: true,
TimestampFormat: "2006-01-02 15:04:05",
})
svr := helloservice.NewServer(&HelloHandler{},
server.WithLogger(klog.DefaultLogger()),
)服务元信息
go
import "github.com/cloudwego/kitex/pkg/serviceinfo"
svr := helloservice.NewServer(&HelloHandler{},
server.WithServiceInfo(&serviceinfo.Info{
ServiceName: "hello-service",
Methods: map[string]serviceinfo.MethodInfo{},
}),
)完整示例
go
package main
import (
"context"
"log"
"net"
"os"
"os/signal"
"syscall"
"time"
"github.com/cloudwego/kitex/pkg/klog"
"github.com/cloudwego/kitex/server"
"example/kitex_gen/hello"
"example/kitex_gen/hello/helloservice"
)
type HelloHandler struct{}
func (h *HelloHandler) SayHello(ctx context.Context, req *hello.Request) (*hello.Response, error) {
klog.CtxInfof(ctx, "SayHello called with name: %s", req.Name)
return &hello.Response{
Message: "Hello, " + req.Name + "!",
}, nil
}
func main() {
// 创建服务端
svr := helloservice.NewServer(&HelloHandler{},
server.WithServiceAddr(&net.TCPAddr{
IP: net.ParseIP("0.0.0.0"),
Port: 8888,
}),
server.WithTimeout(3*time.Second),
server.WithLimit(&server.Limit{
MaxConnections: 10000,
MaxQPS: 1000,
}),
)
// 启动服务
go func() {
log.Println("server starting on :8888")
if err := svr.Run(); err != nil {
log.Fatalf("server stopped: %v", err)
}
}()
// 优雅关闭
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
<-sigCh
log.Println("shutting down server...")
if err := svr.Stop(); err != nil {
log.Printf("server stop error: %v", err)
}
log.Println("server stopped")
}小结
本章介绍了 Kitex 服务端开发的核心内容:
- Server 创建:基本创建、监听地址、Listener 配置
- 配置选项:超时、连接限制、并发、中间件、服务注册、TLS
- 生命周期管理:钩子函数、优雅关闭
- Handler 实现:基本实现、依赖注入
- 多服务支持:同时运行多个服务
- 高级配置:网络配置、内存配置、线程模型配置
- 安全设置:TLS 高级配置、认证授权、安全头部
- 监控与可观测性:Prometheus 监控、OpenTelemetry 追踪、日志配置
通过本章的学习,你应该掌握了如何构建一个高性能、安全、可靠的 Kitex 服务端。在下一章中,我们将学习 Kitex 客户端开发,包括客户端配置、连接池管理和故障处理等内容。