博主在学习 Redis 实现发布订阅功能的时候,踩了太多的坑。
不是讲解不详细,看的一知半解;就是代码有问题,实际压根跑不起来!
于是博主萌生了自己写一个最新版且全程无错的博客供各位参考。希望各位不要把我才过的坑再踩一遍。(实战篇的所有代码均由本人测试,全程无Bug。)
废话不多说,让我们进入实战篇的学习!
在开始实战篇的之前,我们先一起回顾下原理篇的内容。
正如上述中Redis的缺点,Redis的发布订阅功能并不可靠,如果我们需要保证消息的可靠性、包括确认、重试等要求,我们还是要选择使用MQ实现发布订阅。
Redis的发布/订阅应用场景:
命令 | 描述 |
|---|---|
Redis Unsubscribe 命令 | 指退订给定的频道 |
Redis Subscribe 命令 | 订阅给定的一个或多个频道的信息 |
Redis Pubsub 命令 | 查看订阅与发布系统状态 |
Redis Punsubscribe 命令 | 退订所有给定模式的频道 |
Redis Publish 命令 | 将信息发送到指定的频道 |
Redis Psubscribe 命令 | 订阅一个或多个符合给定模式的频道 |
首先,确保你已经安装并配置好 Redis 服务器,并创建了 Spring Boot 项目,在pom.xml中引入依赖。
<!-- 所需依赖 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>spring:
# 项目名称
application:
name: test-redis-boot
# Redis 配置
data:
redis:
host: 填写自己的主机IP
port: 8000
password: 有则填,没有去掉这个属性
database: 1
# 连接超时时间
timeout: 10s
lettuce:
pool:
# 连接池中的最小空闲连接
min-idle: 5
# 连接池中的最大空闲连接
max-idle: 8
# 连接池的最大数据库连接数
max-active: 20
# #连接池最大阻塞等待时间(使用负值表示没有限制)
max-wait: -1ms创建一个配置类,用于配置 Redis连接工厂和消息监听器监听通道信息。
注:此配置类无需死记硬背。只需大致了解每个方法的作用即可。
/**
* @Description Redis 配置类,用于配置 Redis 连接工厂和消息监听器监听通道信息
* @Author gongming.Zhang
* @Date 2024/9/11 18:27
* @Version 1.0
*/
@Configuration
@Slf4j
public class RedisConfig {
/**
* 自定义 RedisTemplate 序列化方式
*
* @param redisConnectionFactory Redis 连接的线程安全工厂
* @return 模板类
*/
@Bean
public RedisTemplate<Object, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {
RedisTemplate<Object, Object> redisTemplate = new RedisTemplate<>();
// 绑定 RedisConnectionFactory
redisTemplate.setConnectionFactory(redisConnectionFactory);
// 创建 Jackson2JsonRedisSerializer 序列方式,对象类型使用 Object 类型。
ObjectMapper objectMapper = new ObjectMapper();
objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
objectMapper.activateDefaultTyping(new LaissezFaireSubTypeValidator(), ObjectMapper.DefaultTyping.NON_FINAL);
Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(objectMapper, Object.class);
// 设置 RedisTemplate 序列化规则,因为 key 通常是普通的字符串,所以使用StringRedisSerializer即可,而 value 是对象时,才需要使用序列化与反序列化。
redisTemplate.setKeySerializer(new StringRedisSerializer());
redisTemplate.setValueSerializer(jackson2JsonRedisSerializer);
// hash key 序列化规则
redisTemplate.setHashKeySerializer(new StringRedisSerializer());
redisTemplate.setHashValueSerializer(jackson2JsonRedisSerializer);
// 属性设置后操作
redisTemplate.afterPropertiesSet();
log.info("RedisTemplate 自定义序列化配置完毕...");
return redisTemplate;
}
/**
* 配置主题订阅
*
* 可以添加多个监听器,监听多个通道,只需要将消息监听器与订阅的通道/主题绑定即可。
* addMessageListener(MessageListener listener, Collection<? extends Topic> topics):将消息监听器与多个订阅的通道/主题绑定
* addMessageListener(MessageListener listener, Topic topic):将消息监听器与订阅的通道/主题绑定
*
* @param connectionFactory Redis 连接的线程安全工厂
* @return 容器对象
*/
@Bean
public RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,
MessageListenerAdapter listenerAdapter) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
// 设置连接工厂,RedisConnectionFactory 可以直接从容器中取,也可以从 RedisTemplate 中取
container.setConnectionFactory(connectionFactory);
// 订阅名称叫 cache 的通道, 类似 Redis 中的 subscribe 命令
container.addMessageListener(listenerAdapter, new ChannelTopic("cache"));
// 订阅名称以 'test-' 开头的全部通道, 类似 Redis 的 pSubscribe 命令
container.addMessageListener(listenerAdapter, new PatternTopic("test-*"));
log.info("消息监听器和通道绑定完毕...");
return container;
}
/**
* 配置消息监听适配器
*
* @param redisReceiveListener
* @return
*/
@Bean
public MessageListenerAdapter listenerAdapter(RedisReceiveListener redisReceiveListener) {
return new MessageListenerAdapter(redisReceiveListener);
}
}创建一个消息发布类,用于向客户端发布消息。
/**
* @Description 消息发布服务端
* @Author gongming.Zhang
* @Date 2024/9/12 9:42
* @Version 1.0
*/
@Component
@Slf4j
public class MessagePublisher {
@Autowired
private RedisTemplate<Object, Object> redisTemplate;
/**
* 服务端发布消息
*
* @param channel 通道名
* @param message 待发送的消息
*/
public void sendMessage(String channel, String message) {
redisTemplate.convertAndSend(channel, message);
log.info("消息发送成功... channel={}, message={}", channel, message);
}
}用于监听服务端发送的消息,每次服务端发送新消息时,都会自动调用onMessage()方法。
/**
* @Description Redis 消息监听器
* @Author gongming.Zhang
* @Date 2024/9/11 18:53
* @Version 1.0
*/
/*
当收到订阅的消息时,会将消息交给这个类处理。
* 可以直接实现 MessageListener 接口,也可以继承它的实现类 MessageListenerAdapter.
* 自动多线程处理,打印日志即可看出,即使手动延迟,也不会影响后面消息的接收。
*/
@Component
@Slf4j
public class RedisReceiveListener implements MessageListener {
@Autowired
private RedisTemplate<Object, Object> redisTemplate;
/**
* 处理回调逻辑。每次新消息到达时,都会调用此方法。通过 onMessage 方法执行业务代码
* <p>
* 该接口不仅可以访问实际消息,还可以访问接收消息的频道(Channel),以及订阅时用于匹配频道(Channel)的模式。
* 此信息使被调用者不仅可以通过内容区分各种消息,还可以检查其他详细信息。
*
* @param message 消息对象,不能为 null
* @param pattern 与通道匹配的模式(如果指定),可以为 null
*/
@Override
public void onMessage(Message message, byte[] pattern) {
// 1.获取消息所属的通道 --> 首先获取 字符串序列化器,再从给定的二进制数据中反序列化对象。
String channel = redisTemplate.getStringSerializer().deserialize(message.getChannel());
// 2.获取客户端发送的消息内容 --> 后期可以根据自己项目中 消息 的类型,来确定用什么序列化器
Object msg = redisTemplate.getValueSerializer().deserialize(message.getBody());
log.info("收到Redis订阅消息: channel={} msg={}", channel, msg);
}
}/**
* @Description 测试订阅发布功能
* @Author gongming.Zhang
* @Date 2024/9/12 10:13
* @Version 1.0
*/
@RestController
@RequestMapping(value = "/api/v1/publish")
public class PublisherController {
@Autowired
private MessagePublisher publisher;
@GetMapping("/sendMessage")
public String sendMessage(@RequestParam(value = "message") String message, @RequestParam(value = "channel") String channel) {
publisher.sendMessage(channel, message);
return "Message published: " + message;
}
}

至此,我们 SpringBoot 整合 Redis 实现发布订阅功能已经完成!
通过本文,我们详细介绍了如何在 SpringBoot 中整合 Redis 实现发布/订阅功能,并提供了详细的代码示例。Redis 发布/订阅模式以其高性能和简单易用的特点,在实时消息传递场景中有着广泛的应用,但同时也需要注意其消息丢失和无法持久化等缺点,需要根据实际业务需求选择。