这些年遇到的RocketMQ消息消费超时/消费异常重试机制导致的重复消费问题(并发消费和顺序消费)源码分析

1年前 (2023) 程序员胖胖胖虎阿
148 0 0

一、前言

前一段时间在业务上遇到了一个MQ重复消费的问题,排查发现一个老哥在代码里写了个线程睡眠n分钟(n为客户控制)的逻辑(设计方案真是一言难尽…),导致必现消息重复消费的问题;于是接盘在业务上修改了设计方案、做了消息幂等处理。本着知其然知其所以然的原则,今天深入分析一下消息消费超时/失败是怎么重试的?

PS:RocketMQ版本4.8.0,本文中相关源码注释见GitHub中:RocketMQ:release-4.8.0。

二、源码分析

在RocketMQ源码分析pullMessage:Consumer是如何从broker拉取消息的?这篇文章我们介绍了Consumer如何从Broker拉取消息的、Consumer如何处理拉取到的消息;

其中在从Broker拉取消息成功之后,会进入到PullCallback#onSuccess()方法,当拉取到消息时,首先将消息全部放入到处理队列ProcessQueue中,然后通知消费消息服务consumeMessageService开始干活

1、入口

接着上面的来看,ConsumeMessageService#submitConsumeRequest()为Consumer开始真正开始消费消息的入口;
这些年遇到的RocketMQ消息消费超时/消费异常重试机制导致的重复消费问题(并发消费和顺序消费)源码分析

ConsumeMessageService是一个接口,它有两个实现:ConsumeMessageConcurrentlyServiceConsumeMessageOrderlyService,分别表示并发消费模式、顺序消费模式;因此下面我们从并发消费和顺序消费两部分分别研究消息消费超时/失败的重试机制
这些年遇到的RocketMQ消息消费超时/消费异常重试机制导致的重复消费问题(并发消费和顺序消费)源码分析

2、并发消费模式核心逻辑

我们接着ConsumeMessageConcurrentlyService#submitConsumeRequest()来看,并发消费模式下是如何处理消息消费请求的?

ConsumeMessageConcurrentlyService采用线程池的机制对消息进行分批并发消费,默认一个消息是一批;
这些年遇到的RocketMQ消息消费超时/消费异常重试机制导致的重复消费问题(并发消费和顺序消费)源码分析

1> Consumer端线程执行异常导致的消费异常重试

从上图我们可以看出,在线程池中执行线程任务时,如果失败,Consumer端会自己延时5s之后重试当前消息消费任务,见ConsumeMessageConcurrentlyService#submitConsumeRequestLater()方法;
这些年遇到的RocketMQ消息消费超时/消费异常重试机制导致的重复消费问题(并发消费和顺序消费)源码分析

ConsumeRequest

ConsumeRequest是ConsumeMessageConcurrentlyService的内部类,它作为一个线程任务,内部封装了消息消息请求的具体执行逻辑;
这些年遇到的RocketMQ消息消费超时/消费异常重试机制导致的重复消费问题(并发消费和顺序消费)源码分析
ConsumeRequest#run()方法主要做四个操作:

  1. 注册业务系统自定义的消费监听器,负责具体的消息消费;并设置消息的重试Topic;
  2. 执行消费监听器的consumeMessage()方法,进行真正的消费消息操作;
  3. 统计消息消费数据;
  4. 判断消息消息是否超时、出现异常,处理消息消费结果;
@Override
public void run() {
    if (this.processQueue.isDropped()) {
        log.info("the message queue not be able to consume, because it's dropped. group={} {}", ConsumeMessageConcurrentlyService.this.consumerGroup, this.messageQueue);
        return;
    }

    // todo 1、这里是执行消费前的钩子函数,也就是我们业务系统定义的消费监听器,负责具体消息的消费
    MessageListenerConcurrently listener = ConsumeMessageConcurrentlyService.this.messageListener;
    ConsumeConcurrentlyContext context = new ConsumeConcurrentlyContext(messageQueue);
    ConsumeConcurrentlyStatus status = null;
    // 设置消息的重试topic
    defaultMQPushConsumerImpl.resetRetryAndNamespace(msgs, defaultMQPushConsumer.getConsumerGroup());

    ConsumeMessageContext consumeMessageContext = null;
    // 如果消费者注册了消息消费者hook
    if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {
        consumeMessageContext = new ConsumeMessageContext();
        consumeMessageContext.setNamespace(defaultMQPushConsumer.getNamespace());
        consumeMessageContext.setConsumerGroup(defaultMQPushConsumer.getConsumerGroup());
        consumeMessageContext.setProps(new HashMap<String, String>());
        consumeMessageContext.setMq(messageQueue);
        consumeMessageContext.setMsgList(msgs);
        consumeMessageContext.setSuccess(false);
        // consumer消费前的钩子函数,类似于Spring中的BeanPostProcessor#postProcessBeforeInitialization()方法
        ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookBefore(consumeMessageContext);
    }

    long beginTimestamp = System.currentTimeMillis();
    boolean hasException = false;
    ConsumeReturnType returnType = ConsumeReturnType.SUCCESS;
    try {
        if (msgs != null && !msgs.isEmpty()) {
            for (MessageExt msg : msgs) {
                // 设置每条消息被消费的时间
                MessageAccessor.setConsumeStartTimeStamp(msg, String.valueOf(System.currentTimeMillis()));
            }
        }
        // 2、 开始消费消息
        status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);
    } catch (Throwable e) {
        log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}",
            RemotingHelper.exceptionSimpleDesc(e),
            ConsumeMessageConcurrentlyService.this.consumerGroup,
            msgs,
            messageQueue);
        hasException = true;
    }
    // 消息消费总耗时
    long consumeRT = System.currentTimeMillis() - beginTimestamp;
    // 根据是否出现异常等,判断处理结果
    if (null == status) {
        if (hasException) {
            returnType = ConsumeReturnType.EXCEPTION;
        } else {
            returnType = ConsumeReturnType.RETURNNULL;
        }
        // 消费超时,默认15分钟
    } else if (consumeRT >= defaultMQPushConsumer.getConsumeTimeout() * 60 * 1000) {
        returnType = ConsumeReturnType.TIME_OUT;
    } else if (ConsumeConcurrentlyStatus.RECONSUME_LATER == status) {
        returnType = ConsumeReturnType.FAILED;
    } else if (ConsumeConcurrentlyStatus.CONSUME_SUCCESS == status) {
        returnType = ConsumeReturnType.SUCCESS;
    }

    // 在钩子函数中放入消费结果
    if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {
        consumeMessageContext.getProps().put(MixAll.CONSUME_CONTEXT_TYPE, returnType.name());
    }

    if (null == status) {
        log.warn("consumeMessage return null, Group: {} Msgs: {} MQ: {}",
            ConsumeMessageConcurrentlyService.this.consumerGroup,
            msgs,
            messageQueue);
        status = ConsumeConcurrentlyStatus.RECONSUME_LATER;
    }

    // 执行后置的钩子函数
    if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {
        consumeMessageContext.setStatus(status.toString());
        consumeMessageContext.setSuccess(ConsumeConcurrentlyStatus.CONSUME_SUCCESS == status);
        // 类似于Spring中的BeanPostProcessor#postProcessAfterInitialization()方法
        ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookAfter(consumeMessageContext);
    }

    // 3、统计消息消费数据
    ConsumeMessageConcurrentlyService.this.getConsumerStatsManager()
        .incConsumeRT(ConsumeMessageConcurrentlyService.this.consumerGroup, messageQueue.getTopic(), consumeRT);

    if (!processQueue.isDropped()) {
        // 4、处理消费结果
        ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this);
    } else {
        log.warn("processQueue is dropped without process consume result. messageQueue={}, msgs={}", messageQueue, msgs);
    }
}

下面我开始本文的正题,在调用消息消费监听器后是如何处理消息消费超时 / 异常的?

2> 执行业务上自定义消费监听器导致的消费异常重试

如果在自定义消费监听器中执行业务逻辑出现异常,会将hasException属性设置为true,供后置的钩子函数ConsumeMessageHook使用;并且此时status字段为null,在后面的逻辑中如果发现status字段为null,则会将其设置为RECONSUME_LATER(稍后重新消费);最终在processConsumeResult()中再根据这个status处理消费结果;
这些年遇到的RocketMQ消息消费超时/消费异常重试机制导致的重复消费问题(并发消费和顺序消费)源码分析
processConsumeResult()方法中,会维护一个变量ackIndex表示当前消费请求中 第一个未ACK的消息 在msgs集合中的下标;如果消息全部消费成功,则ackIndex为msgsSize + 1;如果消息消费失败,则ackIndex为-1,表示该批消息需要全部重新消费,即将消息发送回Broker;如果发送消息回Broker失败,Consumer则延时5s后,重新执行当前消费请求。

这些年遇到的RocketMQ消息消费超时/消费异常重试机制导致的重复消费问题(并发消费和顺序消费)源码分析
发送回Broker的消息延时级别默认0;

消息延时级别从哪里获取的?

这些年遇到的RocketMQ消息消费超时/消费异常重试机制导致的重复消费问题(并发消费和顺序消费)源码分析

我们往上追,最终确定ConsumeConcurrentlyContext的来源为ConsumeRequest#run()方法的开头处,并且后续未对其delayLevelWhenNextConsume属性做任何修改;

这个我们从ConsumeConcurrentlyContext类中也可以看到,其setDelayLevelWhenNextConsume()方法未被使用;
这些年遇到的RocketMQ消息消费超时/消费异常重试机制导致的重复消费问题(并发消费和顺序消费)源码分析

Broker端SendMessageProcessor#asyncConsumerSendMsgBack()方法中会处理backMsg。虽然Consumer没有传延时级别,但Broker会默认将其延时级别设置为3,然后将消息先以延时消息的机制发送到延时队列(SCHEDULE_TOPIC_XXXX)中,最多只会重试16次(可配置);重试超过16次会将消息添加到死信队列(%DLQ%+消费组)中。待消息到达投递时间(到期)后,消息转存到重试队列(%RETRY%consumerGroup)中。

Consumer端此时再接收到的该消息本质上是源自"%RETRY%+消费组名称"主题,而不是原始的topic

这些年遇到的RocketMQ消息消费超时/消费异常重试机制导致的重复消费问题(并发消费和顺序消费)源码分析

Broker端如何处理的?见文章:深度剖析RocketMQ延时消息机制原理/源码;

3> 消费超时重试

从ConsumeRequest#run()方法中,我们看不到任何关于消费超时重试的处理,这里只会统计一个消费超时的状态;
这些年遇到的RocketMQ消息消费超时/消费异常重试机制导致的重复消费问题(并发消费和顺序消费)源码分析
可以发现消费超时阈值的获取方式:defaultMQPushConsumer.getConsumeTimeout(),我们看看这个方法还在哪里被用到了;点进去发现在ProcessQueue的cleanExpiredMsg()方法中有调用它,作为判断消息是否过期的阈值(也是并发消费模式下消息过期的阈值)。

这些年遇到的RocketMQ消息消费超时/消费异常重试机制导致的重复消费问题(并发消费和顺序消费)源码分析
ProcessQueue#cleanExpiredMsg()方法中如果判断消息已经过期,会将消息在本地缓存msgTreeMap中清除、并以延时消息(延时级别为3)的方式发送回Broker。
Consumer端此时再接收到的该消息本质上是源自"%RETRY%+消费组名称"主题,而不是开始的那个topic

Broker端如何处理的?见文章:深度剖析RocketMQ延时消息机制原理/源码;

对于消费超时的消息,首先会以延时消息的机制将其发送到延时队列(SCHEDULE_TOPIC_XXXX)中,待消息到达投递时间(到期)后,消息转存到重试队列(%RETRY%consumerGroup)中。这里我们可以注意到延时消息的方式会将消息又被存到CommitLog中 2 * n(重试次数)遍
这些年遇到的RocketMQ消息消费超时/消费异常重试机制导致的重复消费问题(并发消费和顺序消费)源码分析
这些年遇到的RocketMQ消息消费超时/消费异常重试机制导致的重复消费问题(并发消费和顺序消费)源码分析

从这里我们可以看到,延时级别是3的情况下,理应10s后消息才会从延时队列投递到重试队列中,然而现象确实经过了10ms就投递了;由于消息到期的时间是从ConsumeQueue中每条记录的后64位取的,下一篇我们就研究一下CommitLog中的数据如何同步到ConsumeQueue中的?

3、顺序消费模式核心逻辑

顺序消费模式和并发消费模式一样都存在消费异常重试的场景,但是由于顺序消费模式不会清理过期消息,所以不存在消费超时重试的场景。

1> 消费异常重试

与并发消费不同的是顺序消费的ConsumeRequest只针对ProcessQueue和MessageQueue,而不是针对消息。其获取消息的逻辑是直接从ProcessQueue中取,一次取consumeMessageBatchMaxSize个(默认一个)。

另外:run()方法中消息消费的逻辑与非顺序消费差不多,但其关键点在于消息的消费/获取的顺序性,所以就不可避免的引入锁机制(加锁范围是针对ProcessQueue,或者说是MessageQueue,所以说RocketMQ无法做到多MessageQueue的全局顺序消费)

这些年遇到的RocketMQ消息消费超时/消费异常重试机制导致的重复消费问题(并发消费和顺序消费)源码分析
出现异常的具体逻辑也基本和并发消费模式一样:

如果调用自定义消费监听器消费消息异常,则status状态为null,在后面的逻辑中如果status == null,则将SUSPEND_CURRENT_QUEUE_A_MOMENT赋值给status,进而在processConsumeResult()中处理消费结果。

这些年遇到的RocketMQ消息消费超时/消费异常重试机制导致的重复消费问题(并发消费和顺序消费)源码分析
在processConsumeResult()方法中,我们主要看非自动提交ACK的情况;

首先会检查是否达到最大重试消费次数(默认是Integer.MAX_VALUE);如果没有达到最大重试次数,默认延时1s后再次开始尝试消费消息。

这些年遇到的RocketMQ消息消费超时/消费异常重试机制导致的重复消费问题(并发消费和顺序消费)源码分析

1)checkReconsumeTimes()

看一下checkReconsumeTimes()方法是如何判断重试次数的?
这些年遇到的RocketMQ消息消费超时/消费异常重试机制导致的重复消费问题(并发消费和顺序消费)源码分析

1)submitConsumeRequestLater()

延时1s后,再次开启当前消费消息任务;
这些年遇到的RocketMQ消息消费超时/消费异常重试机制导致的重复消费问题(并发消费和顺序消费)源码分析

2> 顺序消费不存在超时消费机制

但是其不存在超时消费重试的概念,因为没有清理过期消息这个操作:
这些年遇到的RocketMQ消息消费超时/消费异常重试机制导致的重复消费问题(并发消费和顺序消费)源码分析
在Consumer启动的时候会启动消息消费服务consumeMessageService,对于并发消费模式而言是定期清理过期消息;而对于顺序消费而言则是定时向Broker申请加锁,以确保消息的顺序消费。
这些年遇到的RocketMQ消息消费超时/消费异常重试机制导致的重复消费问题(并发消费和顺序消费)源码分析

三、总结

1、并发消费模式

消费异常重试机制:

  • 出现异常的两种场景:执行消费请求异常出错、执行指定以消费监听器出错;
  • 出现异常之后会发送延时级别为0的消息到Broker,Broker端的SendMessageProcessor#asyncConsumerSendMsgBack()方法中遇到延时级别为0的消息会将其延时级别设置为(3 + 消费重试次数);
  • 然后将消息先以延时消息的机制发送到延时队列(SCHEDULE_TOPIC_XXXX)中,最多只会重试16次(可配置);重试超过16次会将消息添加到死信队列(%DLQ%+消费组)中。
  • 待消息到达投递时间(到期)后,消息转存到重试队列(%RETRY%consumerGroup)中。**
  • Consumer端此时再次接收到的该消息本质上是源自"%RETRY%+消费组名称"主题,而不是原始的topic

消费超时重试机制:

  • 主要体现在并发消费模式会周期性清理过期的消息,然后将其发送回Broker,后面的步骤和消费异常重试机制一样;
  • 最终当前消费者能再次消费到重试队列(%RETRY%+consumerGroup)中的消息。

2、顺序消费模式

顺序消费模式不存在消费超时重试的机制,对于消费异常重试的逻辑基本和并发消费一样,区别在于,顺序消费模式遇到异常,延时1s后重试(再次消费),重试次数默认为Integer.MAX_VALUE;

这里我们可以发现一个问题,如果一个消息处理的特别慢 或者说消费出现异常在一直重试,那么它后面的消息就会被阻塞;进而导致消息堆积的现象。

相关文章

暂无评论

暂无评论...