Skip to content

Spring Cloud 消息驱动

1. 消息驱动概述

1.1 什么是 Spring Cloud Stream

Spring Cloud Stream 是一个构建消息驱动微服务的框架,通过统一的编程模型简化消息中间件的使用。

1.2 核心概念

概念说明
Binder消息中间件绑定器
Binding消息通道绑定
Message消息载体
Source消息生产者
Sink消息消费者

2. 快速入门

2.1 添加依赖

xml
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>

2.2 基本配置

yaml
spring:
  cloud:
    stream:
      binders:
        defaultRabbit:
          type: rabbit
          environment:
            spring:
              rabbitmq:
                host: localhost
                port: 5672
                username: guest
                password: guest
      bindings:
        orderOutput:
          destination: order.topic
          content-type: application/json
          binder: defaultRabbit
        orderInput:
          destination: order.topic
          content-type: application/json
          group: order-group
          binder: defaultRabbit

2.3 消息生产者

java
@EnableBinding(Source.class)
@Service
public class OrderMessageProducer {
    
    private final MessageChannel output;
    
    public OrderMessageProducer(Source source) {
        this.output = source.output();
    }
    
    public boolean sendOrder(Order order) {
        return output.send(MessageBuilder.withPayload(order).build());
    }
    
    public boolean sendOrderWithHeaders(Order order) {
        Message<Order> message = MessageBuilder
            .withPayload(order)
            .setHeader("orderId", order.getId())
            .setHeader("source", "order-service")
            .setHeader("timestamp", System.currentTimeMillis())
            .build();
        
        return output.send(message);
    }
}

2.4 消息消费者

java
@EnableBinding(Sink.class)
@Service
public class OrderMessageConsumer {
    
    @StreamListener(Sink.INPUT)
    public void handleOrder(Order order) {
        log.info("收到订单消息: {}", order);
        processOrder(order);
    }
    
    @StreamListener(Sink.INPUT)
    public void handleOrderWithHeaders(Message<Order> message) {
        MessageHeaders headers = message.getHeaders();
        Order order = message.getPayload();
        
        log.info("收到订单: {}, Headers: {}", order, headers);
        processOrder(order);
    }
    
    private void processOrder(Order order) {
        // 处理订单逻辑
    }
}

3. 自定义通道

3.1 定义通道接口

java
public interface OrderChannels {
    
    String ORDER_OUTPUT = "orderOutput";
    String ORDER_INPUT = "orderInput";
    String NOTIFICATION_OUTPUT = "notificationOutput";
    String NOTIFICATION_INPUT = "notificationInput";
    
    @Output(ORDER_OUTPUT)
    MessageChannel orderOutput();
    
    @Input(ORDER_INPUT)
    SubscribableChannel orderInput();
    
    @Output(NOTIFICATION_OUTPUT)
    MessageChannel notificationOutput();
    
    @Input(NOTIFICATION_INPUT)
    SubscribableChannel notificationInput();
}

3.2 配置绑定

yaml
spring:
  cloud:
    stream:
      bindings:
        orderOutput:
          destination: order.topic
        orderInput:
          destination: order.topic
          group: order-group
        notificationOutput:
          destination: notification.topic
        notificationInput:
          destination: notification.topic
          group: notification-group

3.3 使用自定义通道

java
@EnableBinding(OrderChannels.class)
@Service
public class OrderService {
    
    private final MessageChannel orderOutput;
    private final MessageChannel notificationOutput;
    
    public OrderService(OrderChannels channels) {
        this.orderOutput = channels.orderOutput();
        this.notificationOutput = channels.notificationOutput();
    }
    
    public void createOrder(Order order) {
        // 创建订单
        Order saved = orderRepository.save(order);
        
        // 发送订单消息
        orderOutput.send(MessageBuilder.withPayload(saved).build());
        
        // 发送通知消息
        Notification notification = new Notification(
            saved.getUserId(), 
            "订单创建成功", 
            saved.getId()
        );
        notificationOutput.send(MessageBuilder.withPayload(notification).build());
    }
}

4. 消费者组

4.1 消费者组配置

yaml
spring:
  cloud:
    stream:
      bindings:
        orderInput:
          destination: order.topic
          group: order-group-1
        orderInput2:
          destination: order.topic
          group: order-group-2

4.2 消费者组作用

同一消费者组内只有一个消费者处理消息:

order.topic

    ├── order-group-1 (实例1) ── 处理消息

    └── order-group-1 (实例2) ── 不处理(同一组)
    

    ├── order-group-2 (实例1) ── 处理消息

    └── order-group-2 (实例2) ── 不处理(同一组)

5. 消息分区

5.1 分区配置

yaml
spring:
  cloud:
    stream:
      bindings:
        orderOutput:
          destination: order.topic
          producer:
            partition-key-expression: headers['partitionKey']
            partition-count: 3
        orderInput:
          destination: order.topic
          group: order-group
          consumer:
            partitioned: true
          instance-count: 3
          instance-index: 0

5.2 发送分区消息

java
@Service
public class PartitionMessageService {
    
    private final MessageChannel orderOutput;
    
    public void sendOrder(Order order) {
        Message<Order> message = MessageBuilder
            .withPayload(order)
            .setHeader("partitionKey", order.getUserId())
            .build();
        
        orderOutput.send(message);
    }
}

6. 错误处理

6.1 全局错误通道

java
@Service
public class ErrorMessageHandler {
    
    @ServiceActivator(inputChannel = "errorChannel")
    public void handleError(ErrorMessage errorMessage) {
        log.error("消息处理失败: {}", errorMessage);
        
        Message<?> failedMessage = errorMessage.getOriginalMessage();
        Throwable cause = errorMessage.getPayload();
        
        // 记录错误日志
        errorLogService.log(failedMessage, cause);
    }
}

6.2 死信队列

yaml
spring:
  cloud:
    stream:
      bindings:
        orderInput:
          destination: order.topic
          group: order-group
          consumer:
            max-attempts: 3
            back-off-initial-interval: 1000
            back-off-multiplier: 2.0
            back-off-max-interval: 10000
      rabbit:
        bindings:
          orderInput:
            consumer:
              auto-bind-dlq: true
              dead-letter-queue-name: order.dlq
              dead-letter-exchange: order.dlx
              dead-letter-routing-key: order.dlq

6.3 自定义错误处理

java
@Service
public class OrderErrorHandler {
    
    @ServiceActivator(inputChannel = "order.topic.order-group.errors")
    public void handleError(ErrorMessage errorMessage) {
        Message<?> message = errorMessage.getOriginalMessage();
        Throwable cause = errorMessage.getPayload();
        
        log.error("订单消息处理失败: {}", message, cause);
        
        // 发送到死信队列
        deadLetterService.send(message, cause);
    }
}

7. 消息过滤

7.1 条件消费

java
@Service
public class OrderMessageConsumer {
    
    @StreamListener(
        value = Sink.INPUT,
        condition = "headers['type'] == 'CREATE'"
    )
    public void handleCreateOrder(Order order) {
        log.info("处理创建订单: {}", order);
    }
    
    @StreamListener(
        value = Sink.INPUT,
        condition = "headers['type'] == 'CANCEL'"
    )
    public void handleCancelOrder(Order order) {
        log.info("处理取消订单: {}", order);
    }
}

7.2 SpEL 表达式过滤

java
@StreamListener(
    value = Sink.INPUT,
    condition = "payload.amount > 1000"
)
public void handleHighValueOrder(Order order) {
    log.info("处理高价值订单: {}", order);
}

@StreamListener(
    value = Sink.INPUT,
    condition = "payload.status == T(com.example.OrderStatus).PENDING"
)
public void handlePendingOrder(Order order) {
    log.info("处理待处理订单: {}", order);
}

8. Kafka 集成

8.1 添加依赖

xml
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>

8.2 Kafka 配置

yaml
spring:
  cloud:
    stream:
      kafka:
        binder:
          brokers: localhost:9092
          auto-create-topics: true
          auto-add-partitions: true
        bindings:
          orderInput:
            consumer:
              auto-commit-offset: false
              enable-dlq: true
              dlq-name: order.dlq
          orderOutput:
            producer:
              sync: true
              partition-count: 3
      bindings:
        orderOutput:
          destination: order-topic
        orderInput:
          destination: order-topic
          group: order-group

9. 小结

本章学习了 Spring Cloud 消息驱动的核心内容:

内容要点
基本概念Binder、Binding、Message
消息生产Source、MessageChannel
消息消费Sink、@StreamListener
自定义通道定义接口、配置绑定
消费者组负载均衡、消息分发
消息分区分区策略、分区配置
错误处理死信队列、错误通道
消息过滤条件消费、SpEL 表达式

下一章将学习 Spring Cloud 分布式事务。