Skip to content

Spring Cloud 分布式事务

1. 分布式事务概述

1.1 什么是分布式事务

分布式事务是指事务参与者、资源服务器、事务管理器分布在不同的节点上,需要保证数据一致性的事务。

1.2 分布式事务解决方案

方案说明适用场景
2PC/XA两阶段提交强一致性要求
TCC补偿事务业务可控场景
Saga长事务复杂业务流程
本地消息表最终一致性简单场景
Seata阿里开源框架通用场景

2. Seata

2.1 Seata 架构

Seata 核心组件:

├── TC (Transaction Coordinator) - 事务协调者
├── TM (Transaction Manager) - 事务管理器
└── RM (Resource Manager) - 资源管理器

2.2 添加依赖

xml
<dependency>
    <groupId>io.seata</groupId>
    <artifactId>seata-spring-boot-starter</artifactId>
    <version>2.0.0</version>
</dependency>
<dependency>
    <groupId>com.alibaba.cloud</groupId>
    <artifactId>spring-cloud-starter-alibaba-seata</artifactId>
    <version>2023.0.1.0</version>
</dependency>

2.3 Seata 配置

yaml
seata:
  enabled: true
  application-id: order-service
  tx-service-group: my_tx_group
  service:
    vgroup-mapping:
      my_tx_group: default
    grouplist:
      default: 127.0.0.1:8091
  config:
    type: nacos
    nacos:
      server-addr: localhost:8848
      namespace: seata
      group: SEATA_GROUP
  registry:
    type: nacos
    nacos:
      server-addr: localhost:8848
      namespace: seata
      group: SEATA_GROUP

3. AT 模式

3.1 AT 模式原理

AT 模式流程:

1. 解析 SQL,记录数据快照
2. 执行业务 SQL
3. 记录后镜像
4. 注册分支事务
5. 提交本地事务
6. 全局提交/回滚

3.2 数据库配置

sql
-- 每个数据库创建 undo_log 表
CREATE TABLE `undo_log` (
  `id` bigint NOT NULL AUTO_INCREMENT,
  `branch_id` bigint NOT NULL,
  `xid` varchar(100) NOT NULL,
  `context` varchar(128) NOT NULL,
  `rollback_info` longblob NOT NULL,
  `log_status` int NOT NULL,
  `log_created` datetime NOT NULL,
  `log_modified` datetime NOT NULL,
  PRIMARY KEY (`id`),
  UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`)
) ENGINE=InnoDB;

3.3 使用 AT 模式

java
@Service
public class OrderService {
    
    private final OrderRepository orderRepository;
    private final ProductClient productClient;
    private final AccountClient accountClient;
    
    @GlobalTransactional(name = "create-order", rollbackFor = Exception.class)
    public Order createOrder(OrderRequest request) {
        // 创建订单
        Order order = new Order();
        order.setUserId(request.getUserId());
        order.setProductId(request.getProductId());
        order.setQuantity(request.getQuantity());
        order.setAmount(request.getAmount());
        order.setStatus(OrderStatus.PENDING);
        
        Order savedOrder = orderRepository.save(order);
        
        // 扣减库存
        productClient.deductStock(request.getProductId(), request.getQuantity());
        
        // 扣减余额
        accountClient.deductBalance(request.getUserId(), request.getAmount());
        
        // 更新订单状态
        savedOrder.setStatus(OrderStatus.PAID);
        return orderRepository.save(savedOrder);
    }
}

3.4 AT 模式配置

yaml
seata:
  data-source-proxy-mode: AT
  at:
    datasource-proxy-mode: AT

4. TCC 模式

4.1 TCC 模式原理

TCC 模式三个阶段:

1. Try:预留资源
2. Confirm:确认提交
3. Cancel:取消预留

4.2 TCC 接口定义

java
public interface StockTccService {
    
    @TwoPhaseBusinessAction(
        name = "prepareDeductStock",
        commitMethod = "commit",
        rollbackMethod = "rollback"
    )
    boolean prepareDeductStock(
        @BusinessActionContextParameter(paramName = "productId") Long productId,
        @BusinessActionContextParameter(paramName = "quantity") Integer quantity
    );
    
    boolean commit(BusinessActionContext context);
    
    boolean rollback(BusinessActionContext context);
}

@Service
public class StockTccServiceImpl implements StockTccService {
    
    private final StockRepository stockRepository;
    private final StockFreezeRepository freezeRepository;
    
    @Override
    @Transactional
    public boolean prepareDeductStock(Long productId, Integer quantity) {
        // 检查库存
        Stock stock = stockRepository.findByProductId(productId);
        if (stock == null || stock.getAvailable() < quantity) {
            throw new RuntimeException("库存不足");
        }
        
        // 冻结库存
        stock.setAvailable(stock.getAvailable() - quantity);
        stockRepository.save(stock);
        
        // 记录冻结记录
        StockFreeze freeze = new StockFreeze();
        freeze.setProductId(productId);
        freeze.setQuantity(quantity);
        freeze.setStatus(FreezeStatus.FREEZED);
        freezeRepository.save(freeze);
        
        return true;
    }
    
    @Override
    @Transactional
    public boolean commit(BusinessActionContext context) {
        Long productId = context.getActionContext("productId", Long.class);
        Integer quantity = context.getActionContext("quantity", Integer.class);
        
        // 删除冻结记录
        freezeRepository.deleteByProductIdAndQuantity(productId, quantity);
        
        return true;
    }
    
    @Override
    @Transactional
    public boolean rollback(BusinessActionContext context) {
        Long productId = context.getActionContext("productId", Long.class);
        Integer quantity = context.getActionContext("quantity", Integer.class);
        
        // 恢复库存
        Stock stock = stockRepository.findByProductId(productId);
        stock.setAvailable(stock.getAvailable() + quantity);
        stockRepository.save(stock);
        
        // 删除冻结记录
        freezeRepository.deleteByProductIdAndQuantity(productId, quantity);
        
        return true;
    }
}

4.3 使用 TCC 模式

java
@Service
public class OrderService {
    
    private final OrderRepository orderRepository;
    private final StockTccService stockTccService;
    private final AccountTccService accountTccService;
    
    @GlobalTransactional(name = "create-order-tcc")
    public Order createOrder(OrderRequest request) {
        // 创建订单
        Order order = new Order();
        order.setUserId(request.getUserId());
        order.setProductId(request.getProductId());
        order.setQuantity(request.getQuantity());
        order.setAmount(request.getAmount());
        
        Order savedOrder = orderRepository.save(order);
        
        // TCC 扣减库存
        stockTccService.prepareDeductStock(request.getProductId(), request.getQuantity());
        
        // TCC 扣减余额
        accountTccService.prepareDeductBalance(request.getUserId(), request.getAmount());
        
        return savedOrder;
    }
}

5. Saga 模式

5.1 Saga 模式原理

Saga 模式:

1. 将长事务拆分为多个本地事务
2. 每个事务有对应的补偿操作
3. 按顺序执行事务
4. 失败时执行补偿操作

5.2 状态机定义

json
{
  "name": "createOrderSaga",
  "comment": "创建订单 Saga",
  "startState": "createOrder",
  "states": {
    "createOrder": {
      "type": "ServiceTask",
      "serviceName": "orderService",
      "serviceMethod": "createOrder",
      "compensateState": "cancelOrder",
      "next": "deductStock",
      "input": ["$.input.orderRequest"],
      "output": {
        "order": "$.order"
      }
    },
    "deductStock": {
      "type": "ServiceTask",
      "serviceName": "stockService",
      "serviceMethod": "deductStock",
      "compensateState": "restoreStock",
      "next": "deductBalance",
      "input": ["$.input.orderRequest.productId", "$.input.orderRequest.quantity"]
    },
    "deductBalance": {
      "type": "ServiceTask",
      "serviceName": "accountService",
      "serviceMethod": "deductBalance",
      "compensateState": "restoreBalance",
      "next": "succeed",
      "input": ["$.input.orderRequest.userId", "$.input.orderRequest.amount"]
    },
    "cancelOrder": {
      "type": "ServiceTask",
      "serviceName": "orderService",
      "serviceMethod": "cancelOrder",
      "input": ["$.order.id"]
    },
    "restoreStock": {
      "type": "ServiceTask",
      "serviceName": "stockService",
      "serviceMethod": "restoreStock",
      "input": ["$.input.orderRequest.productId", "$.input.orderRequest.quantity"]
    },
    "restoreBalance": {
      "type": "ServiceTask",
      "serviceName": "accountService",
      "serviceMethod": "restoreBalance",
      "input": ["$.input.orderRequest.userId", "$.input.orderRequest.amount"]
    },
    "succeed": {
      "type": "Succeed"
    },
    "fail": {
      "type": "Fail"
    }
  }
}

5.3 使用 Saga 模式

java
@Configuration
public class SagaConfig {
    
    @Bean
    public StateMachineEngine stateMachineEngine(DataSource dataSource) {
        DbStateMachineConfig config = new DbStateMachineConfig(dataSource);
        config.setApplicationId("order-service");
        config.setTxServiceGroup("my_tx_group");
        
        return new ProcessCtrlStateMachineEngine(config);
    }
}

@Service
public class OrderSagaService {
    
    private final StateMachineEngine stateMachineEngine;
    
    public void createOrder(OrderRequest request) {
        Map<String, Object> input = new HashMap<>();
        input.put("orderRequest", request);
        
        StateMachineInstance instance = stateMachineEngine.start(
            "createOrderSaga",
            null,
            input
        );
        
        if (ExecutionStatus.SU.equals(instance.getStatus())) {
            log.info("Saga 执行成功");
        } else {
            log.error("Saga 执行失败: {}", instance.getException());
        }
    }
}

6. 本地消息表

6.1 实现方案

sql
CREATE TABLE `local_message` (
  `id` bigint NOT NULL AUTO_INCREMENT,
  `message_id` varchar(64) NOT NULL,
  `message_type` varchar(32) NOT NULL,
  `content` text NOT NULL,
  `status` varchar(16) NOT NULL,
  `retry_count` int DEFAULT 0,
  `created_at` datetime NOT NULL,
  `updated_at` datetime NOT NULL,
  PRIMARY KEY (`id`),
  UNIQUE KEY `uk_message_id` (`message_id`)
);

6.2 消息发送

java
@Service
public class LocalMessageService {
    
    private final LocalMessageRepository messageRepository;
    private final MessageProducer messageProducer;
    
    @Transactional
    public void sendMessageWithTransaction(String type, Object content) {
        // 保存消息到本地表
        LocalMessage message = new LocalMessage();
        message.setMessageId(UUID.randomUUID().toString());
        message.setMessageType(type);
        message.setContent(JsonUtils.toJson(content));
        message.setStatus(MessageStatus.PENDING);
        message.setCreatedAt(LocalDateTime.now());
        
        messageRepository.save(message);
        
        // 执行业务操作
        doBusiness(content);
    }
    
    @Scheduled(fixedRate = 5000)
    public void sendPendingMessages() {
        List<LocalMessage> messages = messageRepository
            .findByStatusAndRetryCountLessThan(
                MessageStatus.PENDING, 
                5
            );
        
        for (LocalMessage message : messages) {
            try {
                messageProducer.send(message.getMessageType(), message.getContent());
                message.setStatus(MessageStatus.SENT);
            } catch (Exception e) {
                message.setRetryCount(message.getRetryCount() + 1);
            }
            messageRepository.save(message);
        }
    }
}

7. 小结

本章学习了 Spring Cloud 分布式事务的核心内容:

内容要点
分布式事务概念CAP、BASE 理论
SeataAT、TCC、Saga 模式
AT 模式自动补偿、无侵入
TCC 模式Try-Confirm-Cancel
Saga 模式状态机、补偿事务
本地消息表最终一致性

下一章将学习 Spring Cloud 链路追踪。