RabbitMQ如何保证消息不丢失
消息传输中的三个过程
生产端到RabbitMQ、RabbitMQ到消费端、消费端消费掉消息
在这三个过程中,任意一过程都将可能导致消息传输处理失败
生产端到RabbitMQ
- 事务消息机制:会严重降低性能
- confirm消息确认机制:生产端投递消息一旦投递到RabbitMQ后,RabbitMQ就会发送一个确认消息给生产端
开启消息确认的方式:
channel.confirmSelect();// 开启发送方确认模式
异步监听确认和未确认的消息
channel.addConfirmListener(new ConfirmListener() {
//消息正确到达broker
@Override
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
System.out.println("已收到消息");
//做一些其他处理
}
//RabbitMQ因为自身内部错误导致消息丢失,就会发送一条nack消息
@Override
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
System.out.println("未确认消息,标识:" + deliveryTag);
//做一些其他处理,比如消息重发等
}
});
此时,无论RabbitMQ是否确认消息,生产端都可以感知到消息是否处理成功。
问题:但是如果RabbitMQ接收到消息,并进行处理,但还未来的及发送确认消息,或者生产端由于网络故障没有接收到消息,,生产端就不知道消息有没有发送成功?
- 消息持久化
消息持久化解决的是RabbitMQ突然挂掉了导致内存中消息丢失的情况。为了让重启后的RabbitMQ 重启后,能够恢复硬盘中的数据,我们需要将exchange、queue和message都进行持久化
//exchange持久化
//第三个参数true表示这个exchange持久化
channel.exchangeDeclare(EXCHANGE_NAME, "direct", true);
//queue持久化
//第二个参数true表示这个queue持久化
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
//message 持久化
//第三个参数MessageProperties.PERSISTENT_TEXT_PLAIN表示这条消息持久化
channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes(StandardCharsets.UTF_8));
问题:RabbitMQ接收到消息,还未来得及持久化到硬盘的时候,RabbitMQ挂了,消息丢失
* 消息入库
为了保证消息传输过程中,消息不被丢失,将发送消息前,先将消息保存到数据库,增加status状态字段、超时时间以及重发次数。其中状态字段表示当时消息传输是否传输成功。超时时间表示当超过一定时间后,重新发送消息。(可以通过定时任务,定时检测status为0,且时间大于超时时间的消息),重发次数表示最大的重发次数,如果发送一直失败,超过一定次数,就对做单独处理
问题:当目前为止,还存在一个问题,如果消息到了RabbitMQ,但是确认消息接收失败,这种,如果通过重复发送消息的方式可能导致消息的冗余,目前可想到的方式但不一定是最优的方案是,在消息体中,增加一个唯一ID,确定消息的唯一性。
消费端消息不丢失
消费端的话,可以认为有两种情况,消息传输过程中和消息处理过程中,导致消息处理失败。
如果消费端和生产端一样,通过ack机制确认,那么这种问题可以避免。默认RabbitMQ在消息发出后就立即删除这条消息,导致无论消费端是否接受到消息,是否处理完,消息都会丢失
所以,需要将自动ack机制改为手动ack机制
消费端手动确认消息
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
try {
//接收到消息,做处理
//手动确认
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
} catch (Exception e) {
//出错处理,这里可以让消息重回队列重新发送或直接丢弃消息
}
};
//第二个参数autoAck设为false表示关闭自动确认机制,需手动确认
channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> {});
autoAck设置为FALSE,对rabbitMQ而言,如果一直没有接收到消费端的确认信号,并且消费此消息的消费端已经断开或者宕机,RabbitMQ 会自动将该消息重新放入消息队列的头部,等地投递给下一个消息的消费者。
同样,消费端消息和生产的消息都需要保证幂等性。
参考资料:
引入RabbitMQ后,你如何保证全链路数据100%不丢失?