Skip to content

微服务架构设计

概述

微服务架构是一种将单一应用程序拆分为一组小型服务的架构风格,每个服务运行在独立的进程中,服务间通过轻量级通信机制协作。本章将深入探讨基于 CloudWeGo 的微服务架构设计原则、模式和最佳实践。

核心内容

微服务设计原则

单一职责原则

每个微服务应该只负责一个业务功能:

go
type OrderService struct {
    orderRepo   repository.OrderRepository
    eventBus    *EventBus
}

func (s *OrderService) CreateOrder(ctx context.Context, req *CreateOrderReq) (*Order, error) {
    order := &Order{
        ID:        generateID(),
        UserID:    req.UserID,
        ProductID: req.ProductID,
        Status:    OrderStatusPending,
        CreatedAt: time.Now(),
    }
    
    if err := s.orderRepo.Create(ctx, order); err != nil {
        return nil, err
    }
    
    s.eventBus.Publish("order.created", order)
    
    return order, nil
}

服务自治

每个服务独立开发、部署和扩展:

yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: order-service
spec:
  replicas: 3
  selector:
    matchLabels:
      app: order-service
  template:
    metadata:
      labels:
        app: order-service
    spec:
      containers:
      - name: order-service
        image: order-service:v1.0.0
        ports:
        - containerPort: 8888
        resources:
          requests:
            cpu: "100m"
            memory: "128Mi"
          limits:
            cpu: "500m"
            memory: "512Mi"

服务拆分策略

按业务能力拆分

go
type UserService struct {
    repo repository.UserRepository
}

type OrderService struct {
    repo repository.OrderRepository
}

type PaymentService struct {
    repo repository.PaymentRepository
}

type InventoryService struct {
    repo repository.InventoryRepository
}

按子域拆分(DDD)

go
type UserDomain struct {
    userService    *service.UserService
    authService    *service.AuthService
    profileService *service.ProfileService
}

type OrderDomain struct {
    orderService   *service.OrderService
    cartService    *service.CartService
}

type PaymentDomain struct {
    paymentService *service.PaymentService
    refundService  *service.RefundService
}

服务通信模式

同步通信

go
type OrderClient struct {
    client *kitex.Client
}

func (c *OrderClient) GetOrder(ctx context.Context, orderID int64) (*Order, error) {
    req := &order.GetOrderReq{OrderId: orderID}
    resp, err := c.client.GetOrder(ctx, req)
    if err != nil {
        return nil, fmt.Errorf("get order failed: %w", err)
    }
    return resp.Order, nil
}

func NewOrderClient() (*OrderClient, error) {
    r, err := consul.NewConsulResolver("127.0.0.1:8500")
    if err != nil {
        return nil, err
    }
    
    client, err := orderservice.NewClient("order-service",
        client.WithResolver(r),
        client.WithTimeout(3*time.Second),
    )
    if err != nil {
        return nil, err
    }
    
    return &OrderClient{client: client}, nil
}

异步通信(消息队列)

go
type MessageQueue struct {
    producer sarama.SyncProducer
    consumer sarama.ConsumerGroup
}

func (mq *MessageQueue) Publish(ctx context.Context, topic string, message []byte) error {
    msg := &sarama.ProducerMessage{
        Topic: topic,
        Value: sarama.ByteEncoder(message),
    }
    
    _, _, err := mq.producer.SendMessage(msg)
    return err
}

func (mq *MessageQueue) Subscribe(topic string, handler MessageHandler) error {
    return mq.consumer.Consume(context.Background(), []string{topic}, &consumerHandler{
        handler: handler,
    })
}

type OrderEventHandler struct {
    orderService *service.OrderService
}

func (h *OrderEventHandler) Handle(ctx context.Context, event *OrderEvent) error {
    switch event.Type {
    case "payment.completed":
        return h.orderService.MarkAsPaid(ctx, event.OrderID)
    case "inventory.reserved":
        return h.orderService.MarkAsReserved(ctx, event.OrderID)
    }
    return nil
}

API 网关设计

网关架构

go
type APIGateway struct {
    userClient    *UserClient
    orderClient   *OrderClient
    productClient *ProductClient
}

func (g *APIGateway) GetUserInfo(ctx context.Context, userID int64) (*UserInfo, error) {
    return g.userClient.GetUser(ctx, userID)
}

func (g *APIGateway) CreateOrder(ctx context.Context, req *CreateOrderReq) (*Order, error) {
    user, err := g.userClient.GetUser(ctx, req.UserID)
    if err != nil {
        return nil, err
    }
    
    product, err := g.productClient.GetProduct(ctx, req.ProductID)
    if err != nil {
        return nil, err
    }
    
    order, err := g.orderClient.CreateOrder(ctx, &CreateOrderReq{
        UserID:    user.ID,
        ProductID: product.ID,
        Amount:    product.Price,
    })
    if err != nil {
        return nil, err
    }
    
    return order, nil
}

路由配置

go
func SetupRouter(gateway *APIGateway) *gin.Engine {
    r := gin.Default()
    
    r.Use(AuthMiddleware())
    r.Use(RateLimitMiddleware())
    r.Use(CircuitBreakerMiddleware())
    
    api := r.Group("/api/v1")
    {
        api.GET("/users/:id", gateway.GetUserInfo)
        api.POST("/orders", gateway.CreateOrder)
        api.GET("/orders/:id", gateway.GetOrder)
        api.GET("/products/:id", gateway.GetProduct)
    }
    
    return r
}

服务治理

服务发现

go
type ServiceRegistry interface {
    Register(ctx context.Context, service *ServiceInfo) error
    Deregister(ctx context.Context, serviceID string) error
    Discover(ctx context.Context, serviceName string) ([]*ServiceInstance, error)
}

type NacosRegistry struct {
    client naming_client.INamingClient
}

func (r *NacosRegistry) Register(ctx context.Context, service *ServiceInfo) error {
    instance := &naming_client.Instance{
        ServiceName: service.Name,
        Ip:          service.IP,
        Port:        service.Port,
        Weight:      service.Weight,
        Metadata:    service.Metadata,
    }
    
    _, err := r.client.RegisterInstance(instance)
    return err
}

func (r *NacosRegistry) Discover(ctx context.Context, serviceName string) ([]*ServiceInstance, error) {
    instances, err := r.client.SelectInstances(serviceName, "DEFAULT_GROUP", true)
    if err != nil {
        return nil, err
    }
    
    var result []*ServiceInstance
    for _, ins := range instances {
        result = append(result, &ServiceInstance{
            ID:      ins.InstanceId,
            Address: ins.Ip,
            Port:    ins.Port,
            Weight:  ins.Weight,
        })
    }
    
    return result, nil
}

负载均衡

go
type LoadBalancer interface {
    Select(instances []*ServiceInstance) *ServiceInstance
}

type RoundRobinLB struct {
    counter uint64
}

func (lb *RoundRobinLB) Select(instances []*ServiceInstance) *ServiceInstance {
    if len(instances) == 0 {
        return nil
    }
    
    idx := atomic.AddUint64(&lb.counter, 1)
    return instances[idx%uint64(len(instances))]
}

type WeightedRoundRobinLB struct {
    instances []*ServiceInstance
    weights   []int
    counter   int
}

func (lb *WeightedRoundRobinLB) Select(instances []*ServiceInstance) *ServiceInstance {
    total := 0
    for _, ins := range instances {
        total += ins.Weight
    }
    
    lb.counter = (lb.counter + 1) % total
    
    current := 0
    for _, ins := range instances {
        current += ins.Weight
        if lb.counter < current {
            return ins
        }
    }
    
    return instances[0]
}

熔断降级

go
type CircuitBreaker struct {
    maxFailures   int
    timeout       time.Duration
    state         State
    failures      int
    lastFailTime  time.Time
}

type State int

const (
    StateClosed State = iota
    StateOpen
    StateHalfOpen
)

func (cb *CircuitBreaker) Call(fn func() error) error {
    if cb.state == StateOpen {
        if time.Since(cb.lastFailTime) > cb.timeout {
            cb.state = StateHalfOpen
        } else {
            return ErrCircuitBreakerOpen
        }
    }
    
    err := fn()
    
    if err != nil {
        cb.failures++
        if cb.failures >= cb.maxFailures {
            cb.state = StateOpen
            cb.lastFailTime = time.Now()
        }
        return err
    }
    
    cb.failures = 0
    cb.state = StateClosed
    return nil
}

func CircuitBreakerMiddleware(cb *CircuitBreaker) endpoint.Middleware {
    return func(next endpoint.Endpoint) endpoint.Endpoint {
        return func(ctx context.Context, req interface{}) (interface{}, error) {
            var resp interface{}
            err := cb.Call(func() error {
                var err error
                resp, err = next(ctx, req)
                return err
            })
            return resp, err
        }
    }
}

分布式事务

Saga 模式

go
type SagaStep struct {
    Name    string
    Action  func(ctx context.Context) error
    Compensate func(ctx context.Context) error
}

type Saga struct {
    steps []SagaStep
}

func (s *Saga) Execute(ctx context.Context) error {
    var executedSteps []int
    
    for i, step := range s.steps {
        if err := step.Action(ctx); err != nil {
            for j := len(executedSteps) - 1; j >= 0; j-- {
                s.steps[executedSteps[j]].Compensate(ctx)
            }
            return fmt.Errorf("saga failed at step %s: %w", step.Name, err)
        }
        executedSteps = append(executedSteps, i)
    }
    
    return nil
}

func CreateOrderSaga(orderService *OrderService, 
    inventoryService *InventoryService, 
    paymentService *PaymentService) *Saga {
    
    return &Saga{
        steps: []SagaStep{
            {
                Name: "create_order",
                Action: func(ctx context.Context) error {
                    return orderService.CreateOrder(ctx)
                },
                Compensate: func(ctx context.Context) error {
                    return orderService.CancelOrder(ctx)
                },
            },
            {
                Name: "reserve_inventory",
                Action: func(ctx context.Context) error {
                    return inventoryService.Reserve(ctx)
                },
                Compensate: func(ctx context.Context) error {
                    return inventoryService.Release(ctx)
                },
            },
            {
                Name: "process_payment",
                Action: func(ctx context.Context) error {
                    return paymentService.Process(ctx)
                },
                Compensate: func(ctx context.Context) error {
                    return paymentService.Refund(ctx)
                },
            },
        },
    }
}

TCC 模式

go
type TCCService interface {
    Try(ctx context.Context, req interface{}) error
    Confirm(ctx context.Context, req interface{}) error
    Cancel(ctx context.Context, req interface{}) error
}

type OrderTCC struct {
    orderService *OrderService
}

func (t *OrderTCC) Try(ctx context.Context, req interface{}) error {
    orderReq := req.(*CreateOrderReq)
    return t.orderService.CreatePending(ctx, orderReq)
}

func (t *OrderTCC) Confirm(ctx context.Context, req interface{}) error {
    orderReq := req.(*CreateOrderReq)
    return t.orderService.Confirm(ctx, orderReq.OrderID)
}

func (t *OrderTCC) Cancel(ctx context.Context, req interface{}) error {
    orderReq := req.(*CreateOrderReq)
    return t.orderService.Cancel(ctx, orderReq.OrderID)
}

type TCCCoordinator struct {
    services []TCCService
}

func (c *TCCCoordinator) Execute(ctx context.Context, req interface{}) error {
    var tried []int
    
    for i, svc := range c.services {
        if err := svc.Try(ctx, req); err != nil {
            for j := len(tried) - 1; j >= 0; j-- {
                c.services[tried[j]].Cancel(ctx, req)
            }
            return err
        }
        tried = append(tried, i)
    }
    
    for _, svc := range c.services {
        if err := svc.Confirm(ctx, req); err != nil {
            log.Printf("confirm failed: %v", err)
        }
    }
    
    return nil
}

数据一致性

最终一致性

go
type EventStore struct {
    db *sql.DB
}

func (es *EventStore) Append(ctx context.Context, aggregateID string, event interface{}) error {
    data, err := json.Marshal(event)
    if err != nil {
        return err
    }
    
    _, err = es.db.ExecContext(ctx, `
        INSERT INTO events (aggregate_id, event_type, event_data, created_at)
        VALUES (?, ?, ?, ?)
    `, aggregateID, reflect.TypeOf(event).Name(), data, time.Now())
    
    return err
}

func (es *EventStore) GetEvents(ctx context.Context, aggregateID string) ([]Event, error) {
    rows, err := es.db.QueryContext(ctx, `
        SELECT event_type, event_data, created_at
        FROM events
        WHERE aggregate_id = ?
        ORDER BY created_at
    `, aggregateID)
    if err != nil {
        return nil, err
    }
    defer rows.Close()
    
    var events []Event
    for rows.Next() {
        var eventType string
        var eventData []byte
        var createdAt time.Time
        
        if err := rows.Scan(&eventType, &eventData, &createdAt); err != nil {
            return nil, err
        }
        
        events = append(events, Event{
            Type:      eventType,
            Data:      eventData,
            CreatedAt: createdAt,
        })
    }
    
    return events, nil
}

CQRS 模式

go
type CommandHandler interface {
    Handle(ctx context.Context, cmd interface{}) error
}

type QueryHandler interface {
    Handle(ctx context.Context, query interface{}) (interface{}, error)
}

type CreateUserCommand struct {
    Name  string
    Email string
}

type UserCommandHandler struct {
    userRepo repository.UserRepository
    eventBus *EventBus
}

func (h *UserCommandHandler) Handle(ctx context.Context, cmd interface{}) error {
    createCmd := cmd.(*CreateUserCommand)
    
    user := &model.User{
        ID:    generateID(),
        Name:  createCmd.Name,
        Email: createCmd.Email,
    }
    
    if err := h.userRepo.Create(ctx, user); err != nil {
        return err
    }
    
    h.eventBus.Publish("user.created", user)
    return nil
}

type GetUserQuery struct {
    ID int64
}

type UserQueryHandler struct {
    db *sql.DB
}

func (h *UserQueryHandler) Handle(ctx context.Context, query interface{}) (interface{}, error) {
    getQuery := query.(*GetUserQuery)
    
    var user model.User
    err := h.db.QueryRowContext(ctx, `
        SELECT id, name, email, created_at
        FROM users_view
        WHERE id = ?
    `, getQuery.ID).Scan(&user.ID, &user.Name, &user.Email, &user.CreatedAt)
    
    if err != nil {
        return nil, err
    }
    
    return &user, nil
}

可观测性

分布式追踪

go
import (
    "go.opentelemetry.io/otel"
    "go.opentelemetry.io/otel/trace"
)

func TracingMiddleware(tracer trace.Tracer) endpoint.Middleware {
    return func(next endpoint.Endpoint) endpoint.Endpoint {
        return func(ctx context.Context, req interface{}) (interface{}, error) {
            ctx, span := tracer.Start(ctx, "rpc_request")
            defer span.End()
            
            resp, err := next(ctx, req)
            
            if err != nil {
                span.RecordError(err)
            }
            
            return resp, err
        }
    }
}

func SetupTracing(serviceName string) (trace.Tracer, error) {
    exporter, err := jaeger.New(jaeger.WithCollectorEndpoint())
    if err != nil {
        return nil, err
    }
    
    tp := tracesdk.NewTracerProvider(
        tracesdk.WithBatcher(exporter),
        tracesdk.WithResource(resource.NewWithAttributes(
            semconv.SchemaURL,
            semconv.ServiceNameKey.String(serviceName),
        )),
    )
    
    otel.SetTracerProvider(tp)
    
    return tp.Tracer(serviceName), nil
}

日志聚合

go
type StructuredLogger struct {
    logger *zap.Logger
}

func (l *StructuredLogger) Log(ctx context.Context, level LogLevel, msg string, fields ...Field) {
    traceID := trace.SpanFromContext(ctx).SpanContext().TraceID()
    
    zapFields := []zap.Field{
        zap.String("trace_id", traceID.String()),
        zap.Time("timestamp", time.Now()),
    }
    
    for _, f := range fields {
        zapFields = append(zapFields, zap.Any(f.Key, f.Value))
    }
    
    switch level {
    case LevelInfo:
        l.logger.Info(msg, zapFields...)
    case LevelError:
        l.logger.Error(msg, zapFields...)
    case LevelWarn:
        l.logger.Warn(msg, zapFields...)
    }
}

func LoggingMiddleware(logger *StructuredLogger) endpoint.Middleware {
    return func(next endpoint.Endpoint) endpoint.Endpoint {
        return func(ctx context.Context, req interface{}) (interface{}, error) {
            start := time.Now()
            
            resp, err := next(ctx, req)
            
            logger.Log(ctx, LevelInfo, "request completed",
                Field{Key: "duration", Value: time.Since(start)},
                Field{Key: "error", Value: err},
            )
            
            return resp, err
        }
    }
}

配置管理

集中式配置

go
type ConfigCenter interface {
    Get(ctx context.Context, key string) (string, error)
    Watch(ctx context.Context, key string, callback func(value string))
}

type NacosConfigCenter struct {
    client config_client.IConfigClient
}

func (c *NacosConfigCenter) Get(ctx context.Context, key string) (string, error) {
    return c.client.GetConfig(vo.ConfigParam{
        DataId: key,
        Group:  "DEFAULT_GROUP",
    })
}

func (c *NacosConfigCenter) Watch(ctx context.Context, key string, callback func(value string)) {
    c.client.ListenConfig(vo.ConfigParam{
        DataId: key,
        Group:  "DEFAULT_GROUP",
        OnChange: func(namespace, group, dataId, data string) {
            callback(data)
        },
    })
}

type DynamicConfig struct {
    center ConfigCenter
    cache  sync.Map
}

func (c *DynamicConfig) GetConfig(ctx context.Context, key string) (string, error) {
    if val, ok := c.cache.Load(key); ok {
        return val.(string), nil
    }
    
    val, err := c.center.Get(ctx, key)
    if err != nil {
        return "", err
    }
    
    c.cache.Store(key, val)
    
    c.center.Watch(ctx, key, func(newValue string) {
        c.cache.Store(key, newValue)
    })
    
    return val, nil
}

小结

本章深入探讨了微服务架构设计的核心要点:

  1. 设计原则:单一职责、服务自治、独立部署
  2. 服务拆分:按业务能力、按子域(DDD)拆分策略
  3. 通信模式:同步 RPC 调用、异步消息队列
  4. API 网关:统一入口、路由配置、协议转换
  5. 服务治理:服务发现、负载均衡、熔断降级
  6. 分布式事务:Saga 模式、TCC 模式
  7. 数据一致性:最终一致性、CQRS 模式
  8. 可观测性:分布式追踪、日志聚合、监控告警
  9. 配置管理:集中式配置、动态更新

在下一章中,我们将通过实际项目案例来应用这些架构设计理念。