一、初识MQ
RabbitMQ是一个开源的消息代理和队列服务器,用来通过普通协议在完全不同的应用之间共享数据,RabbitMQ是使用Erlang语言来编写的,并且是基于AMQP协议的。
AMQP:Advanced Message Queuing Protocol,高级消息队列协议。是具有现代特征的二进制协议,是一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。
1、同步调用的问题
2、异步调用方案
优势一:服务解耦
优势二:性能提升,吞吐量提高
优势三:服务没有强依赖,不担心级联失败问题
优势四:流量消峰
异步通讯的缺点 :
- 依赖于Broker的可靠性,安全性,吞吐能力
- 架构复杂了,业务没有明显的流程线,不好追踪管理
3、各种MQ的对比
二、RabbitMQ入门
官网https://rabbitmq.com/
1、基本概念
核心概念
1.1、Message
消息,消息是不具名的,它由消息头和消息体组成。消息体是不透明的,而消息头则由一系列的可选属性组成,这些属性包括routing-key (路由键)、priority (相对于其他消息的优先权)、delivery-mode (指出该消息可能需要持久性存储)等。
1.2、Publisher
消息的生产者,也是一个向交换器发布消息的客户端应用程序。
1.3、Exchange
交换器,用来接收生产者发送的消息并将这些消息路由给服务器中的队列。
Exchange有4种类型: direct(默认), fanout, topic,和headers,不同类型的Exchange转发消息的策略有所区别
1.4、Queue
消息队列,用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。一个消息可投入一一个或多个队列。消息一直在队列里面,等待消费者连接到这个队列将其取走。
1.5、Binding
绑定,用于消息队列和交换器之间的关联。一个绑定就是基 于路由键将交换器和消息队列连接起来的路由规则,所以可以将交换器理解成一个由绑定构成的路由表。
Exchange和Queue的绑定可以是多对多的关系。
1.6、Connection
网络连接,比如一个TCP连接。
1.7、Channel
信道,多路复用连接中的一条独立的双向数据流通道。信道是建立在真实的TCP连接内的虚拟连接,AMQP命令都是通过信道发出去的,不管是发布消息、订阅队列还是接收消息,这些动作都是通过信道完成。因为对于操作系统来说建立和销毁TCP都是非常昂贵的开销,所以引入了信道的概念,以复用一条TCP连接。
1.8、Consumer
消息的消费者,表示一个从消息队列中取得消息的客户端应用程序。
1.9、Virtual Host
虚拟主机,表示-一批交换器、消息队列和相关对象。虚拟主机是共享相同的身份认证和加密环境的独立服务器域。每个vhost本质上就是一个mini版的RabbitMQ服务器,拥有自己的队列、交换器、绑定和权限机制。vhost 是AMQP概念的基础,必须在连接时指定,
RabbitMQ默认的vhost是/。
1.10、Broker
表示消息队列服务器实体
2、安装
安装参考:https://blog.csdn.net/Blueeyedboy521/article/details/124001883
2、常见消息模型
2.1、基本消息类型
学习视频https://www.bilibili.com/video/BV1LQ4y127n4?p=67&spm_id_from=pageDriver
三、SpringAMQP
1、什么是SpringAMQP
官方文档:https://spring.io/projects/spring-amqp
2、简单示例
2.1、引入AMQP依赖
<!--AMQP依赖,包含RabbitMQ-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2.2、配置文件
spring:
rabbitmq:
addresses: 192.168.100.120:5672,192.168.100.121:5672,192.168.100.122:5672
username: admin
password: admin
#开启消息确认模式,新版本已经弃用
#publisher-confirms: true
#开启消息送达提示
publisher-returns: true
# springboot.rabbitmq.publisher-confirm 新版本已被弃用,现在使用 spring.rabbitmq.publisher-confirm-type = correlated 实现相同效果
publisher-confirm-type: correlated
virtual-host: /
listener:
type: simple
simple:
acknowledge-mode: auto #确认模式
prefetch: 1 #限制每次发送一条数据。
concurrency: 3 #同一个队列启动几个消费者
max-concurrency: 3 #启动消费者最大数量
#重试策略相关配置
retry:
# 开启消费者(程序出现异常)重试机制,默认开启并一直重试
enabled: true
# 最大重试次数
max-attempts: 5
# 重试间隔时间(毫秒)
initial-interval: 3000
spring.rabbitmq.publisher-confirm-type新版发布确认属性有三种确认类型
/**
* The type of publisher confirms to use.
*/
public enum ConfirmType {
/**
* Use {@code RabbitTemplate#waitForConfirms()} (or {@code waitForConfirmsOrDie()}
* within scoped operations.
*/
SIMPLE,
/**
* Use with {@code CorrelationData} to correlate confirmations with sent
* messsages.
*/
CORRELATED,
/**
* Publisher confirms are disabled (default).
*/
NONE
}
NONE值是禁用发布确认模式,是默认值
CORRELATED值是发布消息成功到交换器后会触发回调方法,如1示例
SIMPLE值经测试有两种效果,其一效果和CORRELATED值一样会触发回调方法,其二在发布消息成功后使用rabbitTemplate调用waitForConfirms或waitForConfirmsOrDie方法等待broker节点返回发送结果,根据返回结果来判定下一步的逻辑,要注意的点是waitForConfirmsOrDie方法如果返回false则会关闭channel,则接下来无法发送消息到broker;
————————————————
版权声明:本文为CSDN博主「OkidoGreen」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/z69183787/article/details/109371628
2.3、发送消息
在publisher服务中新建一个测试类,编写测试方法
2.4、消费消息
3、WorkQueue工作队列
3.1、模拟WorkQueue实现一个队列绑定多个消费者
3.2、消息发送
3.3、消息接受
3.4、消费预期机制prefetch
配置一次只能取一条,处理完才能取下一条
spring:
rabbitmq:
host: 192.168.100.120
port: 5672
username: admin
password: admin
#开启消息确认模式,新版本已经弃用
#publisher-confirms: true
#开启消息送达提示
publisher-returns: true
# springboot.rabbitmq.publisher-confirm 新版本已被弃用,现在使用 spring.rabbitmq.publisher-confirm-type = correlated 实现相同效果
publisher-confirm-type: correlated
virtual-host: /
listener:
simple:
prefetch: 1 #限制每次发送一条数据。
4、发布和订阅模型
4.1、概念
4.2、Fanout广播类型
4.2.1、示例
4.2.2、声明绑定
4.2.3、消费者绑定
4.2.4、生产者发送消息
4.3、DriectExchage路由类型
通过routeKey可以实现Fanout广播类型
4.3.1、示例
4.3.2、申明Exchange、Queue
@Configuration
public class RabbitDirectConfig {
@Bean
public Queue directQueue(){
// 参数介绍
// 1.队列名 2.是否持久化 3.是否独占 4.自动删除 5.其他参数
return new Queue("directQueue-One",true,false,false,null);
}
@Bean
public Queue directQueue2(){
// 参数介绍
// 1.队列名 2.是否持久化 3.是否独占 4.自动删除 5.其他参数
return new Queue("directQueue-Two",true,false,false,null);
}
@Bean
public DirectExchange directExchange(){
// 参数介绍
// 1.交换器名 2.是否持久化 3.自动删除 4.其他参数
return new DirectExchange("MqSendService-One",true,false,null);
}
@Bean
public Binding bingExchange(){
// 绑定队列
return BindingBuilder.bind(directQueue2())
// 队列绑定到哪个交换器
.to(directExchange())
// 绑定路由key,必须指定
.with("One");
}
@Bean
public Binding bingExchange2(){
// 绑定队列
return BindingBuilder.bind(directQueue2())
// 队列绑定到哪个交换器
.to(directExchange())
// 绑定路由key,必须指定
.with("Two");
}
}
4.3.3、在consumer消费者服务监听
4.3.4、总结
4.4、TopicExchange话题广播类型
4.4.1、示例
4.4.2、申明
@Configuration
public class RabbitTopicConfig {
@Bean
public Queue queue(){
// 参数介绍
// 1.队列名 2.是否持久化 3.是否独占 4.自动删除 5.其他参数
return new Queue("simple.queue",true,false,false,null);
}
@Bean
public TopicExchange topicExchange(){
// 参数介绍
// 1.交换器名 2.是否持久化 3.自动删除 4.其他参数
return new TopicExchange("amq.topic",true,false,null);
}
@Bean
public Binding bingExchange(){
// 绑定队列
return BindingBuilder.bind(queue())
// 队列绑定到哪个交换器
.to(topicExchange())
// 绑定路由key,必须指定
.with("simple.#");
}
}
4.4.3、生产者发送
5、消息转换器
spring的消息对象处理是有org.springframework.amqp.support.converter.MessageConverter来处理的,而默认实现是SpringMessageConverter,基于JDK的ObjectOutputStream完成序列化。
如果需要修改只需要定义一个MessageConverter类型的bean即可,推荐用JSON方式序列化,步骤如下
5.1、引入jackson的依赖
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-xml</artifactId>
<version>2.9.10</version>
</dependency>
5.2、申明Bean
@Bean
public MessageConverter jsonMessageConverter(){
return new Jackson2JsonMessageConverter();
}