文章目录
-
- 1、环境准备
- 2、生产者
-
- 普通消息
- 同步消息
- 异步消息
- 3、消费者
-
- (1)基础示例
- (2)顺序消费
- (2)消费模式
-
- a.集群模式
- b.广播模式
- 4、accessKey和secretKey
1、环境准备
pom引入依赖
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.1.1</version>
</dependency>
application.properties中增加mq的配置内容
rocketmq.name-server=http://localhost:9876
rocketmq.producer.group=ysx-group
rocketmq.producer.send-message-timeout= 30000
rocketmq.producer.access-key=
rocketmq.producer.secret-key=
2、生产者
普通消息
普通消息无返回值,只负责发送消息⽽不等待服务器回应且没有回调函数触发
@Resource
RocketMQTemplate rocketMQTemplate;
@RequestMapping("mq/template/send")
public void sendMessage1(){
rocketMQTemplate.convertAndSend("ysx-topic","this is a template message");
}
同步消息
同步消息有返回值SendResult,等到消息发送成功后才算结束。
@RequestMapping("mq/template/send/sync")
public SendResult sendMessage2(){
SendResult sendResult = rocketMQTemplate.syncSend("ysx-topic", "this is a template sync message");
return sendResult;
}
syncSend()最终也是调用的producer.send()。
public SendResult syncSend(String destination, Message<?> message, long timeout, int delayLevel) {
...
SendResult sendResult = producer.send(rocketMsg, timeout);
...
}
异步消息
异步消息无返回值,需要传入回调类。无需等待消息是否发送成功。
@RequestMapping("mq/template/send/async")
public void sendMessage3(){
rocketMQTemplate.asyncSend("ysx-topic",
"this is a template async message", new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.println("send success:"+sendResult);
}
@Override
public void onException(Throwable throwable) {
System.out.println("send fail:"+throwable.getMessage());
}
});
}
同理,最终调用的也是producer的方法。
public void asyncSend(String destination, Message<?> message, SendCallback sendCallback, long timeout, int delayLevel) {
...
producer.send(rocketMsg, sendCallback, timeout);
...
}
3、消费者
(1)基础示例
需要注意:
- @Component交由Spring容器接管
- 实现RocketMQListener接口,使用@RocketMQMessageListener注册监听,需指定消费者组和Topic。
@Component
@RocketMQMessageListener(consumerGroup = "ysx-consumer-group", topic = "ysx-topic")
public class MQConsumerListener implements RocketMQListener<String> {
@Override
public void onMessage(String s) {
System.out.println("consume message:"+s);
}
}
(2)顺序消费
一个Topic下可能会有多个消息队列queue,在发送消息时可以指定要发送到哪个消息队列中。
生产消息:
@RequestMapping("mq/template/send/order")
public void sendMessageOrderly(){
rocketMQTemplate.setMessageQueueSelector(new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> list, Message message, Object o) {
//可以自定义规则,取list的第几个,这里取第一个
return list.get(1);
}
});
rocketMQTemplate.syncSendOrderly("order-topic","this is order message","hashKey001");
}
消费消息:
可通过ConsumeMode.ORDERLY进行配置。
consumeThreadMax 消费最大线程
@Component
@RocketMQMessageListener(consumerGroup = "order-topic-consumer", topic = "order-topic",consumeMode = ConsumeMode.ORDERLY,consumeThreadMax = 1)
public class MQConsumerListenerOrder implements RocketMQListener<String> {
@Override
public void onMessage(String s) {
System.out.println("consumer group order message:"+s);
}
}
其余配置如下:
public @interface RocketMQMessageListener {
String NAME_SERVER_PLACEHOLDER = "${rocketmq.name-server:}";
String ACCESS_KEY_PLACEHOLDER = "${rocketmq.consumer.access-key:}";
String SECRET_KEY_PLACEHOLDER = "${rocketmq.consumer.secret-key:}";
String TRACE_TOPIC_PLACEHOLDER = "${rocketmq.consumer.customized-trace-topic:}";
String ACCESS_CHANNEL_PLACEHOLDER = "${rocketmq.access-channel:}";
/**
* Consumers of the same role is required to have exactly same subscriptions and consumerGroup to correctly achieve
* load balance. It's required and needs to be globally unique.
*
*
* See <a href="http://rocketmq.apache.org/docs/core-concept/">here</a> for further discussion.
*/
String consumerGroup();
/**
* Topic name.
*/
String topic();
/**
* Control how to selector message.
*
* @see SelectorType
*/
SelectorType selectorType() default SelectorType.TAG;
/**
* Control which message can be select. Grammar please see {@link SelectorType#TAG} and {@link SelectorType#SQL92}
*/
String selectorExpression() default "*";
/**
* Control consume mode, you can choice receive message concurrently or orderly.
*/
ConsumeMode consumeMode() default ConsumeMode.CONCURRENTLY;
/**
* Control message mode, if you want all subscribers receive message all message, broadcasting is a good choice.
*/
MessageModel messageModel() default MessageModel.CLUSTERING;
/**
* Max consumer thread number.
*/
int consumeThreadMax() default 64;
/**
* Maximum amount of time in minutes a message may block the consuming thread.
*/
long consumeTimeout() default 15L;
/**
* The property of "access-key".
*/
String accessKey() default ACCESS_KEY_PLACEHOLDER;
/**
* The property of "secret-key".
*/
String secretKey() default SECRET_KEY_PLACEHOLDER;
/**
* Switch flag instance for message trace.
*/
boolean enableMsgTrace() default true;
/**
* The name value of message trace topic.If you don't config,you can use the default trace topic name.
*/
String customizedTraceTopic() default TRACE_TOPIC_PLACEHOLDER;
/**
* The property of "name-server".
*/
String nameServer() default NAME_SERVER_PLACEHOLDER;
/**
* The property of "access-channel".
*/
String accessChannel() default ACCESS_CHANNEL_PLACEHOLDER;
}
(2)消费模式
RocketMQ消费模式有两种:集群模式和广播模式。
a.集群模式
【默认模式】
针对同一个ConsumerGroup中,同一个消息只有一个消费者消费即可。
针对不同的ConsumerGroup中,同一个消息,每个消费组中都有一个消费者需要消费。
示例:
一个生产者组,其中有两个生产者。
两个消费者组 groupA、groupB,每组中有两个消费者: groupA-1 groupA-2 groupB-1 groupB-2。
使用任一生产者发送消息,每个消费者组都能收到消息。每组只需有一个消费者收到并消费即可。
consumer group a-1 message:this is a template sync message
consumer group b-1 message:this is a template sync message
consumer group a-2 message:this is a template sync message
consumer group b-2 message:this is a template sync message
b.广播模式
同一个消息,每个消费者都需要消息。类似发布-订阅模式。
消费模式,可通过MessageModel进行配置。
public enum MessageModel {
BROADCASTING("BROADCASTING"),
CLUSTERING("CLUSTERING");
private final String modeCN;
MessageModel(String modeCN) {
this.modeCN = modeCN;
}
public String getModeCN() {
return this.modeCN;
}
}
将消费者组groupA中的两个消费者改为广播模式,其余保持不变。
@Component
@RocketMQMessageListener(consumerGroup = "consumer-group-a", topic = "message-mode-topic",messageModel = MessageModel.BROADCASTING)
public class MQConsumerListenerA1 implements RocketMQListener<String> {
@Override
public void onMessage(String s) {
System.out.println("consumer group a-1 message:"+s);
}
}
@Component
@RocketMQMessageListener(consumerGroup = "consumer-group-a", topic = "message-mode-topic",messageModel = MessageModel.BROADCASTING)
public class MQConsumerListenerA1 implements RocketMQListener<String> {
@Override
public void onMessage(String s) {
System.out.println("consumer group a-2 message:"+s);
}
}
使用任一生产者发出消息,groupA中的两个消费者都可以收到消息,groupB中的随机一个消费者也可以收到消息。
consumer group b-1 message:this is a template sync message
consumer group a-1 message:this is a template sync message
consumer group a-2 message:this is a template sync message
4、accessKey和secretKey
类似访问的用户名和密码。在acl项目中,以plain_acl.yml为例,配置了用户权限。
accounts:
- accessKey: RocketMQ
secretKey: 12345678
whiteRemoteAddress: 192.168.0.*
admin: false
defaultTopicPerm: DENY
defaultGroupPerm: SUB
topicPerms:
- topicA=DENY
- topicB=PUB|SUB
- topicC=SUB
groupPerms:
# the group should convert to retry topic
- groupA=DENY
- groupB=SUB
- groupC=SUB
- accessKey: rocketmq2
secretKey: 12345678
whiteRemoteAddress: 192.168.1.*
# if it is admin, it could access all resources
admin: true
启用aksk
在broker.conf中,配置aclEnable= true。
项目中使用
如果启用accessKey和secretKey,需要在创建DefaultMQProducer时传入。
//生产者
DefaultMQProducer producer = new DefaultMQProducer("ysx-group", new AclClientRPCHook(new SessionCredentials(
"accessKey", "secretKey")), true,"");
//消费者
DefaultMQPushConsumer consumer =new DefaultMQPushConsumer("ysx-acl-consumer-group",new AclClientRPCHook(new SessionCredentials("accessKey", "secretKey")),
new AllocateMessageQueueAveragely());
如果key配置正确,可正常发送/消费消息。否则会报错,如下所示:
org.apache.rocketmq.client.exception.MQClientException: Send [3] times, still failed, cost [19]ms, Topic: ysx-topic-acl, BrokersSent: [broker-a, broker-a, broker-a]
Caused by: org.apache.rocketmq.client.exception.MQBrokerException: CODE: 1 DESC: org.apache.rocketmq.acl.common.AclException: No accessKey is configured, org.apache.rocketmq.acl.plain.PlainPermissionManager.validate(PlainPermissionManager.java:636)