项目实战
概述
本章将通过一个完整的电商微服务项目,演示如何使用 CloudWeGo 构建生产级的微服务系统。项目涵盖用户服务、商品服务、订单服务、支付服务等核心模块,包含完整的业务流程和技术实现。
核心内容
项目架构
整体架构设计
┌─────────────────────────────────────────────────────────┐
│ API Gateway │
│ (Gin + Middleware) │
└─────────────────────┬───────────────────────────────────┘
│
┌─────────────┼─────────────┐
│ │ │
▼ ▼ ▼
┌───────────┐ ┌───────────┐ ┌───────────┐
│ User │ │ Product │ │ Order │
│ Service │ │ Service │ │ Service │
└───────────┘ └───────────┘ └───────────┘
│ │ │
└─────────────┼─────────────┘
│
┌─────────────┼─────────────┐
│ │ │
▼ ▼ ▼
┌───────────┐ ┌───────────┐ ┌───────────┐
│ MySQL │ │ Redis │ │ Kafka │
└───────────┘ └───────────┘ └───────────┘IDL 定义
thrift
namespace go user
struct User {
1: i64 id
2: string username
3: string email
4: string phone
5: i32 status
6: string created_at
}
struct CreateUserRequest {
1: string username
2: string password
3: string email
4: string phone
}
struct GetUserRequest {
1: i64 id
}
struct GetUserResponse {
1: User user
}
service UserService {
GetUserResponse GetUser(1: GetUserRequest req)
i64 CreateUser(1: CreateUserRequest req)
}thrift
namespace go product
struct Product {
1: i64 id
2: string name
3: string description
4: double price
5: i32 stock
6: i32 status
7: string created_at
}
struct GetProductRequest {
1: i64 id
}
struct GetProductResponse {
1: Product product
}
struct ListProductsRequest {
1: i32 page
2: i32 page_size
3: string keyword
}
struct ListProductsResponse {
1: list<Product> products
2: i64 total
}
service ProductService {
GetProductResponse GetProduct(1: GetProductRequest req)
ListProductsResponse ListProducts(1: ListProductsRequest req)
bool ReserveStock(1: i64 product_id, 2: i32 quantity)
bool ReleaseStock(1: i64 product_id, 2: i32 quantity)
}thrift
namespace go order
struct Order {
1: i64 id
2: i64 user_id
3: i64 product_id
4: i32 quantity
5: double amount
6: i32 status
7: string created_at
}
struct CreateOrderRequest {
1: i64 user_id
2: i64 product_id
3: i32 quantity
}
struct CreateOrderResponse {
1: Order order
}
struct GetOrderRequest {
1: i64 id
}
struct GetOrderResponse {
1: Order order
}
service OrderService {
CreateOrderResponse CreateOrder(1: CreateOrderRequest req)
GetOrderResponse GetOrder(1: GetOrderRequest req)
bool CancelOrder(1: i64 order_id)
bool UpdateOrderStatus(1: i64 order_id, 2: i32 status)
}用户服务实现
项目结构
user-service/
├── cmd/
│ └── server/
│ └── main.go
├── internal/
│ ├── handler/
│ │ └── user_handler.go
│ ├── service/
│ │ └── user_service.go
│ ├── repository/
│ │ └── user_repo.go
│ └── model/
│ └── user.go
├── pkg/
│ ├── config/
│ │ └── config.go
│ ├── middleware/
│ │ └── auth.go
│ └── utils/
│ └── hash.go
├── idl/
│ └── user.thrift
├── kitex_gen/
├── config/
│ └── config.yaml
├── go.mod
└── Makefile服务启动
go
package main
import (
"log"
"net"
"os"
"os/signal"
"syscall"
"github.com/cloudwego/kitex/pkg/klog"
"github.com/cloudwego/kitex/pkg/rpcinfo"
"github.com/cloudwego/kitex/server"
"github.com/kitex-contrib/registry-nacos/registry"
"user-service/internal/handler"
"user-service/internal/repository"
"user-service/internal/service"
"user-service/pkg/config"
"user-service/kitex_gen/user/userservice"
)
func main() {
cfg, err := config.Load("config/config.yaml")
if err != nil {
log.Fatal(err)
}
db, err := repository.NewDB(cfg.Database)
if err != nil {
log.Fatal(err)
}
redis := repository.NewRedis(cfg.Redis)
userRepo := repository.NewUserRepository(db, redis)
userService := service.NewUserService(userRepo)
userHandler := handler.NewUserHandler(userService)
r, err := registry.NewDefaultNacosRegistry(cfg.Registry.Address)
if err != nil {
log.Fatal(err)
}
addr := &net.TCPAddr{
IP: net.ParseIP(cfg.Server.IP),
Port: cfg.Server.Port,
}
svr := userservice.NewServer(
userHandler,
server.WithServiceAddr(addr),
server.WithRegistry(r),
server.WithServerBasicInfo(&rpcinfo.EndpointBasicInfo{
ServiceName: "user-service",
}),
server.WithMiddleware(handler.LoggingMiddleware),
server.WithMiddleware(handler.RecoveryMiddleware),
)
go func() {
klog.Infof("user-service starting on %s:%d", cfg.Server.IP, cfg.Server.Port)
if err := svr.Run(); err != nil {
log.Fatalf("server stopped: %v", err)
}
}()
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
<-sigCh
klog.Info("shutting down server...")
if err := svr.Stop(); err != nil {
klog.Errorf("server stop error: %v", err)
}
klog.Info("server stopped")
}Handler 实现
go
package handler
import (
"context"
"github.com/cloudwego/kitex/pkg/klog"
"user-service/internal/service"
"user-service/kitex_gen/user"
)
type UserHandler struct {
userService *service.UserService
}
func NewUserHandler(userService *service.UserService) *UserHandler {
return &UserHandler{
userService: userService,
}
}
func (h *UserHandler) GetUser(ctx context.Context, req *user.GetUserRequest) (*user.GetUserResponse, error) {
klog.CtxInfof(ctx, "GetUser called: id=%d", req.Id)
userModel, err := h.userService.GetUser(ctx, req.Id)
if err != nil {
klog.CtxErrorf(ctx, "get user failed: %v", err)
return nil, err
}
return &user.GetUserResponse{
User: &user.User{
Id: userModel.ID,
Username: userModel.Username,
Email: userModel.Email,
Phone: userModel.Phone,
Status: userModel.Status,
CreatedAt: userModel.CreatedAt,
},
}, nil
}
func (h *UserHandler) CreateUser(ctx context.Context, req *user.CreateUserRequest) (int64, error) {
klog.CtxInfof(ctx, "CreateUser called: username=%s", req.Username)
userID, err := h.userService.CreateUser(ctx, &service.CreateUserReq{
Username: req.Username,
Password: req.Password,
Email: req.Email,
Phone: req.Phone,
})
if err != nil {
klog.CtxErrorf(ctx, "create user failed: %v", err)
return 0, err
}
return userID, nil
}Service 实现
go
package service
import (
"context"
"errors"
"time"
"user-service/internal/model"
"user-service/internal/repository"
"user-service/pkg/utils"
)
type CreateUserReq struct {
Username string
Password string
Email string
Phone string
}
type UserService struct {
repo *repository.UserRepository
}
func NewUserService(repo *repository.UserRepository) *UserService {
return &UserService{repo: repo}
}
func (s *UserService) GetUser(ctx context.Context, id int64) (*model.User, error) {
cached, err := s.repo.GetFromCache(ctx, id)
if err == nil {
return cached, nil
}
user, err := s.repo.FindByID(ctx, id)
if err != nil {
return nil, err
}
_ = s.repo.SetToCache(ctx, user)
return user, nil
}
func (s *UserService) CreateUser(ctx context.Context, req *CreateUserReq) (int64, error) {
exists, err := s.repo.ExistsByUsername(ctx, req.Username)
if err != nil {
return 0, err
}
if exists {
return 0, errors.New("username already exists")
}
hashedPassword, err := utils.HashPassword(req.Password)
if err != nil {
return 0, err
}
user := &model.User{
Username: req.Username,
Password: hashedPassword,
Email: req.Email,
Phone: req.Phone,
Status: 1,
CreatedAt: time.Now().Format("2006-01-02 15:04:05"),
}
if err := s.repo.Create(ctx, user); err != nil {
return 0, err
}
return user.ID, nil
}Repository 实现
go
package repository
import (
"context"
"encoding/json"
"time"
"github.com/go-redis/redis/v8"
"gorm.io/gorm"
"user-service/internal/model"
)
type UserRepository struct {
db *gorm.DB
redis *redis.Client
}
func NewUserRepository(db *gorm.DB, redis *redis.Client) *UserRepository {
return &UserRepository{
db: db,
redis: redis,
}
}
func (r *UserRepository) FindByID(ctx context.Context, id int64) (*model.User, error) {
var user model.User
err := r.db.WithContext(ctx).First(&user, id).Error
if err != nil {
return nil, err
}
return &user, nil
}
func (r *UserRepository) ExistsByUsername(ctx context.Context, username string) (bool, error) {
var count int64
err := r.db.WithContext(ctx).Model(&model.User{}).
Where("username = ?", username).
Count(&count).Error
return count > 0, err
}
func (r *UserRepository) Create(ctx context.Context, user *model.User) error {
return r.db.WithContext(ctx).Create(user).Error
}
func (r *UserRepository) GetFromCache(ctx context.Context, id int64) (*model.User, error) {
key := "user:" + string(id)
data, err := r.redis.Get(ctx, key).Bytes()
if err != nil {
return nil, err
}
var user model.User
if err := json.Unmarshal(data, &user); err != nil {
return nil, err
}
return &user, nil
}
func (r *UserRepository) SetToCache(ctx context.Context, user *model.User) error {
key := "user:" + string(user.ID)
data, err := json.Marshal(user)
if err != nil {
return err
}
return r.redis.Set(ctx, key, data, 5*time.Minute).Err()
}商品服务实现
商品服务核心逻辑
go
package service
import (
"context"
"errors"
"sync"
"time"
"github.com/cloudwego/kitex/pkg/klog"
"product-service/internal/model"
"product-service/internal/repository"
)
type ProductService struct {
repo *repository.ProductRepository
stockMutex sync.Map
}
func NewProductService(repo *repository.ProductRepository) *ProductService {
return &ProductService{repo: repo}
}
func (s *ProductService) GetProduct(ctx context.Context, id int64) (*model.Product, error) {
return s.repo.FindByID(ctx, id)
}
func (s *ProductService) ListProducts(ctx context.Context, page, pageSize int32, keyword string) ([]*model.Product, int64, error) {
return s.repo.List(ctx, page, pageSize, keyword)
}
func (s *ProductService) ReserveStock(ctx context.Context, productID int64, quantity int32) error {
mutex, _ := s.stockMutex.LoadOrStore(productID, &sync.Mutex{})
mu := mutex.(*sync.Mutex)
mu.Lock()
defer mu.Unlock()
product, err := s.repo.FindByID(ctx, productID)
if err != nil {
return err
}
if product.Stock < quantity {
return errors.New("insufficient stock")
}
product.Stock -= quantity
product.UpdatedAt = time.Now()
if err := s.repo.Update(ctx, product); err != nil {
klog.CtxErrorf(ctx, "update stock failed: %v", err)
return err
}
klog.CtxInfof(ctx, "reserved stock: product_id=%d, quantity=%d, remaining=%d",
productID, quantity, product.Stock)
return nil
}
func (s *ProductService) ReleaseStock(ctx context.Context, productID int64, quantity int32) error {
mutex, _ := s.stockMutex.LoadOrStore(productID, &sync.Mutex{})
mu := mutex.(*sync.Mutex)
mu.Lock()
defer mu.Unlock()
product, err := s.repo.FindByID(ctx, productID)
if err != nil {
return err
}
product.Stock += quantity
product.UpdatedAt = time.Now()
if err := s.repo.Update(ctx, product); err != nil {
klog.CtxErrorf(ctx, "release stock failed: %v", err)
return err
}
klog.CtxInfof(ctx, "released stock: product_id=%d, quantity=%d, remaining=%d",
productID, quantity, product.Stock)
return nil
}订单服务实现
订单服务核心逻辑
go
package service
import (
"context"
"errors"
"fmt"
"time"
"github.com/cloudwego/kitex/pkg/klog"
"order-service/internal/model"
"order-service/internal/repository"
"order-service/pkg/client"
)
const (
OrderStatusPending = 1
OrderStatusPaid = 2
OrderStatusShipped = 3
OrderStatusCompleted = 4
OrderStatusCancelled = 5
)
type OrderService struct {
repo *repository.OrderRepository
userClient *client.UserClient
productClient *client.ProductClient
eventBus *EventBus
}
func NewOrderService(
repo *repository.OrderRepository,
userClient *client.UserClient,
productClient *client.ProductClient,
eventBus *EventBus,
) *OrderService {
return &OrderService{
repo: repo,
userClient: userClient,
productClient: productClient,
eventBus: eventBus,
}
}
func (s *OrderService) CreateOrder(ctx context.Context, req *CreateOrderReq) (*model.Order, error) {
user, err := s.userClient.GetUser(ctx, req.UserID)
if err != nil {
return nil, fmt.Errorf("get user failed: %w", err)
}
if user.Status != 1 {
return nil, errors.New("user is not active")
}
product, err := s.productClient.GetProduct(ctx, req.ProductID)
if err != nil {
return nil, fmt.Errorf("get product failed: %w", err)
}
if product.Stock < req.Quantity {
return nil, errors.New("insufficient stock")
}
if err := s.productClient.ReserveStock(ctx, req.ProductID, req.Quantity); err != nil {
return nil, fmt.Errorf("reserve stock failed: %w", err)
}
order := &model.Order{
ID: generateID(),
UserID: req.UserID,
ProductID: req.ProductID,
Quantity: req.Quantity,
Amount: product.Price * float64(req.Quantity),
Status: OrderStatusPending,
CreatedAt: time.Now(),
}
if err := s.repo.Create(ctx, order); err != nil {
_ = s.productClient.ReleaseStock(ctx, req.ProductID, req.Quantity)
return nil, fmt.Errorf("create order failed: %w", err)
}
s.eventBus.Publish("order.created", order)
klog.CtxInfof(ctx, "order created: id=%d, user_id=%d, product_id=%d, amount=%.2f",
order.ID, order.UserID, order.ProductID, order.Amount)
return order, nil
}
func (s *OrderService) CancelOrder(ctx context.Context, orderID int64) error {
order, err := s.repo.FindByID(ctx, orderID)
if err != nil {
return err
}
if order.Status == OrderStatusCancelled {
return errors.New("order already cancelled")
}
if order.Status >= OrderStatusShipped {
return errors.New("cannot cancel shipped order")
}
if err := s.productClient.ReleaseStock(ctx, order.ProductID, order.Quantity); err != nil {
klog.CtxErrorf(ctx, "release stock failed: %v", err)
}
order.Status = OrderStatusCancelled
order.UpdatedAt = time.Now()
if err := s.repo.Update(ctx, order); err != nil {
return err
}
s.eventBus.Publish("order.cancelled", order)
klog.CtxInfof(ctx, "order cancelled: id=%d", orderID)
return nil
}
func (s *OrderService) UpdateOrderStatus(ctx context.Context, orderID int64, status int32) error {
order, err := s.repo.FindByID(ctx, orderID)
if err != nil {
return err
}
order.Status = status
order.UpdatedAt = time.Now()
if err := s.repo.Update(ctx, order); err != nil {
return err
}
s.eventBus.Publish("order.status_updated", order)
klog.CtxInfof(ctx, "order status updated: id=%d, status=%d", orderID, status)
return nil
}API 网关实现
网关核心代码
go
package main
import (
"context"
"log"
"time"
"github.com/gin-gonic/gin"
"api-gateway/pkg/client"
"api-gateway/pkg/middleware"
)
func main() {
userClient, err := client.NewUserClient("user-service")
if err != nil {
log.Fatal(err)
}
productClient, err := client.NewProductClient("product-service")
if err != nil {
log.Fatal(err)
}
orderClient, err := client.NewOrderClient("order-service")
if err != nil {
log.Fatal(err)
}
gateway := &APIGateway{
userClient: userClient,
productClient: productClient,
orderClient: orderClient,
}
r := gin.Default()
r.Use(middleware.CORS())
r.Use(middleware.RateLimit(1000))
r.Use(middleware.RequestID())
r.Use(middleware.Logging())
api := r.Group("/api/v1")
{
api.POST("/users", gateway.CreateUser)
api.GET("/users/:id", gateway.GetUser)
api.GET("/products", gateway.ListProducts)
api.GET("/products/:id", gateway.GetProduct)
api.POST("/orders", middleware.Auth(), gateway.CreateOrder)
api.GET("/orders/:id", middleware.Auth(), gateway.GetOrder)
api.DELETE("/orders/:id", middleware.Auth(), gateway.CancelOrder)
}
if err := r.Run(":8080"); err != nil {
log.Fatal(err)
}
}
type APIGateway struct {
userClient *client.UserClient
productClient *client.ProductClient
orderClient *client.OrderClient
}
func (g *APIGateway) CreateUser(c *gin.Context) {
var req CreateUserReq
if err := c.ShouldBindJSON(&req); err != nil {
c.JSON(400, gin.H{"error": err.Error()})
return
}
ctx, cancel := context.WithTimeout(c.Request.Context(), 3*time.Second)
defer cancel()
userID, err := g.userClient.CreateUser(ctx, &req)
if err != nil {
c.JSON(500, gin.H{"error": err.Error()})
return
}
c.JSON(200, gin.H{"user_id": userID})
}
func (g *APIGateway) CreateOrder(c *gin.Context) {
var req CreateOrderReq
if err := c.ShouldBindJSON(&req); err != nil {
c.JSON(400, gin.H{"error": err.Error()})
return
}
userID := c.GetInt64("user_id")
ctx, cancel := context.WithTimeout(c.Request.Context(), 5*time.Second)
defer cancel()
order, err := g.orderClient.CreateOrder(ctx, userID, req.ProductID, req.Quantity)
if err != nil {
c.JSON(500, gin.H{"error": err.Error()})
return
}
c.JSON(200, gin.H{
"order_id": order.ID,
"amount": order.Amount,
"status": order.Status,
})
}服务间调用
客户端封装
go
package client
import (
"context"
"time"
"github.com/cloudwego/kitex/client"
"github.com/kitex-contrib/registry-nacos/resolver"
"order-service/kitex_gen/product"
"order-service/kitex_gen/product/productservice"
)
type ProductClient struct {
client productservice.Client
}
func NewProductClient(serviceName string) (*ProductClient, error) {
r, err := resolver.NewDefaultNacosResolver("nacos://127.0.0.1:8848")
if err != nil {
return nil, err
}
c, err := productservice.NewClient(serviceName,
client.WithResolver(r),
client.WithTimeout(3*time.Second),
client.WithMiddleware(RetryMiddleware(3)),
)
if err != nil {
return nil, err
}
return &ProductClient{client: c}, nil
}
func (c *ProductClient) GetProduct(ctx context.Context, id int64) (*product.Product, error) {
resp, err := c.client.GetProduct(ctx, &product.GetProductRequest{Id: id})
if err != nil {
return nil, err
}
return resp.Product, nil
}
func (c *ProductClient) ReserveStock(ctx context.Context, productID int64, quantity int32) error {
_, err := c.client.ReserveStock(ctx, productID, quantity)
return err
}
func (c *ProductClient) ReleaseStock(ctx context.Context, productID int64, quantity int32) error {
_, err := c.client.ReleaseStock(ctx, productID, quantity)
return err
}消息队列集成
Kafka 事件总线
go
package eventbus
import (
"context"
"encoding/json"
"log"
"github.com/IBM/sarama"
)
type EventBus struct {
producer sarama.SyncProducer
consumer sarama.ConsumerGroup
handlers map[string][]EventHandler
}
type EventHandler func(ctx context.Context, event *Event) error
type Event struct {
Type string
Payload interface{}
Timestamp int64
}
func NewEventBus(brokers []string) (*EventBus, error) {
config := sarama.NewConfig()
config.Producer.Return.Successes = true
config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRoundRobin
config.Consumer.Offsets.Initial = sarama.OffsetNewest
producer, err := sarama.NewSyncProducer(brokers, config)
if err != nil {
return nil, err
}
consumer, err := sarama.NewConsumerGroup(brokers, "order-service", config)
if err != nil {
return nil, err
}
return &EventBus{
producer: producer,
consumer: consumer,
handlers: make(map[string][]EventHandler),
}, nil
}
func (b *EventBus) Publish(topic string, payload interface{}) error {
event := &Event{
Type: topic,
Payload: payload,
Timestamp: time.Now().Unix(),
}
data, err := json.Marshal(event)
if err != nil {
return err
}
msg := &sarama.ProducerMessage{
Topic: topic,
Value: sarama.ByteEncoder(data),
}
_, _, err = b.producer.SendMessage(msg)
return err
}
func (b *EventBus) Subscribe(topic string, handler EventHandler) {
b.handlers[topic] = append(b.handlers[topic], handler)
}
func (b *EventBus) Start(ctx context.Context) error {
topics := make([]string, 0, len(b.handlers))
for topic := range b.handlers {
topics = append(topics, topic)
}
handler := &consumerGroupHandler{
handlers: b.handlers,
}
go func() {
for {
if err := b.consumer.Consume(ctx, topics, handler); err != nil {
log.Printf("consumer error: %v", err)
}
if ctx.Err() != nil {
return
}
}
}()
return nil
}
type consumerGroupHandler struct {
handlers map[string][]EventHandler
}
func (h *consumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
for msg := range claim.Messages() {
var event Event
if err := json.Unmarshal(msg.Value, &event); err != nil {
log.Printf("unmarshal event failed: %v", err)
continue
}
handlers := h.handlers[msg.Topic]
for _, handler := range handlers {
if err := handler(session.Context(), &event); err != nil {
log.Printf("handle event failed: %v", err)
}
}
session.MarkMessage(msg, "")
}
return nil
}部署配置
Docker Compose
yaml
version: '3.8'
services:
nacos:
image: nacos/nacos-server:latest
environment:
- MODE=standalone
ports:
- "8848:8848"
mysql:
image: mysql:8.0
environment:
- MYSQL_ROOT_PASSWORD=root123
- MYSQL_DATABASE=ecommerce
ports:
- "3306:3306"
redis:
image: redis:7-alpine
ports:
- "6379:6379"
kafka:
image: bitnami/kafka:latest
environment:
- KAFKA_CFG_NODE_ID=0
- KAFKA_CFG_PROCESS_ROLES=controller,broker
- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092
- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka:9093
- KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
ports:
- "9092:9092"
user-service:
build: ./user-service
ports:
- "8881:8881"
depends_on:
- nacos
- mysql
- redis
environment:
- NACOS_ADDR=nacos:8848
- MYSQL_ADDR=mysql:3306
- REDIS_ADDR=redis:6379
product-service:
build: ./product-service
ports:
- "8882:8882"
depends_on:
- nacos
- mysql
- redis
environment:
- NACOS_ADDR=nacos:8848
- MYSQL_ADDR=mysql:3306
- REDIS_ADDR=redis:6379
order-service:
build: ./order-service
ports:
- "8883:8883"
depends_on:
- nacos
- mysql
- redis
- kafka
environment:
- NACOS_ADDR=nacos:8848
- MYSQL_ADDR=mysql:3306
- REDIS_ADDR=redis:6379
- KAFKA_ADDR=kafka:9092
api-gateway:
build: ./api-gateway
ports:
- "8080:8080"
depends_on:
- nacos
- user-service
- product-service
- order-service
environment:
- NACOS_ADDR=nacos:8848小结
本章通过一个完整的电商微服务项目,演示了:
- 项目架构:API 网关 + 微服务 + 基础设施
- IDL 定义:使用 Thrift 定义服务接口
- 用户服务:完整的用户管理功能
- 商品服务:商品管理和库存控制
- 订单服务:订单创建和状态管理
- API 网关:统一入口和路由分发
- 服务调用:客户端封装和重试机制
- 消息队列:Kafka 事件总线实现
- 部署配置:Docker Compose 完整配置
在下一章中,我们将总结 CloudWeGo 的学习路线和最佳实践。