Skip to content

可观测性

概述

可观测性是微服务架构中理解系统行为的关键能力,包括日志、指标和链路追踪三大支柱。在 CloudWeGo 生态中,Kitex 和 Hertz 提供了完善的可观测性支持,帮助开发者快速定位问题、优化性能。

为什么需要可观测性?

在分布式系统中面临诸多挑战:

  • 服务调用链路复杂,问题定位困难
  • 系统运行状态难以实时掌握
  • 性能瓶颈难以发现和优化
  • 故障影响范围难以评估

可观测性通过日志、指标、追踪三大手段,提供系统运行的全方位视图,是构建可靠微服务系统的基石。

核心内容

1. 可观测性三大支柱

基本概念

  • 日志(Logging):记录离散的事件,用于问题诊断
  • 指标(Metrics):聚合的数值数据,用于监控和告警
  • 追踪(Tracing):请求的完整调用链路,用于性能分析

三者关系

日志 → 事件详情

指标 → 聚合统计

追踪 → 链路关联

2. 日志系统

基本使用

Kitex 支持多种日志框架集成:

go
package main

import (
    "github.com/cloudwego/kitex/server"
    "github.com/cloudwego/kitex/pkg/klog"
)

func main() {
    klog.SetLevel(klog.LevelDebug)
    
    klog.Debug("debug message")
    klog.Info("info message")
    klog.Warn("warn message")
    klog.Error("error message")
    
    svr := echo.NewServer(
        new(EchoImpl),
    )
    
    if err := svr.Run(); err != nil {
        klog.Fatalf("server stopped with error: %v", err)
    }
}

结构化日志

使用结构化日志便于查询和分析:

go
package main

import (
    "github.com/cloudwego/kitex/pkg/klog"
)

type User struct {
    ID   int    `json:"id"`
    Name string `json:"name"`
}

func main() {
    user := &User{ID: 1, Name: "Alice"}
    
    klog.Infof("user login: %+v", user)
    
    klog.JSON("user_login", map[string]interface{}{
        "user_id":   user.ID,
        "user_name": user.Name,
        "timestamp": time.Now().Unix(),
    })
}

日志中间件

为 RPC 调用添加日志记录:

go
package main

import (
    "context"
    "time"
    
    "github.com/cloudwego/kitex/pkg/klog"
    "github.com/cloudwego/kitex/pkg/rpcinfo"
    "github.com/cloudwego/kitex/server"
)

func LoggingMiddleware(next server.Endpoint) server.Endpoint {
    return func(ctx context.Context, req, resp interface{}) (err error) {
        start := time.Now()
        
        ri := rpcinfo.GetRPCInfo(ctx)
        methodName := ri.To().Method()
        
        klog.Infof("[%s] request: %+v", methodName, req)
        
        err = next(ctx, req, resp)
        
        duration := time.Since(start)
        
        if err != nil {
            klog.Errorf("[%s] error: %v, duration: %v", methodName, err, duration)
        } else {
            klog.Infof("[%s] success, duration: %v", methodName, duration)
        }
        
        return err
    }
}

func main() {
    svr := echo.NewServer(
        new(EchoImpl),
        server.WithMiddleware(LoggingMiddleware),
    )
    
    if err := svr.Run(); err != nil {
        klog.Fatalf("server stopped with error: %v", err)
    }
}

3. 指标监控

Prometheus 集成

Kitex 支持 Prometheus 指标采集:

go
package main

import (
    "github.com/cloudwego/kitex/server"
    "github.com/kitex-contrib/monitor-prometheus"
)

func main() {
    svr := echo.NewServer(
        new(EchoImpl),
        server.WithTracer(prometheus.NewServerTracer(":9091", "/metrics")),
    )
    
    if err := svr.Run(); err != nil {
        klog.Fatalf("server stopped with error: %v", err)
    }
}

自定义指标

定义和记录自定义指标:

go
package main

import (
    "github.com/prometheus/client_golang/prometheus"
    "github.com/prometheus/client_golang/prometheus/promauto"
)

var (
    requestCounter = promauto.NewCounterVec(
        prometheus.CounterOpts{
            Name: "echo_request_total",
            Help: "Total number of echo requests",
        },
        []string{"method", "status"},
    )
    
    requestDuration = promauto.NewHistogramVec(
        prometheus.HistogramOpts{
            Name:    "echo_request_duration_seconds",
            Help:    "Duration of echo requests",
            Buckets: prometheus.DefBuckets,
        },
        []string{"method"},
    )
    
    activeConnections = promauto.NewGauge(
        prometheus.GaugeOpts{
            Name: "echo_active_connections",
            Help: "Number of active connections",
        },
    )
)

func (s *EchoImpl) Echo(ctx context.Context, req *api.Request) (*api.Response, error) {
    start := time.Now()
    activeConnections.Inc()
    defer activeConnections.Dec()
    
    resp, err := s.processRequest(ctx, req)
    
    duration := time.Since(start).Seconds()
    requestDuration.WithLabelValues("Echo").Observe(duration)
    
    if err != nil {
        requestCounter.WithLabelValues("Echo", "error").Inc()
    } else {
        requestCounter.WithLabelValues("Echo", "success").Inc()
    }
    
    return resp, err
}

指标中间件

go
package main

import (
    "context"
    "time"
    
    "github.com/cloudwego/kitex/pkg/rpcinfo"
    "github.com/cloudwego/kitex/server"
    "github.com/prometheus/client_golang/prometheus"
)

var (
    rpcDuration = prometheus.NewHistogramVec(
        prometheus.HistogramOpts{
            Name:    "rpc_duration_seconds",
            Help:    "RPC duration in seconds",
            Buckets: []float64{.001, .005, .01, .025, .05, .1, .25, .5, 1, 2.5, 5, 10},
        },
        []string{"service", "method"},
    )
)

func MetricsMiddleware(next server.Endpoint) server.Endpoint {
    return func(ctx context.Context, req, resp interface{}) (err error) {
        start := time.Now()
        
        ri := rpcinfo.GetRPCInfo(ctx)
        service := ri.To().ServiceName()
        method := ri.To().Method()
        
        err = next(ctx, req, resp)
        
        duration := time.Since(start).Seconds()
        rpcDuration.WithLabelValues(service, method).Observe(duration)
        
        return err
    }
}

4. 链路追踪

OpenTelemetry 集成

Kitex 支持 OpenTelemetry 链路追踪:

go
package main

import (
    "go.opentelemetry.io/otel"
    "go.opentelemetry.io/otel/exporters/jaeger"
    "go.opentelemetry.io/otel/sdk/resource"
    tracesdk "go.opentelemetry.io/otel/sdk/trace"
    semconv "go.opentelemetry.io/otel/semconv/v1.4.0"
    
    "github.com/cloudwego/kitex/server"
    "github.com/kitex-contrib/tracer-opentelemetry/provider"
)

func initTracer() (func(context.Context) error, error) {
    exp, err := jaeger.New(jaeger.WithCollectorEndpoint(jaeger.WithEndpoint("http://localhost:14268/api/traces")))
    if err != nil {
        return nil, err
    }
    
    tp := tracesdk.NewTracerProvider(
        tracesdk.WithBatcher(exp),
        tracesdk.WithResource(resource.NewWithAttributes(
            semconv.SchemaURL,
            semconv.ServiceNameKey.String("echo-service"),
        )),
    )
    
    otel.SetTracerProvider(tp)
    
    return tp.Shutdown, nil
}

func main() {
    shutdown, err := initTracer()
    if err != nil {
        klog.Fatalf("failed to init tracer: %v", err)
    }
    defer shutdown(context.Background())
    
    svr := echo.NewServer(
        new(EchoImpl),
        server.WithSuite(provider.NewServerSuite()),
    )
    
    if err := svr.Run(); err != nil {
        klog.Fatalf("server stopped with error: %v", err)
    }
}

客户端追踪

go
package main

import (
    "github.com/cloudwego/kitex/client"
    "github.com/kitex-contrib/tracer-opentelemetry/provider"
)

func main() {
    c, err := echo.NewClient(
        "echo",
        client.WithSuite(provider.NewClientSuite()),
    )
    if err != nil {
        panic(err)
    }
    
    ctx := context.Background()
    resp, err := c.Echo(ctx, &api.Request{Message: "Hello"})
    if err != nil {
        panic(err)
    }
    log.Println(resp.Message)
}

自定义 Span

在业务逻辑中添加自定义 Span:

go
package main

import (
    "go.opentelemetry.io/otel"
    "go.opentelemetry.io/otel/trace"
)

var tracer = otel.Tracer("echo-service")

func (s *EchoImpl) Echo(ctx context.Context, req *api.Request) (*api.Response, error) {
    ctx, span := tracer.Start(ctx, "Echo")
    defer span.End()
    
    resp, err := s.processRequest(ctx, req)
    
    if err != nil {
        span.RecordError(err)
    }
    
    return resp, err
}

func (s *EchoImpl) processRequest(ctx context.Context, req *api.Request) (*api.Response, error) {
    ctx, span := tracer.Start(ctx, "processRequest")
    defer span.End()
    
    span.SetAttributes(
        attribute.String("request.message", req.Message),
    )
    
    return &api.Response{
        Message: "Echo: " + req.Message,
    }, nil
}

5. 健康检查

健康检查端点

go
package main

import (
    "net/http"
    
    "github.com/cloudwego/kitex/pkg/klog"
)

func startHealthServer() {
    http.HandleFunc("/health", func(w http.ResponseWriter, r *http.Request) {
        w.WriteHeader(http.StatusOK)
        w.Write([]byte("OK"))
    })
    
    go func() {
        if err := http.ListenAndServe(":8080", nil); err != nil {
            klog.Fatalf("health server error: %v", err)
        }
    }()
}

func main() {
    startHealthServer()
    
    svr := echo.NewServer(
        new(EchoImpl),
    )
    
    if err := svr.Run(); err != nil {
        klog.Fatalf("server stopped with error: %v", err)
    }
}

6. 服务元数据

元数据传递

在服务间传递元数据:

go
package main

import (
    "github.com/cloudwego/kitex/pkg/rpcinfo"
)

func (s *EchoImpl) Echo(ctx context.Context, req *api.Request) (*api.Response, error) {
    ri := rpcinfo.GetRPCInfo(ctx)
    
    fromService := ri.From().ServiceName()
    toService := ri.To().ServiceName()
    method := ri.To().Method()
    
    klog.Infof("call from %s to %s.%s", fromService, toService, method)
    
    return &api.Response{
        Message: "Echo: " + req.Message,
    }, nil
}

7. 监控大盘

Grafana 配置

创建 Grafana 监控大盘:

yaml
apiVersion: 1
providers:
  - name: 'Kitex Dashboard'
    folder: 'Kitex'
    type: file
    options:
      path: /var/lib/grafana/dashboards

告警规则

配置 Prometheus 告警规则:

yaml
groups:
  - name: kitex_alerts
    rules:
      - alert: HighErrorRate
        expr: rate(echo_request_total{status="error"}[5m]) > 0.1
        for: 5m
        labels:
          severity: critical
        annotations:
          summary: High error rate detected
          description: Error rate is {{ $value }} per second
      
      - alert: HighLatency
        expr: histogram_quantile(0.95, rate(echo_request_duration_seconds_bucket[5m])) > 1
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: High latency detected
          description: 95th percentile latency is {{ $value }} seconds

8. 日志聚合

ELK 集成

配置日志输出到 Elasticsearch:

go
package main

import (
    "github.com/cloudwego/kitex/pkg/klog"
    "github.com/sirupsen/logrus"
)

type ElasticsearchHook struct {
    client *elastic.Client
    index  string
}

func (h *ElasticsearchHook) Fire(entry *logrus.Entry) error {
    doc := map[string]interface{}{
        "timestamp": entry.Time,
        "level":     entry.Level.String(),
        "message":   entry.Message,
        "fields":    entry.Data,
    }
    
    _, err := h.client.Index().
        Index(h.index).
        BodyJson(doc).
        Do(context.Background())
    
    return err
}

func (h *ElasticsearchHook) Levels() []logrus.Level {
    return logrus.AllLevels
}

9. 全链路监控

集成示例

完整的可观测性集成:

go
package main

import (
    "context"
    "net/http"
    "time"
    
    "github.com/cloudwego/kitex/pkg/klog"
    "github.com/cloudwego/kitex/server"
    "github.com/kitex-contrib/monitor-prometheus"
    "github.com/kitex-contrib/tracer-opentelemetry/provider"
    "github.com/prometheus/client_golang/prometheus/promhttp"
)

func main() {
    shutdown, err := initTracer()
    if err != nil {
        klog.Fatalf("failed to init tracer: %v", err)
    }
    defer shutdown(context.Background())
    
    go func() {
        http.Handle("/metrics", promhttp.Handler())
        http.HandleFunc("/health", func(w http.ResponseWriter, r *http.Request) {
            w.WriteHeader(http.StatusOK)
            w.Write([]byte("OK"))
        })
        http.ListenAndServe(":8080", nil)
    }()
    
    svr := echo.NewServer(
        new(EchoImpl),
        server.WithSuite(provider.NewServerSuite()),
        server.WithTracer(prometheus.NewServerTracer(":9091", "/metrics")),
        server.WithMiddleware(LoggingMiddleware),
        server.WithMiddleware(MetricsMiddleware),
    )
    
    if err := svr.Run(); err != nil {
        klog.Fatalf("server stopped with error: %v", err)
    }
}

最佳实践

1. 日志规范

级别场景
DEBUG开发调试信息
INFO关键业务流程
WARN潜在问题
ERROR错误信息
FATAL致命错误

2. 指标选择

go
type MetricsConfig struct {
    EnableQPS      bool
    EnableLatency  bool
    EnableError    bool
    EnableThroughput bool
}

func NewMetricsConfig() *MetricsConfig {
    return &MetricsConfig{
        EnableQPS:        true,
        EnableLatency:    true,
        EnableError:      true,
        EnableThroughput: true,
    }
}

3. 追踪采样

go
import (
    "go.opentelemetry.io/otel/sdk/trace"
)

func newSampler() trace.Sampler {
    return trace.ParentBased(
        trace.TraceIDRatioBased(0.1),
    )
}

小结

本章介绍了 CloudWeGo 生态中的可观测性机制:

  1. 三大支柱:理解日志、指标、追踪的作用和关系
  2. 日志系统:掌握结构化日志和日志中间件的使用
  3. 指标监控:学会使用 Prometheus 进行指标采集和展示
  4. 链路追踪:掌握 OpenTelemetry 集成和自定义 Span
  5. 健康检查:了解健康检查端点的实现
  6. 服务元数据:学会在服务间传递元数据
  7. 监控大盘:了解 Grafana 和告警规则的配置
  8. 日志聚合:掌握 ELK 集成方法
  9. 全链路监控:学会完整的可观测性集成
  10. 最佳实践:掌握日志规范、指标选择、追踪采样等实践技巧

可观测性是构建可靠微服务系统的基石,完善的可观测性体系能够帮助快速定位问题、优化性能,是微服务架构不可或缺的重要组成部分。