可观测性
概述
可观测性是微服务架构中理解系统行为的关键能力,包括日志、指标和链路追踪三大支柱。在 CloudWeGo 生态中,Kitex 和 Hertz 提供了完善的可观测性支持,帮助开发者快速定位问题、优化性能。
为什么需要可观测性?
在分布式系统中面临诸多挑战:
- 服务调用链路复杂,问题定位困难
- 系统运行状态难以实时掌握
- 性能瓶颈难以发现和优化
- 故障影响范围难以评估
可观测性通过日志、指标、追踪三大手段,提供系统运行的全方位视图,是构建可靠微服务系统的基石。
核心内容
1. 可观测性三大支柱
基本概念
- 日志(Logging):记录离散的事件,用于问题诊断
- 指标(Metrics):聚合的数值数据,用于监控和告警
- 追踪(Tracing):请求的完整调用链路,用于性能分析
三者关系
日志 → 事件详情
↓
指标 → 聚合统计
↓
追踪 → 链路关联2. 日志系统
基本使用
Kitex 支持多种日志框架集成:
go
package main
import (
"github.com/cloudwego/kitex/server"
"github.com/cloudwego/kitex/pkg/klog"
)
func main() {
klog.SetLevel(klog.LevelDebug)
klog.Debug("debug message")
klog.Info("info message")
klog.Warn("warn message")
klog.Error("error message")
svr := echo.NewServer(
new(EchoImpl),
)
if err := svr.Run(); err != nil {
klog.Fatalf("server stopped with error: %v", err)
}
}结构化日志
使用结构化日志便于查询和分析:
go
package main
import (
"github.com/cloudwego/kitex/pkg/klog"
)
type User struct {
ID int `json:"id"`
Name string `json:"name"`
}
func main() {
user := &User{ID: 1, Name: "Alice"}
klog.Infof("user login: %+v", user)
klog.JSON("user_login", map[string]interface{}{
"user_id": user.ID,
"user_name": user.Name,
"timestamp": time.Now().Unix(),
})
}日志中间件
为 RPC 调用添加日志记录:
go
package main
import (
"context"
"time"
"github.com/cloudwego/kitex/pkg/klog"
"github.com/cloudwego/kitex/pkg/rpcinfo"
"github.com/cloudwego/kitex/server"
)
func LoggingMiddleware(next server.Endpoint) server.Endpoint {
return func(ctx context.Context, req, resp interface{}) (err error) {
start := time.Now()
ri := rpcinfo.GetRPCInfo(ctx)
methodName := ri.To().Method()
klog.Infof("[%s] request: %+v", methodName, req)
err = next(ctx, req, resp)
duration := time.Since(start)
if err != nil {
klog.Errorf("[%s] error: %v, duration: %v", methodName, err, duration)
} else {
klog.Infof("[%s] success, duration: %v", methodName, duration)
}
return err
}
}
func main() {
svr := echo.NewServer(
new(EchoImpl),
server.WithMiddleware(LoggingMiddleware),
)
if err := svr.Run(); err != nil {
klog.Fatalf("server stopped with error: %v", err)
}
}3. 指标监控
Prometheus 集成
Kitex 支持 Prometheus 指标采集:
go
package main
import (
"github.com/cloudwego/kitex/server"
"github.com/kitex-contrib/monitor-prometheus"
)
func main() {
svr := echo.NewServer(
new(EchoImpl),
server.WithTracer(prometheus.NewServerTracer(":9091", "/metrics")),
)
if err := svr.Run(); err != nil {
klog.Fatalf("server stopped with error: %v", err)
}
}自定义指标
定义和记录自定义指标:
go
package main
import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)
var (
requestCounter = promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "echo_request_total",
Help: "Total number of echo requests",
},
[]string{"method", "status"},
)
requestDuration = promauto.NewHistogramVec(
prometheus.HistogramOpts{
Name: "echo_request_duration_seconds",
Help: "Duration of echo requests",
Buckets: prometheus.DefBuckets,
},
[]string{"method"},
)
activeConnections = promauto.NewGauge(
prometheus.GaugeOpts{
Name: "echo_active_connections",
Help: "Number of active connections",
},
)
)
func (s *EchoImpl) Echo(ctx context.Context, req *api.Request) (*api.Response, error) {
start := time.Now()
activeConnections.Inc()
defer activeConnections.Dec()
resp, err := s.processRequest(ctx, req)
duration := time.Since(start).Seconds()
requestDuration.WithLabelValues("Echo").Observe(duration)
if err != nil {
requestCounter.WithLabelValues("Echo", "error").Inc()
} else {
requestCounter.WithLabelValues("Echo", "success").Inc()
}
return resp, err
}指标中间件
go
package main
import (
"context"
"time"
"github.com/cloudwego/kitex/pkg/rpcinfo"
"github.com/cloudwego/kitex/server"
"github.com/prometheus/client_golang/prometheus"
)
var (
rpcDuration = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Name: "rpc_duration_seconds",
Help: "RPC duration in seconds",
Buckets: []float64{.001, .005, .01, .025, .05, .1, .25, .5, 1, 2.5, 5, 10},
},
[]string{"service", "method"},
)
)
func MetricsMiddleware(next server.Endpoint) server.Endpoint {
return func(ctx context.Context, req, resp interface{}) (err error) {
start := time.Now()
ri := rpcinfo.GetRPCInfo(ctx)
service := ri.To().ServiceName()
method := ri.To().Method()
err = next(ctx, req, resp)
duration := time.Since(start).Seconds()
rpcDuration.WithLabelValues(service, method).Observe(duration)
return err
}
}4. 链路追踪
OpenTelemetry 集成
Kitex 支持 OpenTelemetry 链路追踪:
go
package main
import (
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/exporters/jaeger"
"go.opentelemetry.io/otel/sdk/resource"
tracesdk "go.opentelemetry.io/otel/sdk/trace"
semconv "go.opentelemetry.io/otel/semconv/v1.4.0"
"github.com/cloudwego/kitex/server"
"github.com/kitex-contrib/tracer-opentelemetry/provider"
)
func initTracer() (func(context.Context) error, error) {
exp, err := jaeger.New(jaeger.WithCollectorEndpoint(jaeger.WithEndpoint("http://localhost:14268/api/traces")))
if err != nil {
return nil, err
}
tp := tracesdk.NewTracerProvider(
tracesdk.WithBatcher(exp),
tracesdk.WithResource(resource.NewWithAttributes(
semconv.SchemaURL,
semconv.ServiceNameKey.String("echo-service"),
)),
)
otel.SetTracerProvider(tp)
return tp.Shutdown, nil
}
func main() {
shutdown, err := initTracer()
if err != nil {
klog.Fatalf("failed to init tracer: %v", err)
}
defer shutdown(context.Background())
svr := echo.NewServer(
new(EchoImpl),
server.WithSuite(provider.NewServerSuite()),
)
if err := svr.Run(); err != nil {
klog.Fatalf("server stopped with error: %v", err)
}
}客户端追踪
go
package main
import (
"github.com/cloudwego/kitex/client"
"github.com/kitex-contrib/tracer-opentelemetry/provider"
)
func main() {
c, err := echo.NewClient(
"echo",
client.WithSuite(provider.NewClientSuite()),
)
if err != nil {
panic(err)
}
ctx := context.Background()
resp, err := c.Echo(ctx, &api.Request{Message: "Hello"})
if err != nil {
panic(err)
}
log.Println(resp.Message)
}自定义 Span
在业务逻辑中添加自定义 Span:
go
package main
import (
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/trace"
)
var tracer = otel.Tracer("echo-service")
func (s *EchoImpl) Echo(ctx context.Context, req *api.Request) (*api.Response, error) {
ctx, span := tracer.Start(ctx, "Echo")
defer span.End()
resp, err := s.processRequest(ctx, req)
if err != nil {
span.RecordError(err)
}
return resp, err
}
func (s *EchoImpl) processRequest(ctx context.Context, req *api.Request) (*api.Response, error) {
ctx, span := tracer.Start(ctx, "processRequest")
defer span.End()
span.SetAttributes(
attribute.String("request.message", req.Message),
)
return &api.Response{
Message: "Echo: " + req.Message,
}, nil
}5. 健康检查
健康检查端点
go
package main
import (
"net/http"
"github.com/cloudwego/kitex/pkg/klog"
)
func startHealthServer() {
http.HandleFunc("/health", func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
w.Write([]byte("OK"))
})
go func() {
if err := http.ListenAndServe(":8080", nil); err != nil {
klog.Fatalf("health server error: %v", err)
}
}()
}
func main() {
startHealthServer()
svr := echo.NewServer(
new(EchoImpl),
)
if err := svr.Run(); err != nil {
klog.Fatalf("server stopped with error: %v", err)
}
}6. 服务元数据
元数据传递
在服务间传递元数据:
go
package main
import (
"github.com/cloudwego/kitex/pkg/rpcinfo"
)
func (s *EchoImpl) Echo(ctx context.Context, req *api.Request) (*api.Response, error) {
ri := rpcinfo.GetRPCInfo(ctx)
fromService := ri.From().ServiceName()
toService := ri.To().ServiceName()
method := ri.To().Method()
klog.Infof("call from %s to %s.%s", fromService, toService, method)
return &api.Response{
Message: "Echo: " + req.Message,
}, nil
}7. 监控大盘
Grafana 配置
创建 Grafana 监控大盘:
yaml
apiVersion: 1
providers:
- name: 'Kitex Dashboard'
folder: 'Kitex'
type: file
options:
path: /var/lib/grafana/dashboards告警规则
配置 Prometheus 告警规则:
yaml
groups:
- name: kitex_alerts
rules:
- alert: HighErrorRate
expr: rate(echo_request_total{status="error"}[5m]) > 0.1
for: 5m
labels:
severity: critical
annotations:
summary: High error rate detected
description: Error rate is {{ $value }} per second
- alert: HighLatency
expr: histogram_quantile(0.95, rate(echo_request_duration_seconds_bucket[5m])) > 1
for: 5m
labels:
severity: warning
annotations:
summary: High latency detected
description: 95th percentile latency is {{ $value }} seconds8. 日志聚合
ELK 集成
配置日志输出到 Elasticsearch:
go
package main
import (
"github.com/cloudwego/kitex/pkg/klog"
"github.com/sirupsen/logrus"
)
type ElasticsearchHook struct {
client *elastic.Client
index string
}
func (h *ElasticsearchHook) Fire(entry *logrus.Entry) error {
doc := map[string]interface{}{
"timestamp": entry.Time,
"level": entry.Level.String(),
"message": entry.Message,
"fields": entry.Data,
}
_, err := h.client.Index().
Index(h.index).
BodyJson(doc).
Do(context.Background())
return err
}
func (h *ElasticsearchHook) Levels() []logrus.Level {
return logrus.AllLevels
}9. 全链路监控
集成示例
完整的可观测性集成:
go
package main
import (
"context"
"net/http"
"time"
"github.com/cloudwego/kitex/pkg/klog"
"github.com/cloudwego/kitex/server"
"github.com/kitex-contrib/monitor-prometheus"
"github.com/kitex-contrib/tracer-opentelemetry/provider"
"github.com/prometheus/client_golang/prometheus/promhttp"
)
func main() {
shutdown, err := initTracer()
if err != nil {
klog.Fatalf("failed to init tracer: %v", err)
}
defer shutdown(context.Background())
go func() {
http.Handle("/metrics", promhttp.Handler())
http.HandleFunc("/health", func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
w.Write([]byte("OK"))
})
http.ListenAndServe(":8080", nil)
}()
svr := echo.NewServer(
new(EchoImpl),
server.WithSuite(provider.NewServerSuite()),
server.WithTracer(prometheus.NewServerTracer(":9091", "/metrics")),
server.WithMiddleware(LoggingMiddleware),
server.WithMiddleware(MetricsMiddleware),
)
if err := svr.Run(); err != nil {
klog.Fatalf("server stopped with error: %v", err)
}
}最佳实践
1. 日志规范
| 级别 | 场景 |
|---|---|
| DEBUG | 开发调试信息 |
| INFO | 关键业务流程 |
| WARN | 潜在问题 |
| ERROR | 错误信息 |
| FATAL | 致命错误 |
2. 指标选择
go
type MetricsConfig struct {
EnableQPS bool
EnableLatency bool
EnableError bool
EnableThroughput bool
}
func NewMetricsConfig() *MetricsConfig {
return &MetricsConfig{
EnableQPS: true,
EnableLatency: true,
EnableError: true,
EnableThroughput: true,
}
}3. 追踪采样
go
import (
"go.opentelemetry.io/otel/sdk/trace"
)
func newSampler() trace.Sampler {
return trace.ParentBased(
trace.TraceIDRatioBased(0.1),
)
}小结
本章介绍了 CloudWeGo 生态中的可观测性机制:
- 三大支柱:理解日志、指标、追踪的作用和关系
- 日志系统:掌握结构化日志和日志中间件的使用
- 指标监控:学会使用 Prometheus 进行指标采集和展示
- 链路追踪:掌握 OpenTelemetry 集成和自定义 Span
- 健康检查:了解健康检查端点的实现
- 服务元数据:学会在服务间传递元数据
- 监控大盘:了解 Grafana 和告警规则的配置
- 日志聚合:掌握 ELK 集成方法
- 全链路监控:学会完整的可观测性集成
- 最佳实践:掌握日志规范、指标选择、追踪采样等实践技巧
可观测性是构建可靠微服务系统的基石,完善的可观测性体系能够帮助快速定位问题、优化性能,是微服务架构不可或缺的重要组成部分。