Skip to content

项目实战

概述

本章将通过一个完整的电商微服务项目,演示如何使用 CloudWeGo 构建生产级的微服务系统。项目涵盖用户服务、商品服务、订单服务、支付服务等核心模块,包含完整的业务流程和技术实现。

核心内容

项目架构

整体架构设计

┌─────────────────────────────────────────────────────────┐
│                      API Gateway                         │
│                   (Gin + Middleware)                     │
└─────────────────────┬───────────────────────────────────┘

        ┌─────────────┼─────────────┐
        │             │             │
        ▼             ▼             ▼
┌───────────┐ ┌───────────┐ ┌───────────┐
│   User    │ │  Product  │ │   Order   │
│  Service  │ │  Service  │ │  Service  │
└───────────┘ └───────────┘ └───────────┘
        │             │             │
        └─────────────┼─────────────┘

        ┌─────────────┼─────────────┐
        │             │             │
        ▼             ▼             ▼
┌───────────┐ ┌───────────┐ ┌───────────┐
│   MySQL   │ │   Redis   │ │  Kafka    │
└───────────┘ └───────────┘ └───────────┘

IDL 定义

thrift
namespace go user

struct User {
    1: i64 id
    2: string username
    3: string email
    4: string phone
    5: i32 status
    6: string created_at
}

struct CreateUserRequest {
    1: string username
    2: string password
    3: string email
    4: string phone
}

struct GetUserRequest {
    1: i64 id
}

struct GetUserResponse {
    1: User user
}

service UserService {
    GetUserResponse GetUser(1: GetUserRequest req)
    i64 CreateUser(1: CreateUserRequest req)
}
thrift
namespace go product

struct Product {
    1: i64 id
    2: string name
    3: string description
    4: double price
    5: i32 stock
    6: i32 status
    7: string created_at
}

struct GetProductRequest {
    1: i64 id
}

struct GetProductResponse {
    1: Product product
}

struct ListProductsRequest {
    1: i32 page
    2: i32 page_size
    3: string keyword
}

struct ListProductsResponse {
    1: list<Product> products
    2: i64 total
}

service ProductService {
    GetProductResponse GetProduct(1: GetProductRequest req)
    ListProductsResponse ListProducts(1: ListProductsRequest req)
    bool ReserveStock(1: i64 product_id, 2: i32 quantity)
    bool ReleaseStock(1: i64 product_id, 2: i32 quantity)
}
thrift
namespace go order

struct Order {
    1: i64 id
    2: i64 user_id
    3: i64 product_id
    4: i32 quantity
    5: double amount
    6: i32 status
    7: string created_at
}

struct CreateOrderRequest {
    1: i64 user_id
    2: i64 product_id
    3: i32 quantity
}

struct CreateOrderResponse {
    1: Order order
}

struct GetOrderRequest {
    1: i64 id
}

struct GetOrderResponse {
    1: Order order
}

service OrderService {
    CreateOrderResponse CreateOrder(1: CreateOrderRequest req)
    GetOrderResponse GetOrder(1: GetOrderRequest req)
    bool CancelOrder(1: i64 order_id)
    bool UpdateOrderStatus(1: i64 order_id, 2: i32 status)
}

用户服务实现

项目结构

user-service/
├── cmd/
│   └── server/
│       └── main.go
├── internal/
│   ├── handler/
│   │   └── user_handler.go
│   ├── service/
│   │   └── user_service.go
│   ├── repository/
│   │   └── user_repo.go
│   └── model/
│       └── user.go
├── pkg/
│   ├── config/
│   │   └── config.go
│   ├── middleware/
│   │   └── auth.go
│   └── utils/
│       └── hash.go
├── idl/
│   └── user.thrift
├── kitex_gen/
├── config/
│   └── config.yaml
├── go.mod
└── Makefile

服务启动

go
package main

import (
    "log"
    "net"
    "os"
    "os/signal"
    "syscall"
    
    "github.com/cloudwego/kitex/pkg/klog"
    "github.com/cloudwego/kitex/pkg/rpcinfo"
    "github.com/cloudwego/kitex/server"
    "github.com/kitex-contrib/registry-nacos/registry"
    
    "user-service/internal/handler"
    "user-service/internal/repository"
    "user-service/internal/service"
    "user-service/pkg/config"
    "user-service/kitex_gen/user/userservice"
)

func main() {
    cfg, err := config.Load("config/config.yaml")
    if err != nil {
        log.Fatal(err)
    }
    
    db, err := repository.NewDB(cfg.Database)
    if err != nil {
        log.Fatal(err)
    }
    
    redis := repository.NewRedis(cfg.Redis)
    
    userRepo := repository.NewUserRepository(db, redis)
    userService := service.NewUserService(userRepo)
    userHandler := handler.NewUserHandler(userService)
    
    r, err := registry.NewDefaultNacosRegistry(cfg.Registry.Address)
    if err != nil {
        log.Fatal(err)
    }
    
    addr := &net.TCPAddr{
        IP:   net.ParseIP(cfg.Server.IP),
        Port: cfg.Server.Port,
    }
    
    svr := userservice.NewServer(
        userHandler,
        server.WithServiceAddr(addr),
        server.WithRegistry(r),
        server.WithServerBasicInfo(&rpcinfo.EndpointBasicInfo{
            ServiceName: "user-service",
        }),
        server.WithMiddleware(handler.LoggingMiddleware),
        server.WithMiddleware(handler.RecoveryMiddleware),
    )
    
    go func() {
        klog.Infof("user-service starting on %s:%d", cfg.Server.IP, cfg.Server.Port)
        if err := svr.Run(); err != nil {
            log.Fatalf("server stopped: %v", err)
        }
    }()
    
    sigCh := make(chan os.Signal, 1)
    signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
    <-sigCh
    
    klog.Info("shutting down server...")
    if err := svr.Stop(); err != nil {
        klog.Errorf("server stop error: %v", err)
    }
    klog.Info("server stopped")
}

Handler 实现

go
package handler

import (
    "context"
    
    "github.com/cloudwego/kitex/pkg/klog"
    
    "user-service/internal/service"
    "user-service/kitex_gen/user"
)

type UserHandler struct {
    userService *service.UserService
}

func NewUserHandler(userService *service.UserService) *UserHandler {
    return &UserHandler{
        userService: userService,
    }
}

func (h *UserHandler) GetUser(ctx context.Context, req *user.GetUserRequest) (*user.GetUserResponse, error) {
    klog.CtxInfof(ctx, "GetUser called: id=%d", req.Id)
    
    userModel, err := h.userService.GetUser(ctx, req.Id)
    if err != nil {
        klog.CtxErrorf(ctx, "get user failed: %v", err)
        return nil, err
    }
    
    return &user.GetUserResponse{
        User: &user.User{
            Id:        userModel.ID,
            Username:  userModel.Username,
            Email:     userModel.Email,
            Phone:     userModel.Phone,
            Status:    userModel.Status,
            CreatedAt: userModel.CreatedAt,
        },
    }, nil
}

func (h *UserHandler) CreateUser(ctx context.Context, req *user.CreateUserRequest) (int64, error) {
    klog.CtxInfof(ctx, "CreateUser called: username=%s", req.Username)
    
    userID, err := h.userService.CreateUser(ctx, &service.CreateUserReq{
        Username: req.Username,
        Password: req.Password,
        Email:    req.Email,
        Phone:    req.Phone,
    })
    if err != nil {
        klog.CtxErrorf(ctx, "create user failed: %v", err)
        return 0, err
    }
    
    return userID, nil
}

Service 实现

go
package service

import (
    "context"
    "errors"
    "time"
    
    "user-service/internal/model"
    "user-service/internal/repository"
    "user-service/pkg/utils"
)

type CreateUserReq struct {
    Username string
    Password string
    Email    string
    Phone    string
}

type UserService struct {
    repo *repository.UserRepository
}

func NewUserService(repo *repository.UserRepository) *UserService {
    return &UserService{repo: repo}
}

func (s *UserService) GetUser(ctx context.Context, id int64) (*model.User, error) {
    cached, err := s.repo.GetFromCache(ctx, id)
    if err == nil {
        return cached, nil
    }
    
    user, err := s.repo.FindByID(ctx, id)
    if err != nil {
        return nil, err
    }
    
    _ = s.repo.SetToCache(ctx, user)
    
    return user, nil
}

func (s *UserService) CreateUser(ctx context.Context, req *CreateUserReq) (int64, error) {
    exists, err := s.repo.ExistsByUsername(ctx, req.Username)
    if err != nil {
        return 0, err
    }
    if exists {
        return 0, errors.New("username already exists")
    }
    
    hashedPassword, err := utils.HashPassword(req.Password)
    if err != nil {
        return 0, err
    }
    
    user := &model.User{
        Username:  req.Username,
        Password:  hashedPassword,
        Email:     req.Email,
        Phone:     req.Phone,
        Status:    1,
        CreatedAt: time.Now().Format("2006-01-02 15:04:05"),
    }
    
    if err := s.repo.Create(ctx, user); err != nil {
        return 0, err
    }
    
    return user.ID, nil
}

Repository 实现

go
package repository

import (
    "context"
    "encoding/json"
    "time"
    
    "github.com/go-redis/redis/v8"
    "gorm.io/gorm"
    
    "user-service/internal/model"
)

type UserRepository struct {
    db    *gorm.DB
    redis *redis.Client
}

func NewUserRepository(db *gorm.DB, redis *redis.Client) *UserRepository {
    return &UserRepository{
        db:    db,
        redis: redis,
    }
}

func (r *UserRepository) FindByID(ctx context.Context, id int64) (*model.User, error) {
    var user model.User
    err := r.db.WithContext(ctx).First(&user, id).Error
    if err != nil {
        return nil, err
    }
    return &user, nil
}

func (r *UserRepository) ExistsByUsername(ctx context.Context, username string) (bool, error) {
    var count int64
    err := r.db.WithContext(ctx).Model(&model.User{}).
        Where("username = ?", username).
        Count(&count).Error
    return count > 0, err
}

func (r *UserRepository) Create(ctx context.Context, user *model.User) error {
    return r.db.WithContext(ctx).Create(user).Error
}

func (r *UserRepository) GetFromCache(ctx context.Context, id int64) (*model.User, error) {
    key := "user:" + string(id)
    data, err := r.redis.Get(ctx, key).Bytes()
    if err != nil {
        return nil, err
    }
    
    var user model.User
    if err := json.Unmarshal(data, &user); err != nil {
        return nil, err
    }
    
    return &user, nil
}

func (r *UserRepository) SetToCache(ctx context.Context, user *model.User) error {
    key := "user:" + string(user.ID)
    data, err := json.Marshal(user)
    if err != nil {
        return err
    }
    
    return r.redis.Set(ctx, key, data, 5*time.Minute).Err()
}

商品服务实现

商品服务核心逻辑

go
package service

import (
    "context"
    "errors"
    "sync"
    "time"
    
    "github.com/cloudwego/kitex/pkg/klog"
    
    "product-service/internal/model"
    "product-service/internal/repository"
)

type ProductService struct {
    repo       *repository.ProductRepository
    stockMutex sync.Map
}

func NewProductService(repo *repository.ProductRepository) *ProductService {
    return &ProductService{repo: repo}
}

func (s *ProductService) GetProduct(ctx context.Context, id int64) (*model.Product, error) {
    return s.repo.FindByID(ctx, id)
}

func (s *ProductService) ListProducts(ctx context.Context, page, pageSize int32, keyword string) ([]*model.Product, int64, error) {
    return s.repo.List(ctx, page, pageSize, keyword)
}

func (s *ProductService) ReserveStock(ctx context.Context, productID int64, quantity int32) error {
    mutex, _ := s.stockMutex.LoadOrStore(productID, &sync.Mutex{})
    mu := mutex.(*sync.Mutex)
    mu.Lock()
    defer mu.Unlock()
    
    product, err := s.repo.FindByID(ctx, productID)
    if err != nil {
        return err
    }
    
    if product.Stock < quantity {
        return errors.New("insufficient stock")
    }
    
    product.Stock -= quantity
    product.UpdatedAt = time.Now()
    
    if err := s.repo.Update(ctx, product); err != nil {
        klog.CtxErrorf(ctx, "update stock failed: %v", err)
        return err
    }
    
    klog.CtxInfof(ctx, "reserved stock: product_id=%d, quantity=%d, remaining=%d",
        productID, quantity, product.Stock)
    
    return nil
}

func (s *ProductService) ReleaseStock(ctx context.Context, productID int64, quantity int32) error {
    mutex, _ := s.stockMutex.LoadOrStore(productID, &sync.Mutex{})
    mu := mutex.(*sync.Mutex)
    mu.Lock()
    defer mu.Unlock()
    
    product, err := s.repo.FindByID(ctx, productID)
    if err != nil {
        return err
    }
    
    product.Stock += quantity
    product.UpdatedAt = time.Now()
    
    if err := s.repo.Update(ctx, product); err != nil {
        klog.CtxErrorf(ctx, "release stock failed: %v", err)
        return err
    }
    
    klog.CtxInfof(ctx, "released stock: product_id=%d, quantity=%d, remaining=%d",
        productID, quantity, product.Stock)
    
    return nil
}

订单服务实现

订单服务核心逻辑

go
package service

import (
    "context"
    "errors"
    "fmt"
    "time"
    
    "github.com/cloudwego/kitex/pkg/klog"
    
    "order-service/internal/model"
    "order-service/internal/repository"
    "order-service/pkg/client"
)

const (
    OrderStatusPending   = 1
    OrderStatusPaid      = 2
    OrderStatusShipped   = 3
    OrderStatusCompleted = 4
    OrderStatusCancelled = 5
)

type OrderService struct {
    repo          *repository.OrderRepository
    userClient    *client.UserClient
    productClient *client.ProductClient
    eventBus      *EventBus
}

func NewOrderService(
    repo *repository.OrderRepository,
    userClient *client.UserClient,
    productClient *client.ProductClient,
    eventBus *EventBus,
) *OrderService {
    return &OrderService{
        repo:          repo,
        userClient:    userClient,
        productClient: productClient,
        eventBus:      eventBus,
    }
}

func (s *OrderService) CreateOrder(ctx context.Context, req *CreateOrderReq) (*model.Order, error) {
    user, err := s.userClient.GetUser(ctx, req.UserID)
    if err != nil {
        return nil, fmt.Errorf("get user failed: %w", err)
    }
    
    if user.Status != 1 {
        return nil, errors.New("user is not active")
    }
    
    product, err := s.productClient.GetProduct(ctx, req.ProductID)
    if err != nil {
        return nil, fmt.Errorf("get product failed: %w", err)
    }
    
    if product.Stock < req.Quantity {
        return nil, errors.New("insufficient stock")
    }
    
    if err := s.productClient.ReserveStock(ctx, req.ProductID, req.Quantity); err != nil {
        return nil, fmt.Errorf("reserve stock failed: %w", err)
    }
    
    order := &model.Order{
        ID:        generateID(),
        UserID:    req.UserID,
        ProductID: req.ProductID,
        Quantity:  req.Quantity,
        Amount:    product.Price * float64(req.Quantity),
        Status:    OrderStatusPending,
        CreatedAt: time.Now(),
    }
    
    if err := s.repo.Create(ctx, order); err != nil {
        _ = s.productClient.ReleaseStock(ctx, req.ProductID, req.Quantity)
        return nil, fmt.Errorf("create order failed: %w", err)
    }
    
    s.eventBus.Publish("order.created", order)
    
    klog.CtxInfof(ctx, "order created: id=%d, user_id=%d, product_id=%d, amount=%.2f",
        order.ID, order.UserID, order.ProductID, order.Amount)
    
    return order, nil
}

func (s *OrderService) CancelOrder(ctx context.Context, orderID int64) error {
    order, err := s.repo.FindByID(ctx, orderID)
    if err != nil {
        return err
    }
    
    if order.Status == OrderStatusCancelled {
        return errors.New("order already cancelled")
    }
    
    if order.Status >= OrderStatusShipped {
        return errors.New("cannot cancel shipped order")
    }
    
    if err := s.productClient.ReleaseStock(ctx, order.ProductID, order.Quantity); err != nil {
        klog.CtxErrorf(ctx, "release stock failed: %v", err)
    }
    
    order.Status = OrderStatusCancelled
    order.UpdatedAt = time.Now()
    
    if err := s.repo.Update(ctx, order); err != nil {
        return err
    }
    
    s.eventBus.Publish("order.cancelled", order)
    
    klog.CtxInfof(ctx, "order cancelled: id=%d", orderID)
    
    return nil
}

func (s *OrderService) UpdateOrderStatus(ctx context.Context, orderID int64, status int32) error {
    order, err := s.repo.FindByID(ctx, orderID)
    if err != nil {
        return err
    }
    
    order.Status = status
    order.UpdatedAt = time.Now()
    
    if err := s.repo.Update(ctx, order); err != nil {
        return err
    }
    
    s.eventBus.Publish("order.status_updated", order)
    
    klog.CtxInfof(ctx, "order status updated: id=%d, status=%d", orderID, status)
    
    return nil
}

API 网关实现

网关核心代码

go
package main

import (
    "context"
    "log"
    "time"
    
    "github.com/gin-gonic/gin"
    
    "api-gateway/pkg/client"
    "api-gateway/pkg/middleware"
)

func main() {
    userClient, err := client.NewUserClient("user-service")
    if err != nil {
        log.Fatal(err)
    }
    
    productClient, err := client.NewProductClient("product-service")
    if err != nil {
        log.Fatal(err)
    }
    
    orderClient, err := client.NewOrderClient("order-service")
    if err != nil {
        log.Fatal(err)
    }
    
    gateway := &APIGateway{
        userClient:    userClient,
        productClient: productClient,
        orderClient:   orderClient,
    }
    
    r := gin.Default()
    
    r.Use(middleware.CORS())
    r.Use(middleware.RateLimit(1000))
    r.Use(middleware.RequestID())
    r.Use(middleware.Logging())
    
    api := r.Group("/api/v1")
    {
        api.POST("/users", gateway.CreateUser)
        api.GET("/users/:id", gateway.GetUser)
        
        api.GET("/products", gateway.ListProducts)
        api.GET("/products/:id", gateway.GetProduct)
        
        api.POST("/orders", middleware.Auth(), gateway.CreateOrder)
        api.GET("/orders/:id", middleware.Auth(), gateway.GetOrder)
        api.DELETE("/orders/:id", middleware.Auth(), gateway.CancelOrder)
    }
    
    if err := r.Run(":8080"); err != nil {
        log.Fatal(err)
    }
}

type APIGateway struct {
    userClient    *client.UserClient
    productClient *client.ProductClient
    orderClient   *client.OrderClient
}

func (g *APIGateway) CreateUser(c *gin.Context) {
    var req CreateUserReq
    if err := c.ShouldBindJSON(&req); err != nil {
        c.JSON(400, gin.H{"error": err.Error()})
        return
    }
    
    ctx, cancel := context.WithTimeout(c.Request.Context(), 3*time.Second)
    defer cancel()
    
    userID, err := g.userClient.CreateUser(ctx, &req)
    if err != nil {
        c.JSON(500, gin.H{"error": err.Error()})
        return
    }
    
    c.JSON(200, gin.H{"user_id": userID})
}

func (g *APIGateway) CreateOrder(c *gin.Context) {
    var req CreateOrderReq
    if err := c.ShouldBindJSON(&req); err != nil {
        c.JSON(400, gin.H{"error": err.Error()})
        return
    }
    
    userID := c.GetInt64("user_id")
    
    ctx, cancel := context.WithTimeout(c.Request.Context(), 5*time.Second)
    defer cancel()
    
    order, err := g.orderClient.CreateOrder(ctx, userID, req.ProductID, req.Quantity)
    if err != nil {
        c.JSON(500, gin.H{"error": err.Error()})
        return
    }
    
    c.JSON(200, gin.H{
        "order_id": order.ID,
        "amount":   order.Amount,
        "status":   order.Status,
    })
}

服务间调用

客户端封装

go
package client

import (
    "context"
    "time"
    
    "github.com/cloudwego/kitex/client"
    "github.com/kitex-contrib/registry-nacos/resolver"
    
    "order-service/kitex_gen/product"
    "order-service/kitex_gen/product/productservice"
)

type ProductClient struct {
    client productservice.Client
}

func NewProductClient(serviceName string) (*ProductClient, error) {
    r, err := resolver.NewDefaultNacosResolver("nacos://127.0.0.1:8848")
    if err != nil {
        return nil, err
    }
    
    c, err := productservice.NewClient(serviceName,
        client.WithResolver(r),
        client.WithTimeout(3*time.Second),
        client.WithMiddleware(RetryMiddleware(3)),
    )
    if err != nil {
        return nil, err
    }
    
    return &ProductClient{client: c}, nil
}

func (c *ProductClient) GetProduct(ctx context.Context, id int64) (*product.Product, error) {
    resp, err := c.client.GetProduct(ctx, &product.GetProductRequest{Id: id})
    if err != nil {
        return nil, err
    }
    return resp.Product, nil
}

func (c *ProductClient) ReserveStock(ctx context.Context, productID int64, quantity int32) error {
    _, err := c.client.ReserveStock(ctx, productID, quantity)
    return err
}

func (c *ProductClient) ReleaseStock(ctx context.Context, productID int64, quantity int32) error {
    _, err := c.client.ReleaseStock(ctx, productID, quantity)
    return err
}

消息队列集成

Kafka 事件总线

go
package eventbus

import (
    "context"
    "encoding/json"
    "log"
    
    "github.com/IBM/sarama"
)

type EventBus struct {
    producer sarama.SyncProducer
    consumer sarama.ConsumerGroup
    handlers map[string][]EventHandler
}

type EventHandler func(ctx context.Context, event *Event) error

type Event struct {
    Type      string
    Payload   interface{}
    Timestamp int64
}

func NewEventBus(brokers []string) (*EventBus, error) {
    config := sarama.NewConfig()
    config.Producer.Return.Successes = true
    config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRoundRobin
    config.Consumer.Offsets.Initial = sarama.OffsetNewest
    
    producer, err := sarama.NewSyncProducer(brokers, config)
    if err != nil {
        return nil, err
    }
    
    consumer, err := sarama.NewConsumerGroup(brokers, "order-service", config)
    if err != nil {
        return nil, err
    }
    
    return &EventBus{
        producer: producer,
        consumer: consumer,
        handlers: make(map[string][]EventHandler),
    }, nil
}

func (b *EventBus) Publish(topic string, payload interface{}) error {
    event := &Event{
        Type:      topic,
        Payload:   payload,
        Timestamp: time.Now().Unix(),
    }
    
    data, err := json.Marshal(event)
    if err != nil {
        return err
    }
    
    msg := &sarama.ProducerMessage{
        Topic: topic,
        Value: sarama.ByteEncoder(data),
    }
    
    _, _, err = b.producer.SendMessage(msg)
    return err
}

func (b *EventBus) Subscribe(topic string, handler EventHandler) {
    b.handlers[topic] = append(b.handlers[topic], handler)
}

func (b *EventBus) Start(ctx context.Context) error {
    topics := make([]string, 0, len(b.handlers))
    for topic := range b.handlers {
        topics = append(topics, topic)
    }
    
    handler := &consumerGroupHandler{
        handlers: b.handlers,
    }
    
    go func() {
        for {
            if err := b.consumer.Consume(ctx, topics, handler); err != nil {
                log.Printf("consumer error: %v", err)
            }
            
            if ctx.Err() != nil {
                return
            }
        }
    }()
    
    return nil
}

type consumerGroupHandler struct {
    handlers map[string][]EventHandler
}

func (h *consumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
    for msg := range claim.Messages() {
        var event Event
        if err := json.Unmarshal(msg.Value, &event); err != nil {
            log.Printf("unmarshal event failed: %v", err)
            continue
        }
        
        handlers := h.handlers[msg.Topic]
        for _, handler := range handlers {
            if err := handler(session.Context(), &event); err != nil {
                log.Printf("handle event failed: %v", err)
            }
        }
        
        session.MarkMessage(msg, "")
    }
    return nil
}

部署配置

Docker Compose

yaml
version: '3.8'

services:
  nacos:
    image: nacos/nacos-server:latest
    environment:
      - MODE=standalone
    ports:
      - "8848:8848"
  
  mysql:
    image: mysql:8.0
    environment:
      - MYSQL_ROOT_PASSWORD=root123
      - MYSQL_DATABASE=ecommerce
    ports:
      - "3306:3306"
  
  redis:
    image: redis:7-alpine
    ports:
      - "6379:6379"
  
  kafka:
    image: bitnami/kafka:latest
    environment:
      - KAFKA_CFG_NODE_ID=0
      - KAFKA_CFG_PROCESS_ROLES=controller,broker
      - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092
      - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092
      - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka:9093
      - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
    ports:
      - "9092:9092"
  
  user-service:
    build: ./user-service
    ports:
      - "8881:8881"
    depends_on:
      - nacos
      - mysql
      - redis
    environment:
      - NACOS_ADDR=nacos:8848
      - MYSQL_ADDR=mysql:3306
      - REDIS_ADDR=redis:6379
  
  product-service:
    build: ./product-service
    ports:
      - "8882:8882"
    depends_on:
      - nacos
      - mysql
      - redis
    environment:
      - NACOS_ADDR=nacos:8848
      - MYSQL_ADDR=mysql:3306
      - REDIS_ADDR=redis:6379
  
  order-service:
    build: ./order-service
    ports:
      - "8883:8883"
    depends_on:
      - nacos
      - mysql
      - redis
      - kafka
    environment:
      - NACOS_ADDR=nacos:8848
      - MYSQL_ADDR=mysql:3306
      - REDIS_ADDR=redis:6379
      - KAFKA_ADDR=kafka:9092
  
  api-gateway:
    build: ./api-gateway
    ports:
      - "8080:8080"
    depends_on:
      - nacos
      - user-service
      - product-service
      - order-service
    environment:
      - NACOS_ADDR=nacos:8848

小结

本章通过一个完整的电商微服务项目,演示了:

  1. 项目架构:API 网关 + 微服务 + 基础设施
  2. IDL 定义:使用 Thrift 定义服务接口
  3. 用户服务:完整的用户管理功能
  4. 商品服务:商品管理和库存控制
  5. 订单服务:订单创建和状态管理
  6. API 网关:统一入口和路由分发
  7. 服务调用:客户端封装和重试机制
  8. 消息队列:Kafka 事件总线实现
  9. 部署配置:Docker Compose 完整配置

在下一章中,我们将总结 CloudWeGo 的学习路线和最佳实践。