diff --git a/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/config/YudaoMQAutoConfiguration.java b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/config/YudaoMQAutoConfiguration.java index c369d49d6..042a4f736 100644 --- a/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/config/YudaoMQAutoConfiguration.java +++ b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/config/YudaoMQAutoConfiguration.java @@ -8,8 +8,11 @@ import cn.iocoder.yudao.framework.mq.core.RedisMQTemplate; import cn.iocoder.yudao.framework.mq.core.interceptor.RedisMessageInterceptor; import cn.iocoder.yudao.framework.mq.core.pubsub.AbstractChannelMessageListener; import cn.iocoder.yudao.framework.mq.core.stream.AbstractStreamMessageListener; +import cn.iocoder.yudao.framework.mq.job.RedisPendingMessageResendJob; import cn.iocoder.yudao.framework.redis.config.YudaoRedisAutoConfiguration; import lombok.extern.slf4j.Slf4j; +import org.redisson.api.RedissonClient; +import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.autoconfigure.AutoConfiguration; import org.springframework.context.annotation.Bean; import org.springframework.data.redis.connection.RedisServerCommands; @@ -24,7 +27,7 @@ import org.springframework.data.redis.listener.ChannelTopic; import org.springframework.data.redis.listener.RedisMessageListenerContainer; import org.springframework.data.redis.stream.DefaultStreamMessageListenerContainerX; import org.springframework.data.redis.stream.StreamMessageListenerContainer; -import org.springframework.scheduling.annotation.Async; +import org.springframework.scheduling.annotation.EnableScheduling; import java.util.List; import java.util.Properties; @@ -35,6 +38,7 @@ import java.util.Properties; * @author 芋道源码 */ @Slf4j +@EnableScheduling // 启用定时任务,用于 RedisPendingMessageResendJob 重发消息 @AutoConfiguration(after = YudaoRedisAutoConfiguration.class) public class YudaoMQAutoConfiguration { @@ -69,9 +73,20 @@ public class YudaoMQAutoConfiguration { return container; } + /** + * 创建 Redis Stream 重新消费的任务 + */ + @Bean + public RedisPendingMessageResendJob redisPendingMessageResendJob(List> listeners, + RedisMQTemplate redisTemplate, + @Value("${spring.application.name}") String groupName, + RedissonClient redissonClient) { + return new RedisPendingMessageResendJob(listeners, redisTemplate, groupName, redissonClient); + } + /** * 创建 Redis Stream 集群消费的容器 - * + *

* Redis Stream 的 xreadgroup 命令:https://www.geek-book.com/src/docs/redis/redis/redis.io/commands/xreadgroup.html */ @Bean(initMethod = "start", destroyMethod = "stop") @@ -99,7 +114,8 @@ public class YudaoMQAutoConfiguration { // 创建 listener 对应的消费者分组 try { redisTemplate.opsForStream().createGroup(listener.getStreamKey(), listener.getGroup()); - } catch (Exception ignore) {} + } catch (Exception ignore) { + } // 设置 listener 对应的 redisTemplate listener.setRedisMQTemplate(redisMQTemplate); // 创建 Consumer 对象 diff --git a/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/job/RedisPendingMessageResendJob.java b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/job/RedisPendingMessageResendJob.java new file mode 100644 index 000000000..f4ba050c0 --- /dev/null +++ b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/job/RedisPendingMessageResendJob.java @@ -0,0 +1,80 @@ +package cn.iocoder.yudao.framework.mq.job; + +import cn.hutool.core.collection.CollUtil; +import cn.iocoder.yudao.framework.mq.core.RedisMQTemplate; +import cn.iocoder.yudao.framework.mq.core.stream.AbstractStreamMessageListener; +import lombok.AllArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.redisson.api.RLock; +import org.redisson.api.RedissonClient; +import org.springframework.data.redis.connection.stream.Consumer; +import org.springframework.data.redis.connection.stream.MapRecord; +import org.springframework.data.redis.connection.stream.PendingMessagesSummary; +import org.springframework.data.redis.connection.stream.ReadOffset; +import org.springframework.data.redis.connection.stream.StreamOffset; +import org.springframework.data.redis.connection.stream.StreamRecords; +import org.springframework.data.redis.core.StreamOperations; +import org.springframework.scheduling.annotation.Scheduled; + +import java.util.List; +import java.util.Map; + +/** + * 这个任务用于处理,crash 之后的消费者未消费完的消息 + */ +@Slf4j +@AllArgsConstructor +public class RedisPendingMessageResendJob { + + private static final String LOCK_KEY = "redis:pending:msg:lock"; + + private final List> listeners; + private final RedisMQTemplate redisTemplate; + private final String groupName; + private final RedissonClient redissonClient; + + /** + * 一分钟执行一次,这里选择每分钟的35秒执行,是为了避免整点任务过多的问题 + */ + @Scheduled(cron = "35 * * * * ?") + public void messageResend() { + RLock lock = redissonClient.getLock(LOCK_KEY); + // 尝试加锁 + if (lock.tryLock()) { + try { + execute(); + } catch (Exception ex) { + log.error("[messageResend][执行异常]", ex); + } finally { + lock.unlock(); + } + } + } + + private void execute() { + StreamOperations ops = redisTemplate.getRedisTemplate().opsForStream(); + listeners.forEach(listener -> { + PendingMessagesSummary pendingMessagesSummary = ops.pending(listener.getStreamKey(), groupName); + // 每个消费者的 pending 队列消息数量 + Map pendingMessagesPerConsumer = pendingMessagesSummary.getPendingMessagesPerConsumer(); + pendingMessagesPerConsumer.forEach((consumerName, pendingMessageCount) -> { + log.info("[processPendingMessage][消费者({}) 消息数量({})]", consumerName, pendingMessageCount); + + // 从消费者的 pending 队列中读取消息 + List> records = ops.read(Consumer.from(groupName, consumerName), StreamOffset.create(listener.getStreamKey(), ReadOffset.from("0"))); + if (CollUtil.isEmpty(records)) { + return; + } + for (MapRecord record : records) { + // 重新投递消息 + redisTemplate.getRedisTemplate().opsForStream().add(StreamRecords.newRecord() + .ofObject(record.getValue()) // 设置内容 + .withStreamKey(listener.getStreamKey())); + + // ack 消息消费完成 + redisTemplate.getRedisTemplate().opsForStream().acknowledge(groupName, record); + } + }); + }); + } +}