Redis实现消息队列

2年前 (2022) 程序员胖胖胖虎阿
211 0 0

在Redis中提供了三种实现消息队列的方式:

  1. List结构:基于List结构来模拟消息队列
  2. PubSub:基本的点对点消息模型
  3. Stream:较完善的消息队列模型

1. List实现消息队列

Redis的List数据结构类型是一个双向链表,而队列要求进,出口不能在同一个位置,所以可以利用List的添加取出命令来实现模拟消息队列。

  1. LPUSH,RPOP
  2. RPUSH,LPOP

但是java在消费消息的时候,如果没有消息了,消费者应该是阻塞等待,等到有消息投递了,再继续消费信息,而上述命令不是阻塞式的,如果没有消息了还在获取的话会获取到Null。所以应该实现阻塞的效果用下列命令

  1. BRPOP
  2. BLPOP

上述两个命令的取出效果是阻塞式的。

List实现消息队列的缺点:

  1. 无法避免消息丢失:例如消费者拿到消息还没有消费就宕机了
  2. 只能支持单个消费

2. 基于PubSub的消息队列

PubSub(发布订阅)是Redis2.0版本引入的消息传递模型。顾名思义,消费者可以订阅一个或多个channel,生产者向对应channel发送消息后,所有订阅者都能收到相关消息。

  1. SUBSCRIBE channel [channel] :订阅一个或多个频道
  2. PUBLISH channel msg :向一个频道发送消息
  3. PSUBSCRIBE pattern[pattern] :订阅与pattern格式匹配的所有频道

这里的PSUBSCRIBE与RabbitMQ的匹配相似。

基于PubSub的消息队列的缺点:

  1. List支持数据持久化,但是PubSub不支持数据持久化

3. 基于Stream的消息队列

Stream是Redis5.0引入的新的数据类型,可以实现一个功能较为完善的消息队列

添加命令
Redis实现消息队列

例如

XADD users * name jack age 21

users是队列,*表示消息id ,后面的部分表示消息体

消费命令

Redis实现消息队列
当ID为$时代表读取最新的消息。

例如

XREAD COUNT 1 STREAMS users 0

COUNT 1 代表每次只读取一条,STREAMS users 表示从users这个队列里读取

注意:Stream的消息队列消费消息后是不会剔除该消息的

缺点:当指定ID为$,代表读取最新的消息,如果在处理一条新消息的时候,突然来了5条消息,当再次读取最新消息时,只能读取到5条消息的最后一条,造成消息漏读的现象

Stream消息队列的优点:

  1. 消息可回溯(消费后不会被剔除)
  2. 消息可以被多个消费者读取
  3. 可以阻塞读取

3.1 消费者组

消费者组(Consumer Group):将多个消费者划分到一个组中,监听同一个队列。具备下列特点:

  1. 消息分流:队列中的消息会分流给组内的不同消费者,而不是重复消费,从而加快消息处理的速度,同一个消费者组里的消费者之间处于一种竞争的关系,消息是不会出现消费重复的,同时一定程度上也可以避免消息漏读的现象
  2. 消息标识:消费者组会维护一个标示,记录最后一个被处理的消息,哪怕消费者宕机重启,还会从标示之后读取消息。确保每一个消息都会被消费
  3. 消息确认:消费者获取消息后,消息处于pending状态,并存入一个pending-list。当处理完成后需要通过XACK来确认消息,标记消息为已处理,才会从pending-list移

如何创建消费者组?

XGROUP CREATE key groupName ID [MKSTREAM]
  1. key:队列名称
  2. groupName:消费者组名称
  3. ID:起始ID标识,$代表队列中最后一个消息,0代表队列中第一个消息
  4. MKSTREAM:队列不存在时自动创建

Redis实现消息队列

如何从消费者组读取消息?

XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key..] ID [ID..]
  1. group:消费者组名称
  2. consumer:消费者名称,如果消费者不存在,会自动创建一个消费者
  3. count:本次查询最大数量
  4. BLOCK milliseconds:是否阻塞?阻塞的时间
  5. NOACK:消费消息后不响应
  6. STREAMS key:指定队列名称
  7. ID:获取消息的起始ID >表示从下一个未消费的消息开始 。其它:根据指定id从pending-list中获取已消费但未确认的消息,例如0,是从pending-list中的第一个消息开始

那么消费者消费完消息后如何确认消息呢?

XACK key group ID [ID..]
  1. key:队列名称
  2. group:消费者组名称
  3. ID:消息的ID

java手动模拟消费者监听消息的代码

while(true){
            Object message = redis.call("XREADGROUP GROUP 你的消费组名称 消费者名称 COUNT 每次读取消息的数量 BLOCK 阻塞时间毫秒 STREAMS 队列名称 >");
            if (message == null){
                continue;
            }
            try{
                // 处理消息的逻辑 处理完毕后要ACK
                handleMessage(message);
            }catch (Exception e){
                while (true){
                    // 从等待响应的队列里拿消息
                    Object unAckMessage = redis.call("XREADGROUP GROUP 你的消费组名称 消费者名称 COUNT 每次读取消息的数量 BLOCK 阻塞时间毫秒 STREAMS 队列名称 >");
                    if (unAckMessage == null){
                        continue;
                    }
                    try {
                        handleMessage(unAckMessage);
                    }catch (Exception e1){
                        continue;
                    }
                }
            }
        }

版权声明:程序员胖胖胖虎阿 发表于 2022年9月21日 上午1:08。
转载请注明:Redis实现消息队列 | 胖虎的工具箱-编程导航

相关文章

暂无评论

暂无评论...