Spring Cloud 消息驱动
1. 消息驱动概述
1.1 什么是 Spring Cloud Stream
Spring Cloud Stream 是一个构建消息驱动微服务的框架,通过统一的编程模型简化消息中间件的使用。
1.2 核心概念
| 概念 | 说明 |
|---|---|
| Binder | 消息中间件绑定器 |
| Binding | 消息通道绑定 |
| Message | 消息载体 |
| Source | 消息生产者 |
| Sink | 消息消费者 |
2. 快速入门
2.1 添加依赖
xml
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>2.2 基本配置
yaml
spring:
cloud:
stream:
binders:
defaultRabbit:
type: rabbit
environment:
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
bindings:
orderOutput:
destination: order.topic
content-type: application/json
binder: defaultRabbit
orderInput:
destination: order.topic
content-type: application/json
group: order-group
binder: defaultRabbit2.3 消息生产者
java
@EnableBinding(Source.class)
@Service
public class OrderMessageProducer {
private final MessageChannel output;
public OrderMessageProducer(Source source) {
this.output = source.output();
}
public boolean sendOrder(Order order) {
return output.send(MessageBuilder.withPayload(order).build());
}
public boolean sendOrderWithHeaders(Order order) {
Message<Order> message = MessageBuilder
.withPayload(order)
.setHeader("orderId", order.getId())
.setHeader("source", "order-service")
.setHeader("timestamp", System.currentTimeMillis())
.build();
return output.send(message);
}
}2.4 消息消费者
java
@EnableBinding(Sink.class)
@Service
public class OrderMessageConsumer {
@StreamListener(Sink.INPUT)
public void handleOrder(Order order) {
log.info("收到订单消息: {}", order);
processOrder(order);
}
@StreamListener(Sink.INPUT)
public void handleOrderWithHeaders(Message<Order> message) {
MessageHeaders headers = message.getHeaders();
Order order = message.getPayload();
log.info("收到订单: {}, Headers: {}", order, headers);
processOrder(order);
}
private void processOrder(Order order) {
// 处理订单逻辑
}
}3. 自定义通道
3.1 定义通道接口
java
public interface OrderChannels {
String ORDER_OUTPUT = "orderOutput";
String ORDER_INPUT = "orderInput";
String NOTIFICATION_OUTPUT = "notificationOutput";
String NOTIFICATION_INPUT = "notificationInput";
@Output(ORDER_OUTPUT)
MessageChannel orderOutput();
@Input(ORDER_INPUT)
SubscribableChannel orderInput();
@Output(NOTIFICATION_OUTPUT)
MessageChannel notificationOutput();
@Input(NOTIFICATION_INPUT)
SubscribableChannel notificationInput();
}3.2 配置绑定
yaml
spring:
cloud:
stream:
bindings:
orderOutput:
destination: order.topic
orderInput:
destination: order.topic
group: order-group
notificationOutput:
destination: notification.topic
notificationInput:
destination: notification.topic
group: notification-group3.3 使用自定义通道
java
@EnableBinding(OrderChannels.class)
@Service
public class OrderService {
private final MessageChannel orderOutput;
private final MessageChannel notificationOutput;
public OrderService(OrderChannels channels) {
this.orderOutput = channels.orderOutput();
this.notificationOutput = channels.notificationOutput();
}
public void createOrder(Order order) {
// 创建订单
Order saved = orderRepository.save(order);
// 发送订单消息
orderOutput.send(MessageBuilder.withPayload(saved).build());
// 发送通知消息
Notification notification = new Notification(
saved.getUserId(),
"订单创建成功",
saved.getId()
);
notificationOutput.send(MessageBuilder.withPayload(notification).build());
}
}4. 消费者组
4.1 消费者组配置
yaml
spring:
cloud:
stream:
bindings:
orderInput:
destination: order.topic
group: order-group-1
orderInput2:
destination: order.topic
group: order-group-24.2 消费者组作用
同一消费者组内只有一个消费者处理消息:
order.topic
│
├── order-group-1 (实例1) ── 处理消息
│
└── order-group-1 (实例2) ── 不处理(同一组)
│
├── order-group-2 (实例1) ── 处理消息
│
└── order-group-2 (实例2) ── 不处理(同一组)5. 消息分区
5.1 分区配置
yaml
spring:
cloud:
stream:
bindings:
orderOutput:
destination: order.topic
producer:
partition-key-expression: headers['partitionKey']
partition-count: 3
orderInput:
destination: order.topic
group: order-group
consumer:
partitioned: true
instance-count: 3
instance-index: 05.2 发送分区消息
java
@Service
public class PartitionMessageService {
private final MessageChannel orderOutput;
public void sendOrder(Order order) {
Message<Order> message = MessageBuilder
.withPayload(order)
.setHeader("partitionKey", order.getUserId())
.build();
orderOutput.send(message);
}
}6. 错误处理
6.1 全局错误通道
java
@Service
public class ErrorMessageHandler {
@ServiceActivator(inputChannel = "errorChannel")
public void handleError(ErrorMessage errorMessage) {
log.error("消息处理失败: {}", errorMessage);
Message<?> failedMessage = errorMessage.getOriginalMessage();
Throwable cause = errorMessage.getPayload();
// 记录错误日志
errorLogService.log(failedMessage, cause);
}
}6.2 死信队列
yaml
spring:
cloud:
stream:
bindings:
orderInput:
destination: order.topic
group: order-group
consumer:
max-attempts: 3
back-off-initial-interval: 1000
back-off-multiplier: 2.0
back-off-max-interval: 10000
rabbit:
bindings:
orderInput:
consumer:
auto-bind-dlq: true
dead-letter-queue-name: order.dlq
dead-letter-exchange: order.dlx
dead-letter-routing-key: order.dlq6.3 自定义错误处理
java
@Service
public class OrderErrorHandler {
@ServiceActivator(inputChannel = "order.topic.order-group.errors")
public void handleError(ErrorMessage errorMessage) {
Message<?> message = errorMessage.getOriginalMessage();
Throwable cause = errorMessage.getPayload();
log.error("订单消息处理失败: {}", message, cause);
// 发送到死信队列
deadLetterService.send(message, cause);
}
}7. 消息过滤
7.1 条件消费
java
@Service
public class OrderMessageConsumer {
@StreamListener(
value = Sink.INPUT,
condition = "headers['type'] == 'CREATE'"
)
public void handleCreateOrder(Order order) {
log.info("处理创建订单: {}", order);
}
@StreamListener(
value = Sink.INPUT,
condition = "headers['type'] == 'CANCEL'"
)
public void handleCancelOrder(Order order) {
log.info("处理取消订单: {}", order);
}
}7.2 SpEL 表达式过滤
java
@StreamListener(
value = Sink.INPUT,
condition = "payload.amount > 1000"
)
public void handleHighValueOrder(Order order) {
log.info("处理高价值订单: {}", order);
}
@StreamListener(
value = Sink.INPUT,
condition = "payload.status == T(com.example.OrderStatus).PENDING"
)
public void handlePendingOrder(Order order) {
log.info("处理待处理订单: {}", order);
}8. Kafka 集成
8.1 添加依赖
xml
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>8.2 Kafka 配置
yaml
spring:
cloud:
stream:
kafka:
binder:
brokers: localhost:9092
auto-create-topics: true
auto-add-partitions: true
bindings:
orderInput:
consumer:
auto-commit-offset: false
enable-dlq: true
dlq-name: order.dlq
orderOutput:
producer:
sync: true
partition-count: 3
bindings:
orderOutput:
destination: order-topic
orderInput:
destination: order-topic
group: order-group9. 小结
本章学习了 Spring Cloud 消息驱动的核心内容:
| 内容 | 要点 |
|---|---|
| 基本概念 | Binder、Binding、Message |
| 消息生产 | Source、MessageChannel |
| 消息消费 | Sink、@StreamListener |
| 自定义通道 | 定义接口、配置绑定 |
| 消费者组 | 负载均衡、消息分发 |
| 消息分区 | 分区策略、分区配置 |
| 错误处理 | 死信队列、错误通道 |
| 消息过滤 | 条件消费、SpEL 表达式 |
下一章将学习 Spring Cloud 分布式事务。