目录
💌 介绍
💒 使用场景
🏳🌈 模拟案例
📕 准备工作
🏴 写法一(死信队列TTL)
RabbitMQ配置文件
生产者
消费者
测试
🏴 写法二 (死信队列TTL)
RabbitMQ配置文件
生产者
消费者
测试
🚩 写法三 (插件版本-推荐)
插件安装
RabbitMQ配置文件
生产者
消费者
测试
👍 延迟队列方法推荐
💌 介绍
顾名思义:首先它要具有队列的特性,再给它附加一个延迟消费队列消息的功能,也就是说可以指定队列中的消息在哪个时间点被消费。
💒 使用场景
- 预支付订单创建成功后,30分钟后还没有支付,自动取消订单,修改订单状态
- 用户注册成功后,如果3天没有登录则进行短信提醒
- 优惠券过期前发送短信进行提醒
- ....
以上场景都可以用延时队列来完成
🏳🌈 模拟案例
需求:生产者发布消息,10秒、60秒后消费者拿到消息进行消费
📕 准备工作
导入RabbitMQ依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
配置RabbitMQ连接相关信息
#MySQL
spring:
rabbitmq:
host: 127.0.0.1
port: 5672
username: xxxx
password: xxx
server:
port: 8087
🏴 写法一(死信队列TTL)
生产者生产消息——>到交换机分发给对应的队列(A10秒过期,B60秒过期)——>过期后到死信交换机——>消费者进行消费(执行顺序如下图)
RabbitMQ配置文件
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
/**
* @author 小影
* @create: 2022/8/18 10:26
* @describe:mq配置 如示例图配置:2交换机、4队列、4路由key
*/
@Configuration
public class RabbitMQConfiguration {
// 延迟交换机
public static final String DELAY_EXCHANGE_NAME = "delay.exchange";
// 延迟队列
public static final String DELAY_QUEUE_NAME_A = "delay.queue.a";
public static final String DELAY_QUEUE_NAME_B = "delay.queue.b";
// 延迟队列路由key
public static final String DELAY_QUEUE_ROUTING_KEY_A = "delay.routingKey.a";
public static final String DELAY_QUEUE_ROUTING_KEY_B = "delay.routingKey.b";
// 死信交换机
public static final String DEAD_LETTER_EXCHANGE_NAME = "dead.letter.exchange";
// 死信队列
public static final String DEAD_LETTER_QUEUE_NAME_A = "dead.letter.queue.a";
public static final String DEAD_LETTER_QUEUE_NAME_B = "dead.letter.queue.b";
// 私信队列路由key
public static final String DEAD_LETTER_ROUTING_KEY_A = "dead.letter.delay_10s.routingkey.a";
public static final String DEAD_LETTER_ROUTING_KEY_B = "dead.letter.delay_60s.routingkey.b";
// 声明延迟交换机
@Bean("delayExchange")
public DirectExchange delayExchange() {
return new DirectExchange(DELAY_EXCHANGE_NAME);
}
// 声明死信交换机
@Bean("deadLetterExchange")
public DirectExchange deadLetterExchange() {
return new DirectExchange(DEAD_LETTER_EXCHANGE_NAME);
}
// 声明延迟队列A,延迟10s,并且绑定到对应的死信交换机
@Bean("delayQueueA")
public Queue delayQueueA() {
HashMap<String, Object> args = new HashMap<>();
// 声明队列绑定的死信交换机
args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE_NAME);
// 声明队列的属性路由key
args.put("x-dead-letter-routing-key", DEAD_LETTER_ROUTING_KEY_A);
// 声明队列的消息TTL存活时间
args.put("x-message-ttl", 10000);
return QueueBuilder.durable(DELAY_QUEUE_NAME_A).withArguments(args).build();
}
// 声明延迟队列B,延迟60s,并且绑定到对应的死信交换机
@Bean("delayQueueB")
public Queue delayQueueB() {
HashMap<String, Object> args = new HashMap<>();
// 声明队列绑定的死信交换机
args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE_NAME);
// 声明队列的属性路由key
args.put("x-dead-letter-routing-key", DEAD_LETTER_ROUTING_KEY_B);
// 声明队列的消息TTL存活时间
args.put("x-message-ttl", 60000);
return QueueBuilder.durable(DELAY_QUEUE_NAME_B).withArguments(args).build();
}
// 声明死信队列A,用于接收延迟10S的消息
@Bean("deadLetterQueueA")
public Queue deadLetterQueueA() {
return new Queue(DEAD_LETTER_QUEUE_NAME_A);
}
// 声明死信队列B,用于接收延迟60S的消息
@Bean("deadLetterQueueB")
public Queue deadLetterQueueB() {
return new Queue(DEAD_LETTER_QUEUE_NAME_B);
}
// 设置延迟队列A的绑定关系
@Bean
public Binding delayBindingA(@Qualifier("delayQueueA") Queue queue,
@Qualifier("delayExchange") DirectExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(DELAY_QUEUE_ROUTING_KEY_A);
}
// 设置延迟队列B的绑定关系
@Bean
public Binding delayBindingB(@Qualifier("delayQueueB") Queue queue,
@Qualifier("delayExchange") DirectExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(DELAY_QUEUE_ROUTING_KEY_B);
}
// 设置死信队列A的绑定关系
@Bean
public Binding deadLetterBindingA(@Qualifier("deadLetterQueueA") Queue queue,
@Qualifier("deadLetterExchange") DirectExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_ROUTING_KEY_A);
}
// 设置死信队列B的绑定关系
@Bean
public Binding deadLetterBindingB(@Qualifier("deadLetterQueueB") Queue queue,
@Qualifier("deadLetterExchange") DirectExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_ROUTING_KEY_B);
}
}
此配置文件的代码关系图如下
生产者
import static com.ying.demo.config.RabbitMQConfiguration.DELAY_EXCHANGE_NAME;
import static com.ying.demo.config.RabbitMQConfiguration.DELAY_QUEUE_ROUTING_KEY_A;
import static com.ying.demo.config.RabbitMQConfiguration.DELAY_QUEUE_ROUTING_KEY_B;
/**
* @author 小影
* @create: 2022/8/18 11:13
* @describe:延迟消息生产者
*/
@Component
public class DelayMessageProducer {
@Resource
private RabbitTemplate rabbitTemplate;
public void send(String message,int type) {
switch (type){
case 1: // 10s的消息
// param:队列名称、路由key、消息
rabbitTemplate.convertAndSend(DELAY_EXCHANGE_NAME, DELAY_QUEUE_ROUTING_KEY_A, message);
break;
case 2:// 60s的消息
rabbitTemplate.convertAndSend(DELAY_EXCHANGE_NAME, DELAY_QUEUE_ROUTING_KEY_B, message);
break;
}
}
}
消费者
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.time.LocalDateTime;
import static com.ying.demo.config.RabbitMQConfiguration.DEAD_LETTER_QUEUE_NAME_A;
import static com.ying.demo.config.RabbitMQConfiguration.DEAD_LETTER_QUEUE_NAME_B;
/**
* @author 小影
* @create: 2022/8/18 11:19
* @describe:死信消费者
*/
@Slf4j
@Component
public class DeadLetterQueueConsumer {
/**
* 监听私信队列A
* @param message
* @param channel 作手动回执、确认
*/
@RabbitListener(queues = DEAD_LETTER_QUEUE_NAME_A)
public void receiveA(Message message, Channel channel) {
String msg = new String(message.getBody());
log.info("当前时间:{},死信队列A收到消息:{}", LocalDateTime.now(),msg);
}
/**
* 监听私信队列B
* @param message
* @param channel 作手动回执、确认
*/
@RabbitListener(queues = DEAD_LETTER_QUEUE_NAME_B)
public void receiveB(Message message, Channel channel) {
String msg = new String(message.getBody());
log.info("当前时间:{},死信队列B收到消息:{}", LocalDateTime.now(),msg);
}
}
测试
@Slf4j
@RestController
@RequestMapping("rabbitmq")
public class RabbitMqController {
@Resource
private DelayMessageProducer producer;
@GetMapping("send")
public void send(String message, Integer type) {
log.info("当前时间:{},消息:{},延迟类型:{}", LocalDateTime.now(), message, Objects.requireNonNull(type));
producer.send(message, type);
}
}
分别请求:
http://localhost:8089/rabbitmq/send?message=我是10秒&type=1
http://localhost:8089/rabbitmq/send?message=我是60秒&type=2
如果出现异常:Channel shutdown: channel error; protocol method:#method(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'type' for exchange 'delay.exchange' in vhost '/': received ''x-delayed-message'' but current is 'direct', class-id=40, method-id=10
可能是mq已经存在交换机了先去删掉
弊端:后期要扩展其他不同延时的时间,就需要增加延时的配置,非常麻烦
🏴 写法二 (死信队列TTL)
生产者生产消息(并设置过期时间)——>到交换机分发给延迟队列——>过期后到死信交换机——>消费者进行消费(执行顺序如下图)
RabbitMQ配置文件
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
/**
* @author 小影
* @create: 2022/8/18 10:26
* @describe:mq配置 如示例图配置:2交换机、2队列、2路由key
*/
@Configuration
public class RabbitMQConfiguration {
// 延迟交换机
public static final String DELAY_EXCHANGE_NAME = "delay.exchange";
// 延迟队列
public static final String DELAY_QUEUE_NAME = "delay.queue";
// 延迟队列路由key
public static final String DELAY_QUEUE_ROUTING_KEY = "delay.routingKey";
// 死信交换机
public static final String DEAD_LETTER_EXCHANGE_NAME = "dead.letter.exchange";
// 死信队列
public static final String DEAD_LETTER_QUEUE_NAME = "dead.letter.queue";
// 私信队列路由key
public static final String DEAD_LETTER_ROUTING_KEY = "dead.letter.routingkey";
// 声明延迟交换机
@Bean("delayExchange")
public DirectExchange delayExchange() {
return new DirectExchange(DELAY_EXCHANGE_NAME);
}
// 声明死信交换机
@Bean("deadLetterExchange")
public DirectExchange deadLetterExchange() {
return new DirectExchange(DEAD_LETTER_EXCHANGE_NAME);
}
// 声明延迟队列,不设置存活时间,并且绑定到对应的死信交换机
@Bean("delayQueue")
public Queue delayQueue() {
HashMap<String, Object> args = new HashMap<>();
// 声明队列绑定的死信交换机
args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE_NAME);
// 声明队列的属性路由key
args.put("x-dead-letter-routing-key", DEAD_LETTER_ROUTING_KEY);
return QueueBuilder.durable(DELAY_QUEUE_NAME).withArguments(args).build();
}
// 声明死信队列
@Bean("deadLetterQueue")
public Queue deadLetterQueue() {
return new Queue(DEAD_LETTER_QUEUE_NAME);
}
// 设置延迟队列的绑定关系
@Bean
public Binding delayBinding(@Qualifier("delayQueue") Queue queue,
@Qualifier("delayExchange") DirectExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(DELAY_QUEUE_ROUTING_KEY);
}
// 设置死信队列的绑定关系
@Bean
public Binding deadLetterBinding(@Qualifier("deadLetterQueue") Queue queue,
@Qualifier("deadLetterExchange") DirectExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_ROUTING_KEY);
}
}
生产者
import static com.ying.demo.config.RabbitMQConfiguration.DELAY_EXCHANGE_NAME;
import static com.ying.demo.config.RabbitMQConfiguration.DELAY_QUEUE_ROUTING_KEY;
/**
* @author 小影
* @create: 2022/8/18 11:13
* @describe:延迟消息生产者
*/
@Component
public class DelayMessageProducer {
@Resource
private RabbitTemplate rabbitTemplate;
/**
*
* @param message 消息
* @param delayTime 存活时间
*/
public void send(String message,String delayTime) {
// param:延迟交换机,路由KEY,存活时间
rabbitTemplate.convertAndSend(DELAY_EXCHANGE_NAME, DELAY_QUEUE_ROUTING_KEY, message, msg -> {
msg.getMessageProperties().setExpiration(delayTime);
return msg;
});
}
}
消费者
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.time.LocalDateTime;
import static com.ying.demo.config.RabbitMQConfiguration.DEAD_LETTER_QUEUE_NAME;
/**
* @author 小影
* @create: 2022/8/18 11:19
* @describe:死信消费者
*/
@Slf4j
@Component
public class DeadLetterQueueConsumer {
/**
* 监听私信队列A
* @param message
* @param channel 作手动回执、确认
*/
@RabbitListener(queues = DEAD_LETTER_QUEUE_NAME)
public void receiveA(Message message, Channel channel) {
String msg = new String(message.getBody());
log.info("当前时间:{},死信队列收到消息:{}", LocalDateTime.now(),msg);
}
}
测试
@Slf4j
@RestController
@RequestMapping("rabbitmq")
public class RabbitMqController {
@Resource
private DelayMessageProducer producer;
@GetMapping("send")
public void send(String message, String delayTime) {
log.info("当前时间:{},消息:{},存活时间:{}", LocalDateTime.now(), message, delayTime);
producer.send(message, delayTime);
}
}
分别请求
http://localhost:8089/rabbitmq/send?message=我是60秒&delayTime=60000
http://localhost:8089/rabbitmq/send?message=我是10秒&delayTime=10000
弊端:由于是先进先出的,如果60秒进去了,10秒在进去,10秒结束了,他要等60秒结束,60秒出来10秒才能出来
🚩 写法三 (插件版本-推荐)
安装插件后会生成新的Exchange类型 x-delayed-message ,该类型消息支持延迟投递机制,接收消息后并未立即将消息投递至目标队列,而是存储在mnesia(一个分布式数据库)中,随后检测消息延迟时间,如达到投递时间讲其通过 x-delayed-type 类型标记的交换机投至目标队列。
插件安装
1.进入mq官网社区插件:Community Plugins — RabbitMQ
2.找到rabbitmq_delayed_message_exchange
选择对应版本的ez文件下载
Releases · rabbitmq/rabbitmq-delayed-message-exchange · GitHub
注:我的MQ是通过yum安装的
1.在系统中查看安装的rabbitmq
rpm -qa |grep rabbitmq
2.查询mq的安装的相关文件目录
rpm -ql rabbitmq-server-3.10.7-1.el8.noarch
翻到最下面发现mnesia的安装目录; mnesia=分布式数据库,看看就好
然后把我们下载的ez安装包解压放到 /usr/lib/rabbitmq/lib/rabbitmq_server-3.10.7/plugins 里面
3.重启RabbitMQ服务
systemctl restart rabbitmq-server.service
4.重启插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
RabbitMQ配置文件
/**
* @author 小影
* @create: 2022/8/18 10:26
* @describe:mq配置 如示例图配置:1交换机、1队列、1路由key
*/
@Configuration
public class RabbitMQConfiguration {
// 延迟交换机
public static final String DELAY_EXCHANGE_NAME = "delay.exchange";
// 延迟队列
public static final String DELAY_QUEUE_NAME = "delay.queue";
// 延迟队列路由key
public static final String DELAY_QUEUE_ROUTING_KEY = "delay.routingKey";
// 声明延迟交换机
@Bean("delayExchange")
public CustomExchange delayExchange() {
HashMap<String, Object> args = new HashMap<>();
args.put("x-delayed-type", "direct");
return new CustomExchange(DELAY_EXCHANGE_NAME,"x-delayed-message",true,false,args);
}
// 声明延迟队列
@Bean("delayQueue")
public Queue delayQueue() {
return new Queue(DELAY_QUEUE_NAME);
}
// 设置延迟队列的绑定关系
@Bean
public Binding delayBinding(@Qualifier("delayQueue") Queue queue,
@Qualifier("delayExchange") CustomExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(DELAY_QUEUE_ROUTING_KEY).noargs();
}
}
生产者
import static com.ying.demo.config.RabbitMQConfiguration.DELAY_EXCHANGE_NAME;
import static com.ying.demo.config.RabbitMQConfiguration.DELAY_QUEUE_ROUTING_KEY;
/**
* @author 小影
* @create: 2022/8/18 11:13
* @describe:延迟消息生产者
*/
@Component
public class DelayMessageProducer {
@Resource
private RabbitTemplate rabbitTemplate;
/**
*
* @param message 消息
* @param delayTime 存活时间
*/
public void send(String message,Integer delayTime) {
// param:延迟交换机,路由KEY,存活时间
rabbitTemplate.convertAndSend(DELAY_EXCHANGE_NAME, DELAY_QUEUE_ROUTING_KEY, message, msg -> {
msg.getMessageProperties().setDelay(delayTime);
return msg;
});
}
}
消费者
import static com.ying.demo.config.RabbitMQConfiguration.DELAY_QUEUE_NAME;
/**
* @author 小影
* @create: 2022/8/18 11:19
* @describe:消费者
*/
@Slf4j
@Component
public class DeadLetterQueueConsumer {
/*
* 监听私信队列
* @param message
* @param channel 作手动回执、确认
*/
@RabbitListener(queues = DELAY_QUEUE_NAME)
public void receiveA(Message message, Channel channel) {
String msg = new String(message.getBody());
log.info("当前时间:{},延迟队列收到消息:{}", LocalDateTime.now(),msg);
}
}
测试
@Slf4j
@RestController
@RequestMapping("rabbitmq")
public class RabbitMqController {
@Resource
private DelayMessageProducer producer;
@GetMapping("send")
public void send(String message, Integer delayTime) {
log.info("当前时间:{},消息:{},存活时间:{}", LocalDateTime.now(), message, delayTime);
producer.send(message, delayTime);
}
}
启动项目查看rabbitmq的可视化界面
如下图此时生成的交换机是x-delayed-message类型的
分别发送:
http://localhost:8089/rabbitmq/send?message=我是60秒&delayTime=60000
http://localhost:8089/rabbitmq/send?message=我是10秒&delayTime=10000
结局并不是60秒先被消费,完成了我们的意愿。
原理:
- 交换机里面有个数据库,生产者生产信息把这个信息放入数据库中
- 交换机里面的插件就会一直监听这个时间
- 时间到了把对应数据取出来,放入队列,让消费者进行消费
👍 延迟队列方法推荐
这是小编在开发学习使用和总结, 这中间或许也存在着不足,希望可以得到大家的理解和建议。如有侵权联系小编!