Spring Boot集成RocketMQ以及RocketMQ的基本使用
- RocketMQ概述
- 搭建RocketMQ服务
- RocketMQ的基本使用
-
- 添加相关依赖
- 原生方式操作
-
- 生产者
- 消费者
- Spring方式操作
-
- 生产者
- 配置rocketmq
- 执行测试
-
- 生成者发送消息
- 消费者消费消息
- 发送不同类型的消息
-
- 普通消息
- 顺序消息
- 延时消息
- 批量消息
- 过滤消息
- 事务消息
- RocketMQ分布式事务消息
-
- 事务相关术语
- 事务消息发送与回查过程
- 分布式事务实现
RocketMQ概述
Apache RocketMQ官网:https://rocketmq.apache.org/
Apache RocketMQ开发者指南:https://github.com/apache/rocketmq/blob/master/docs/cn/README.md
Spring Cloud Alibaba :https://github.com/alibaba/spring-cloud-alibaba/wiki/RocketMQ
搭建RocketMQ服务
Docker搭建RocketMQ服务参考:https://blog.csdn.net/qq_38628046/article/details/106875278
RocketMQ的基本使用
添加相关依赖
Spring Cloud Alibaba、Spring Boot集成RocketMQ,添加如下相关依赖
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.3.2.RELEASE</version>
<relativePath/>
</parent>
<dependencyManagement>
<dependencies>
<!--整合spring cloud-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>Hoxton.SR9</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<!--整合spring cloud alibaba-->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-alibaba-dependencies</artifactId>
<version>2.2.6.RELEASE</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.1</version>
</dependency>
原生方式操作
生产者
public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {
// 创建消息生产者, 指定生产者所属的组名
DefaultMQProducer producer = new DefaultMQProducer("myproducer-group");
// 指定Nameserver地址
producer.setNamesrvAddr("IP:9876");
// 启动生产者
producer.start();
// 创建消息对象,指定主题、标签和消息体
Message msg = new Message("myTopic", "myTag", ("RocketMQ Message").getBytes());
// 发送消息
SendResult sendResult = producer.send(msg, 10000);
System.out.println(sendResult);
// 关闭生产者
producer.shutdown();
}
SendResult [sendStatus=SEND_OK, msgId=7F0000010AB000B4AAC28D5DBB010000, offsetMsgId=704A609600002A9F00000000000000C8, messageQueue=MessageQueue [topic=myTopic, brokerName=broker_name, queueId=3], queueOffset=0]
10:48:50.315 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[IP:10911] result: true
10:48:50.319 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[IP:9876] result: true
消费者
public static void main(String[] args) throws MQClientException {
// 创建消息消费者, 指定消费者所属的组名
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("myconsumer-group");
// 指定Nameserver地址
consumer.setNamesrvAddr("IP:9876");
// 指定消费者订阅的主题和标签
consumer.subscribe("myTopic", "*");
// 设置回调函数,编写处理消息的方法
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.println("Receive Messages: " + msgs);
//返回消费状态
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 启动消息消费者
consumer.start();
System.out.println("Consumer Started.....");
}
Consumer Started.....
Receive Messages: [MessageExt [brokerName=broker_name, queueId=3, storeSize=204, queueOffset=0, sysFlag=0, bornTimestamp=1640659730178, bornHost=/125.71.203.164:2170, storeTimestamp=1640659730105, storeHost=/IP:10911, msgId=704A609600002A9F00000000000000C8, commitLogOffset=200, bodyCRC=2037941969, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='myTopic', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=1, CONSUME_START_TIME=1640659730229, UNIQ_KEY=7F0000010AB000B4AAC28D5DBB010000, CLUSTER=DefaultCluster, WAIT=true, TAGS=myTag}, body=[82, 111, 99, 107, 101, 116, 77, 81, 32, 77, 101, 115, 115, 97, 103, 101, 32, 49, 50, 51], transactionId='null'}]]
Spring方式操作
生产者
@Autowired
private RocketMQTemplate rocketMQTemplate;
@GetMapping("/produce")
public String testRocket(){
HashMap<String, Object> map = new HashMap<>();
map.put("name","小白");
map.put("age",22);
map.put("sex","男");
rocketMQTemplate.convertAndSend("topic-produce",map);
return "success";
}
配置rocketmq
rocketmq:
name-server: IP:9876 # rocketMQ服务的地址
producer:
group: rocket-produce # 生产者组
执行测试
生成者发送消息
发送消息后,请求http://IP:8082/produce
,查看Rocket控制台
查看消息
消费者消费消息
@Component
@RocketMQMessageListener(consumerGroup = "consumer-group", topic = "topic-produce")
public class MyRocketMQListener implements RocketMQListener<HashMap<String, Object>> {
@Override
public void onMessage(HashMap<String, Object> map) {
String name = map.get("name").toString();
String age = map.get("age").toString();
String sex = map.get("sex").toString();
System.out.println("name: " + name + ",age: " + age + ",sex: " + sex);
}
}
启动项目
INFO 15800 --- [ restartedMain] a.r.s.s.DefaultRocketMQListenerContainer : running container: DefaultRocketMQListenerContainer{consumerGroup='consumer-group', nameServer='IP:9876', topic='topic-produce', consumeMode=CONCURRENTLY, selectorType=TAG, selectorExpression='*', messageModel=CLUSTERING}
INFO 15800 --- [ restartedMain] o.a.r.s.a.ListenerContainerConfiguration : Register the listener to container, listenerBeanName:myRocketMQListener, containerBeanName:org.apache.rocketmq.spring.support.DefaultRocketMQListenerContainer_1
name: 小白,age: 22,sex: 男
查看控制台
发送不同类型的消息
普通消息
RocketMQ提供三种方式来发送普通消息:可靠同步消息发送、可靠异步消息发送和单向消息发送。
同步消息
指消息发送方发出数据后,会在收到接收方发回响应之后才发下一个数据包的通讯方式。
使用比较广泛,如:重要的消息通知,短信通知
@Test
public void testSyncSend() {
// param1: topic; 若添加tag: topic:tag
// param2: 消息内容
SendResult sendResult = rocketMQTemplate.syncSend("test-topic", "这是一条同步消息");
System.out.println(sendResult);
}
发送异步消息
指发送方发出数据后,不等接收方发回响应,接着发送下个数据包的通讯方式。发送方通过回调接口接收服务器响应,并对响应结果进行处理。
通常用在对响应时间敏感的业务场景,即发送端不能容忍长时间地等待Broker的响应。
@Test
public void testAsyncSend() throws InterruptedException {
//param1: topic; 若添加tag: topic:tag
//param2: 消息内容
//param3: 回调函数, 处理返回结果
rocketMQTemplate.asyncSend("test-topic", "这是一条异步消息", new
SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.println(sendResult);
}
@Override
public void onException(Throwable throwable) {
System.out.println(throwable);
}
});
Thread.sleep(100000);
}
单向发送消息
指发送方只负责发送消息,不等待服务器回应且没有回调函数触发,即只发送请求不等待应答。
这种方式主要用在不特别关心发送结果的场景,例如日志发送。
@Test
public void testOneWay() {
rocketMQTemplate.sendOneWay("test-topic", "这是一条单向消息");
}
顺序消息
消息有序指的是可以按照消息的发送顺序来消费(FIFO)。RocketMQ可以严格的保证消息有序,可以分为分区有序或者全局有序。
顺序消费的原理解析,在默认的情况下消息发送会采取Round Robin轮询方式把消息发送到不同的queue(分区队列);而消费消息的时候从多个queue上拉取消息,这种情况发送和消费是不能保证顺序。但是如果控制发送的顺序消息只依次发送到同一个queue中,消费的时候只从这个queue上依次拉取,则就保证了顺序。当发送和消费参与的queue只有一个,则是全局有序;如果多个queue参与,则为分区有序,即相对每个queue,消息都是有序的。
@Test
public void testSyncSend() {
// 同步顺序消息
// param1: topic; 若添加tag: topic:tag
// param2: 消息内容
// param3: 用于队列的选择
SendResult sendResult = rocketMQTemplate.syncSendOrderly("test-topic", "这是一条同步顺序消息", "order");
// 异步顺序消息
// rocketMQTemplate.asyncSendOrderly();
// 单向顺序消息
// rocketMQTemplate.sendOneWayOrderly(, , , , , );
System.out.println(sendResult);
}
延时消息
@Test
public void testDelay(){
// 设置延时等级3,这个消息将在10s之后发送(现在只支持固定的几个时间,详看delayTimeLevel)
SendResult result=rocketMQTemplate.syncSend("myTopic", MessageBuilder.withPayload("hello delay").build(),1000,3);
System.out.println(result);
}
延时消息的使用限制
// org/apache/rocketmq/store/config/MessageStoreConfig.java
private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
RocketMq并不支持任意时间的延时,需要设置几个固定的延时等级,从1s到2h分别对应着等级1到18 消息消费失败会进入延时消息队列,消息发送时间与设置的延时等级和重试次数有关,详见代码SendMessageProcessor.java
批量消息
@Test
public void testBatch(){
List<Message> messages = new ArrayList<>();
messages.add(MessageBuilder.withPayload("Hello 1").build());
messages.add(MessageBuilder.withPayload("Hello 2").build());
messages.add(MessageBuilder.withPayload("Hello 6").build());
rocketMQTemplate.syncSend("myTopic",messages);
}
过滤消息
@Test
public void testTag(){
rocketMQTemplate.syncSend("myTopic:tag1",MessageBuilder.withPayload("hello tag1").build());
rocketMQTemplate.syncSend("myTopic:tag2",MessageBuilder.withPayload("hello tag2").build());
rocketMQTemplate.syncSend("myTopic:tag3",MessageBuilder.withPayload("hello tag3").build());
}
消费者将接收包含tag1或tag2的消息。
@Component
@RocketMQMessageListener(consumerGroup = "consumer-group", topic = "myTopic",selectorExpression = "tag1 || tag2",selectorType = SelectorType.TAG)
public class MyRocketMQListener implements RocketMQListener<String> {
@Override
public void onMessage(String msg) {
System.out.println(msg);
}
}
但是限制是一个消息只能有一个标签,对于复杂的场景可能不起作用,可以使用SQL表达式筛选消息。SQL特性可以通过发送消息时的属性来进行计算。
例子:
------------
| message |
|----------| a > 5 AND b = 'abc'
| a = 10 | --------------------> Gotten
| b = 'abc'|
| c = true |
------------
------------
| message |
|----------| a > 5 AND b = 'abc'
| a = 1 | --------------------> Missed
| b = 'abc'|
| c = true |
------------
事务消息
RocketMQ分布式事务消息
RocketMQ提供了事务消息,通过事务消息就能达到分布式事务的最终一致。
事务相关术语
事务消息共有三种状态,提交状态、回滚状态、中间状态:
TransactionStatus.CommitTransaction: 提交事务,它允许消费者消费此消息
TransactionStatus.RollbackTransaction: 回滚事务,它代表该消息将被删除,不允许被消费
TransactionStatus.Unknown: 中间状态,它代表需要检查消息队列来确定状态
半消息(Half(Prepare) Message)
暂不能投递的消息,发送方已经成功地将消息发送到了RocketMQ服务端,但是服务端未收到生产者对该消息的二次确认,此时该消息被标记成“暂不能投递”状态,处于该种状态下的消息即半事务消息。
消息回查(Message Status Check)
由于网络闪断、生产者应用重启等原因,导致某条事务消息的二次确认丢失,RocketMQ服务端通过扫描发现某条消息长期处于“半事务消息”时,需要主动向消息生产者询问该
消息的最终状态(Commit 或是 Rollback),该询问过程即消息回查。
事务消息发送与回查过程
事务消息发送步骤:
1. 发送方将半事务消息发送至RocketMQ服务端
2. RocketMQ服务端将消息持久化之后,向发送方返回Ack确认消息已经发送成功,此时消息为半事务消息
3. 发送方开始执行本地事务逻辑
4. 发送方根据本地事务执行结果向服务端提交二次确认(Commit 或是 Rollback),服务端收到 Commit 状态则将半事务消息标记为可投递,订阅方最终将收到该消息;服务端收到 Rollback 状 态则删除半事务消息,订阅方将不会接受该消息
事务消息回查步骤:
1. 在断网或者是应用重启的特殊情况下,上述步骤4提交的二次确认最终未到达服务端,经过固定时间后服务端将对该消息发起消息回查
2. 发送方收到消息回查后,需要检查对应消息的本地事务执行的最终结果
3. 发送方根据检查得到的本地事务的最终状态再次提交二次确认,服务端仍按照步骤4对半事务消息进行操作
分布式事务实现
生产者
@GetMapping("/testRocketTx")
public String testRocketTx() {
HashMap<String, Object> map = new HashMap<>();
map.put("name", "小白");
map.put("age", 22);
map.put("sex", "男");
rocketMQTemplate.sendMessageInTransaction("topic-produce",
MessageBuilder.withPayload(map)
.setHeader(RocketMQHeaders.TRANSACTION_ID, UUID.randomUUID().toString())
.setHeader("my_data", map)
.build(), map
);
return "success";
}
消费者
@RocketMQTransactionListener
public class AddBonusTransactionListener implements RocketMQLocalTransactionListener {
/**
* 消费消息,消费成功提交事务,失败则回滚丢弃消息不消费
*
* @param msg
* @param arg
* @return
*/
@Transactional(rollbackFor = Exception.class)
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
MessageHeaders headers = msg.getHeaders();
byte[] payloadByte = (byte[]) msg.getPayload();
String payload = new String(payloadByte);
System.out.println("payload = " + payload);
String transactionId = (String) headers.get(RocketMQHeaders.TRANSACTION_ID);
System.out.println("transactionId = " + transactionId);
String myData = headers.get("my_data").toString();
System.out.println("myData = " + myData);
String argData = arg.toString();
System.out.println("argData = " + argData);
try {
// TODO 记录MQ日志
int a = 10;
int b = a / 0;
//可以消费该消息
return RocketMQLocalTransactionState.COMMIT;
} catch (Exception e) {
// 继续查询该消息的状态
return RocketMQLocalTransactionState.UNKNOWN;
}
}
/**
* 检查本地事务状态
*
* @param msg
* @return
*/
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
MessageHeaders headers = msg.getHeaders();
String transactionId = (String) headers.get(RocketMQHeaders.TRANSACTION_ID);
// TODO 根据事务id查询数据库,判断消息是否消费
Object DbRocketMQLog = "DB RocketMQ Log Object";
if (DbRocketMQLog != null) {
// 消息已被消费,删除该消息
return RocketMQLocalTransactionState.ROLLBACK;
}
// TODO 进行消费逻辑,记录MQ日志
return RocketMQLocalTransactionState.COMMIT;
}
}