SpringBoot集成RabbitMq

2年前 (2022) 程序员胖胖胖虎阿
180 0 0

一、初识MQ

RabbitMQ是一个开源的消息代理和队列服务器,用来通过普通协议在完全不同的应用之间共享数据,RabbitMQ是使用Erlang语言来编写的,并且是基于AMQP协议的。

AMQP:Advanced Message Queuing Protocol,高级消息队列协议。是具有现代特征的二进制协议,是一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。

1、同步调用的问题

SpringBoot集成RabbitMq

2、异步调用方案

优势一:服务解耦
SpringBoot集成RabbitMq

优势二:性能提升,吞吐量提高
优势三:服务没有强依赖,不担心级联失败问题
优势四:流量消峰
SpringBoot集成RabbitMq
异步通讯的缺点 :

  • 依赖于Broker的可靠性,安全性,吞吐能力
  • 架构复杂了,业务没有明显的流程线,不好追踪管理

3、各种MQ的对比

SpringBoot集成RabbitMq

二、RabbitMQ入门

官网https://rabbitmq.com/

1、基本概念

SpringBoot集成RabbitMq
核心概念

1.1、Message

消息,消息是不具名的,它由消息头和消息体组成。消息体是不透明的,而消息头则由一系列的可选属性组成,这些属性包括routing-key (路由键)、priority (相对于其他消息的优先权)、delivery-mode (指出该消息可能需要持久性存储)等。

1.2、Publisher

消息的生产者,也是一个向交换器发布消息的客户端应用程序。

1.3、Exchange

交换器,用来接收生产者发送的消息并将这些消息路由给服务器中的队列。
Exchange有4种类型: direct(默认), fanout, topic,和headers,不同类型的Exchange转发消息的策略有所区别

1.4、Queue

消息队列,用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。一个消息可投入一一个或多个队列。消息一直在队列里面,等待消费者连接到这个队列将其取走。

1.5、Binding

绑定,用于消息队列和交换器之间的关联。一个绑定就是基 于路由键将交换器和消息队列连接起来的路由规则,所以可以将交换器理解成一个由绑定构成的路由表。
Exchange和Queue的绑定可以是多对多的关系。

1.6、Connection

网络连接,比如一个TCP连接。

1.7、Channel

信道,多路复用连接中的一条独立的双向数据流通道。信道是建立在真实的TCP连接内的虚拟连接,AMQP命令都是通过信道发出去的,不管是发布消息、订阅队列还是接收消息,这些动作都是通过信道完成。因为对于操作系统来说建立和销毁TCP都是非常昂贵的开销,所以引入了信道的概念,以复用一条TCP连接。

1.8、Consumer

消息的消费者,表示一个从消息队列中取得消息的客户端应用程序。

1.9、Virtual Host

虚拟主机,表示-一批交换器、消息队列和相关对象。虚拟主机是共享相同的身份认证和加密环境的独立服务器域。每个vhost本质上就是一个mini版的RabbitMQ服务器,拥有自己的队列、交换器、绑定和权限机制。vhost 是AMQP概念的基础,必须在连接时指定,
  RabbitMQ默认的vhost是/。

1.10、Broker

表示消息队列服务器实体

2、安装

安装参考:https://blog.csdn.net/Blueeyedboy521/article/details/124001883

2、常见消息模型

SpringBoot集成RabbitMq

2.1、基本消息类型SpringBoot集成RabbitMq

学习视频https://www.bilibili.com/video/BV1LQ4y127n4?p=67&spm_id_from=pageDriver
SpringBoot集成RabbitMq

三、SpringAMQP

1、什么是SpringAMQP

SpringBoot集成RabbitMq
官方文档:https://spring.io/projects/spring-amqp

2、简单示例

2.1、引入AMQP依赖

<!--AMQP依赖,包含RabbitMQ-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

2.2、配置文件

spring:
  rabbitmq:
    addresses: 192.168.100.120:5672,192.168.100.121:5672,192.168.100.122:5672
    username: admin
    password: admin
    #开启消息确认模式,新版本已经弃用
    #publisher-confirms: true
    #开启消息送达提示
    publisher-returns: true
    # springboot.rabbitmq.publisher-confirm 新版本已被弃用,现在使用 spring.rabbitmq.publisher-confirm-type = correlated 实现相同效果
    publisher-confirm-type: correlated
    virtual-host: /
    listener:
      type: simple
      simple:
        acknowledge-mode: auto #确认模式
        prefetch: 1 #限制每次发送一条数据。
        concurrency: 3 #同一个队列启动几个消费者
        max-concurrency: 3 #启动消费者最大数量
        #重试策略相关配置
        retry:
          # 开启消费者(程序出现异常)重试机制,默认开启并一直重试
          enabled: true
          # 最大重试次数
          max-attempts: 5
          # 重试间隔时间(毫秒)
          initial-interval: 3000

spring.rabbitmq.publisher-confirm-type新版发布确认属性有三种确认类型

/**
 * The type of publisher confirms to use.
 */
public enum ConfirmType {

	/**
	 * Use {@code RabbitTemplate#waitForConfirms()} (or {@code waitForConfirmsOrDie()}
	 * within scoped operations.
	 */
	SIMPLE,

	/**
	 * Use with {@code CorrelationData} to correlate confirmations with sent
	 * messsages.
	 */
	CORRELATED,

	/**
	 * Publisher confirms are disabled (default).
	 */
	NONE

}

NONE值是禁用发布确认模式,是默认值
CORRELATED值是发布消息成功到交换器后会触发回调方法,如1示例
SIMPLE值经测试有两种效果,其一效果和CORRELATED值一样会触发回调方法,其二在发布消息成功后使用rabbitTemplate调用waitForConfirms或waitForConfirmsOrDie方法等待broker节点返回发送结果,根据返回结果来判定下一步的逻辑,要注意的点是waitForConfirmsOrDie方法如果返回false则会关闭channel,则接下来无法发送消息到broker;
————————————————
版权声明:本文为CSDN博主「OkidoGreen」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/z69183787/article/details/109371628

2.3、发送消息

在publisher服务中新建一个测试类,编写测试方法
SpringBoot集成RabbitMq

2.4、消费消息

SpringBoot集成RabbitMq

3、WorkQueue工作队列

3.1、模拟WorkQueue实现一个队列绑定多个消费者

SpringBoot集成RabbitMq

3.2、消息发送

SpringBoot集成RabbitMq

3.3、消息接受

SpringBoot集成RabbitMq

3.4、消费预期机制prefetch

配置一次只能取一条,处理完才能取下一条

spring:
  rabbitmq:
    host: 192.168.100.120
    port: 5672
    username: admin
    password: admin
    #开启消息确认模式,新版本已经弃用
    #publisher-confirms: true
    #开启消息送达提示
    publisher-returns: true
    # springboot.rabbitmq.publisher-confirm 新版本已被弃用,现在使用 spring.rabbitmq.publisher-confirm-type = correlated 实现相同效果
    publisher-confirm-type: correlated
    virtual-host: /
    listener:
      simple:
        prefetch: 1 #限制每次发送一条数据。

4、发布和订阅模型

4.1、概念

SpringBoot集成RabbitMq

4.2、Fanout广播类型

SpringBoot集成RabbitMq

4.2.1、示例

SpringBoot集成RabbitMq

4.2.2、声明绑定

SpringBoot集成RabbitMq
SpringBoot集成RabbitMq
SpringBoot集成RabbitMq

4.2.3、消费者绑定

SpringBoot集成RabbitMq

4.2.4、生产者发送消息

SpringBoot集成RabbitMq

4.3、DriectExchage路由类型

通过routeKey可以实现Fanout广播类型
SpringBoot集成RabbitMq

4.3.1、示例

SpringBoot集成RabbitMq

4.3.2、申明Exchange、Queue

@Configuration
public class RabbitDirectConfig {

    @Bean
    public Queue directQueue(){
        // 参数介绍
        // 1.队列名 2.是否持久化 3.是否独占 4.自动删除 5.其他参数
        return new Queue("directQueue-One",true,false,false,null);
    }

    @Bean
    public Queue directQueue2(){
        // 参数介绍
        // 1.队列名 2.是否持久化 3.是否独占 4.自动删除 5.其他参数
        return new Queue("directQueue-Two",true,false,false,null);
    }

    @Bean
    public DirectExchange directExchange(){
        // 参数介绍
        // 1.交换器名 2.是否持久化 3.自动删除 4.其他参数
        return new DirectExchange("MqSendService-One",true,false,null);
    }

    @Bean
    public Binding bingExchange(){
        // 绑定队列
        return BindingBuilder.bind(directQueue2()) 
                // 队列绑定到哪个交换器
                .to(directExchange())
                // 绑定路由key,必须指定
                .with("One");
    }

    @Bean
    public Binding bingExchange2(){
		// 绑定队列
        return BindingBuilder.bind(directQueue2()) 
                // 队列绑定到哪个交换器
                .to(directExchange())
                // 绑定路由key,必须指定
                .with("Two");
    }
}

4.3.3、在consumer消费者服务监听

SpringBoot集成RabbitMq

4.3.4、总结SpringBoot集成RabbitMq

4.4、TopicExchange话题广播类型

SpringBoot集成RabbitMq

4.4.1、示例

SpringBoot集成RabbitMq

4.4.2、申明

@Configuration
public class RabbitTopicConfig {
    @Bean
    public Queue queue(){
        // 参数介绍
        // 1.队列名 2.是否持久化 3.是否独占 4.自动删除 5.其他参数
        return new Queue("simple.queue",true,false,false,null);
    }

    @Bean
    public TopicExchange topicExchange(){
        // 参数介绍
        // 1.交换器名 2.是否持久化 3.自动删除 4.其他参数
        return new TopicExchange("amq.topic",true,false,null);
    }

    @Bean
    public Binding bingExchange(){
        // 绑定队列
        return BindingBuilder.bind(queue())
                // 队列绑定到哪个交换器
                .to(topicExchange())
                // 绑定路由key,必须指定
                .with("simple.#");
    }
}

SpringBoot集成RabbitMqSpringBoot集成RabbitMq

4.4.3、生产者发送

SpringBoot集成RabbitMq

5、消息转换器

spring的消息对象处理是有org.springframework.amqp.support.converter.MessageConverter来处理的,而默认实现是SpringMessageConverter,基于JDK的ObjectOutputStream完成序列化。
如果需要修改只需要定义一个MessageConverter类型的bean即可,推荐用JSON方式序列化,步骤如下

5.1、引入jackson的依赖

<dependency>
    <groupId>com.fasterxml.jackson.dataformat</groupId>
    <artifactId>jackson-dataformat-xml</artifactId>
    <version>2.9.10</version>
</dependency>

5.2、申明Bean

@Bean
public MessageConverter jsonMessageConverter(){
	return new Jackson2JsonMessageConverter();
}

SpringBoot集成RabbitMq

版权声明:程序员胖胖胖虎阿 发表于 2022年9月27日 上午11:56。
转载请注明:SpringBoot集成RabbitMq | 胖虎的工具箱-编程导航

相关文章

暂无评论

暂无评论...