简单的介绍下Redis中的Pub/Sub模型的用法,前几年还用过,自从Redis Stream出来以后,这个应该就很少用。
1. Redis Pub/Sub优缺点
先来看看优缺点吧,从以下就可以看出,这个模型可使用的场景实在是非常非常少了。
优点
- 支持发布 / 订阅,支持多组生产者、消费者处理消息
缺点
- 消费者下线,数据会丢失
- 不支持数据持久化,Redis 宕机,数据也会丢失
- 消息堆积,缓冲区溢出,消费者会被强制踢下线,数据也会丢失
2. 实现方式
实现方式比较简单,只需要创建于给监听器,实现MessageListener
接口即可。
2.1. 创建消费者监听器
@Log4j2
@Configuration
public class RedisListener implements MessageListener {
@Value("${spring.redis.channel-topic:default}")
private String topic;
@Autowired
private RestHighLevelClient restHighLevelClient;
public RedisListener(RestHighLevelClient restHighLevelClient) {
this.restHighLevelClient = restHighLevelClient;
}
@Override
public void onMessage(Message message, byte[] pattern) {
String topic = new String(pattern);
String context = new String(message.getBody());
log.info("topic:{},context:{}", topic, context);
}
@Bean
RedisMessageListenerContainer redisMessageListenerContainer(
RedisConnectionFactory redisConnectionFactory, RedisListener redisListener) {
RedisMessageListenerContainer redisMessageListenerContainer = new RedisMessageListenerContainer();
redisMessageListenerContainer.setConnectionFactory(redisConnectionFactory);
//订阅topic - subscribe
redisMessageListenerContainer.addMessageListener(redisListener, new ChannelTopic(topic));
return redisMessageListenerContainer;
}
}
2.2. 创建生产者代码
@Service
public class CollectorServiceImpl implements CollectorService {
private RedisTemplate<String, String> redisTemplate;
private ObjectMapper objectMapper;
@Value("${spring.redis.channel-topic:default}")
private String topic;
public CollectorServiceImpl(ObjectMapper objectMapper, RedisTemplate<String, String> redisTemplate) {
this.objectMapper = objectMapper;
this.redisTemplate = redisTemplate;
}
@Override
public void sendToMsg(BuriedPointDTO buriedPointDTO) {
try {
redisTemplate.convertAndSend(topic, objectMapper.writeValueAsString(buriedPointDTO));
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
}
}
关于Redis Pub/Sub和Redis Stream,可参考这篇:把Redis当作队列来用,真的合适吗?-redis作为队列 (51cto.com)