Skip to content

Spring Boot 消息队列

1. 消息队列概述

1.1 什么是消息队列

消息队列(Message Queue)是一种异步通信机制,用于在分布式系统中传递消息。

1.2 消息队列优势

优势说明
解耦生产者和消费者无需直接交互
异步提高系统响应速度
削峰平滑流量高峰
可靠消息持久化,保证不丢失

1.3 常见消息队列

消息队列特点
RabbitMQ可靠性高、功能丰富
Kafka高吞吐、分布式
RocketMQ阿里开源、事务消息
ActiveMQ老牌、支持多种协议

2. RabbitMQ

2.1 添加依赖

xml
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

2.2 配置 RabbitMQ

yaml
spring:
  rabbitmq:
    host: localhost
    port: 5672
    username: guest
    password: guest
    virtual-host: /
    publisher-confirm-type: correlated
    publisher-returns: true
    listener:
      simple:
        acknowledge-mode: manual
        prefetch: 10
        retry:
          enabled: true
          initial-interval: 1000
          max-attempts: 3
          multiplier: 2

2.3 队列和交换机配置

java
@Configuration
public class RabbitMQConfig {
    
    public static final String QUEUE_NAME = "order.queue";
    public static final String EXCHANGE_NAME = "order.exchange";
    public static final String ROUTING_KEY = "order.created";
    
    @Bean
    public Queue orderQueue() {
        return QueueBuilder.durable(QUEUE_NAME)
                .withArgument("x-message-ttl", 60000)
                .withArgument("x-dead-letter-exchange", "dlx.exchange")
                .build();
    }
    
    @Bean
    public DirectExchange orderExchange() {
        return new DirectExchange(EXCHANGE_NAME, true, false);
    }
    
    @Bean
    public Binding orderBinding() {
        return BindingBuilder.bind(orderQueue())
                .to(orderExchange())
                .with(ROUTING_KEY);
    }
    
    @Bean
    public Queue deadLetterQueue() {
        return new Queue("dlx.queue", true);
    }
    
    @Bean
    public DirectExchange deadLetterExchange() {
        return new DirectExchange("dlx.exchange", true, false);
    }
    
    @Bean
    public Binding deadLetterBinding() {
        return BindingBuilder.bind(deadLetterQueue())
                .to(deadLetterExchange())
                .with("dlx.routing.key");
    }
}

2.4 消息生产者

java
@Service
public class OrderMessageProducer {
    
    private final RabbitTemplate rabbitTemplate;
    
    public void sendOrder(Order order) {
        rabbitTemplate.convertAndSend(
            RabbitMQConfig.EXCHANGE_NAME,
            RabbitMQConfig.ROUTING_KEY,
            order,
            message -> {
                message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
                message.getMessageProperties().setContentType("application/json");
                return message;
            }
        );
    }
    
    public void sendOrderWithConfirm(Order order) {
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
        
        rabbitTemplate.convertAndSend(
            RabbitMQConfig.EXCHANGE_NAME,
            RabbitMQConfig.ROUTING_KEY,
            order,
            correlationData
        );
    }
    
    @Bean
    public RabbitTemplate.ConfirmCallback confirmCallback() {
        return (correlationData, ack, cause) -> {
            if (ack) {
                log.info("消息发送成功: {}", correlationData);
            } else {
                log.error("消息发送失败: {}, 原因: {}", correlationData, cause);
            }
        };
    }
}

2.5 消息消费者

java
@Component
public class OrderMessageConsumer {
    
    @RabbitListener(queues = RabbitMQConfig.QUEUE_NAME)
    public void handleOrder(Order order, Channel channel, 
                           @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {
        try {
            log.info("收到订单消息: {}", order);
            processOrder(order);
            channel.basicAck(tag, false);
        } catch (Exception e) {
            log.error("处理订单失败", e);
            channel.basicNack(tag, false, false);
        }
    }
    
    @RabbitListener(queues = "dlx.queue")
    public void handleDeadLetter(Order order) {
        log.warn("收到死信消息: {}", order);
    }
    
    private void processOrder(Order order) {
        // 处理订单逻辑
    }
}

2.6 延迟队列

java
@Configuration
public class DelayQueueConfig {
    
    @Bean
    public Queue delayQueue() {
        return QueueBuilder.durable("delay.queue")
                .withArgument("x-message-ttl", 30000)
                .withArgument("x-dead-letter-exchange", "order.exchange")
                .withArgument("x-dead-letter-routing-key", "order.created")
                .build();
    }
    
    @Bean
    public CustomExchange delayExchange() {
        Map<String, Object> args = new HashMap<>();
        args.put("x-delayed-type", "direct");
        return new CustomExchange("delay.exchange", "x-delayed-message", true, false, args);
    }
}

@Service
public class DelayMessageService {
    
    private final RabbitTemplate rabbitTemplate;
    
    public void sendDelayMessage(Order order, int delayMillis) {
        rabbitTemplate.convertAndSend(
            "delay.exchange",
            "delay.routing.key",
            order,
            message -> {
                message.getMessageProperties().setDelay(delayMillis);
                return message;
            }
        );
    }
}

3. Kafka

3.1 添加依赖

xml
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

3.2 配置 Kafka

yaml
spring:
  kafka:
    bootstrap-servers: localhost:9092
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
      acks: all
      retries: 3
      batch-size: 16384
      buffer-memory: 33554432
    consumer:
      group-id: my-group
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
      auto-offset-reset: earliest
      enable-auto-commit: false
      max-poll-records: 100
    listener:
      ack-mode: manual
      concurrency: 3

3.3 消息生产者

java
@Service
public class KafkaMessageProducer {
    
    private final KafkaTemplate<String, Object> kafkaTemplate;
    
    public void send(String topic, Object message) {
        kafkaTemplate.send(topic, message);
    }
    
    public void send(String topic, String key, Object message) {
        kafkaTemplate.send(topic, key, message);
    }
    
    public CompletableFuture<SendResult<String, Object>> sendAsync(String topic, Object message) {
        return kafkaTemplate.send(topic, message);
    }
    
    public void sendWithCallback(String topic, Object message) {
        CompletableFuture<SendResult<String, Object>> future = kafkaTemplate.send(topic, message);
        
        future.whenComplete((result, ex) -> {
            if (ex == null) {
                log.info("发送成功: topic={}, partition={}, offset={}",
                    result.getRecordMetadata().topic(),
                    result.getRecordMetadata().partition(),
                    result.getRecordMetadata().offset());
            } else {
                log.error("发送失败", ex);
            }
        });
    }
}

3.4 消息消费者

java
@Component
public class KafkaMessageConsumer {
    
    @KafkaListener(topics = "order-topic", groupId = "order-group")
    public void consumeOrder(Order order, 
                            @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
                            @Header(KafkaHeaders.RECEIVED_PARTITION) int partition,
                            @Header(KafkaHeaders.OFFSET) long offset,
                            Acknowledgment acknowledgment) {
        try {
            log.info("收到消息: topic={}, partition={}, offset={}, message={}",
                topic, partition, offset, order);
            
            processOrder(order);
            
            acknowledgment.acknowledge();
        } catch (Exception e) {
            log.error("处理消息失败", e);
        }
    }
    
    @KafkaListener(topics = "order-topic", groupId = "order-group-batch")
    public void consumeBatch(List<Order> orders, Acknowledgment acknowledgment) {
        try {
            log.info("批量收到 {} 条消息", orders.size());
            
            for (Order order : orders) {
                processOrder(order);
            }
            
            acknowledgment.acknowledge();
        } catch (Exception e) {
            log.error("批量处理失败", e);
        }
    }
    
    private void processOrder(Order order) {
        // 处理订单逻辑
    }
}

3.5 事务消息

java
@Configuration
public class KafkaTransactionConfig {
    
    @Bean
    public ProducerFactory<String, Object> producerFactory() {
        Map<String, Object> config = new HashMap<>();
        config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        config.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "order-transaction");
        return new DefaultKafkaProducerFactory<>(config);
    }
    
    @Bean
    public KafkaTransactionManager<String, Object> kafkaTransactionManager(
            ProducerFactory<String, Object> producerFactory) {
        return new KafkaTransactionManager<>(producerFactory);
    }
}

@Service
public class TransactionalMessageService {
    
    private final KafkaTemplate<String, Object> kafkaTemplate;
    
    @Transactional
    public void sendTransactionalMessage(Order order) {
        kafkaTemplate.executeInTransaction(template -> {
            template.send("order-topic", order);
            template.send("notification-topic", new Notification(order));
            return true;
        });
    }
}

4. 消息可靠性

4.1 生产者确认

java
@Configuration
public class ProducerConfirmConfig {
    
    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate template = new RabbitTemplate(connectionFactory);
        
        template.setConfirmCallback((correlationData, ack, cause) -> {
            if (ack) {
                log.info("消息已确认: {}", correlationData);
            } else {
                log.error("消息未确认: {}, 原因: {}", correlationData, cause);
                // 重试或记录日志
            }
        });
        
        template.setReturnsCallback(returned -> {
            log.error("消息被退回: {}, 响应码: {}, 响应信息: {}",
                returned.getMessage(),
                returned.getReplyCode(),
                returned.getReplyText());
        });
        
        return template;
    }
}

4.2 消费者确认

java
@Component
public class ReliableConsumer {
    
    @RabbitListener(queues = "order.queue")
    public void handleOrder(Order order, Channel channel,
                           @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {
        try {
            processOrder(order);
            channel.basicAck(tag, false);
        } catch (BusinessException e) {
            log.warn("业务异常,拒绝消息: {}", e.getMessage());
            channel.basicReject(tag, false);
        } catch (Exception e) {
            log.error("系统异常,重新入队", e);
            channel.basicNack(tag, false, true);
        }
    }
}

4.3 消息幂等性

java
@Component
public class IdempotentConsumer {
    
    private final RedisTemplate<String, String> redisTemplate;
    
    @RabbitListener(queues = "order.queue")
    public void handleOrder(Order order, Channel channel,
                           @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {
        String messageId = order.getMessageId();
        String key = "order:processed:" + messageId;
        
        Boolean isNew = redisTemplate.opsForValue()
            .setIfAbsent(key, "1", Duration.ofHours(24));
        
        if (Boolean.FALSE.equals(isNew)) {
            log.warn("消息已处理: {}", messageId);
            channel.basicAck(tag, false);
            return;
        }
        
        try {
            processOrder(order);
            channel.basicAck(tag, false);
        } catch (Exception e) {
            redisTemplate.delete(key);
            channel.basicNack(tag, false, true);
        }
    }
}

5. 消息监控

5.1 RabbitMQ 监控

java
@Configuration
public class RabbitMQMetricsConfig {
    
    @Bean
    public MeterBinder rabbitMQMetrics(ConnectionFactory connectionFactory) {
        return new RabbitMQMetrics(connectionFactory, "myapp.rabbitmq");
    }
}

5.2 Kafka 监控

yaml
management:
  endpoints:
    web:
      exposure:
        include: health,metrics,prometheus
  metrics:
    tags:
      application: ${spring.application.name}

6. 小结

本章学习了 Spring Boot 消息队列的核心内容:

内容要点
RabbitMQ队列、交换机、路由键
KafkaTopic、Partition、Consumer Group
消息生产发送、确认、事务
消息消费监听、确认、重试
延迟队列TTL、死信队列
消息可靠性确认机制、幂等性
消息监控Micrometer 指标

下一章将学习 Spring Boot 测试。