最近需要做一个简单的埋点工作,考虑到发送数据比较密集,每次都将数据实时写入那肯定不合理,于是就考虑利用消息队列做一下缓冲,避免过多的写入造成对系统的影响,这种场景拍脑门一想就是利用kafka或者rabbitmq来实现,但目前现状是申请网络策略非常麻烦,为了一个小功能再引入一个新的中间件也比较浪费,于是就想着利用redis stream
来实现了。
1. 环境要求
- Redis 5.0以上,因为Redis5才新增的stream数据类型,具体可参考Redis Streams tutorial | Redis
2. 具体实现
2.1. 依赖
确认项目中依赖有spring-boot-starter-data-redis
,如果没有的话请添加如下:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
2.2. 配置application.yml
spring:
redis:
database: 0
host: 127.0.0.1
port: 6379
password: passwd123 # 有密码就填,没有就不填
lettuce: # 我使用了lettuce,你也可以用jedis
pool:
max-idle: 8 # 连接池中的最大空闲连接
min-idle: 1 # 连接池中的最小空闲连接
max-active: 8 # 连接池最大连接数(使用负值表示没有限制)
max-wait: -1 # 连接池最大阻塞等待时间(使用负值表示没有限制)
timeout: 1000 #毫秒
channel-topic: buried_point:dev # 自定义一个
2.3. 创建监听器
创建一个监听器,实现StreamListener接口,用来消费生产的数据。
@Log4j2
@Component
public class BuriedPointListener implements StreamListener<String, MapRecord<String, String, String>> {
private RedisTemplate<String, String> redisTemplate;
private ObjectMapper objectMapper;
@Value("${spring.redis.channel-topic:default}")
private String topic;
public BuriedPointListener(RedisTemplate<String, String> redisTemplate, ObjectMapper objectMapper) {
this.redisTemplate = redisTemplate;
}
@Override
public void onMessage(MapRecord<String, String, String> message) {
String stream = message.getStream();
RecordId id = message.getId();
Map<String, String> map = message.getValue();
log.debug("[手动] group:[group-a] 接收到一个消息 stream:[{}],id:[{}],value:[{}]", stream, id, map);
// 以下将数据写入es的逻辑省略。。。
// 写入数据后删除该消息
// 手动确认消息,如果是 streamMessageListenerContainer.receiveAutoAck 则为自动确认,不需要这一步
redisTemplate.opsForStream().acknowledge(Objects.requireNonNull(stream), "group-a", id.getValue());
// 删除消息
redisTemplate.opsForStream().delete(Objects.requireNonNull(stream), id.getValue());
}
}
2.4. 创建RedisStreamConfiguration配置类
@Configuration
public class RedisStreamConfiguration {
@Autowired
private RedisConnectionFactory redisConnectionFactory;
@Autowired
private BuriedPointListener buriedPointListener;
@Value("${spring.redis.channel-topic:default}")
private String topic;
@Bean(initMethod = "start", destroyMethod = "stop")
public StreamMessageListenerContainer<String, MapRecord<String, String, String>> streamMessageListenerContainer() {
AtomicInteger index = new AtomicInteger(1);
int processors = Runtime.getRuntime().availableProcessors();
ThreadPoolExecutor executor = new ThreadPoolExecutor(processors, processors, 0, TimeUnit.SECONDS,
new LinkedBlockingDeque<>(), r -> {
Thread thread = new Thread(r);
thread.setName("async-stream-consumer-" + index.getAndIncrement());
thread.setDaemon(true);
return thread;
});
StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, MapRecord<String, String, String>> options =
StreamMessageListenerContainer.StreamMessageListenerContainerOptions
.builder()
// 一次最多获取多少条消息
.batchSize(3)
// 运行 Stream 的 poll task
.executor(executor)
// Stream 中没有消息时,阻塞多长时间,需要比 `spring.redis.timeout` 的时间小
.pollTimeout(Duration.ofSeconds(3))
// 获取消息的过程或获取到消息给具体的消息者处理的过程中,发生了异常的处理
.errorHandler(new StreamErrorHandler())
.build();
StreamMessageListenerContainer<String, MapRecord<String, String, String>> streamMessageListenerContainer =
StreamMessageListenerContainer.create(redisConnectionFactory, options);
// 消费组 group-a ,不自动ack group-a需要在接到消息以后先创建,才能消费。创建命令: xgroup create topicname[例如:testtopic] 组名[例如:group-a] offset从哪开始消费 [例如:0]
// 从消费组中没有分配给消费者的消息开始消费
streamMessageListenerContainer.receive(Consumer.from("group-a", "consumer-a"),
StreamOffset.create(topic, ReadOffset.lastConsumed()), buriedPointListener);
// 如果需要 自动ack,请使用receiveAutoAck()方法
//streamMessageListenerContainer.receiveAutoAck(Consumer.from("group-a", "consumer-a"),
// StreamOffset.create(topic, ReadOffset.lastConsumed()), buriedPointListener);
return streamMessageListenerContainer;
}
}
2.5. 创建消费组
127.0.0.1:6379> xgroup create buried_point:dev group-a 0
OK
2.6. 创建生产者
只需要调用 redisTemplate.opsForStream().add(topic, data);
方法往里写数据就行。
@Log4j2
@Service
public class CollectorServiceImpl implements CollectorService {
private RedisTemplate<String, String> redisTemplate;
private ObjectMapper objectMapper;
@Value("${spring.redis.channel-topic:default}")
private String topic;
public CollectorServiceImpl(ObjectMapper objectMapper, RedisTemplate<String, String> redisTemplate) {
this.objectMapper = objectMapper;
this.redisTemplate = redisTemplate;
}
@Override
public void sendToMsg(BuriedPointDTO buriedPointDTO) {
try {
//redisTemplate.convertAndSend(topic, objectMapper.writeValueAsString(buriedPointDTO));
Map<String, Object> data = new HashMap<>();
data.put("url", buriedPointDTO.getUrl());
data.put("title", buriedPointDTO.getTitle());
data.put("uid", buriedPointDTO.getUserId());
data.put("userName", buriedPointDTO.getUserName());
data.put("accessTime", DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss").format(buriedPointDTO.getAccessTime()));
redisTemplate.opsForStream().add(topic, data);
log.info(objectMapper.writeValueAsString(buriedPointDTO));
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
}
}
此时,启动项目,当redis stream
中有数据的时候就会开始消费数据。