RabbitMQ详解
2065字约7分钟
2025-08-16
什么是RabbitMQ
RabbitMQ是一个开源的AMQP(Advanced Message Queuing Protocol)消息中间件,由Erlang语言开发。它提供了可靠的消息传递机制,支持多种消息模式,广泛应用于分布式系统中。
主要特性
- 可靠性:支持消息持久化、确认机制
- 灵活的路由:支持多种交换机类型和路由策略
- 集群支持:支持高可用和负载均衡
- 管理界面:提供友好的Web管理界面
- 多协议支持:支持AMQP、MQTT、STOMP等协议
核心概念
1. 生产者(Producer)
发送消息到RabbitMQ的应用程序。
2. 消费者(Consumer)
从RabbitMQ接收消息的应用程序。
3. 交换机(Exchange)
接收生产者发送的消息,并根据路由规则将消息分发到队列。
4. 队列(Queue)
存储消息的缓冲区,消费者从队列中获取消息。
5. 绑定(Binding)
定义交换机和队列之间的关系,指定消息如何从交换机路由到队列。
6. 路由键(Routing Key)
生产者发送消息时指定的键,交换机根据此键决定消息路由到哪个队列。
安装和配置
Docker安装(推荐)
# 拉取RabbitMQ镜像
docker pull rabbitmq:3-management
# 运行RabbitMQ容器
docker run -d --name rabbitmq \
-p 5672:5672 \
-p 15672:15672 \
-e RABBITMQ_DEFAULT_USER=admin \
-e RABBITMQ_DEFAULT_PASS=admin123 \
rabbitmq:3-management访问管理界面
- URL: http://localhost:15672
- 用户名: admin
- 密码: admin123
SpringBoot集成
1. 添加依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>2. 配置文件
spring:
rabbitmq:
host: localhost
port: 5672
username: admin
password: admin123
virtual-host: /
# 开启发送确认
publisher-confirm-type: correlated
# 开启发送失败退回
publisher-returns: true
# 开启手动确认
listener:
simple:
acknowledge-mode: manual
prefetch: 13. 配置类
@Configuration
public class RabbitMQConfig {
// 定义队列
@Bean
public Queue simpleQueue() {
return new Queue("simple.queue", true);
}
// 定义交换机
@Bean
public DirectExchange directExchange() {
return new DirectExchange("direct.exchange");
}
// 绑定队列和交换机
@Bean
public Binding binding(Queue simpleQueue, DirectExchange directExchange) {
return BindingBuilder.bind(simpleQueue)
.to(directExchange)
.with("simple.routing.key");
}
}消息模式详解
1. 简单模式(Simple Queue)
最简单的消息模式,一个生产者对应一个消费者。
生产者
@Component
public class SimpleProducer {
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendMessage(String message) {
rabbitTemplate.convertAndSend("simple.queue", message);
System.out.println("发送消息: " + message);
}
}消费者
@Component
public class SimpleConsumer {
@RabbitListener(queues = "simple.queue")
public void handleMessage(String message) {
System.out.println("收到消息: " + message);
// 处理业务逻辑
}
}2. 工作队列模式(Work Queue)
一个生产者对应多个消费者,实现负载均衡。
生产者
@Component
public class WorkQueueProducer {
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendMessage(String message) {
rabbitTemplate.convertAndSend("work.queue", message);
System.out.println("发送消息: " + message);
}
}消费者
@Component
public class WorkQueueConsumer {
@RabbitListener(queues = "work.queue")
public void handleMessage(String message) {
System.out.println("消费者1收到消息: " + message);
// 模拟处理时间
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@RabbitListener(queues = "work.queue")
public void handleMessage2(String message) {
System.out.println("消费者2收到消息: " + message);
// 模拟处理时间
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}3. 发布订阅模式(Publish/Subscribe)
一个生产者发送消息到交换机,交换机将消息广播到所有绑定的队列。
配置
@Configuration
public class FanoutConfig {
@Bean
public Queue fanoutQueue1() {
return new Queue("fanout.queue1", true);
}
@Bean
public Queue fanoutQueue2() {
return new Queue("fanout.queue2", true);
}
@Bean
public FanoutExchange fanoutExchange() {
return new FanoutExchange("fanout.exchange");
}
@Bean
public Binding binding1(Queue fanoutQueue1, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
}
@Bean
public Binding binding2(Queue fanoutQueue2, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);
}
}生产者
@Component
public class FanoutProducer {
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendMessage(String message) {
rabbitTemplate.convertAndSend("fanout.exchange", "", message);
System.out.println("发送消息: " + message);
}
}消费者
@Component
public class FanoutConsumer {
@RabbitListener(queues = "fanout.queue1")
public void handleMessage1(String message) {
System.out.println("队列1收到消息: " + message);
}
@RabbitListener(queues = "fanout.queue2")
public void handleMessage2(String message) {
System.out.println("队列2收到消息: " + message);
}
}4. 路由模式(Routing)
根据路由键将消息发送到指定的队列。
配置
@Configuration
public class RoutingConfig {
@Bean
public Queue routingQueue1() {
return new Queue("routing.queue1", true);
}
@Bean
public Queue routingQueue2() {
return new Queue("routing.queue2", true);
}
@Bean
public DirectExchange routingExchange() {
return new DirectExchange("routing.exchange");
}
@Bean
public Binding binding1(Queue routingQueue1, DirectExchange routingExchange) {
return BindingBuilder.bind(routingQueue1)
.to(routingExchange)
.with("routing.key1");
}
@Bean
public Binding binding2(Queue routingQueue2, DirectExchange routingExchange) {
return BindingBuilder.bind(routingQueue2)
.to(routingExchange)
.with("routing.key2");
}
}生产者
@Component
public class RoutingProducer {
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendMessage(String routingKey, String message) {
rabbitTemplate.convertAndSend("routing.exchange", routingKey, message);
System.out.println("发送消息到" + routingKey + ": " + message);
}
}5. 主题模式(Topic)
根据通配符路由键将消息发送到匹配的队列。
配置
@Configuration
public class TopicConfig {
@Bean
public Queue topicQueue1() {
return new Queue("topic.queue1", true);
}
@Bean
public Queue topicQueue2() {
return new Queue("topic.queue2", true);
}
@Bean
public TopicExchange topicExchange() {
return new TopicExchange("topic.exchange");
}
@Bean
public Binding binding1(Queue topicQueue1, TopicExchange topicExchange) {
return BindingBuilder.bind(topicQueue1)
.to(topicExchange)
.with("topic.*.key");
}
@Bean
public Binding binding2(Queue topicQueue2, TopicExchange topicExchange) {
return BindingBuilder.bind(topicQueue2)
.to(topicExchange)
.with("topic.#");
}
}生产者
@Component
public class TopicProducer {
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendMessage(String routingKey, String message) {
rabbitTemplate.convertAndSend("topic.exchange", routingKey, message);
System.out.println("发送消息到" + routingKey + ": " + message);
}
}高级特性
1. 消息确认机制
生产者确认
@Component
public class ConfirmProducer {
@Autowired
private RabbitTemplate rabbitTemplate;
@PostConstruct
public void init() {
// 设置确认回调
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
if (ack) {
System.out.println("消息发送成功");
} else {
System.out.println("消息发送失败: " + cause);
}
});
// 设置返回回调
rabbitTemplate.setReturnsCallback(returned -> {
System.out.println("消息被退回: " + returned);
});
}
public void sendMessage(String message) {
rabbitTemplate.convertAndSend("confirm.queue", message);
}
}消费者确认
@Component
public class ConfirmConsumer {
@RabbitListener(queues = "confirm.queue")
public void handleMessage(String message, Channel channel,
@Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) {
try {
System.out.println("收到消息: " + message);
// 处理业务逻辑
// 手动确认消息
channel.basicAck(deliveryTag, false);
} catch (Exception e) {
try {
// 拒绝消息,不重新入队
channel.basicNack(deliveryTag, false, false);
} catch (IOException ex) {
ex.printStackTrace();
}
}
}
}2. 消息持久化
@Configuration
public class PersistenceConfig {
@Bean
public Queue persistentQueue() {
return QueueBuilder.durable("persistent.queue")
.build();
}
@Bean
public Exchange persistentExchange() {
return ExchangeBuilder.directExchange("persistent.exchange")
.durable(true)
.build();
}
}3. 死信队列
@Configuration
public class DeadLetterConfig {
@Bean
public Queue deadLetterQueue() {
return QueueBuilder.durable("dead.letter.queue")
.build();
}
@Bean
public Queue normalQueue() {
return QueueBuilder.durable("normal.queue")
.deadLetterExchange("dead.letter.exchange")
.deadLetterRoutingKey("dead.letter.routing.key")
.build();
}
@Bean
public DirectExchange deadLetterExchange() {
return new DirectExchange("dead.letter.exchange");
}
@Bean
public Binding deadLetterBinding(Queue deadLetterQueue, DirectExchange deadLetterExchange) {
return BindingBuilder.bind(deadLetterQueue)
.to(deadLetterExchange)
.with("dead.letter.routing.key");
}
}4. 延迟队列
@Configuration
public class DelayConfig {
@Bean
public Queue delayQueue() {
return QueueBuilder.durable("delay.queue")
.withArgument("x-message-ttl", 10000) // 10秒过期
.deadLetterExchange("delay.exchange")
.deadLetterRoutingKey("delay.routing.key")
.build();
}
@Bean
public Queue processQueue() {
return QueueBuilder.durable("process.queue")
.build();
}
@Bean
public DirectExchange delayExchange() {
return new DirectExchange("delay.exchange");
}
@Bean
public Binding delayBinding(Queue processQueue, DirectExchange delayExchange) {
return BindingBuilder.bind(processQueue)
.to(delayExchange)
.with("delay.routing.key");
}
}最佳实践
1. 消息幂等性
@Component
public class IdempotentConsumer {
@Autowired
private RedisTemplate<String, String> redisTemplate;
@RabbitListener(queues = "idempotent.queue")
public void handleMessage(String message,
@Header(AmqpHeaders.MESSAGE_ID) String messageId) {
// 检查消息是否已处理
String key = "message:" + messageId;
if (Boolean.TRUE.equals(redisTemplate.hasKey(key))) {
System.out.println("消息已处理,跳过: " + messageId);
return;
}
// 处理业务逻辑
System.out.println("处理消息: " + message);
// 标记消息已处理
redisTemplate.opsForValue().set(key, "processed", Duration.ofHours(24));
}
}2. 消息重试机制
@Component
public class RetryConsumer {
@RabbitListener(queues = "retry.queue")
public void handleMessage(String message, Channel channel,
@Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag,
@Header(AmqpHeaders.REDELIVERED) boolean redelivered) {
try {
System.out.println("处理消息: " + message);
// 模拟业务处理
if (Math.random() < 0.3) {
throw new RuntimeException("模拟处理失败");
}
// 确认消息
channel.basicAck(deliveryTag, false);
} catch (Exception e) {
try {
if (redelivered) {
// 重试次数过多,拒绝消息
channel.basicNack(deliveryTag, false, false);
System.out.println("消息重试次数过多,拒绝: " + message);
} else {
// 重新入队重试
channel.basicNack(deliveryTag, false, true);
System.out.println("消息处理失败,重新入队: " + message);
}
} catch (IOException ex) {
ex.printStackTrace();
}
}
}
}3. 批量处理
@Component
public class BatchConsumer {
@RabbitListener(queues = "batch.queue", containerFactory = "batchListenerContainerFactory")
public void handleBatchMessage(List<String> messages) {
System.out.println("批量处理消息,数量: " + messages.size());
for (String message : messages) {
System.out.println("处理消息: " + message);
}
}
}
@Configuration
public class BatchConfig {
@Bean
public SimpleRabbitListenerContainerFactory batchListenerContainerFactory(
ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setBatchSize(10);
factory.setConsumerBatchEnabled(true);
return factory;
}
}常见问题
1. 消息丢失
- 开启生产者确认机制
- 开启消息持久化
- 使用事务或publisher-confirm
2. 消息重复
- 实现消息幂等性
- 使用消息ID去重
- 合理设置重试策略
3. 消息积压
- 增加消费者数量
- 优化消息处理逻辑
- 使用死信队列处理异常消息
4. 性能优化
- 合理设置预取数量
- 使用批量处理
- 开启消息压缩
- 合理设置队列长度
监控和管理
1. 启用管理插件
# 启用管理插件
rabbitmq-plugins enable rabbitmq_management
# 启用延迟消息插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange2. 健康检查
@Component
public class RabbitMQHealthIndicator implements HealthIndicator {
@Autowired
private RabbitTemplate rabbitTemplate;
@Override
public Health health() {
try {
rabbitTemplate.execute(channel -> {
channel.queueDeclarePassive("health.check");
return null;
});
return Health.up().build();
} catch (Exception e) {
return Health.down()
.withException(e)
.build();
}
}
}3. 指标监控
@Component
public class RabbitMQMetrics {
@Autowired
private RabbitTemplate rabbitTemplate;
@EventListener
public void handleMessageEvent(MessageEvent event) {
// 记录消息处理指标
System.out.println("消息事件: " + event);
}
}总结
RabbitMQ是一个功能强大的消息中间件,通过本教程的学习,您应该能够:
- 理解RabbitMQ的核心概念和工作原理
- 在SpringBoot中集成和使用RabbitMQ
- 实现各种消息模式
- 使用高级特性如消息确认、死信队列等
- 遵循最佳实践,避免常见问题
- 进行监控和性能优化
在实际项目中,建议根据业务需求选择合适的消息模式,合理配置参数,并做好监控和异常处理。
