念念不忘
必会回响

SpringBoot与Redis Pub/Sub模型整合实现订阅发布

简单的介绍下Redis中的Pub/Sub模型的用法,前几年还用过,自从Redis Stream出来以后,这个应该就很少用。

1. Redis Pub/Sub优缺点

先来看看优缺点吧,从以下就可以看出,这个模型可使用的场景实在是非常非常少了。

优点

  1. 支持发布 / 订阅,支持多组生产者、消费者处理消息

缺点

  1. 消费者下线,数据会丢失
  2. 不支持数据持久化,Redis 宕机,数据也会丢失
  3. 消息堆积,缓冲区溢出,消费者会被强制踢下线,数据也会丢失

2. 实现方式

实现方式比较简单,只需要创建于给监听器,实现MessageListener接口即可。

2.1. 创建消费者监听器

@Log4j2
@Configuration
public class RedisListener implements MessageListener {

    @Value("${spring.redis.channel-topic:default}")
    private String topic;

    @Autowired
    private RestHighLevelClient restHighLevelClient;

    public RedisListener(RestHighLevelClient restHighLevelClient) {
        this.restHighLevelClient = restHighLevelClient;
    }

    @Override
    public void onMessage(Message message, byte[] pattern) {
        String topic = new String(pattern);
        String context = new String(message.getBody());
        log.info("topic:{},context:{}", topic, context);
    }

    @Bean
    RedisMessageListenerContainer redisMessageListenerContainer(
            RedisConnectionFactory redisConnectionFactory, RedisListener redisListener) {
        RedisMessageListenerContainer redisMessageListenerContainer = new RedisMessageListenerContainer();
        redisMessageListenerContainer.setConnectionFactory(redisConnectionFactory);
        //订阅topic - subscribe
        redisMessageListenerContainer.addMessageListener(redisListener, new ChannelTopic(topic));
        return redisMessageListenerContainer;
    }
}

2.2. 创建生产者代码

@Service
public class CollectorServiceImpl implements CollectorService {
    private RedisTemplate<String, String> redisTemplate;
    private ObjectMapper objectMapper;

    @Value("${spring.redis.channel-topic:default}")
    private String topic;

    public CollectorServiceImpl(ObjectMapper objectMapper, RedisTemplate<String, String> redisTemplate) {
        this.objectMapper = objectMapper;
        this.redisTemplate = redisTemplate;
    }

    @Override
    public void sendToMsg(BuriedPointDTO buriedPointDTO) {
        try {
            redisTemplate.convertAndSend(topic, objectMapper.writeValueAsString(buriedPointDTO));
        } catch (JsonProcessingException e) {
            throw new RuntimeException(e);
        }
    }
}

关于Redis Pub/Sub和Redis Stream,可参考这篇:把Redis当作队列来用,真的合适吗?-redis作为队列 (51cto.com)

赞(5) 打赏
未经允许不得转载:堆上小栈 » SpringBoot与Redis Pub/Sub模型整合实现订阅发布

评论 抢沙发

觉得文章有用就打赏一下文章作者

非常感谢你的打赏,我们将继续提供更多优质内容,让我们一起创建更加美好的网络世界!

支付宝扫一扫

微信扫一扫

登录

找回密码

注册