Spring Boot 消息队列
1. 消息队列概述
1.1 什么是消息队列
消息队列(Message Queue)是一种异步通信机制,用于在分布式系统中传递消息。
1.2 消息队列优势
| 优势 | 说明 |
|---|---|
| 解耦 | 生产者和消费者无需直接交互 |
| 异步 | 提高系统响应速度 |
| 削峰 | 平滑流量高峰 |
| 可靠 | 消息持久化,保证不丢失 |
1.3 常见消息队列
| 消息队列 | 特点 |
|---|---|
| RabbitMQ | 可靠性高、功能丰富 |
| Kafka | 高吞吐、分布式 |
| RocketMQ | 阿里开源、事务消息 |
| ActiveMQ | 老牌、支持多种协议 |
2. RabbitMQ
2.1 添加依赖
xml
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>2.2 配置 RabbitMQ
yaml
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
virtual-host: /
publisher-confirm-type: correlated
publisher-returns: true
listener:
simple:
acknowledge-mode: manual
prefetch: 10
retry:
enabled: true
initial-interval: 1000
max-attempts: 3
multiplier: 22.3 队列和交换机配置
java
@Configuration
public class RabbitMQConfig {
public static final String QUEUE_NAME = "order.queue";
public static final String EXCHANGE_NAME = "order.exchange";
public static final String ROUTING_KEY = "order.created";
@Bean
public Queue orderQueue() {
return QueueBuilder.durable(QUEUE_NAME)
.withArgument("x-message-ttl", 60000)
.withArgument("x-dead-letter-exchange", "dlx.exchange")
.build();
}
@Bean
public DirectExchange orderExchange() {
return new DirectExchange(EXCHANGE_NAME, true, false);
}
@Bean
public Binding orderBinding() {
return BindingBuilder.bind(orderQueue())
.to(orderExchange())
.with(ROUTING_KEY);
}
@Bean
public Queue deadLetterQueue() {
return new Queue("dlx.queue", true);
}
@Bean
public DirectExchange deadLetterExchange() {
return new DirectExchange("dlx.exchange", true, false);
}
@Bean
public Binding deadLetterBinding() {
return BindingBuilder.bind(deadLetterQueue())
.to(deadLetterExchange())
.with("dlx.routing.key");
}
}2.4 消息生产者
java
@Service
public class OrderMessageProducer {
private final RabbitTemplate rabbitTemplate;
public void sendOrder(Order order) {
rabbitTemplate.convertAndSend(
RabbitMQConfig.EXCHANGE_NAME,
RabbitMQConfig.ROUTING_KEY,
order,
message -> {
message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
message.getMessageProperties().setContentType("application/json");
return message;
}
);
}
public void sendOrderWithConfirm(Order order) {
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
rabbitTemplate.convertAndSend(
RabbitMQConfig.EXCHANGE_NAME,
RabbitMQConfig.ROUTING_KEY,
order,
correlationData
);
}
@Bean
public RabbitTemplate.ConfirmCallback confirmCallback() {
return (correlationData, ack, cause) -> {
if (ack) {
log.info("消息发送成功: {}", correlationData);
} else {
log.error("消息发送失败: {}, 原因: {}", correlationData, cause);
}
};
}
}2.5 消息消费者
java
@Component
public class OrderMessageConsumer {
@RabbitListener(queues = RabbitMQConfig.QUEUE_NAME)
public void handleOrder(Order order, Channel channel,
@Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {
try {
log.info("收到订单消息: {}", order);
processOrder(order);
channel.basicAck(tag, false);
} catch (Exception e) {
log.error("处理订单失败", e);
channel.basicNack(tag, false, false);
}
}
@RabbitListener(queues = "dlx.queue")
public void handleDeadLetter(Order order) {
log.warn("收到死信消息: {}", order);
}
private void processOrder(Order order) {
// 处理订单逻辑
}
}2.6 延迟队列
java
@Configuration
public class DelayQueueConfig {
@Bean
public Queue delayQueue() {
return QueueBuilder.durable("delay.queue")
.withArgument("x-message-ttl", 30000)
.withArgument("x-dead-letter-exchange", "order.exchange")
.withArgument("x-dead-letter-routing-key", "order.created")
.build();
}
@Bean
public CustomExchange delayExchange() {
Map<String, Object> args = new HashMap<>();
args.put("x-delayed-type", "direct");
return new CustomExchange("delay.exchange", "x-delayed-message", true, false, args);
}
}
@Service
public class DelayMessageService {
private final RabbitTemplate rabbitTemplate;
public void sendDelayMessage(Order order, int delayMillis) {
rabbitTemplate.convertAndSend(
"delay.exchange",
"delay.routing.key",
order,
message -> {
message.getMessageProperties().setDelay(delayMillis);
return message;
}
);
}
}3. Kafka
3.1 添加依赖
xml
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>3.2 配置 Kafka
yaml
spring:
kafka:
bootstrap-servers: localhost:9092
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
acks: all
retries: 3
batch-size: 16384
buffer-memory: 33554432
consumer:
group-id: my-group
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
auto-offset-reset: earliest
enable-auto-commit: false
max-poll-records: 100
listener:
ack-mode: manual
concurrency: 33.3 消息生产者
java
@Service
public class KafkaMessageProducer {
private final KafkaTemplate<String, Object> kafkaTemplate;
public void send(String topic, Object message) {
kafkaTemplate.send(topic, message);
}
public void send(String topic, String key, Object message) {
kafkaTemplate.send(topic, key, message);
}
public CompletableFuture<SendResult<String, Object>> sendAsync(String topic, Object message) {
return kafkaTemplate.send(topic, message);
}
public void sendWithCallback(String topic, Object message) {
CompletableFuture<SendResult<String, Object>> future = kafkaTemplate.send(topic, message);
future.whenComplete((result, ex) -> {
if (ex == null) {
log.info("发送成功: topic={}, partition={}, offset={}",
result.getRecordMetadata().topic(),
result.getRecordMetadata().partition(),
result.getRecordMetadata().offset());
} else {
log.error("发送失败", ex);
}
});
}
}3.4 消息消费者
java
@Component
public class KafkaMessageConsumer {
@KafkaListener(topics = "order-topic", groupId = "order-group")
public void consumeOrder(Order order,
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
@Header(KafkaHeaders.RECEIVED_PARTITION) int partition,
@Header(KafkaHeaders.OFFSET) long offset,
Acknowledgment acknowledgment) {
try {
log.info("收到消息: topic={}, partition={}, offset={}, message={}",
topic, partition, offset, order);
processOrder(order);
acknowledgment.acknowledge();
} catch (Exception e) {
log.error("处理消息失败", e);
}
}
@KafkaListener(topics = "order-topic", groupId = "order-group-batch")
public void consumeBatch(List<Order> orders, Acknowledgment acknowledgment) {
try {
log.info("批量收到 {} 条消息", orders.size());
for (Order order : orders) {
processOrder(order);
}
acknowledgment.acknowledge();
} catch (Exception e) {
log.error("批量处理失败", e);
}
}
private void processOrder(Order order) {
// 处理订单逻辑
}
}3.5 事务消息
java
@Configuration
public class KafkaTransactionConfig {
@Bean
public ProducerFactory<String, Object> producerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
config.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "order-transaction");
return new DefaultKafkaProducerFactory<>(config);
}
@Bean
public KafkaTransactionManager<String, Object> kafkaTransactionManager(
ProducerFactory<String, Object> producerFactory) {
return new KafkaTransactionManager<>(producerFactory);
}
}
@Service
public class TransactionalMessageService {
private final KafkaTemplate<String, Object> kafkaTemplate;
@Transactional
public void sendTransactionalMessage(Order order) {
kafkaTemplate.executeInTransaction(template -> {
template.send("order-topic", order);
template.send("notification-topic", new Notification(order));
return true;
});
}
}4. 消息可靠性
4.1 生产者确认
java
@Configuration
public class ProducerConfirmConfig {
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate template = new RabbitTemplate(connectionFactory);
template.setConfirmCallback((correlationData, ack, cause) -> {
if (ack) {
log.info("消息已确认: {}", correlationData);
} else {
log.error("消息未确认: {}, 原因: {}", correlationData, cause);
// 重试或记录日志
}
});
template.setReturnsCallback(returned -> {
log.error("消息被退回: {}, 响应码: {}, 响应信息: {}",
returned.getMessage(),
returned.getReplyCode(),
returned.getReplyText());
});
return template;
}
}4.2 消费者确认
java
@Component
public class ReliableConsumer {
@RabbitListener(queues = "order.queue")
public void handleOrder(Order order, Channel channel,
@Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {
try {
processOrder(order);
channel.basicAck(tag, false);
} catch (BusinessException e) {
log.warn("业务异常,拒绝消息: {}", e.getMessage());
channel.basicReject(tag, false);
} catch (Exception e) {
log.error("系统异常,重新入队", e);
channel.basicNack(tag, false, true);
}
}
}4.3 消息幂等性
java
@Component
public class IdempotentConsumer {
private final RedisTemplate<String, String> redisTemplate;
@RabbitListener(queues = "order.queue")
public void handleOrder(Order order, Channel channel,
@Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {
String messageId = order.getMessageId();
String key = "order:processed:" + messageId;
Boolean isNew = redisTemplate.opsForValue()
.setIfAbsent(key, "1", Duration.ofHours(24));
if (Boolean.FALSE.equals(isNew)) {
log.warn("消息已处理: {}", messageId);
channel.basicAck(tag, false);
return;
}
try {
processOrder(order);
channel.basicAck(tag, false);
} catch (Exception e) {
redisTemplate.delete(key);
channel.basicNack(tag, false, true);
}
}
}5. 消息监控
5.1 RabbitMQ 监控
java
@Configuration
public class RabbitMQMetricsConfig {
@Bean
public MeterBinder rabbitMQMetrics(ConnectionFactory connectionFactory) {
return new RabbitMQMetrics(connectionFactory, "myapp.rabbitmq");
}
}5.2 Kafka 监控
yaml
management:
endpoints:
web:
exposure:
include: health,metrics,prometheus
metrics:
tags:
application: ${spring.application.name}6. 小结
本章学习了 Spring Boot 消息队列的核心内容:
| 内容 | 要点 |
|---|---|
| RabbitMQ | 队列、交换机、路由键 |
| Kafka | Topic、Partition、Consumer Group |
| 消息生产 | 发送、确认、事务 |
| 消息消费 | 监听、确认、重试 |
| 延迟队列 | TTL、死信队列 |
| 消息可靠性 | 确认机制、幂等性 |
| 消息监控 | Micrometer 指标 |
下一章将学习 Spring Boot 测试。