微服务架构设计
概述
微服务架构是一种将单一应用程序拆分为一组小型服务的架构风格,每个服务运行在独立的进程中,服务间通过轻量级通信机制协作。本章将深入探讨基于 CloudWeGo 的微服务架构设计原则、模式和最佳实践。
核心内容
微服务设计原则
单一职责原则
每个微服务应该只负责一个业务功能:
go
type OrderService struct {
orderRepo repository.OrderRepository
eventBus *EventBus
}
func (s *OrderService) CreateOrder(ctx context.Context, req *CreateOrderReq) (*Order, error) {
order := &Order{
ID: generateID(),
UserID: req.UserID,
ProductID: req.ProductID,
Status: OrderStatusPending,
CreatedAt: time.Now(),
}
if err := s.orderRepo.Create(ctx, order); err != nil {
return nil, err
}
s.eventBus.Publish("order.created", order)
return order, nil
}服务自治
每个服务独立开发、部署和扩展:
yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: order-service
spec:
replicas: 3
selector:
matchLabels:
app: order-service
template:
metadata:
labels:
app: order-service
spec:
containers:
- name: order-service
image: order-service:v1.0.0
ports:
- containerPort: 8888
resources:
requests:
cpu: "100m"
memory: "128Mi"
limits:
cpu: "500m"
memory: "512Mi"服务拆分策略
按业务能力拆分
go
type UserService struct {
repo repository.UserRepository
}
type OrderService struct {
repo repository.OrderRepository
}
type PaymentService struct {
repo repository.PaymentRepository
}
type InventoryService struct {
repo repository.InventoryRepository
}按子域拆分(DDD)
go
type UserDomain struct {
userService *service.UserService
authService *service.AuthService
profileService *service.ProfileService
}
type OrderDomain struct {
orderService *service.OrderService
cartService *service.CartService
}
type PaymentDomain struct {
paymentService *service.PaymentService
refundService *service.RefundService
}服务通信模式
同步通信
go
type OrderClient struct {
client *kitex.Client
}
func (c *OrderClient) GetOrder(ctx context.Context, orderID int64) (*Order, error) {
req := &order.GetOrderReq{OrderId: orderID}
resp, err := c.client.GetOrder(ctx, req)
if err != nil {
return nil, fmt.Errorf("get order failed: %w", err)
}
return resp.Order, nil
}
func NewOrderClient() (*OrderClient, error) {
r, err := consul.NewConsulResolver("127.0.0.1:8500")
if err != nil {
return nil, err
}
client, err := orderservice.NewClient("order-service",
client.WithResolver(r),
client.WithTimeout(3*time.Second),
)
if err != nil {
return nil, err
}
return &OrderClient{client: client}, nil
}异步通信(消息队列)
go
type MessageQueue struct {
producer sarama.SyncProducer
consumer sarama.ConsumerGroup
}
func (mq *MessageQueue) Publish(ctx context.Context, topic string, message []byte) error {
msg := &sarama.ProducerMessage{
Topic: topic,
Value: sarama.ByteEncoder(message),
}
_, _, err := mq.producer.SendMessage(msg)
return err
}
func (mq *MessageQueue) Subscribe(topic string, handler MessageHandler) error {
return mq.consumer.Consume(context.Background(), []string{topic}, &consumerHandler{
handler: handler,
})
}
type OrderEventHandler struct {
orderService *service.OrderService
}
func (h *OrderEventHandler) Handle(ctx context.Context, event *OrderEvent) error {
switch event.Type {
case "payment.completed":
return h.orderService.MarkAsPaid(ctx, event.OrderID)
case "inventory.reserved":
return h.orderService.MarkAsReserved(ctx, event.OrderID)
}
return nil
}API 网关设计
网关架构
go
type APIGateway struct {
userClient *UserClient
orderClient *OrderClient
productClient *ProductClient
}
func (g *APIGateway) GetUserInfo(ctx context.Context, userID int64) (*UserInfo, error) {
return g.userClient.GetUser(ctx, userID)
}
func (g *APIGateway) CreateOrder(ctx context.Context, req *CreateOrderReq) (*Order, error) {
user, err := g.userClient.GetUser(ctx, req.UserID)
if err != nil {
return nil, err
}
product, err := g.productClient.GetProduct(ctx, req.ProductID)
if err != nil {
return nil, err
}
order, err := g.orderClient.CreateOrder(ctx, &CreateOrderReq{
UserID: user.ID,
ProductID: product.ID,
Amount: product.Price,
})
if err != nil {
return nil, err
}
return order, nil
}路由配置
go
func SetupRouter(gateway *APIGateway) *gin.Engine {
r := gin.Default()
r.Use(AuthMiddleware())
r.Use(RateLimitMiddleware())
r.Use(CircuitBreakerMiddleware())
api := r.Group("/api/v1")
{
api.GET("/users/:id", gateway.GetUserInfo)
api.POST("/orders", gateway.CreateOrder)
api.GET("/orders/:id", gateway.GetOrder)
api.GET("/products/:id", gateway.GetProduct)
}
return r
}服务治理
服务发现
go
type ServiceRegistry interface {
Register(ctx context.Context, service *ServiceInfo) error
Deregister(ctx context.Context, serviceID string) error
Discover(ctx context.Context, serviceName string) ([]*ServiceInstance, error)
}
type NacosRegistry struct {
client naming_client.INamingClient
}
func (r *NacosRegistry) Register(ctx context.Context, service *ServiceInfo) error {
instance := &naming_client.Instance{
ServiceName: service.Name,
Ip: service.IP,
Port: service.Port,
Weight: service.Weight,
Metadata: service.Metadata,
}
_, err := r.client.RegisterInstance(instance)
return err
}
func (r *NacosRegistry) Discover(ctx context.Context, serviceName string) ([]*ServiceInstance, error) {
instances, err := r.client.SelectInstances(serviceName, "DEFAULT_GROUP", true)
if err != nil {
return nil, err
}
var result []*ServiceInstance
for _, ins := range instances {
result = append(result, &ServiceInstance{
ID: ins.InstanceId,
Address: ins.Ip,
Port: ins.Port,
Weight: ins.Weight,
})
}
return result, nil
}负载均衡
go
type LoadBalancer interface {
Select(instances []*ServiceInstance) *ServiceInstance
}
type RoundRobinLB struct {
counter uint64
}
func (lb *RoundRobinLB) Select(instances []*ServiceInstance) *ServiceInstance {
if len(instances) == 0 {
return nil
}
idx := atomic.AddUint64(&lb.counter, 1)
return instances[idx%uint64(len(instances))]
}
type WeightedRoundRobinLB struct {
instances []*ServiceInstance
weights []int
counter int
}
func (lb *WeightedRoundRobinLB) Select(instances []*ServiceInstance) *ServiceInstance {
total := 0
for _, ins := range instances {
total += ins.Weight
}
lb.counter = (lb.counter + 1) % total
current := 0
for _, ins := range instances {
current += ins.Weight
if lb.counter < current {
return ins
}
}
return instances[0]
}熔断降级
go
type CircuitBreaker struct {
maxFailures int
timeout time.Duration
state State
failures int
lastFailTime time.Time
}
type State int
const (
StateClosed State = iota
StateOpen
StateHalfOpen
)
func (cb *CircuitBreaker) Call(fn func() error) error {
if cb.state == StateOpen {
if time.Since(cb.lastFailTime) > cb.timeout {
cb.state = StateHalfOpen
} else {
return ErrCircuitBreakerOpen
}
}
err := fn()
if err != nil {
cb.failures++
if cb.failures >= cb.maxFailures {
cb.state = StateOpen
cb.lastFailTime = time.Now()
}
return err
}
cb.failures = 0
cb.state = StateClosed
return nil
}
func CircuitBreakerMiddleware(cb *CircuitBreaker) endpoint.Middleware {
return func(next endpoint.Endpoint) endpoint.Endpoint {
return func(ctx context.Context, req interface{}) (interface{}, error) {
var resp interface{}
err := cb.Call(func() error {
var err error
resp, err = next(ctx, req)
return err
})
return resp, err
}
}
}分布式事务
Saga 模式
go
type SagaStep struct {
Name string
Action func(ctx context.Context) error
Compensate func(ctx context.Context) error
}
type Saga struct {
steps []SagaStep
}
func (s *Saga) Execute(ctx context.Context) error {
var executedSteps []int
for i, step := range s.steps {
if err := step.Action(ctx); err != nil {
for j := len(executedSteps) - 1; j >= 0; j-- {
s.steps[executedSteps[j]].Compensate(ctx)
}
return fmt.Errorf("saga failed at step %s: %w", step.Name, err)
}
executedSteps = append(executedSteps, i)
}
return nil
}
func CreateOrderSaga(orderService *OrderService,
inventoryService *InventoryService,
paymentService *PaymentService) *Saga {
return &Saga{
steps: []SagaStep{
{
Name: "create_order",
Action: func(ctx context.Context) error {
return orderService.CreateOrder(ctx)
},
Compensate: func(ctx context.Context) error {
return orderService.CancelOrder(ctx)
},
},
{
Name: "reserve_inventory",
Action: func(ctx context.Context) error {
return inventoryService.Reserve(ctx)
},
Compensate: func(ctx context.Context) error {
return inventoryService.Release(ctx)
},
},
{
Name: "process_payment",
Action: func(ctx context.Context) error {
return paymentService.Process(ctx)
},
Compensate: func(ctx context.Context) error {
return paymentService.Refund(ctx)
},
},
},
}
}TCC 模式
go
type TCCService interface {
Try(ctx context.Context, req interface{}) error
Confirm(ctx context.Context, req interface{}) error
Cancel(ctx context.Context, req interface{}) error
}
type OrderTCC struct {
orderService *OrderService
}
func (t *OrderTCC) Try(ctx context.Context, req interface{}) error {
orderReq := req.(*CreateOrderReq)
return t.orderService.CreatePending(ctx, orderReq)
}
func (t *OrderTCC) Confirm(ctx context.Context, req interface{}) error {
orderReq := req.(*CreateOrderReq)
return t.orderService.Confirm(ctx, orderReq.OrderID)
}
func (t *OrderTCC) Cancel(ctx context.Context, req interface{}) error {
orderReq := req.(*CreateOrderReq)
return t.orderService.Cancel(ctx, orderReq.OrderID)
}
type TCCCoordinator struct {
services []TCCService
}
func (c *TCCCoordinator) Execute(ctx context.Context, req interface{}) error {
var tried []int
for i, svc := range c.services {
if err := svc.Try(ctx, req); err != nil {
for j := len(tried) - 1; j >= 0; j-- {
c.services[tried[j]].Cancel(ctx, req)
}
return err
}
tried = append(tried, i)
}
for _, svc := range c.services {
if err := svc.Confirm(ctx, req); err != nil {
log.Printf("confirm failed: %v", err)
}
}
return nil
}数据一致性
最终一致性
go
type EventStore struct {
db *sql.DB
}
func (es *EventStore) Append(ctx context.Context, aggregateID string, event interface{}) error {
data, err := json.Marshal(event)
if err != nil {
return err
}
_, err = es.db.ExecContext(ctx, `
INSERT INTO events (aggregate_id, event_type, event_data, created_at)
VALUES (?, ?, ?, ?)
`, aggregateID, reflect.TypeOf(event).Name(), data, time.Now())
return err
}
func (es *EventStore) GetEvents(ctx context.Context, aggregateID string) ([]Event, error) {
rows, err := es.db.QueryContext(ctx, `
SELECT event_type, event_data, created_at
FROM events
WHERE aggregate_id = ?
ORDER BY created_at
`, aggregateID)
if err != nil {
return nil, err
}
defer rows.Close()
var events []Event
for rows.Next() {
var eventType string
var eventData []byte
var createdAt time.Time
if err := rows.Scan(&eventType, &eventData, &createdAt); err != nil {
return nil, err
}
events = append(events, Event{
Type: eventType,
Data: eventData,
CreatedAt: createdAt,
})
}
return events, nil
}CQRS 模式
go
type CommandHandler interface {
Handle(ctx context.Context, cmd interface{}) error
}
type QueryHandler interface {
Handle(ctx context.Context, query interface{}) (interface{}, error)
}
type CreateUserCommand struct {
Name string
Email string
}
type UserCommandHandler struct {
userRepo repository.UserRepository
eventBus *EventBus
}
func (h *UserCommandHandler) Handle(ctx context.Context, cmd interface{}) error {
createCmd := cmd.(*CreateUserCommand)
user := &model.User{
ID: generateID(),
Name: createCmd.Name,
Email: createCmd.Email,
}
if err := h.userRepo.Create(ctx, user); err != nil {
return err
}
h.eventBus.Publish("user.created", user)
return nil
}
type GetUserQuery struct {
ID int64
}
type UserQueryHandler struct {
db *sql.DB
}
func (h *UserQueryHandler) Handle(ctx context.Context, query interface{}) (interface{}, error) {
getQuery := query.(*GetUserQuery)
var user model.User
err := h.db.QueryRowContext(ctx, `
SELECT id, name, email, created_at
FROM users_view
WHERE id = ?
`, getQuery.ID).Scan(&user.ID, &user.Name, &user.Email, &user.CreatedAt)
if err != nil {
return nil, err
}
return &user, nil
}可观测性
分布式追踪
go
import (
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/trace"
)
func TracingMiddleware(tracer trace.Tracer) endpoint.Middleware {
return func(next endpoint.Endpoint) endpoint.Endpoint {
return func(ctx context.Context, req interface{}) (interface{}, error) {
ctx, span := tracer.Start(ctx, "rpc_request")
defer span.End()
resp, err := next(ctx, req)
if err != nil {
span.RecordError(err)
}
return resp, err
}
}
}
func SetupTracing(serviceName string) (trace.Tracer, error) {
exporter, err := jaeger.New(jaeger.WithCollectorEndpoint())
if err != nil {
return nil, err
}
tp := tracesdk.NewTracerProvider(
tracesdk.WithBatcher(exporter),
tracesdk.WithResource(resource.NewWithAttributes(
semconv.SchemaURL,
semconv.ServiceNameKey.String(serviceName),
)),
)
otel.SetTracerProvider(tp)
return tp.Tracer(serviceName), nil
}日志聚合
go
type StructuredLogger struct {
logger *zap.Logger
}
func (l *StructuredLogger) Log(ctx context.Context, level LogLevel, msg string, fields ...Field) {
traceID := trace.SpanFromContext(ctx).SpanContext().TraceID()
zapFields := []zap.Field{
zap.String("trace_id", traceID.String()),
zap.Time("timestamp", time.Now()),
}
for _, f := range fields {
zapFields = append(zapFields, zap.Any(f.Key, f.Value))
}
switch level {
case LevelInfo:
l.logger.Info(msg, zapFields...)
case LevelError:
l.logger.Error(msg, zapFields...)
case LevelWarn:
l.logger.Warn(msg, zapFields...)
}
}
func LoggingMiddleware(logger *StructuredLogger) endpoint.Middleware {
return func(next endpoint.Endpoint) endpoint.Endpoint {
return func(ctx context.Context, req interface{}) (interface{}, error) {
start := time.Now()
resp, err := next(ctx, req)
logger.Log(ctx, LevelInfo, "request completed",
Field{Key: "duration", Value: time.Since(start)},
Field{Key: "error", Value: err},
)
return resp, err
}
}
}配置管理
集中式配置
go
type ConfigCenter interface {
Get(ctx context.Context, key string) (string, error)
Watch(ctx context.Context, key string, callback func(value string))
}
type NacosConfigCenter struct {
client config_client.IConfigClient
}
func (c *NacosConfigCenter) Get(ctx context.Context, key string) (string, error) {
return c.client.GetConfig(vo.ConfigParam{
DataId: key,
Group: "DEFAULT_GROUP",
})
}
func (c *NacosConfigCenter) Watch(ctx context.Context, key string, callback func(value string)) {
c.client.ListenConfig(vo.ConfigParam{
DataId: key,
Group: "DEFAULT_GROUP",
OnChange: func(namespace, group, dataId, data string) {
callback(data)
},
})
}
type DynamicConfig struct {
center ConfigCenter
cache sync.Map
}
func (c *DynamicConfig) GetConfig(ctx context.Context, key string) (string, error) {
if val, ok := c.cache.Load(key); ok {
return val.(string), nil
}
val, err := c.center.Get(ctx, key)
if err != nil {
return "", err
}
c.cache.Store(key, val)
c.center.Watch(ctx, key, func(newValue string) {
c.cache.Store(key, newValue)
})
return val, nil
}小结
本章深入探讨了微服务架构设计的核心要点:
- 设计原则:单一职责、服务自治、独立部署
- 服务拆分:按业务能力、按子域(DDD)拆分策略
- 通信模式:同步 RPC 调用、异步消息队列
- API 网关:统一入口、路由配置、协议转换
- 服务治理:服务发现、负载均衡、熔断降级
- 分布式事务:Saga 模式、TCC 模式
- 数据一致性:最终一致性、CQRS 模式
- 可观测性:分布式追踪、日志聚合、监控告警
- 配置管理:集中式配置、动态更新
在下一章中,我们将通过实际项目案例来应用这些架构设计理念。