念念不忘
必会回响

SpringBoot与Redis Stream整合实现消息队列

最近需要做一个简单的埋点工作,考虑到发送数据比较密集,每次都将数据实时写入那肯定不合理,于是就考虑利用消息队列做一下缓冲,避免过多的写入造成对系统的影响,这种场景拍脑门一想就是利用kafka或者rabbitmq来实现,但目前现状是申请网络策略非常麻烦,为了一个小功能再引入一个新的中间件也比较浪费,于是就想着利用redis stream来实现了。

1. 环境要求

2. 具体实现

2.1. 依赖

确认项目中依赖有spring-boot-starter-data-redis,如果没有的话请添加如下:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>

2.2. 配置application.yml

spring:
  redis:
    database: 0
    host: 127.0.0.1
    port: 6379
    password: passwd123 # 有密码就填,没有就不填
    lettuce:  # 我使用了lettuce,你也可以用jedis
      pool:
        max-idle: 8 # 连接池中的最大空闲连接
        min-idle: 1 # 连接池中的最小空闲连接
        max-active: 8 # 连接池最大连接数(使用负值表示没有限制)
        max-wait: -1  # 连接池最大阻塞等待时间(使用负值表示没有限制)
    timeout: 1000  #毫秒
    channel-topic: buried_point:dev  # 自定义一个

2.3. 创建监听器

创建一个监听器,实现StreamListener接口,用来消费生产的数据。

@Log4j2
@Component
public class BuriedPointListener implements StreamListener<String, MapRecord<String, String, String>> {

    private RedisTemplate<String, String> redisTemplate;

    private ObjectMapper objectMapper;

    @Value("${spring.redis.channel-topic:default}")
    private String topic;

    public BuriedPointListener(RedisTemplate<String, String> redisTemplate, ObjectMapper objectMapper) {
        this.redisTemplate = redisTemplate;
    }

    @Override
    public void onMessage(MapRecord<String, String, String> message) {
        String stream = message.getStream();
        RecordId id = message.getId();
        Map<String, String> map = message.getValue();
        log.debug("[手动] group:[group-a] 接收到一个消息 stream:[{}],id:[{}],value:[{}]", stream, id, map);
        // 以下将数据写入es的逻辑省略。。。
        // 写入数据后删除该消息
        // 手动确认消息,如果是 streamMessageListenerContainer.receiveAutoAck 则为自动确认,不需要这一步
        redisTemplate.opsForStream().acknowledge(Objects.requireNonNull(stream), "group-a", id.getValue());
        // 删除消息
        redisTemplate.opsForStream().delete(Objects.requireNonNull(stream), id.getValue());
    }
}

2.4. 创建RedisStreamConfiguration配置类

@Configuration
public class RedisStreamConfiguration {
    @Autowired
    private RedisConnectionFactory redisConnectionFactory;

    @Autowired
    private BuriedPointListener buriedPointListener;

    @Value("${spring.redis.channel-topic:default}")
    private String topic;

    @Bean(initMethod = "start", destroyMethod = "stop")
    public StreamMessageListenerContainer<String, MapRecord<String, String, String>> streamMessageListenerContainer() {
        AtomicInteger index = new AtomicInteger(1);
        int processors = Runtime.getRuntime().availableProcessors();
        ThreadPoolExecutor executor = new ThreadPoolExecutor(processors, processors, 0, TimeUnit.SECONDS,
                new LinkedBlockingDeque<>(), r -> {
            Thread thread = new Thread(r);
            thread.setName("async-stream-consumer-" + index.getAndIncrement());
            thread.setDaemon(true);
            return thread;
        });

        StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, MapRecord<String, String, String>> options =
                StreamMessageListenerContainer.StreamMessageListenerContainerOptions
                        .builder()
                        // 一次最多获取多少条消息
                        .batchSize(3)
                        // 运行 Stream 的 poll task
                        .executor(executor)
                        // Stream 中没有消息时,阻塞多长时间,需要比 `spring.redis.timeout` 的时间小
                        .pollTimeout(Duration.ofSeconds(3))
                        // 获取消息的过程或获取到消息给具体的消息者处理的过程中,发生了异常的处理
                        .errorHandler(new StreamErrorHandler())
                        .build();

        StreamMessageListenerContainer<String, MapRecord<String, String, String>> streamMessageListenerContainer =
                StreamMessageListenerContainer.create(redisConnectionFactory, options);

        // 消费组 group-a ,不自动ack group-a需要在接到消息以后先创建,才能消费。创建命令: xgroup create topicname[例如:testtopic] 组名[例如:group-a] offset从哪开始消费 [例如:0]
        // 从消费组中没有分配给消费者的消息开始消费
        streamMessageListenerContainer.receive(Consumer.from("group-a", "consumer-a"),
                StreamOffset.create(topic, ReadOffset.lastConsumed()), buriedPointListener);

        // 如果需要 自动ack,请使用receiveAutoAck()方法
        //streamMessageListenerContainer.receiveAutoAck(Consumer.from("group-a", "consumer-a"),
        //        StreamOffset.create(topic, ReadOffset.lastConsumed()), buriedPointListener);

        return streamMessageListenerContainer;
    }
}

2.5. 创建消费组

127.0.0.1:6379> xgroup create buried_point:dev group-a 0
OK

2.6. 创建生产者

只需要调用 redisTemplate.opsForStream().add(topic, data);方法往里写数据就行。

@Log4j2
@Service
public class CollectorServiceImpl implements CollectorService {
    private RedisTemplate<String, String> redisTemplate;

    private ObjectMapper objectMapper;

    @Value("${spring.redis.channel-topic:default}")
    private String topic;

    public CollectorServiceImpl(ObjectMapper objectMapper, RedisTemplate<String, String> redisTemplate) {
        this.objectMapper = objectMapper;
        this.redisTemplate = redisTemplate;
    }

    @Override
    public void sendToMsg(BuriedPointDTO buriedPointDTO) {
        try {
            //redisTemplate.convertAndSend(topic, objectMapper.writeValueAsString(buriedPointDTO));
            Map<String, Object> data = new HashMap<>();
            data.put("url", buriedPointDTO.getUrl());
            data.put("title", buriedPointDTO.getTitle());
            data.put("uid", buriedPointDTO.getUserId());
            data.put("userName", buriedPointDTO.getUserName());
            data.put("accessTime", DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss").format(buriedPointDTO.getAccessTime()));
            redisTemplate.opsForStream().add(topic, data);
            log.info(objectMapper.writeValueAsString(buriedPointDTO));
        } catch (JsonProcessingException e) {
            throw new RuntimeException(e);
        }
    }
}

此时,启动项目,当redis stream中有数据的时候就会开始消费数据。

赞(5) 打赏
未经允许不得转载:堆上小栈 » SpringBoot与Redis Stream整合实现消息队列

评论 抢沙发

觉得文章有用就打赏一下文章作者

非常感谢你的打赏,我们将继续提供更多优质内容,让我们一起创建更加美好的网络世界!

支付宝扫一扫

微信扫一扫

登录

找回密码

注册