应用场景,消息可靠投递,消息丢失,消息重复消费,消息的幂等性,消息的顺序性,消息队列积压,延迟队列,消息过期失效,消息队列的高可用
使用消息队列有解耦,扩展性,削峰,异步等功能,市面上主流的几款mq,rabbitmq,rocketmq,kafka有各自的应用场景。kafka,有出色的吞吐量,比较强悍的性能,而且集群可以实现高可用,就是会丢数据,所以一般被用于日志分析和大数据采集。rabbitmq,消息可靠性比较高,支持六种工作模式,功能比较全面,但是由于吞吐量比较低,消息累积还会影响性能,加上erlang语言不好定制,所以一般使用于小规模的场景,大多数是中小企业用的比较多。rocketmq,高可用,高性能,高吞吐量,支持多种消息类型,比如同步,异步,顺序,广播,延迟,批量,过滤,事务等等消息,功能比较全面,只不过开源版本比不上商业版本的,加上开发这个中间件的大佬写的文档不多,文档不太全,这也是它的一个缺点,不过这个中间件可以作用于几乎全场景。
引入消息中间件也会带来很多问题,先说说消息丢失,生产者往消息队列发送消息,消息队列往消费者发送消息,会有丢消息的可能,消息队列也有可能丢消息,通常MQ存盘时都会先写入操作系统的缓存页中,然后再由操作系统异步的将消息写入硬盘,这个中间有个时间差,就可能会造成消息丢失,如果服务挂了,缓存中还没有来得及写入硬盘的消息就会发生消息丢失。不同的消息中间件对于消息丢失也有不同的解决方案,先说说最容易丢失消息的kafka吧。生产者发消息给Kafka Broker:消息写入Leader后,Follower是主动与Leader进行同步,然后发ack告诉生产者收到消息了,这个过程kafka提供了一个参数,request.required.acks属性来确认消息的生产,0表示不进行消息接收是否成功的确认,发生网络抖动消息丢了,生产者不校验ACK自然就不知道丢了。1表示当Leader接收成功时确认,只要Leader存活就可以保证不丢失,保证了吞吐量,但是如果leader挂了,恰好选了一个没有ACK的follower,那也丢了。-1或者all表示Leader和Follower都接收成功时确认,可以最大限度保证消息不丢失,但是吞吐量低,降低了kafka的性能。一般在不涉及金额的情况下,均衡考虑可以使用1,保证消息的发送和性能的一个平衡。Kafka Broker 消息同步和持久化:Kafka通过多分区多副本机制,可以最大限度保证数据不会丢失,如果数据已经写入系统缓存中,但是还没来得及刷入磁盘,这个时候机器宕机,或者没电了,那就丢消息了,当然这种情况很极端。Kafka Broker 将消息传递给消费者:如果消费这边配置的是自动提交,万一消费到数据还没处理完,就自动提交offset了,但是此时消费者直接宕机了,未处理完的数据丢失了,下次也消费不到了。所以为了避免这种情况,需要将配置改为,先消费处理数据,然后手动提交,这样消息处理失败,也不会提交成功,没有丢消息。
rabbitmq整个消息投递的路径是producer—>rabbitmq broker—>exchange—>queue—>consumer。
生产者将消息投递到Broker时产生confirm状态,会出现二种情况,ack:表示已经被Broker签收。nack:表示表示已经被Broker拒收,原因可能有队列满了,限流,IO异常等。生产者将消息投递到Broker,被Broker签收,但是没有对应的队列进行投递,将消息回退给生产者会产生return状态。这二种状态是rabbitmq提供的消息可靠投递机制,生产者开启确认模式和退回模式。使用rabbitTemplate.setConfirmCallback设置回调函数。当消息发送到exchange后回调confirm方法。在方法中判断ack,如果为true,则发送成功,如果为false,则发送失败,需要处理。使用rabbitTemplate.setReturnCallback设置退回函数,当消息从exchange路由到queue失败后,如果设置了rabbitTemplate.setMandatory(true)参数,则会将消息退回给producer。消费者在rabbit:listener-container标签中设置acknowledge属性,设置ack方式 none:自动确认,manual:手动确认。none自动确认模式很危险,当生产者发送多条消息,消费者接收到一条信息时,会自动认为当前发送的消息已经签收了,这个时候消费者进行业务处理时出现了异常情况,也会认为消息已经正常签收处理了,而队列里面显示都被消费掉了。所以真实开发都会改为手动签收,可以防止消息丢失。消费者如果在消费端没有出现异常,则调用channel.basicAck方法确认签收消息。消费者如果出现异常,则在catch中调用 basicNack或 basicReject,拒绝消息,让MQ重新发送消息。通过一系列的操作,可以保证消息的可靠投递以及防止消息丢失的情况。
然后说一下rocketmq,生产者使用事务消息机制保证消息零丢失,第一步就是确保Producer发送消息到了Broker这个过程不会丢消息。发送half消息给rocketmq,这个half消息是在生产者操作前发送的,对下游服务的消费者是不可见的。这个消息主要是确认RocketMQ的服务是否正常,通知RocketMQ,马上要发一个消息了,做好准备。half消息如果写入失败就认为MQ的服务是有问题的,这个时候就不能通知下游服务了,给生产者的操作加上一个状态标记,然后等待MQ服务正常后再进行补偿操作,等MQ服务正常后重新下单通知下游服务。然后执行本地事务,比如说下了个订单,把下单数据写入到mysql,返回本地事务状态给rocketmq,在这个过程中,如果写入数据库失败,可能是数据库崩了,需要等一段时间才能恢复,这个时候把订单一直标记为"新下单"的状态,订单的消息先缓存起来,比如Redis、文本或者其他方式,然后给RocketMQ返回一个未知状态,未知状态的事务状态回查是由RocketMQ的Broker主动发起的,RocketMQ过一段时间来回查事务状态,在回查事务状态的时候,再尝试把数据写入数据库,如果数据库这时候已经恢复了,继续后面的业务。而且即便这个时候half消息写入成功后RocketMQ挂了,只要存储的消息没有丢失,等RocketMQ恢复后,RocketMQ就会再次继续状态回查的流程。第二步就是确保Broker接收到的消息不会丢失,因为RocketMQ为了减少磁盘的IO,会先将消息写入到os缓存中,不是直接写入到磁盘里面,消费者从os缓存中获取消息,类似于从内存中获取消息,速度更快,过一段时间会由os线程异步的将消息刷入磁盘中,此时才算真正完成了消息的持久化。在这个过程中,如果消息还没有完成异步刷盘,RocketMQ中的Broker宕机的话,就会导致消息丢失。所以第二步,消息支持持久化到Commitlog里面,即使宕机后重启,未消费的消息也是可以加载出来的。把RocketMQ的刷盘方式 flushDiskType配置成同步刷盘,一旦同步刷盘返回成功,可以保证接收到的消息一定存储在本地的内存中。采用主从机构,集群部署,Leader中的数据在多个Follower中都存有备份,防止单点故障,同步复制可以保证即使Master 磁盘崩溃,消息仍然不会丢失。但是这里还会有一个问题,主从结构是只做数据备份,没有容灾功能的。也就是说当一个master节点挂了后,slave节点是无法切换成master节点继续提供服务的。所以在RocketMQ4.5以后的版本支持Dledge,DLedger是基于Raft协议选举Leader Broker的,当master节点挂了后,Dledger会接管Broker的CommitLog消息存储 ,在Raft协议中进行多台机器的Leader选举,发起一轮一轮的投票,通过多台机器互相投票选出来一个Leader,完成master节点往slave节点的消息同步。数据同步会通过两个阶段,一个是uncommitted阶段,一个是commited阶段。Leader Broker上的Dledger收到一条数据后,会标记为uncommitted状态,然后他通过自己的DledgerServer组件把这个uncommitted数据发给Follower Broker的DledgerServer组件。接着Follower Broker的DledgerServer收到uncommitted消息之后,必须返回一个ack给Leader Broker的Dledger。然后如果Leader Broker收到超过半数的Follower Broker返回的ack之后,就会把消息标记为committed状态。再接下来, Leader Broker上的DledgerServer就会发送committed消息给Follower Broker上的DledgerServer,让他们把消息也标记为committed状态。这样,就基于Raft协议完成了两阶段的数据同步。第三步,Cunmser确保拉取到的消息被成功消费,就需要消费者不要使用异步消费,有可能造成消息状态返回后消费者本地业务逻辑处理失败造成消息丢失的可能。用同步消费方式,消费者端先处理本地事务,然后再给MQ一个ACK响应,这时MQ就会修改Offset,将消息标记为已消费,不再往其他消费者推送消息,在Broker的这种重新推送机制下,消息是不会在传输过程中丢失的。
然后再说说消息重复消费的问题,第一种情况是发送时消息重复,当一条消息已被成功发送到服务端并完成持久化,此时出现了网络抖动或者客户端宕机,导致服务端对客户端应答失败。 如果此时生产者意识到消息发送失败并尝试再次发送消息,消费者后续会收到两条内容相同并且 Message ID 也相同的消息。第二种情况是投递时消息重复,消息消费的场景下,消息已投递到消费者并完成业务处理,当客户端给服务端反馈应答的时候网络闪断。 为了保证消息至少被消费一次,tMQ 的服务端将在网络恢复后再次尝试投递之前已被处理过的消息,消费者后续会收到两条内容相同并且 Message ID 也相同的消息。第三种情况是负载均衡时消息重复,比如网络抖动、Broker 重启以及订阅方应用重启,当MQ的Broker或客户端重启、扩容或缩容时,会触发Rebalance,此时消费者可能会收到重复消息。那么怎么解决消息重复消费的问题呢?就是对消息进行幂等性处理。在MQ中,是无法保证每个消息只被投递一次的,因为网络抖动或者客户端宕机等其他因素,基本都会配置重试机制,所以要在消费者端的业务上做消费幂等处理,MQ的每条消息都有一个唯一的MessageId,这个参数在多次投递的过程中是不会改变的,业务上可以用这个MessageId加上业务的唯一标识来作为判断幂等的关键依据,例如订单ID。而这个业务标识可以使用Message的Key来进行传递。消费者获取到消息后先根据id去查询redis/db是否存在该消息,如果不存在,则正常消费,消费完后写入redis/db。如果存在,则证明消息被消费过,直接丢弃。
接着说说消息顺序的问题,如果发送端配置了重试机制,mq不会等之前那条消息完全发送成功,才去发送下一条消息,这样可能会出现发送了1,2,3条消息,但是第1条超时了,后面两条发送成功,再重试发送第1条消息,这时消息在broker端的顺序就是2,3,1了。RocketMQ消息有序要保证最终消费到的消息是有序的,需要从Producer、Broker、Consumer三个步骤都保证消息有序才行。在发送者端:在默认情况下,消息发送者会采取Round Robin轮询方式把消息发送到不同的分区队列,而消费者消费的时候也从多个MessageQueue上拉取消息,这种情况下消息是不能保证顺序的。而只有当一组有序的消息发送到同一个MessageQueue上时,才能利用MessageQueue先进先出的特性保证这一组消息有序。而Broker中一个队列内的消息是可以保证有序的。在消费者端:消费者会从多个消息队列上去拿消息。这时虽然每个消息队列上的消息是有序的,但是多个队列之间的消息仍然是乱序的。消费者端要保证消息有序,就需要按队列一个一个来取消息,即取完一个队列的消息后,再去取下一个队列的消息。而给consumer注入的MessageListenerOrderly对象,在RocketMQ内部就会通过锁队列的方式保证消息是一个一个队列来取的。MessageListenerConcurrently这个消息监听器则不会锁队列,每次都是从多个Message中取一批数据,默认不超过32条。因此也无法保证消息有序。RocketMQ 在默认情况下不保证顺序,要保证全局顺序,需要把 Topic 的读写队列数设置为 1,然后生产者和消费者的并发设置也是 1,不能使用多线程。所以这样的话高并发,高吞吐量的功能完全用不上。全局有序就是无论发的是不是同一个分区,我都可以按照你生产的顺序来消费。分区有序就只针对发到同一个分区的消息可以顺序消费。kafka保证全链路消息顺序消费,需要从发送端开始,将所有有序消息发送到同一个分区,然后用一个消费者去消费,但是这种性能比较低,可以在消费者端接收到消息后将需要保证顺序消费的几条消费发到内存队列(可以搞多个),一个内存队列开启一个线程顺序处理消息。RabbitMq没有属性设置消息的顺序性,不过我们可以通过拆分为多个queue,每个queue由一个consumer消费。或者一个queue对应一个consumer,然后这个consumer内部用内存队列做排队,然后分发给底层不同的worker来处理,保证消息的顺序性。
然后再说说消息积压,线上有时因为发送方发送消息速度过快,或者消费方处理消息过慢,可能会导致broker积压大量未消费消息。消息数据格式变动或消费者程序有bug,导致消费者一直消费不成功,也可能导致broker积压大量未消费消息。解决方案可以修改消费端程序,让其将收到的消息快速转发到其他主题,可以设置很多分区,然后再启动多个消费者同时消费新主题的不同分区。可以将这些消费不成功的消息转发到其它队列里去,类似死信队列,后面再慢慢分析死信队列里的消息处理问题。另外在RocketMQ官网中,还分析了一个特殊情况,如果RocketMQ原本是采用的普通方式搭建主从架构,而现在想要中途改为使用Dledger高可用集群,这时候如果不想历史消息丢失,就需要先将消息进行对齐,也就是要消费者把所有的消息都消费完,再来切换主从架构。因为Dledger集群会接管RocketMQ原有的CommitLog日志,所以切换主从架构时,如果有消息没有消费完,这些消息是存在旧的CommitLog中的,就无法再进行消费了。这个场景下也是需要尽快的处理掉积压的消息。
然后说说延迟队列,消息被发送以后,并不想让消费者立刻获取,而是等待特定的时间后,消费者才能获取这个消息进行消费。例如10分钟,内完成订单支付,支付完成后才会通知下游服务进行进一步的营销补偿。往MQ发一个延迟1分钟的消息,消费到这个消息后去检查订单的支付状态,如果订单已经支付,就往下游发送下单的通知。而如果没有支付,就再发一个延迟1分钟的消息。最终在第10个消息时把订单回收,就不用对全部的订单表进行扫描,而只需要每次处理一个单独的订单消息。这个就是延迟对列的应用场景。rabbittmq,rocketmq都可以通过设置ttl来设置延迟时间,kafka则是可以在发送延时消息的时候,先把消息按照不同的延迟时间段发送到指定的队列中,比如topic_1s,topic_5s,topic_10s,topic_2h,然后通过定时器进行轮训消费这些topic,查看消息是否到期,如果到期就把这个消息发送到具体业务处理的topic中,队列中消息越靠前的到期时间越早,具体来说就是定时器在一次消费过程中,对消息的发送时间做判断,看下是否延迟到对应时间了,如果到了就转发,如果还没到这一次定时任务就可以提前结束了。
mq设置过期时间,就会有消息失效的情况,如果消息在队列里积压超过指定的过期时间,就会被mq给清理掉,这个时候数据就没了。解决方案也有手动写程序,将丢失的那批数据,一点点地查出来,然后重新插入到 mq 里面去。
最后再聊聊消息队列高可用问题,对于RocketMQ来说可以使用Dledger主从架构来保证消息队列的高可用,这个在上面也有提到过。然后在说说rabbitmq,它提供了一种叫镜像集群模式,在镜像集群模式下,你创建的 queue,无论元数据还是 queue 里的消息都会存在于多个实例上,就是说,每个 RabbitMQ 节点都有这个 queue 的一个完整镜像,包含 queue 的全部数据的意思。然后每次你写消息到 queue 的时候,都会自动把消息同步到多个实例的 queue 上。RabbitMQ 有很好的管理控制台,可以在后台新增一个策略,这个策略是镜像集群模式的策略,指定的时候是可以要求数据同步到所有节点的,也可以要求同步到指定数量的节点,再次创建 queue 的时候,应用这个策略,就会自动将数据同步到其他的节点上去了。只不过消息需要同步到所有机器上,导致网络带宽压力和消耗很重。最后再说说kafka,它是天然的分布式消息队列,在Kafka 0.8 以后,提供了副本机制,一个 topic要求指定partition数量,每个 partition的数据都会同步到其它机器上,形成自己的多个 replica 副本,所有 replica 会选举一个 leader 出来,其他 replica 就是 follower。写的时候,leader 会负责把数据同步到所有 follower 上去。如果某个 broker 宕机了,没事儿,那个 broker上面的 partition 在其他机器上都有副本的,如果这上面有某个 partition 的 leader,那么此时会从 follower 中重新选举一个新的 leader 出来。
整个图片,歇歇眼,文章大多不换行,排版基本都是一块的,七千四百字,口速快的话,一个小时差不多可以讲完,这篇博文主要是针对面试口述的,备战面试。