Spring Cloud 分布式事务
1. 分布式事务概述
1.1 什么是分布式事务
分布式事务是指事务参与者、资源服务器、事务管理器分布在不同的节点上,需要保证数据一致性的事务。
1.2 分布式事务解决方案
| 方案 | 说明 | 适用场景 |
|---|---|---|
| 2PC/XA | 两阶段提交 | 强一致性要求 |
| TCC | 补偿事务 | 业务可控场景 |
| Saga | 长事务 | 复杂业务流程 |
| 本地消息表 | 最终一致性 | 简单场景 |
| Seata | 阿里开源框架 | 通用场景 |
2. Seata
2.1 Seata 架构
Seata 核心组件:
├── TC (Transaction Coordinator) - 事务协调者
├── TM (Transaction Manager) - 事务管理器
└── RM (Resource Manager) - 资源管理器2.2 添加依赖
xml
<dependency>
<groupId>io.seata</groupId>
<artifactId>seata-spring-boot-starter</artifactId>
<version>2.0.0</version>
</dependency>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-seata</artifactId>
<version>2023.0.1.0</version>
</dependency>2.3 Seata 配置
yaml
seata:
enabled: true
application-id: order-service
tx-service-group: my_tx_group
service:
vgroup-mapping:
my_tx_group: default
grouplist:
default: 127.0.0.1:8091
config:
type: nacos
nacos:
server-addr: localhost:8848
namespace: seata
group: SEATA_GROUP
registry:
type: nacos
nacos:
server-addr: localhost:8848
namespace: seata
group: SEATA_GROUP3. AT 模式
3.1 AT 模式原理
AT 模式流程:
1. 解析 SQL,记录数据快照
2. 执行业务 SQL
3. 记录后镜像
4. 注册分支事务
5. 提交本地事务
6. 全局提交/回滚3.2 数据库配置
sql
-- 每个数据库创建 undo_log 表
CREATE TABLE `undo_log` (
`id` bigint NOT NULL AUTO_INCREMENT,
`branch_id` bigint NOT NULL,
`xid` varchar(100) NOT NULL,
`context` varchar(128) NOT NULL,
`rollback_info` longblob NOT NULL,
`log_status` int NOT NULL,
`log_created` datetime NOT NULL,
`log_modified` datetime NOT NULL,
PRIMARY KEY (`id`),
UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`)
) ENGINE=InnoDB;3.3 使用 AT 模式
java
@Service
public class OrderService {
private final OrderRepository orderRepository;
private final ProductClient productClient;
private final AccountClient accountClient;
@GlobalTransactional(name = "create-order", rollbackFor = Exception.class)
public Order createOrder(OrderRequest request) {
// 创建订单
Order order = new Order();
order.setUserId(request.getUserId());
order.setProductId(request.getProductId());
order.setQuantity(request.getQuantity());
order.setAmount(request.getAmount());
order.setStatus(OrderStatus.PENDING);
Order savedOrder = orderRepository.save(order);
// 扣减库存
productClient.deductStock(request.getProductId(), request.getQuantity());
// 扣减余额
accountClient.deductBalance(request.getUserId(), request.getAmount());
// 更新订单状态
savedOrder.setStatus(OrderStatus.PAID);
return orderRepository.save(savedOrder);
}
}3.4 AT 模式配置
yaml
seata:
data-source-proxy-mode: AT
at:
datasource-proxy-mode: AT4. TCC 模式
4.1 TCC 模式原理
TCC 模式三个阶段:
1. Try:预留资源
2. Confirm:确认提交
3. Cancel:取消预留4.2 TCC 接口定义
java
public interface StockTccService {
@TwoPhaseBusinessAction(
name = "prepareDeductStock",
commitMethod = "commit",
rollbackMethod = "rollback"
)
boolean prepareDeductStock(
@BusinessActionContextParameter(paramName = "productId") Long productId,
@BusinessActionContextParameter(paramName = "quantity") Integer quantity
);
boolean commit(BusinessActionContext context);
boolean rollback(BusinessActionContext context);
}
@Service
public class StockTccServiceImpl implements StockTccService {
private final StockRepository stockRepository;
private final StockFreezeRepository freezeRepository;
@Override
@Transactional
public boolean prepareDeductStock(Long productId, Integer quantity) {
// 检查库存
Stock stock = stockRepository.findByProductId(productId);
if (stock == null || stock.getAvailable() < quantity) {
throw new RuntimeException("库存不足");
}
// 冻结库存
stock.setAvailable(stock.getAvailable() - quantity);
stockRepository.save(stock);
// 记录冻结记录
StockFreeze freeze = new StockFreeze();
freeze.setProductId(productId);
freeze.setQuantity(quantity);
freeze.setStatus(FreezeStatus.FREEZED);
freezeRepository.save(freeze);
return true;
}
@Override
@Transactional
public boolean commit(BusinessActionContext context) {
Long productId = context.getActionContext("productId", Long.class);
Integer quantity = context.getActionContext("quantity", Integer.class);
// 删除冻结记录
freezeRepository.deleteByProductIdAndQuantity(productId, quantity);
return true;
}
@Override
@Transactional
public boolean rollback(BusinessActionContext context) {
Long productId = context.getActionContext("productId", Long.class);
Integer quantity = context.getActionContext("quantity", Integer.class);
// 恢复库存
Stock stock = stockRepository.findByProductId(productId);
stock.setAvailable(stock.getAvailable() + quantity);
stockRepository.save(stock);
// 删除冻结记录
freezeRepository.deleteByProductIdAndQuantity(productId, quantity);
return true;
}
}4.3 使用 TCC 模式
java
@Service
public class OrderService {
private final OrderRepository orderRepository;
private final StockTccService stockTccService;
private final AccountTccService accountTccService;
@GlobalTransactional(name = "create-order-tcc")
public Order createOrder(OrderRequest request) {
// 创建订单
Order order = new Order();
order.setUserId(request.getUserId());
order.setProductId(request.getProductId());
order.setQuantity(request.getQuantity());
order.setAmount(request.getAmount());
Order savedOrder = orderRepository.save(order);
// TCC 扣减库存
stockTccService.prepareDeductStock(request.getProductId(), request.getQuantity());
// TCC 扣减余额
accountTccService.prepareDeductBalance(request.getUserId(), request.getAmount());
return savedOrder;
}
}5. Saga 模式
5.1 Saga 模式原理
Saga 模式:
1. 将长事务拆分为多个本地事务
2. 每个事务有对应的补偿操作
3. 按顺序执行事务
4. 失败时执行补偿操作5.2 状态机定义
json
{
"name": "createOrderSaga",
"comment": "创建订单 Saga",
"startState": "createOrder",
"states": {
"createOrder": {
"type": "ServiceTask",
"serviceName": "orderService",
"serviceMethod": "createOrder",
"compensateState": "cancelOrder",
"next": "deductStock",
"input": ["$.input.orderRequest"],
"output": {
"order": "$.order"
}
},
"deductStock": {
"type": "ServiceTask",
"serviceName": "stockService",
"serviceMethod": "deductStock",
"compensateState": "restoreStock",
"next": "deductBalance",
"input": ["$.input.orderRequest.productId", "$.input.orderRequest.quantity"]
},
"deductBalance": {
"type": "ServiceTask",
"serviceName": "accountService",
"serviceMethod": "deductBalance",
"compensateState": "restoreBalance",
"next": "succeed",
"input": ["$.input.orderRequest.userId", "$.input.orderRequest.amount"]
},
"cancelOrder": {
"type": "ServiceTask",
"serviceName": "orderService",
"serviceMethod": "cancelOrder",
"input": ["$.order.id"]
},
"restoreStock": {
"type": "ServiceTask",
"serviceName": "stockService",
"serviceMethod": "restoreStock",
"input": ["$.input.orderRequest.productId", "$.input.orderRequest.quantity"]
},
"restoreBalance": {
"type": "ServiceTask",
"serviceName": "accountService",
"serviceMethod": "restoreBalance",
"input": ["$.input.orderRequest.userId", "$.input.orderRequest.amount"]
},
"succeed": {
"type": "Succeed"
},
"fail": {
"type": "Fail"
}
}
}5.3 使用 Saga 模式
java
@Configuration
public class SagaConfig {
@Bean
public StateMachineEngine stateMachineEngine(DataSource dataSource) {
DbStateMachineConfig config = new DbStateMachineConfig(dataSource);
config.setApplicationId("order-service");
config.setTxServiceGroup("my_tx_group");
return new ProcessCtrlStateMachineEngine(config);
}
}
@Service
public class OrderSagaService {
private final StateMachineEngine stateMachineEngine;
public void createOrder(OrderRequest request) {
Map<String, Object> input = new HashMap<>();
input.put("orderRequest", request);
StateMachineInstance instance = stateMachineEngine.start(
"createOrderSaga",
null,
input
);
if (ExecutionStatus.SU.equals(instance.getStatus())) {
log.info("Saga 执行成功");
} else {
log.error("Saga 执行失败: {}", instance.getException());
}
}
}6. 本地消息表
6.1 实现方案
sql
CREATE TABLE `local_message` (
`id` bigint NOT NULL AUTO_INCREMENT,
`message_id` varchar(64) NOT NULL,
`message_type` varchar(32) NOT NULL,
`content` text NOT NULL,
`status` varchar(16) NOT NULL,
`retry_count` int DEFAULT 0,
`created_at` datetime NOT NULL,
`updated_at` datetime NOT NULL,
PRIMARY KEY (`id`),
UNIQUE KEY `uk_message_id` (`message_id`)
);6.2 消息发送
java
@Service
public class LocalMessageService {
private final LocalMessageRepository messageRepository;
private final MessageProducer messageProducer;
@Transactional
public void sendMessageWithTransaction(String type, Object content) {
// 保存消息到本地表
LocalMessage message = new LocalMessage();
message.setMessageId(UUID.randomUUID().toString());
message.setMessageType(type);
message.setContent(JsonUtils.toJson(content));
message.setStatus(MessageStatus.PENDING);
message.setCreatedAt(LocalDateTime.now());
messageRepository.save(message);
// 执行业务操作
doBusiness(content);
}
@Scheduled(fixedRate = 5000)
public void sendPendingMessages() {
List<LocalMessage> messages = messageRepository
.findByStatusAndRetryCountLessThan(
MessageStatus.PENDING,
5
);
for (LocalMessage message : messages) {
try {
messageProducer.send(message.getMessageType(), message.getContent());
message.setStatus(MessageStatus.SENT);
} catch (Exception e) {
message.setRetryCount(message.getRetryCount() + 1);
}
messageRepository.save(message);
}
}
}7. 小结
本章学习了 Spring Cloud 分布式事务的核心内容:
| 内容 | 要点 |
|---|---|
| 分布式事务概念 | CAP、BASE 理论 |
| Seata | AT、TCC、Saga 模式 |
| AT 模式 | 自动补偿、无侵入 |
| TCC 模式 | Try-Confirm-Cancel |
| Saga 模式 | 状态机、补偿事务 |
| 本地消息表 | 最终一致性 |
下一章将学习 Spring Cloud 链路追踪。