【外行也能看懂的RabbitMQ系列(四)】—— RabbitMQ进阶篇之通过插件实现延迟队列(内含实现代码及rabbitmq_delayed_message_exchange安装)

2年前 (2022) 程序员胖胖胖虎阿
418 0 0

【外行也能看懂的RabbitMQ系列(四)】—— RabbitMQ进阶篇之通过插件实现延迟队列(内含实现代码及rabbitmq_delayed_message_exchange安装)

系列文章目录

准备篇 RabbitMQ安装文档
第一章 RabbitMQ快速入门篇
第二章 RabbitMQ的Web管理界面详解
第三章 RabbitMQ进阶篇之死信队列
第四章 RabbitMQ进阶篇之通过插件实现延迟队列


文章目录

  • 系列文章目录
  • 前言
  • 一、什么是延时队列
  • 二、延时队列使用场景
  • 三、RabbitMQ中的TTL
  • 四、安装延时队列插件(rabbitmq_delayed_message_exchange)
  • 五、实现插件版的延时队列的实例
    • 5.1 新增场景
    • 5.2 调整需求
    • 5.3 根据新需求修改代码

前言

恭喜所有看到本篇文章的小伙伴,成功解锁了RabbitMQ系列之高级特性插件版
延迟队列的内容🎁通过本文,你将清楚的了解到:什么是延时队列?延时队列使用场景?如何安装安装延时队列插件(rabbitmq_delayed_message_exchange)?😄本文最后,小名将上一篇文章的实例做一些修改来实现新的效果😁

【外行也能看懂的RabbitMQ系列(四)】—— RabbitMQ进阶篇之通过插件实现延迟队列(内含实现代码及rabbitmq_delayed_message_exchange安装)


一、什么是延时队列

什么是延时队列?顾名思义:首先它是一种队列,再给它附加一个延迟消费队列消息的功能,也就是说延时队列中的元素是都是带时间属性的,可以指定队列中的消息在哪个时间点被消费。

简单来说,延时队列就是用来存放需要在指定时间被处理的元素的队列。

二、延时队列使用场景

我们常见的延时队列应用场景:
1、订单成功后,在30分钟内没有支付,自动取消订单并通知用户
2、用户注册成功后,如果三天内没有登陆则进行短信提醒。
3、电商平台新建商户一个月内还没上传商品信息,将冻结商铺等
4、预定会议后,需要在预定的时间点前十分钟通知各个与会人员参加会议。
2.1 分析上述场景
这些场景都是我们常见的,所以我们思考一下:

拿第一个场景来说,系统创建订单之后,需要取消所有超过30分钟没有支付的订单。拿有“重度选择恐惧症”的小名来说吧,也许在加购物车,去支付这些操作都还好好的,突然在付款界面看到价格后停住了,发现这东西我不是那么的需要,就放弃支付了,相信一天之内这样的小伙伴不在少数。比如:小名03:15放弃支付了,小红03:16放弃支付了,小刚03:15放弃支付了,小王04:45放弃支付了……我们如何让系统知道在03:45通知小名和小刚,3:46通知小红,05:15通知小王呢?

再如后面的几个场景:发生新用户注册事件,三天后检查新注册用户的活动数据,然后通知没有任何活动记录的用户;发生商户一个月内还没上传商品信息事件,则冻结该商户的商铺;发生预定会议事件,判断离会议开始是否只有十分钟了,如果是,则通知各个与会人员。

2.2 省时省力的解决方法
这几种场景,你是不是感觉使用定时任务,轮询所有数据,每秒查一次,取出需要被处理的数据,然后运行相应的业务代码处理就可以了?的确如果数据量比较少,这样做即省时又省力。

比如:“用户注册成功后,如果三天内没有登陆则进行短信提醒”这样对于时间不是严格限制的需求,
我们完全可以每天晚上跑个定时任务检查一下所有注册后三天没登陆的用户,这是的确一个可行的方案。

2.3 上述做法的缺点
如果数据量比较大,并且时效性较强的场景“30分钟内没有支付的订单,自动取消订单并通知用户”,在很短时间内,没有支付的订单数据可能多到惊人,如果是活动期间甚至会达到百万甚至千万级别,对于这么庞大的数据量依旧使用上述的轮询方式,显然是不可取的,很可能在一秒内无法完成所有订单的检查,同时会给数据库带来很大压力,无法满足业务要求而且性能低下。

2.4 使用延时队列的必要性
如果使用延时消息队列,我们在创建订单的同时将时间推迟30分钟放入消息中间件中,等时间一到再取出消费即可。

三、RabbitMQ中的TTL

上文中小名已经解释过了,这里呢,帮大家简单回忆一下

需求:

模拟用户商城购买商品时的两种情况:1. 成功下单,2. 超时提醒

  1. 用户下单
  2. 用户下单后展示等待付款页面
  3. 在页面上点击付款的按钮,如果不超时,则跳转到付款成功页面
  4. 如果超时,则给用户发送消息通知,询问用户尚未付款,是否还需要?

配置代码:

//订单最多存在10s
args.put("x-message-ttl", 10 * 1000);
//声明当前队列绑定的死信交换机
args.put("x-dead-letter-exchange", "ex.go.dlx");
//声明当前队列绑定的死信路由key
args.put("x-dead-letter-routing-key", "go.dlx");

Time To Live(TTL)
RabbitMQ可以针对队列设置x-message-ttl(对消息进行单独设置,每条消息TTL可以不同),来控制消息的生存时间,如果超时,则消息变为dead letter(死信),简单来说:就是如果设置了队列的TTL属性,那么一旦消息过期,就会被队列丢弃。

四、安装延时队列插件(rabbitmq_delayed_message_exchange)

由于是外网,可能下载速度有些慢,小名在这里帮大家准备好了安装包,大家可以直接下载使用

地址:

https://wwp.lanzouq.com/ifWwA007nwmf

密码:

eamon

第一步: 下载好先将文件解压到本地电脑上(网盘要求,无法上传无法识别的*.ez文件,小名压缩了一下上传的)
第二步: 上传到服务器的RabbitMQ的插件目录(/rabbitmq/plugins)中
第三步: 进入RabbitMQ的安装目录下的sbin目录,执行下面命令让该插件生效

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

第四步: 重启RabbitMQ

关闭服务:

rabbitmqctl stop

启动服务:

rabbitmq-server -detached

五、实现插件版的延时队列的实例

5.1 新增场景

假设上篇文章的方式只是在某app内信息推送,后续添加新需求,比如1分钟发邮件,1小时短信提醒等等,我们就需要创建很多的队列用来接收不同的消息,而且我们并不能保证这些订单的是按顺序提醒的( 即:有可能存在队列中”A单“提醒时间长于队列中”B单“的时间戳),这时我们就需要一个更通用的方式来发送此类消息,这里我们用到了上述的延时队列插件rabbitmq_delayed_message_exchange

5.2 调整需求

上一篇文章的需求:
模拟用户商城购买商品时的两种情况:1. 成功下单,2. 超时提醒

  1. 用户下单
  2. 用户下单后展示等待付款页面
  3. 在页面上点击付款的按钮,如果不超时,则跳转到付款成功页面
  4. 如果超过10s,则给用户发送系统消息通知,询问用户尚未付款,是否还需要?
    上文中小名已经实现了一个超时10s给用户发送消息的功能,接下来,我们对上篇文章的代码做如下

5.3 根据新需求修改代码

  1. 新增队列绑定
@Configuration
public class DelayedConfig {
    public static final String DELAYED_QUEUE_NAME = "q.delay.plugin";
    public static final String DELAYED_EXCHANGE_NAME = "ex.delay.plugin";
    public static final String DELAYED_ROUTING_KEY = "delay.plugin";

    @Bean
    public Queue queueDelay() {
        return new Queue(DELAYED_QUEUE_NAME);
    }

    @Bean
    public CustomExchange exchangeDelay() {
        Map<String, Object> args = new HashMap<>();
        args.put("x-delayed-type", "direct");
        return new CustomExchange(DELAYED_EXCHANGE_NAME, "x-delayed-message", true, false, args);
    }

    @Bean
    public Binding bindingDelayPlugin(@Qualifier("queueDelay") Queue queue,
                           @Qualifier("exchangeDelay") CustomExchange customExchange) {
        return BindingBuilder.bind(queue).to(customExchange).with(DELAYED_ROUTING_KEY).noargs();
    }

}
  1. 监听器做一些修改
    小名先说下需要修改的部分,翻遍大家对比,文末贴出完整版。

1)新增一个消费者

 //插件延迟队列,监听
    @RabbitListener(queues = DELAYED_QUEUE_NAME)
    public void receiveD(Message message, Channel channel) throws IOException {
        String msg = new String(message.getBody());
        System.out.println("【※】当前时间:"+new Date().toString()+",延时队列收到消息:"+msg);
    }

2)新增生产者

@RabbitListener(queues = "q.go.dlx")
    public void dlxListener(Message message, Channel channel) throws IOException {
            //省略……
            //未支付,1min后给用户发邮箱信息
            long t = System.currentTimeMillis();
            String delayOneMin  = String.valueOf(this.dateRoll(new Date(), Calendar.MINUTE, 1).getTime() - t);
            sendDelayMsgByPlugin(message.getBody()+"【邮箱消息】", delayOneMin);
            //未支付,1小时后给用户发短信
            String delayOneHour = String.valueOf(this.dateRoll(new Date(), Calendar.HOUR, 1).getTime() - t);
            sendDelayMsgByPlugin(message.getBody()+"【短信消息】", delayOneHour);

        }
}


public void sendDelayMsgByPlugin(String msg, String delayTime) {
    System.out.println("延迟时间"+delayTime);
    rabbitTemplate.convertAndSend(DELAYED_EXCHANGE_NAME, DELAYED_ROUTING_KEY, msg, a ->{
        a.getMessageProperties().setDelay(Integer.valueOf(delayTime));//60*1000和Integer.valueOf(delayTime)的区别
        return a;
    });
}

【完整版代码】

@Component
@Slf4j
public class MqListener {

    @Autowired
    IPracticeDlxOrderService iPracticeDlxOrderService;

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @RabbitListener(queues = "q.go.dlx")
    public void dlxListener(Message message, Channel channel) throws IOException {
        System.out.println("支付超时");
        Long id = Long.valueOf(new String(message.getBody(), "utf-8"));

        PracticeDlxOrder order = iPracticeDlxOrderService.lambdaQuery().eq(PracticeDlxOrder::getId, id).one();
        Boolean payStatue = order.getPay();
        //判断是否支付
        if (!payStatue) {//未支付,修改未超时
            UpdateWrapper<PracticeDlxOrder> dlxOrder = new UpdateWrapper<>();
            dlxOrder.eq("id", id);
            dlxOrder.set("timeout", 1);
            iPracticeDlxOrderService.update(dlxOrder);
            log.info("当前时间:{},收到请求,msg:{},delayTime:{}", new Date(), message, new Date().toString());
            //未支付,10后给用户发app信息
            sendDelayMsg(id);
            //未支付,1min后给用户发邮箱信息
            long t = System.currentTimeMillis();
            String delayOneMin  = String.valueOf(this.dateRoll(new Date(), Calendar.MINUTE, 1).getTime() - t);
            sendDelayMsgByPlugin(message.getBody()+"【邮箱消息】", delayOneMin);
            //未支付,1小时后给用户发短信
            String delayOneHour = String.valueOf(this.dateRoll(new Date(), Calendar.HOUR, 1).getTime() - t);
            sendDelayMsgByPlugin(message.getBody()+"【短信消息】", delayOneHour);
        }
    }

    public Date dateRoll(Date date, int i, int d) {
        // 获取Calendar对象并以传进来的时间为准
        Calendar calendar = Calendar.getInstance();
        calendar.setTime(date);
        // 将现在的时间滚动固定时长,转换为Date类型赋值
        calendar.add(i, d);
        // 转换为Date类型再赋值
        date = calendar.getTime();
        return date;
    }

    //死信队列监听
    @RabbitListener(queues = "q.delay")
    public void delayListener(Message message, Channel channel) throws IOException {
        System.out.println(new String(message.getBody()));
    }

    //插件延迟队列,监听
    @RabbitListener(queues = DELAYED_QUEUE_NAME)
    public void receiveD(Message message, Channel channel) throws IOException {
        String msg = new String(message.getBody());
        System.out.println("【※】当前时间:"+new Date().toString()+",延时队列收到消息:"+msg);

    }

    /**
     * 未支付,10s后给用户发信息
     */
    public void sendDelayMsg(Long id){
        rabbitTemplate.setMandatory(true);
        //id + 时间戳 全局唯一
        Date date = DateUtil.getDate(new Date(),1,10);
        CorrelationData correlationData = new CorrelationData(date.toString());

        //发送消息时指定 header 延迟时间
        rabbitTemplate.convertAndSend("ex.delay", "q.delay", "您的订单号:" + id + "尚未付款,是否还需要?",
                new MessagePostProcessor() {
                    @Override
                    public Message postProcessMessage(Message message) throws AmqpException {
                        //设置消息持久化
                        message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
                        message.getMessageProperties().setDelay(10*1000);
                        return message;
                    }
                }, correlationData);
    }

    /**
     *
     * @param msg
     * @param delayTime
     */
    public void sendDelayMsgByPlugin(String msg, String delayTime) {
        System.out.println("延迟时间"+delayTime);
        rabbitTemplate.convertAndSend(DELAYED_EXCHANGE_NAME, DELAYED_ROUTING_KEY, msg, a ->{
            a.getMessageProperties().setDelay(Integer.valueOf(delayTime));//60*1000和Integer.valueOf(delayTime)的区别
            return a;
        });
    }
}
  1. 运行结果:
支付超时
2022-02-17 17:28:10.650  INFO 18324 --- [ntContainer#2-1] e.mq.dlx.modules.listener.MqListener     : 当前时间:Thu Feb 17 17:28:10 CST 2022,收到请求,msg:(Body:'1494242214543482881' MessageProperties [headers={x-first-death-exchange=ex.go, x-death=[{reason=expired, count=1, exchange=ex.go, time=Thu Feb 17 17:28:08 CST 2022, routing-keys=[go], queue=q.go}], x-first-death-reason=expired, x-first-death-queue=q.go}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=ex.go.dlx, receivedRoutingKey=go.dlx, deliveryTag=1, consumerTag=amq.ctag-SasPqfbiS6-pt-e54uV5Hw, consumerQueue=q.go.dlx]),delayTime:Thu Feb 17 17:28:10 CST 2022
延迟时间60000
您的订单号:1494242214543482881尚未付款,是否还需要?
延迟时间3616616
2022-02-17 17:28:27.268  WARN 18324 --- [nectionFactory1] o.s.amqp.rabbit.core.RabbitTemplate      : Returned message but no callback available
【※】当前时间:Thu Feb 17 17:29:27 CST 2022,延时队列收到消息:[B@36c9cd1【邮箱消息】
【※】当前时间:Thu Feb 17 18:28:44 CST 2022,延时队列收到消息:[B@36c9cd1【短信消息】

相关文章

暂无评论

暂无评论...