在Redis中提供了三种实现消息队列的方式:
- List结构:基于List结构来模拟消息队列
- PubSub:基本的点对点消息模型
- Stream:较完善的消息队列模型
1. List实现消息队列
Redis的List数据结构类型是一个双向链表,而队列要求进,出口不能在同一个位置,所以可以利用List的添加取出命令来实现模拟消息队列。
- LPUSH,RPOP
- RPUSH,LPOP
但是java在消费消息的时候,如果没有消息了,消费者应该是阻塞等待,等到有消息投递了,再继续消费信息,而上述命令不是阻塞式的,如果没有消息了还在获取的话会获取到Null。所以应该实现阻塞的效果用下列命令
- BRPOP
- BLPOP
上述两个命令的取出效果是阻塞式的。
List实现消息队列的缺点:
- 无法避免消息丢失:例如消费者拿到消息还没有消费就宕机了
- 只能支持单个消费
2. 基于PubSub的消息队列
PubSub(发布订阅)是Redis2.0版本引入的消息传递模型。顾名思义,消费者可以订阅一个或多个channel,生产者向对应channel发送消息后,所有订阅者都能收到相关消息。
- SUBSCRIBE channel [channel] :订阅一个或多个频道
- PUBLISH channel msg :向一个频道发送消息
- PSUBSCRIBE pattern[pattern] :订阅与pattern格式匹配的所有频道
这里的PSUBSCRIBE与RabbitMQ的匹配相似。
基于PubSub的消息队列的缺点:
- List支持数据持久化,但是PubSub不支持数据持久化
3. 基于Stream的消息队列
Stream是Redis5.0引入的新的数据类型,可以实现一个功能较为完善的消息队列
添加命令
例如
XADD users * name jack age 21
users是队列,*表示消息id ,后面的部分表示消息体
消费命令
当ID为$时代表读取最新的消息。
例如
XREAD COUNT 1 STREAMS users 0
COUNT 1 代表每次只读取一条,STREAMS users 表示从users这个队列里读取
注意:Stream的消息队列消费消息后是不会剔除该消息的
缺点:当指定ID为$,代表读取最新的消息,如果在处理一条新消息的时候,突然来了5条消息,当再次读取最新消息时,只能读取到5条消息的最后一条,造成消息漏读的现象
Stream消息队列的优点:
- 消息可回溯(消费后不会被剔除)
- 消息可以被多个消费者读取
- 可以阻塞读取
3.1 消费者组
消费者组(Consumer Group):将多个消费者划分到一个组中,监听同一个队列。具备下列特点:
- 消息分流:队列中的消息会分流给组内的不同消费者,而不是重复消费,从而加快消息处理的速度,同一个消费者组里的消费者之间处于一种竞争的关系,消息是不会出现消费重复的,同时一定程度上也可以避免消息漏读的现象
- 消息标识:消费者组会维护一个标示,记录最后一个被处理的消息,哪怕消费者宕机重启,还会从标示之后读取消息。确保每一个消息都会被消费
- 消息确认:消费者获取消息后,消息处于pending状态,并存入一个pending-list。当处理完成后需要通过XACK来确认消息,标记消息为已处理,才会从pending-list移
如何创建消费者组?
XGROUP CREATE key groupName ID [MKSTREAM]
- key:队列名称
- groupName:消费者组名称
- ID:起始ID标识,$代表队列中最后一个消息,0代表队列中第一个消息
- MKSTREAM:队列不存在时自动创建
如何从消费者组读取消息?
XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key..] ID [ID..]
- group:消费者组名称
- consumer:消费者名称,如果消费者不存在,会自动创建一个消费者
- count:本次查询最大数量
- BLOCK milliseconds:是否阻塞?阻塞的时间
- NOACK:消费消息后不响应
- STREAMS key:指定队列名称
- ID:获取消息的起始ID >表示从下一个未消费的消息开始 。其它:根据指定id从pending-list中获取已消费但未确认的消息,例如0,是从pending-list中的第一个消息开始
那么消费者消费完消息后如何确认消息呢?
XACK key group ID [ID..]
- key:队列名称
- group:消费者组名称
- ID:消息的ID
java手动模拟消费者监听消息的代码
while(true){
Object message = redis.call("XREADGROUP GROUP 你的消费组名称 消费者名称 COUNT 每次读取消息的数量 BLOCK 阻塞时间毫秒 STREAMS 队列名称 >");
if (message == null){
continue;
}
try{
// 处理消息的逻辑 处理完毕后要ACK
handleMessage(message);
}catch (Exception e){
while (true){
// 从等待响应的队列里拿消息
Object unAckMessage = redis.call("XREADGROUP GROUP 你的消费组名称 消费者名称 COUNT 每次读取消息的数量 BLOCK 阻塞时间毫秒 STREAMS 队列名称 >");
if (unAckMessage == null){
continue;
}
try {
handleMessage(unAckMessage);
}catch (Exception e1){
continue;
}
}
}
}
相关文章
暂无评论...