目录
为什么需要SpringCloud Stream消息驱动呢?
Binder
案例
消息驱动之生产者
消息驱动之消费者
分组消费与持久化
为什么需要SpringCloud Stream消息驱动呢?
比如说我们用到了RabbitMQ和Kafka,由于这两个消息中间件的架构上的不同,像RabbitMQ有exchange,kafka有Topic、partitions分区,这些中间件的差异性导致在实际项目开发中给我们造成了一定的困扰,我们如果用了两种消息队列的其中一种,后面的业务需求,如果我想往另外一种消息队列进行迁移,这无疑是灾难性的,一大堆东西都要重新推倒重新编写,因为它跟我们的系统耦合了,这时候SpringSloud Stream出场了!
SpringCloud Stream是一个构建消息驱动微服务的框架,应用程序通过inputs或者 outputs来与SpringCloud Stream中的binder进行交互,我们可以通过配置来binding ,而 SpringCloud Stream 的binder负责与中间件交互,所以我们只需要搞清楚如何与Stream交互就可以很方便的使用消息驱动了!
SpringCloud Stream由一个中间件中立的核组成,应用通过SpringCloud Stream插入的input(相当于消费者consumer,它是从队列中接收消息的)和output(相当于生产者producer,它是发送消息到队列中的)通道与外界交流
通道通过指定中间件的Binder实现与外部代理连接,业务开发者不再关注具体消息中间件,只需关注Binder对应用程序提供的抽象概念来使用消息中间件实现业务即可!
Binder
Binder是SpringCloud Stream的一个抽象概念,是应用与消息中间件之间的粘合剂,目前SpringCloud Stream实现了Kafka和RabbitMQ的binder
通过binder,可以很方便的连接中间件,可以动态的改变消息的destinations(对应于 Kafka的topic,RabbitMQ的exchanges),这些都可以通过外部配置项来做到,甚至可以任意的改变中间件的类型但是不需要修改一行代码
案例
消息驱动之生产者
1.新建模块cloud-stream-rabbitmq-provider8801
2.pom文件
<dependencies> <!--stream rabbit --> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-rabbit</artifactId> </dependency> <!--eureka client--> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <!--监控--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-actuator</artifactId> </dependency> <!--热部署--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-devtools</artifactId> <scope>runtime</scope> <optional>true</optional> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> </dependencies>
3.yml文件
server: port: 8801 spring: application: name: cloud-stream-provider cloud: stream: binders: # 在此处配置要绑定的rabbitmq的服务信息 defaultRabbit: # 表示定义的名称,用于binding整合 type: rabbit # 消息组件类型 environment: # 设置rabbitmq的相关环境配置 spring: rabbitmq: host: 112.164.16.82 # RabbitMQ在本机的用localhost,在服务器的用服务器的ip地址 port: 5673 username: guest password: guest bindings: # 服务的整合处理 output: # 这个名字是一个通道的名称 destination: studyExchange # 表示要使用的Exchange名称 content-type: application/json # 设置消息类型,本次为json,本文要设置为“text/plain” binder: defaultRabbit # 设置要绑定的消息服务的具体设置(爆红不影响使用,位置没错) eureka: client: service-url: defaultZone: http://localhost:7001/eureka instance: lease-renewal-interval-in-seconds: 2 # 设置心跳的时间间隔(默认是30S) lease-expiration-duration-in-seconds: 5 # 如果超过5S间隔就注销节点 默认是90s instance-id: send-8801.com # 在信息列表时显示主机名称 prefer-ip-address: true # 访问的路径变为IP地址
注意:我之前按照上述中的配置来连接ribbitmq时报错了 !!
上述的application.yml中使用了
spring.cloud.stream.binders.defaultRabbit.environment.spring.rabbitmq.xx
来配置rabbitmq的环境,如果你用的是远程服务器上的rabbitmq,比如我使用的是我自己的阿里云服务器然后在docker容器中运行的rabbitmq,按照上述配置方式的话,启动时会试图连接两次rabbitmq程序,第一次试图连接访问的就是application.yml中配置的地址,此时已经订阅成功了,但是程序还会在之后进行第二次连接,此时访问的地址就是localhost:5673,在我的环境中,我本地没有rabbitmq环境,所以直接报异常,因此,如果是使用自己的服务器来配置,则需要修改配置文件,将rabbitmq的配置信息移动到application.yml中的spring节点下!异常信息:
看一下是什么原因:
修改后的yml文件(最终版):
server: port: 8801 spring: application: name: cloud-stream-provider rabbitmq: host: 112.124.16.82 # RabbitMQ在本机的用localhost,在服务器的用服务器的ip地址 port: 5673 username: guest password: guest cloud: stream: binders: # 在此处配置要绑定的rabbitmq的服务信息 defaultRabbit: # 表示定义的名称,用于binding整合 type: rabbit # 消息组件类型 bindings: # 服务的整合处理 output: # 这个名字是一个通道的名称 destination: studyExchange # 表示要使用的Exchange名称 content-type: application/json # 设置消息类型,本次为json,本文要设置为“text/plain” binder: defaultRabbit # 设置要绑定的消息服务的具体设置(爆红不影响使用,位置没错) eureka: client: service-url: defaultZone: http://localhost:7001/eureka instance: lease-renewal-interval-in-seconds: 2 # 设置心跳的时间间隔(默认是30S) lease-expiration-duration-in-seconds: 5 # 如果超过5S间隔就注销节点 默认是90s instance-id: send-8801.com # 在信息列表时显示主机名称 prefer-ip-address: true # 访问的路径变为IP地址
4.主启动类
@SpringBootApplication public class StreamMQMain8801 { public static void main(String[] args) { SpringApplication.run(StreamMQMain8801.class, args); } }
5.业务类
- 新建service.IMessageProvider接口
package service; public interface IMessageProvider { public String send(); }
- 在service下新建impl.IMessageProviderImpl实现类
package service.impl; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.messaging.Source; import org.springframework.integration.support.MessageBuilder; import org.springframework.messaging.MessageChannel; import service.IMessageProvider; import javax.annotation.Resource; import java.util.UUID; @EnableBinding(Source.class) // 定义消息的推送管道(Source是spring的) public class IMessageProviderImpl implements IMessageProvider { @Resource private MessageChannel output; // 消息发送管道 @Override public String send() { String serial = UUID.randomUUID().toString(); // MessageBuilder是spring的integration.support.MessageBuilder output.send(MessageBuilder.withPayload(serial).build()); System.out.println("*******serial: " + serial); return null; } }
- 新建controller.SendMessageController
package com.IT.springcloud.controller; import com.IT.springcloud.service.IMessageProvider; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; import javax.annotation.Resource; @RestController public class SendMessageController { @Resource private IMessageProvider iMessageProvider; @GetMapping("/sendMessage") public String sendMessage(){ return iMessageProvider.send(); } }
消息驱动之消费者
1.新建模块cloud-stream-rabbitmq-consumer8802
2.pom文件与8801相同
3.yml文件
server: port: 8802 spring: application: name: cloud-stream-consumer rabbitmq: host: 112.124.16.82 # RabbitMQ在本机的用localhost,在服务器的用服务器的ip地址 port: 5673 username: guest password: guest cloud: stream: binders: # 在此处配置要绑定的rabbitmq的服务信息 defaultRabbit: # 表示定义的名称,用于binding整合 type: rabbit # 消息组件类型 bindings: # 服务的整合处理 input: # 这个名字是一个通道的名称 destination: studyExchange # 表示要使用的Exchange名称 content-type: application/json # 设置消息类型,本次为json,本文要设置为“text/plain” binder: defaultRabbit # 设置要绑定的消息服务的具体设置(爆红不影响使用,位置没错) eureka: client: service-url: defaultZone: http://localhost:7001/eureka instance: lease-renewal-interval-in-seconds: 2 # 设置心跳的时间间隔(默认是30S) lease-expiration-duration-in-seconds: 5 # 如果超过5S间隔就注销节点 默认是90s instance-id: send-8802.com # 在信息列表时显示主机名称 prefer-ip-address: true # 访问的路径变为IP地址
4.主启动类StreamMQMain8802(这里省略代码)
5.新建controller.ReceiveMessageListenerController
package com.IT.springcloud.controller; import org.springframework.beans.factory.annotation.Value; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.annotation.StreamListener; import org.springframework.cloud.stream.messaging.Sink; import org.springframework.messaging.Message; import org.springframework.stereotype.Controller; @EnableBinding(Sink.class) @Controller public class ReceiveMessageListenerController { @Value("${server.port}") private String serverPort; @StreamListener(Sink.INPUT) // 监听 public void input(Message<String> message){ System.out.println("消费者1号------>收到的消息:" + message.getPayload() + "\t port:" + serverPort); } }
6.测试,启动7001,8801,8802
浏览器地址栏输入:localhost:8801/sendMessage
发送方8801
接收方8802
分组消费与持久化
1.按照8802克隆一个新模块8803
2.将8802/8803实现轮询分组,每次只有一个消费者收到消息,也就是说,8801发出一条消息,只能被8802和8803中的其中一个接收到,不能同时被接收,这样就可以避免重复消费,只需要在8802和8803的yml文件中:bindings/input下设置为同一个分组即可!
bindings: # 服务的整合处理 input: # 这个名字是一个通道的名称 destination: studyExchange # 表示要使用的Exchange名称 content-type: application/json # 设置消息类型,本次为json,本文要设置为“text/plain” binder: defaultRabbit # 设置要绑定的消息服务的具体设置(爆红不影响使用,位置没错) group: ITA # 设置分组
3.测试,现在我们发送两条消息
8801发送方
8802接收方1
8803接收方2
看一下rabbitmq管理界面