diff --git a/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/scheduler/PendingMessageScheduler.java b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/scheduler/PendingMessageScheduler.java index 38d8df3bb..1d5825a10 100644 --- a/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/scheduler/PendingMessageScheduler.java +++ b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/scheduler/PendingMessageScheduler.java @@ -3,6 +3,8 @@ package cn.iocoder.yudao.framework.mq.scheduler; import cn.iocoder.yudao.framework.mq.core.RedisMQTemplate; import cn.iocoder.yudao.framework.mq.core.stream.AbstractStreamMessageListener; import lombok.extern.slf4j.Slf4j; +import org.redisson.api.RLock; +import org.redisson.api.RedissonClient; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.data.redis.connection.stream.Consumer; @@ -17,6 +19,7 @@ import org.springframework.scheduling.annotation.Scheduled; import java.util.List; import java.util.Map; +import java.util.concurrent.TimeUnit; /** * 这个定时器用于处理,crash 之后的消费者未消费完的消息 @@ -24,19 +27,35 @@ import java.util.Map; @Slf4j @EnableScheduling public class PendingMessageScheduler { - + private static final String LOCK_KEY = "redis:pending:msg:lock"; @Autowired private List> listeners; @Autowired private RedisMQTemplate redisTemplate; @Value("${spring.application.name}") private String groupName; + @Autowired + private RedissonClient redissonClient; /** * 一分钟执行一次 */ @Scheduled(fixedRate = 60 * 1000) public void processPendingMessage() { + final RLock lock = redissonClient.getLock(LOCK_KEY); + try { + // 尝试加锁,最多等待 30 秒,上锁以后 60 秒自动解锁 + boolean lockFlag = lock.tryLock(30, 60, TimeUnit.SECONDS); + if (lockFlag) { + execute(); + } + } catch (InterruptedException e) { + log.error("获取锁失败", e); + } + + } + + private void execute() { StreamOperations ops = redisTemplate.getRedisTemplate().opsForStream(); for (AbstractStreamMessageListener listener : listeners) {