Redis发布订阅
- 基础知识
- 相关命令
-
- 订阅者/等待接收消息
- 发布者/发送消息
- 订阅者/成功接收消息
- 常用命令汇总
- 原理
- Spring boot整合redis
-
- 导入依赖
- Redis配置
- 消息封装类(MessageDto)
- Redis配置类
- 测试类
- 订阅方实现一:RedisMessageListener
- 订阅方实现二:PrintMessageReceiver
- MessageListenerAdapter源码分析
以下是Redis相关笔记总结,方便自己以后复习,同时也希望对大家有所帮助。
内容 | 地址链接 |
---|---|
Redis在Linux环境下的详细安装教程 | https://blog.csdn.net/BBQ__ZXB/article/details/123905061 |
Redis中五大基本数据类型和三种特殊数据类型 | https://blog.csdn.net/BBQ__ZXB/article/details/124169168 |
Redis中基本事务操作及乐观锁的实现 | https://blog.csdn.net/BBQ__ZXB/article/details/124192251 |
Java中使用JedisAPI操作Redis中五大基本数据类型 | https://blog.csdn.net/BBQ__ZXB/article/details/124194220 |
Spring boot整合Redis(入门教程) | https://blog.csdn.net/BBQ__ZXB/article/details/124301234 |
Redis主从复制详解(入门教程) | https://blog.csdn.net/BBQ__ZXB/article/details/124802602 |
Spring boot整合Redis实现发布订阅(超详细) | https://blog.csdn.net/BBQ__ZXB/article/details/124980860 |
Redis执行save命令时报错ERR | https://blog.csdn.net/BBQ__ZXB/article/details/123995289 |
基础知识
Redis发布订阅(pub/sub)是一种消息通信模式:发送者(pub)发送消息,订阅者(sub)接收信息。微信,微博,关注系统
Redis客户端可以订阅任意数量的频道
订阅/发布消息图:
剖析:
1.消息发送者,2.频道,3.消息订阅者
下图展示频道channel1,以及订阅这个频道的三个客户端–client2,client5和client1之间的关系
当有新消息通过PUBLISH命令发送给频道channel1时,这个消息就会被发送给订阅它的三个客户端
相关命令
订阅者/等待接收消息
首先打开 Redis 客户端,然后订阅了一个名为“bbx”的 channel,使用如下命令:
127.0.0.1:6379> SUBSCRIBE bbx
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "bbx"
3) (integer) 1
使用SUBSCRIBE命令订阅了名为 bbx 的 channel。命令执行后该客户端会出处于等待接收消息的阻塞状态。
发布者/发送消息
下面再启动一个 Redis 客户端,输入如下命令:
127.0.0.1:6379> PUBLISH bbx hello
(integer) 1
127.0.0.1:6379> PUBLISH bbx world
(integer) 1
127.0.0.1:6379>
订阅者/成功接收消息
127.0.0.1:6379> SUBSCRIBE bbx
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "bbx"
3) (integer) 1
#等待读取推送消息
1) "message" #消息
2) "bbx" #频道
3) "hello" #消息具体内容
1) "message"
2) "bbx"
3) "world"
常用命令汇总
命令 | 说明 |
---|---|
PSUBSCRIBE pattern [pattern …] | 订阅一个或多个符合指定模式的频道 |
PUBSUB subcommand [argument [argument …]] | 查看发布/订阅系统状态,可选参数 1) channel 返回在线状态的频道 2) numpat 返回指定模式的订阅者数量 3) numsub 返回指定频道的订阅者数量 |
PUBSUB subcommand [argument [argument …]] | 将信息发送到指定的频道 |
PUNSUBSCRIBE [pattern [pattern …]] | 退订所有指定模式的频道 |
SUBSCRIBE channel [channel …] | 订阅一个或者多个频道的消息 |
UNSUBSCRIBE [channel [channel …]] | 退订指定的频道 |
原理
Redis是使用C实现的,可以通过分析Redis源码里的pubsub.c文件,了解发布和订阅机制的底层实现
Redis通过PUBLISH,SUBSCRIBE和PSUBSCRIBE等命令实现发布和订阅功能
通过SUBSCRIBE命令订阅某频道后,redis-server里维护了一个字典,字典的键就是一个频道,字典的值则是一个链表,链表中保存了所有订阅这个频道的客户端。SUBSCRIBE命令的关键,就是将客户端添加到给定频道的订阅链表中。
通过PUBLISH命令向订阅者发送消息,redis-server会使用给定频道作为键,在它维护的频道字典中查找记录了订阅这个频道的所有客户端的链表,将消息发布给所有订阅者
Pub和Sub从字面上理解就是发布(Publish)和订阅(Subscribe),在redis中,可以设定对某一个key值进行消息发布及消息订阅,当一个key值上进行了消息发布后,所有订阅它的客户端都会收到相应的信息,这一功能最明显的用法就是实时消息系统,比如普通的即时聊天,群聊等功能。
Spring boot整合redis
导入依赖
<!--操作redis-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
Redis配置
#SpringBoot 所有的配置类,都有一个自动配置类 RedisAutoConfiguration
#自动配置类都每绑定一个properties配置文件 RedisProperties
#配置redis
spring.redis.host=localhost
spring.redis.port=6379
# Redis服务器连接密码(默认为空)
spring.redis.password=*****
#默认是数据库0
spring.redis.database= 0
# 连接池最大连接数(使用负值表示没有限制) 默认 8
spring.redis.lettuce.pool.max-active=8
# 连接池最大阻塞等待时间(使用负值表示没有限制) 默认 -1
spring.redis.lettuce.pool.max-wait=-1
# 连接池中的最大空闲连接 默认 8
spring.redis.lettuce.pool.max-idle=8
# 连接池中的最小空闲连接 默认 0
spring.redis.lettuce.pool.min-idle=0
消息封装类(MessageDto)
@AllArgsConstructor
@NoArgsConstructor
@Data
public class MessageDto implements Serializable {
private String data;
private String title;
private String content;
}
Redis配置类
@Configuration
public class RedisConfig {
//编写配置类,可模仿RedisAutoConfiguration配置类,该类在开发中可直接使用
@Bean
@SuppressWarnings("all")
public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {
//由于源码autoConfig中是<Object, Object>,开发中一般直接使用<String,Object>
RedisTemplate<String, Object> template = new RedisTemplate();
template.setConnectionFactory(factory);
//Json序列化配置
Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
ObjectMapper om = new ObjectMapper();
om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
jackson2JsonRedisSerializer.setObjectMapper(om);
//String的序列化
StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();
//key采用string的序列化
template.setKeySerializer(stringRedisSerializer);
//hash的key采用string的序列化
template.setKeySerializer(stringRedisSerializer);
//value序列化采用jackson
template.setValueSerializer(jackson2JsonRedisSerializer);
//hash的value序列化方式采用jackson
template.setValueSerializer(jackson2JsonRedisSerializer);
template.afterPropertiesSet();
return template;
}
/**
* Redis消息监听器容器
* 这个容器加载了RedisConnectionFactory和消息监听器
* 可以添加多个监听不同话题的redis监听器,只需要把消息监听器和相应的消息订阅处理器绑定,该消息监听器
* 通过反射技术调用消息订阅处理器的相关方法进行一些业务处理
*
* @param redisConnectionFactory 连接工厂
* @param adapter 适配器
* @return redis消息监听容器
*/
@Bean
@SuppressWarnings("all")
public RedisMessageListenerContainer container(RedisConnectionFactory redisConnectionFactory,
RedisMessageListener listener,
MessageListenerAdapter adapter) {
final String TOPIC_NAME1 = "TEST_TOPIC1"; // 订阅主题
final String TOPIC_NAME2 = "TEST_TOPIC2"; // 订阅主题
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
// 监听所有库的key过期事件
container.setConnectionFactory(redisConnectionFactory);
// 所有的订阅消息,都需要在这里进行注册绑定,new PatternTopic(TOPIC_NAME1)表示发布的主题信息
// 可以添加多个 messageListener,配置不同的通道
container.addMessageListener(listener, new PatternTopic(TOPIC_NAME1));
container.addMessageListener(adapter, new PatternTopic(TOPIC_NAME2));
/**
* 设置序列化对象
* 特别注意:1. 发布的时候需要设置序列化;订阅方也需要设置序列化
* 2. 设置序列化对象必须放在[加入消息监听器]这一步后面,否则会导致接收器接收不到消息
*/
Jackson2JsonRedisSerializer seria = new Jackson2JsonRedisSerializer(Object.class);
ObjectMapper objectMapper = new ObjectMapper();
objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
seria.setObjectMapper(objectMapper);
container.setTopicSerializer(seria);
return container;
}
/**
* 这个地方是给messageListenerAdapter 传入一个消息接受的处理器,利用反射的方法调用“receiveMessage”
* 也有好几个重载方法,这边默认调用处理器的方法 叫OnMessage
*
* @param printMessageReceiver
* @return
*/
@Bean
public MessageListenerAdapter listenerAdapter(PrintMessageReceiver printMessageReceiver) {
MessageListenerAdapter receiveMessage = new MessageListenerAdapter(printMessageReceiver, "receiveMessage");
Jackson2JsonRedisSerializer seria = new Jackson2JsonRedisSerializer(Object.class);
ObjectMapper objectMapper = new ObjectMapper();
objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
seria.setObjectMapper(objectMapper);
receiveMessage.setSerializer(seria);
return receiveMessage;
}
}
该类中,可以通过调用消息接收容器(container)的 addMessageListener(MessageListener listener, Topic topic) 方法 订阅消息;相反地,也可以调用它的 removeMessageListener(MessageListener listener, Topic topic) 方法 取消订阅消息;在这里我们分别使用两种实现方式去订阅两个不通的频道(channel)。
- RedisMessageListener 通过实现MessageListener接口,从而实现该接口中的onMessage(Message message, byte[] pattern)方法。
- MessageListenerAdapter 通过适配器的方式,自定义一个消息接收类PrintMessageReceiver和接收消息的方法
container.addMessageListener(listener, new PatternTopic(TOPIC_NAME1));
container.addMessageListener(adapter, new PatternTopic(TOPIC_NAME2));
分别使用listener去订阅主题TOPIC_NAME1,adapter去订阅TOPIC_NAME2。
接下来分别探讨测试这两种方式。
测试类
@Slf4j
@SpringBootTest
public class RedisMessageTest {
@Autowired
private RedisUtils redisUtils;
@Test
public void test(){
final String TOPIC_NAME1 = "TEST_TOPIC1"; // 订阅主题
final String TOPIC_NAME2 = "TEST_TOPIC2"; // 订阅主题
// 发布消息
MessageDto dto = new MessageDto();
LocalDateTime now = LocalDateTime.now();
DateTimeFormatter timeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS");
dto.setData(timeFormatter.format(now));
dto.setTitle("日常信息");
dto.setContent("hello world!");
redisUtils.convertAndSend(TOPIC_NAME1, dto);
}
}
该类中的RedisUtils是之前自己封装的一个工具类,在该类中新增convertAndSend()方法。
RedisUtils中其他方法可跳转此连接查看
/**
* 向通道发布消息
*/
public boolean convertAndSend(String channel, Object message) {
if (!StringUtils.hasText(channel)) {
return false;
}
try {
redisTemplate.convertAndSend(channel, message);
log.info("发送消息成功,channel:{},message:{}", channel, message);
return true;
} catch (Exception e) {
log.info("发送消息失败,channel:{},message:{}", channel, message);
e.printStackTrace();
}
return false;
}
订阅方实现一:RedisMessageListener
@Slf4j
@Component
public class RedisMessageListener implements MessageListener {
@Autowired
private RedisTemplate redisTemplate;
@Override
public void onMessage(Message message, byte[] pattern) {
// 接收的topic
log.info("channel:" + new String(pattern));
//序列化对象(特别注意:发布的时候需要设置序列化;订阅方也需要设置序列化)
MessageDto messageDto = (MessageDto) redisTemplate.getValueSerializer().deserialize(message.getBody());
log.info(messageDto.getData()+","+messageDto.getContent());
}
}
对使用RedisMessageListener 进行接收消息测试。
测试结果:
订阅方实现二:PrintMessageReceiver
@Slf4j
@Component
public class PrintMessageReceiver {
@Autowired
private RedisTemplate redisTemplate;
public void receiveMessage(MessageDto messageDto , String channel) {
// 接收的topic
log.info("channel:" + channel);
log.info("message:" + messageDto.getTitle());
}
}
注意:该方法的接收参数类型以及顺序,查阅源码得知,该方法的参数可以是一个(只有消息message),也可是两个(message,channel)并且顺序不能变。
在测试类中将 redisUtils.convertAndSend(TOPIC_NAME1, dto);中的TOPIC_NAME1改为TOPIC_NAME2,
测试结果:
MessageListenerAdapter源码分析
- 构造函数
public MessageListenerAdapter(Object delegate, String defaultListenerMethod) {
this(delegate);
setDefaultListenerMethod(defaultListenerMethod);
}
其中this()方法中,初始化了序列化方式,该适配器默认的序列化方式是UTF-8的字符串序列化。
2.onMessage()
@Override
public void onMessage(Message message, @Nullable byte[] pattern) {
try {
// Check whether the delegate is a MessageListener impl itself.
// In that case, the adapter will simply act as a pass-through.
if (delegate != this) {
if (delegate instanceof MessageListener) {
((MessageListener) delegate).onMessage(message, pattern);
return;
}
}
// Regular case: find a handler method reflectively.
Object convertedMessage = extractMessage(message);
String convertedChannel = stringSerializer.deserialize(pattern);
// Invoke the handler method with appropriate arguments.
Object[] listenerArguments = new Object[] { convertedMessage, convertedChannel };
invokeListenerMethod(invoker.getMethodName(), listenerArguments);
} catch (Throwable th) {
handleListenerException(th);
}
}
该方法当订阅频道有消息时默认执行,首先,if(delegate instanceof MessageListener)判断该对象的类是不是实现了MessageListener接口,如果是,就会执行它实现的onMessage()。很显然,我们是自定义的
PrintMessageReceiver 对象,,所以接着往下看。
Object convertedMessage = extractMessage(message);会将message反序列化,如未自定义序列化方式,就会用使用默认的字符串序列化,这就是为什么我们在RedisConfig类中注入listenerAdapter对象时,自定义了Jackson2JsonRedisSerializer 。
3. invokeListenerMethod(invoker.getMethodName(), listenerArguments);
通过反射查找定义对象中处理消息的方法。我们会看到如下的方法实现。
void invoke(Object[] arguments) throws InvocationTargetException, IllegalAccessException {
Object[] message = new Object[] { arguments[0] };
for (Method m : methods) {
Class<?>[] types = m.getParameterTypes();
Object[] args = //
types.length == 2 //
&& types[0].isInstance(arguments[0]) //
&& types[1].isInstance(arguments[1]) ? arguments : message;
if (!types[0].(args[0])) {
continue;
}
m.invoke(delegate, args);
return;
}
}
从而得知我们自定义方法中参数个数可以是一个也可以是两个,如两个参数时,第一个参数接收消息(message),第二个参数接收频道(channel),也可得知为什么自定义方法中,接收消息参数类型我们可以直接写MessageDto。
以上内容如有不对之处,还望不吝赐教。