RabbitMQ
RabbitMQ
安装地址 Docker安装RabbitMQ详细步骤 - 逊老头 - 博客园 (cnblogs.com)
学习地址 MQ的引言和不同MQ的特点_哔哩哔哩_bilibili
完美解决 RabbitMQ可视化界面Overview不显示折线图和队列不显示Messages
基础部分
simple简单模式

- 消息产生者将消息放入队列
- 消息的消费者(consumer) 监听(while) 消息队列,如果队列中有消息,就消费掉,消息被拿走后,自动从队列中删除(隐患 消息可能没有被消费者正确处理,已经从队列中消失了,造成消息的丢失)应用场景:聊天(中间有一个过度的服务器;p端,c端)
生产消息
//生产者
@Test
void send() throws IOException, TimeoutException {
// 创建MQ的连接工厂对象
ConnectionFactory connectionFactory = new ConnectionFactory();
// 设置连接mq主机
connectionFactory.setHost("127.0.0.1");
// 端口号
connectionFactory.setPort(5672);
// 设置连接哪一个虚拟主机
connectionFactory.setVirtualHost("/ems");
// 设置用户名和密码
connectionFactory.setUsername("ems");
connectionFactory.setPassword("ems");
// 获取连接对象
Connection connection = connectionFactory.newConnection();
// 获取连接中的通道
Channel channel = connection.createChannel();
// 通道绑定对应的消息队列
// s:队列名称 b:队列是否持久化 b1:是否独占队列(其他的连接不可用)
// b2: 是否消费结束后自动删除 map:
channel.queueDeclare("hello",false,false,false,null);
// 发布消息
// s: 交换机名称 s1:通道名称 b: MessageProperties.MINIMAL_PERSISTENT_BASIC( 重启消息也在 )
// 最后:内容
channel.basicPublish("","hello",null,"hello rabbit".getBytes());
channel.close();
connection.close();
}
消费消息
// 消费者
@Test
void getMessage() throws IOException, TimeoutException {
// 创建MQ的连接工厂对象
ConnectionFactory connectionFactory = new ConnectionFactory();
// 设置连接mq主机
connectionFactory.setHost("127.0.0.1");
// 端口号
connectionFactory.setPort(5672);
// 设置连接哪一个虚拟主机
connectionFactory.setVirtualHost("/ems");
// 设置用户名和密码
connectionFactory.setUsername("ems");
connectionFactory.setPassword("ems");
// 获取连接对象
Connection connection = connectionFactory.newConnection();
// 获取连接中的通道
Channel channel = connection.createChannel();
// 通道绑定对应的消息队列
// s:队列名称 b:队列是否持久化 b1:是否独占队列(其他的连接不可用)
// b2: 是否消费结束后自动删除 map:
channel.queueDeclare("hello",false,false,false,null);
// 消费消息
// s: 交换机名称 s1:开始消息的自动确认机制 最后:消费时的回调接口
channel.basicConsume("hello",true, new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("new String (body) = " + new String (body));
}
});
//不关闭 为了一直消费信息
// channel.close();
// connection.close();
}
实现工具类
public class RabbitMQConnection {
private static ConnectionFactory connectionFactory;
//静态代码块 只执行一次
static {
connectionFactory = new ConnectionFactory();
connectionFactory.setHost("127.0.0.1"); // 设置连接mq主机
connectionFactory.setPort(5672);// 端口号
connectionFactory.setVirtualHost("/ems");// 设置连接哪一个虚拟主机
connectionFactory.setUsername("ems");// 设置用户名和密码
connectionFactory.setPassword("ems");
}
// 获取 连接对象方法
public static Connection getConnection() throws IOException, TimeoutException {
// 获取连接对象
return connectionFactory.newConnection();
}
// 关闭通道和连接
public void closeChannelAndConnection(Connection con, Channel channel){
try {
if(con!=null)con.close();
if(channel!=null)channel.close();
} catch (IOException | TimeoutException e) {
e.printStackTrace();
}
}
}
任务模型

两个消费者 消费同一个队列 默认会平均分配内容 叫做循环
目前要求能者多劳!!!
- 设置通道一次只能消费一个消息
- 关闭消息的自动确认,开始手动确认消息
Consumer
public class consumer2 {
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = RabbitMQConnection.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("word",false,false,false,null);
channel.basicQos(1);// 每次消费一个
// false 关闭自动确定
channel.basicConsume("word",false, new DefaultConsumer(channel){
@SneakyThrows
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
Thread.sleep(1000);//模拟执行的漫
System.out.println("new String (body) = " + new String (body));
/**
* 参数
* - 手动确认的表示
* - false 是否开启多个消息同时确认
*/
channel.basicAck(envelope.getDeliveryTag(),false);
}
});
}
}
广播
流程如下:
- 有多个消费者
- 每个消费者有自己的队列
- 每个队列都绑定了交换机
- 生产者只能向交换机发送消息,再有交换机发给队列,生产者无权决定
- 交换机吧消息发给和自己绑定的队列
- 队列中的消费者都可以拿到消息
生产者
public class producer {
public static void main(String[] args) throws IOException, TimeoutException {
// 创建连接对象
Connection connection = RabbitMQConnection.getConnection();
Channel channel = connection.createChannel();
// 将通道声明到交换机 (交换机名称,类型)
channel.exchangeDeclare("one","fanout");
channel.basicPublish("one","",null,"hello world".getBytes());
channel.close();
connection.close();
}
}
消费者
public class consumerOne {
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = RabbitMQConnection.getConnection();
Channel channel = connection.createChannel();
// 通道绑定交换机
channel.exchangeDeclare("one","fanout");
// 临时队列
String queue = channel.queueDeclare().getQueue();
// 交换机和队列绑定
channel.queueBind(queue,"one","");
// 消费消息
channel.basicConsume(queue,true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("new String(body) = " + new String(body));
}
});
}
}
路由模型
直连

在fanout的模式下,一条消息,会被订阅的队列消费。但有时我们希望不同的消息被不同的队列消费,
此时就用到了 Direct类型的Exchange
- 队列与交换机绑定,不再是任意绑定了,而是要指定一个
RoutingKey
(路由key) - 消息 的发送方在向exchang发送消息时,也必须要指定一个
RoutingKey
- Exchange在发送消息时,会根据指定的RoutingKey进行判断,只有队列的
RoutingKey
与消息的RoutingKey
完全一致,才会收到消息
生产者
public class producer {
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = RabbitMQConnection.getConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare("name_direct","direct");
String RoutingKey = "warn";
channel.basicPublish("name_direct",
RoutingKey,null,
"这是direct发布于的".getBytes());
channel.close();
connection.close();
}
}
消费者
public class consumerOne {
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = RabbitMQConnection.getConnection();
Channel channel = connection.createChannel();
// 通道绑定交换机
channel.exchangeDeclare("name_direct","direct");
// 临时队列
String queue = channel.queueDeclare().getQueue();
// 交换机和队列绑定和 RoutingKey
channel.queueBind(queue,"name_direct","error");
channel.queueBind(queue,"name_direct","info");
channel.queueBind(queue,"name_direct","warn");
// 消费消息
channel.basicConsume(queue,true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("error " + new String(body));
}
});
}
}
public class consumerTwo {
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = RabbitMQConnection.getConnection();
Channel channel = connection.createChannel();
// 通道绑定交换机
channel.exchangeDeclare("name_direct","direct");
// 临时队列
String queue = channel.queueDeclare().getQueue();
// 交换机和队列绑定和 RoutingKey
channel.queueBind(queue,"name_direct","warn");
// 消费消息
channel.basicConsume(queue,true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("info " + new String(body));
}
});
}
}
动态

Topic类型的
Exchange
与Direct
相比,都是可以根据Routinghey
把消息路由到不同的队列。只不过Topic
类型Exchange可以让队列在绑定Routing key的时候使用通配符!这种模型Routingkey一般都是由一个或多个单词组成,多个单词之间以". "
分割,例如:item.insert
*
匹配不多不少恰好一个单词admin.*
-->admin.hh.abc
|admin.hh
#
匹配一个或多个单词admin.#
-->admin.hh
代码
public class producer {
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = RabbitMQConnection.getConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare("name_topic","topic");
String RoutingKey = "warn.js";
channel.basicPublish("name_direct",
RoutingKey,null,
"这是direct发布于的".getBytes());
channel.close();
connection.close();
}
}
public class consumerOne {
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = RabbitMQConnection.getConnection();
Channel channel = connection.createChannel();
// 通道绑定交换机
channel.exchangeDeclare("name_topic","topic");
// 临时队列
String queue = channel.queueDeclare().getQueue();
// 交换机和队列绑定和 RoutingKey
channel.queueBind(queue,"name_topic","error.*"); //error.xxx | error.aaa
channel.queueBind(queue,"name_topic","info.#");//info.gag.agsdga | info.gas.gas.g.asg.
channel.queueBind(queue,"name_topic","warn");
// 消费消息
channel.basicConsume(queue,true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("error " + new String(body));
}
});
}
}
spring boot 整合
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
spring:
rabbitmq:
host: 127.0.0.1
port: 5672
username: ems
password: ems
virtual-host: /ems
Test
@SpringBootTest
public class rabbitMQTest {
@Autowired
private RabbitTemplate rabbitTemplate;
// hello
@Test
void test(){
rabbitTemplate.convertAndSend("hello","hello world");
}
// work
@Test
void work(){
for (int i = 0; i < 10; i++) {
rabbitTemplate.convertAndSend("work","hello work");
}
}
//fanout
@Test
void fanout(){
rabbitTemplate.convertAndSend("fanout","","hello fanout");
}
// route
@Test
void direct(){
rabbitTemplate.convertAndSend("direct","warn","hello direct");
}
// 动态路由
@Test
void Topic(){
rabbitTemplate.convertAndSend("topic","user.hh.gash","hello topic");
}
}
简单
@Component //默认队列持久化 非独占 不是autodelete
@RabbitListener(queuesToDeclare = @Queue(value = "hello"))
public class RabbitMQ {
// 第一种模式
@RabbitHandler
public void recrive(String message){
System.out.println("message = " + message);
}
}
任务
@Component
public class workConsumer {
//公平消费
@RabbitListener(queuesToDeclare = @Queue("work"))
public void work1(String message){
System.out.println("message1 = " + message);
}
@RabbitListener(queuesToDeclare = @Queue("work"))
public void work2(String message){
System.out.println("message2 = " + message);
}
}
广播
@Component
public class fanoutConsumer {
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue,
exchange = @Exchange(value = "fanout",type = "fanout")
)
})
public void fanout1(String message){
System.out.println("message1 = " + message);
}
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue,
exchange = @Exchange(value = "fanout",type = "fanout")
)
})
public void fanout2(String message){
System.out.println("message2 = " + message);
}
}
静态路由
@Component
public class routeConsumer {
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue,
exchange = @Exchange(value = "direct",type = "direct"),
key = {"info","warn"}
)
})
public void route1(String message){
System.out.println("message1 = " + message);
}
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue,
exchange = @Exchange(value = "direct",type = "direct"),
key = {"warn","message"}
)
})
public void route2(String message){
System.out.println("message2 = " + message);
}
}
动态路由
@Component
public class topicConsumer {
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue,
exchange = @Exchange(value = "topic",type = "topic"),
key = {"user.*","user.#"}
)
})
public void topic1(String message){
System.out.println("message1 = " + message);
}
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue,
exchange = @Exchange(value = "topic",type = "topic"),
key = {"user.*","message"}
)
})
public void topic2(String message){
System.out.println("message2 = " + message);
}
}
高级
来自:https://www.bilibili.com/video/BV1S142197x7?p=101
必须尽可能确保MQ消息的可靠性,即:消息应该至少被消费者处理1次
那么问题来了:
- 我们该如何确保MQ消息的可靠性?
- 如果真的发送失败,有没有其它的兜底方案?
发送者的可靠性
首先,我们一起分析一下消息丢失的可能性有哪些。
消息从发送者发送消息,到消费者处理消息,需要经过的流程是这样的:
消息从生产者到消费者的每一步都可能导致消息丢失:
- 发送消息时丢失:
- 生产者发送消息时连接MQ失败
- 生产者发送消息到达MQ后未找到
Exchange
- 生产者发送消息到达MQ的
Exchange
后,未找到合适的Queue
- 消息到达MQ后,处理消息的进程发生异常
- MQ导致消息丢失:
- 消息到达MQ,保存到队列后,尚未消费就突然宕机
- 消费者处理消息时:
- 消息接收后尚未处理突然宕机
- 消息接收后处理过程中抛出异常
综上,我们要解决消息丢失问题,保证MQ的可靠性,就必须从3个方面入手:
- 确保生产者一定把消息发送到MQ
- 确保MQ不会将消息弄丢
- 确保消费者一定要处理消息
这一章我们先来看如何确保生产者一定能把消息发送到MQ。
- 生产者重试机制
首先第一种情况,就是生产者发送消息时,出现了网络故障,导致与MQ的连接中断。
为了解决这个问题,SpringAMQP提供的消息发送时的重试机制。即:当RabbitTemplate
与MQ连接超时后,多次重试。
修改publisher
模块的application.yaml
文件,添加下面的内容:
spring:
rabbitmq:
connection-timeout: 1s # 设置MQ的连接超时时间
template:
retry:
enabled: true # 开启超时重试机制
initial-interval: 1000ms # 失败后的初始等待时间
multiplier: 1 # 失败后下次的等待时长倍数,下次等待时长 = initial-interval * multiplier
max-attempts: 3 # 最大重试次数
我们利用命令停掉RabbitMQ服务:
docker stop mq
然后测试发送一条消息,会发现会每隔1秒重试1次,总共重试了3次。消息发送的超时重试机制配置成功了!
注意
注意:当网络不稳定的时候,利用重试机制可以有效提高消息发送的成功率。不过SpringAMQP提供的重试机制是阻塞式的重试,也就是说多次重试等待的过程中,当前线程是被阻塞的。
如果对于业务性能有要求,建议禁用重试机制。如果一定要使用,请合理配置等待时长和重试次数,当然也可以考虑使用异步线程来执行发送消息的代码。
- 生产者确认机制
一般情况下,只要生产者与MQ之间的网路连接顺畅,基本不会出现发送消息丢失的情况,因此大多数情况下我们无需考虑这种问题。
不过,在少数情况下,也会出现消息发送到MQ之后丢失的现象,比如:
- MQ内部处理消息的进程发生了异常
- 生产者发送消息到达MQ后未找到
Exchange
- 生产者发送消息到达MQ的
Exchange
后,未找到合适的Queue
,因此无法路由
针对上述情况,RabbitMQ提供了生产者消息确认机制,包括Publisher Confirm
和Publisher Return
两种。在开启确认机制的情况下,当生产者发送消息给MQ后,MQ会根据消息处理的情况返回不同的回执。
具体如图所示:

总结如下:
- 当消息投递到MQ,但是路由失败时,通过Publisher Return返回异常信息,同时返回ack的确认信息,代表投递成功
- 临时消息投递到了MQ,并且入队成功,返回ACK,告知投递成功
- 持久消息投递到了MQ,并且入队完成持久化,返回ACK ,告知投递成功
- 其它情况都会返回NACK,告知投递失败
其中ack
和nack
属于Publisher Confirm机制,ack
是投递成功;nack
是投递失败。而return
则属于Publisher Return机制。
默认两种机制都是关闭状态,需要通过配置文件来开启。
MQ的可靠性
消息到达MQ以后,如果MQ不能及时保存,也会导致消息丢失,所以MQ的可靠性也非常重要。
数据持久化

注意
说明:在开启持久化机制以后,如果同时还开启了生产者确认,那么MQ会在消息持久化以后才发送ACK回执,进一步确保消息的可靠性。
不过出于性能考虑,为了减少IO次数,发送到MQ的消息并不是逐条持久化到数据库的,而是每隔一段时间批量持久化。一般间隔在100毫秒左右,这就会导致ACK有一定的延迟,因此建议生产者确认全部采用异步方式。
LazyQueue
在默认情况下,RabbitMQ会将接收到的信息保存在内存中以降低消息收发的延迟。但在某些特殊情况下,这会导致消息积压,比如:
- 消费者宕机或出现网络故障
- 消息发送量激增,超过了消费者处理速度
- 消费者处理业务发生阻塞
一旦出现消息堆积问题,RabbitMQ的内存占用就会越来越高,直到触发内存预警上限。此时RabbitMQ会将内存消息刷到磁盘上,这个行为成为PageOut
. PageOut
会耗费一段时间,并且会阻塞队列进程。因此在这个过程中RabbitMQ不会再处理新的消息,生产者的所有请求都会被阻塞。
为了解决这个问题,从RabbitMQ的3.6.0版本开始,就增加了Lazy Queues的模式,也就是惰性队列。惰性队列的特征如下:
- 接收到消息后直接存入磁盘而非内存
- 消费者要消费消息时才会从磁盘中读取并加载到内存(也就是懒加载)
- 支持数百万条的消息存储
而在3.12版本之后,LazyQueue已经成为所有队列的默认格式。因此官方推荐升级MQ为3.12版本或者所有队列都设置为LazyQueue模式。
代码配置Lazy模式
在利用SpringAMQP声明队列的时候,添加x-queue-mod=lazy
参数也可设置队列为Lazy模式:
@Bean
public Queue lazyQueue(){
return QueueBuilder
.durable("lazy.queue")
.lazy() // 开启Lazy模式
.build();
}
当然,我们也可以基于注解来声明队列并设置为Lazy模式:
@RabbitListener(queuesToDeclare = @Queue(
name = "lazy.queue",
durable = "true",
arguments = @Argument(name = "x-queue-mode", value = "lazy")
))
public void listenLazyQueue(String msg){
log.info("接收到 lazy.queue的消息:{}", msg);
}
消费者的可靠性
消费者确认机制
为了确认消费者是否成功处理消息,RabbitMQ提供了消费者确认机制(Consumer Acknowledgement)。即:当消费者处理消息结束后,应该向RabbitMQ发送一个回执,告知RabbitMQ自己消息处理状态。回执有三种可选值:
- ack:成功处理消息,RabbitMQ从队列中删除该消息
- nack:消息处理失败,RabbitMQ需要再次投递消息
- reject:消息处理失败并拒绝该消息,RabbitMQ从队列中删除该消息
一般reject方式用的较少,除非是消息格式有问题,那就是开发问题了。因此大多数情况下我>们需要将消息处理的代码通过
try catch
机制捕获,消息处理成功时返回ack,处理失败时返>回nack.由于消息回执的处理代码比较统一,因此SpringAMQP帮我们实现了消息确认。并允许我们通过>配置文件设置ACK处理方式,有三种模式:
none
:不处理。即消息投递给消费者后立刻ack,消息会立刻从MQ删除。非常不安全,不建>议使用manual
:手动模式。需要自己在业务代码中调用api,发送ack
或reject
,存在业务入>侵,但更灵活auto
:自动模式。SpringAMQP利用AOP对我们的消息处理逻辑做了环绕增强,当业务正常>执行时则自动返回ack
. 当业务出现异常时,根据异常判断返回不同结果:
- 如果是业务异常,会自动返回
nack
;- 如果是消息处理或校验异常,自动返回
reject
;
通过下面的配置可以修改SpringAMQP的ACK处理方式:
spring:
rabbitmq:
listener:
simple:
acknowledge-mode: none # 不做处理
失败重试机制
当消费者出现异常后,消息会不断requeue(重入队)到队列,再重新发送给消费者。如果消费者再次执行依然出错,消息会再次requeue到队列,再次投递,直到消息处理成功为止。
极端情况就是消费者一直无法执行成功,那么消息requeue就会无限循环,导致mq的消息处理飙升,带来不必要的压力:
spring:
rabbitmq:
listener:
simple:
retry:
enabled: true # 开启消费者失败重试
initial-interval: 1000ms # 初识的失败等待时长为1秒
multiplier: 1 # 失败的等待时长倍数,下次等待时长 = multiplier * last-interval
max-attempts: 3 # 最大重试次数
stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false
重启consumer服务,重复之前的测试。可以发现:
- 消费者在失败后消息没有重新回到MQ无限重新投递,而是在本地重试了3次
- 本地重试3次以后,抛出了
AmqpRejectAndDontRequeueException
异常。查看RabbitMQ控制台,发现消息被删除了,说明最后SpringAMQP返回的是reject
结论:
- 开启本地重试时,消息处理过程中抛出异常,不会requeue到队列,而是在消费者本地重试
- 重试达到最大次数后,Spring会返回reject,消息会被丢弃
失败处理策略
在之前的测试中,本地测试达到最大重试次数后,消息会被丢弃。这在某些对于消息可靠性要求较高的业务场景下,显然不太合适了。
因此Spring允许我们自定义重试次数耗尽后的消息处理策略,这个策略是由MessageRecovery
接口来定义的,它有3个不同实现:
RejectAndDontRequeueRecoverer
:重试耗尽后,直接reject
,丢弃消息。默认就是这种方式ImmediateRequeueMessageRecoverer
:重试耗尽后,返回nack
,消息重新入队RepublishMessageRecoverer
:重试耗尽后,将失败消息投递到指定的交换机
比较优雅的一种处理方案是RepublishMessageRecoverer
,失败后将消息投递到一个指定的,专门存放异常消息的队列,后续由人工集中处理。
1)在consumer服务中定义处理失败消息的交换机和队列
@Bean
public DirectExchange errorMessageExchange(){
return new DirectExchange("error.direct");
}
@Bean
public Queue errorQueue(){
return new Queue("error.queue", true);
}
@Bean
public Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){
return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error");
}
2)定义一个RepublishMessageRecoverer,关联队列和交换机
@Bean
public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
}
完整代码如下:
package com.itheima.consumer.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.retry.MessageRecoverer;
import org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer;
import org.springframework.context.annotation.Bean;
@Configuration
@ConditionalOnProperty(name = "spring.rabbitmq.listener.simple.retry.enabled", havingValue = "true")
public class ErrorMessageConfig {
@Bean
public DirectExchange errorMessageExchange(){
return new DirectExchange("error.direct");
}
@Bean
public Queue errorQueue(){
return new Queue("error.queue", true);
}
@Bean
public Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){
return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error");
}
@Bean
public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
}
}
业务幂等性
提示
幂等是一个数学概念,用函数表达式来描述是这样的:f(x) = f(f(x))
,例如求绝对值函数。
在程序开发中,则是指同一个业务,执行一次或多次对业务状态的影响是一致的。例如:
- 根据id删除数据
- 查询数据
- 新增数据
但数据的更新往往不是幂等的,如果重复执行可能造成不一样的后果。比如:
- 取消订单,恢复库存的业务。如果多次恢复就会出现库存重复增加的情况
- 退款业务。重复退款对商家而言会有经济损失。
因此,我们必须想办法保证消息处理的幂等性。这里给出两种方案:
- 唯一消息ID
- 业务状态判断
- 唯一消息ID
- 每一条消息都生成一个唯一的id,与消息一起投递给消费者。
- 消费者接收到消息后处理自己的业务,业务处理成功后将消息ID保存到数据库
- 如果下次又收到相同消息,去数据库查询判断是否存在,存在则为重复消息放弃处理。
我们该如何给消息添加唯一ID呢?
其实很简单,SpringAMQP的MessageConverter自带了MessageID的功能,我们只要开启这个功能即可。
以Jackson的消息转换器为例:
@Bean
public MessageConverter messageConverter(){
// 1.定义消息转换器
Jackson2JsonMessageConverter jjmc = new Jackson2JsonMessageConverter();
// 2.配置自动创建消息id,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息
jjmc.setCreateMessageIds(true);
return jjmc;
}
- 业务判断
业务判断就是基于业务本身的逻辑或状态来判断是否是重复的请求或消息,不同的业务场景判断的思路也不一样。
例如我们当前案例中,处理消息的业务逻辑是把订单状态从未支付修改为已支付。因此我们就可以在执行业务时判断订单状态是否是未支付,如果不是则证明订单已经被处理过,无需重复处理。
相比较而言,消息ID的方案需要改造原有的数据库,所以我更推荐使用业务判断的方案。
以支付修改订单的业务为例,我们需要修改OrderServiceImpl
中的markOrderPaySuccess
方法:
@Override
public void markOrderPaySuccess(Long orderId) {
// 1.查询订单
Order old = getById(orderId);
// 2.判断订单状态
if (old == null || old.getStatus() != 1) {
// 订单不存在或者订单状态不是1,放弃处理
return;
}
// 3.尝试更新订单
Order order = new Order();
order.setId(orderId);
order.setStatus(2);
order.setPayTime(LocalDateTime.now());
updateById(order);
}
上述代码逻辑上符合了幂等判断的需求,但是由于判断和更新是两步动作,因此在极小概率下可能存在线程安全问题。
我们可以合并上述操作为这样:
@Override
public void markOrderPaySuccess(Long orderId) {
// UPDATE `order` SET status = ? , pay_time = ? WHERE id = ? AND status = 1
lambdaUpdate()
.set(Order::getStatus, 2)
.set(Order::getPayTime, LocalDateTime.now())
.eq(Order::getId, orderId)
.eq(Order::getStatus, 1)
.update();
}
注意看,上述代码等同于这样的SQL语句:
UPDATE `order` SET status = ? , pay_time = ? WHERE id = ? AND status = 1
我们在where条件中除了判断id以外,还加上了status必须为1的条件。如果条件不符(说明订单已支付),则SQL匹配不到数据,根本不会执行。
兜底方案
虽然我们利用各种机制尽可能增加了消息的可靠性,但也不好说能保证消息100%的可靠。万一真的MQ通知失败该怎么办呢?
有没有其它兜底方案,能够确保订单的支付状态一致呢?
其实思想很简单:既然MQ通知不一定发送到交易服务,那么交易服务就必须自己主动去查询支付状态。这样即便支付服务的MQ通知失败,我们依然能通过主动查询来保证订单状态的一致。
流程如下:
图中黄色线圈起来的部分就是MQ通知失败后的兜底处理方案,由交易服务自己主动去查询支付状态。
不过需要注意的是,交易服务并不知道用户会在什么时候支付,如果查询的时机不正确(比如查询的时候用户正在支付中),可能查询到的支付状态也不正确。
那么问题来了,我们到底该在什么时间主动查询支付状态呢?
这个时间是无法确定的,因此,通常我们采取的措施就是利用定时任务定期查询,例如每隔20秒就查询一次,并判断支付状态。如果发现订单已经支付,则立刻更新订单状态为已支付即可。
定时任务大家之前学习过,具体的实现这里就不再赘述了。
至此,消息可靠性的问题已经解决了。
综上,支付服务与交易服务之间的订单状态一致性是如何保证的?
- 首先,支付服务会正在用户支付成功以后利用MQ消息通知交易服务,完成订单状态同步。
- 其次,为了保证MQ消息的可靠性,我们采用了生产者确认机制、消费者确认、消费者失败重试等策略,确保消息投递的可靠性
- 最后,我们还在交易服务设置了定时任务,定期查询订单支付状态。这样即便MQ通知失败,还可以利用定时任务作为兜底方案,确保订单支付状态的最终一致性。
延迟队列
安装插件以及使用
config
@Configuration
public class TestDelayQueueConfig {
public static final String DEAD_EXCHANGE = "delay_exchange";
public static final String DEAD_QUEUE = "delay_queue";
public static final String DEAD_ROUTING = "delay_key";
@Bean
public CustomExchange delayExchange() {
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-delayed-type", "direct");
return new CustomExchange(DEAD_EXCHANGE, "x-delayed-message", true, false, args);
}
/**
* 延迟消息队列
* @return
*/
@Bean
public Queue delayQueue() {
return new Queue(DEAD_QUEUE, true);
}
@Bean
public Binding deplyBinding() {
return BindingBuilder
.bind(delayQueue())
.to(delayExchange())
.with(DEAD_ROUTING)
.noargs();
}
}
生产者
@Component
public class DeplyProducer {
@Resource
RabbitTemplate rabbitTemplate;
public void send(String msg, Integer delayTime){
// MessageProperties messageProperties = new MessageProperties();
// messageProperties.setDelay(delayTime);
// Message message = new Message(msg.getBytes(), messageProperties);
// rabbitTemplate.convertAndSend(DEAD_EXCHANGE, DEAD_ROUTING, message);
rabbitTemplate.convertAndSend(DEAD_EXCHANGE, DEAD_ROUTING, msg,
message1 -> {
message1.getMessageProperties().setDelay(delayTime);
return message1;}
);
}
}
消费者
@Component
public class DeplyConsumer {
@RabbitListener(queues = DEAD_QUEUE)
public void onMessage(String msg) {
System.out.println("收到信息了 msg = " + msg);
}
}