From a6c92816f0fb5801c27f6dc74cc0ac9542622b86 Mon Sep 17 00:00:00 2001 From: gaibu <1016771049@qq.com> Date: Thu, 15 Dec 2022 15:33:15 +0800 Subject: [PATCH] =?UTF-8?q?fix:=20=E8=A7=A3=E5=86=B3=20redis=20mq=20?= =?UTF-8?q?=E6=B6=88=E6=81=AF=E4=B8=A2=E5=A4=B1=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../mq/scheduler/PendingMessageScheduler.java | 21 ++++++++++++++++++- 1 file changed, 20 insertions(+), 1 deletion(-) diff --git a/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/scheduler/PendingMessageScheduler.java b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/scheduler/PendingMessageScheduler.java index 38d8df3bb..1d5825a10 100644 --- a/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/scheduler/PendingMessageScheduler.java +++ b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/scheduler/PendingMessageScheduler.java @@ -3,6 +3,8 @@ package cn.iocoder.yudao.framework.mq.scheduler; import cn.iocoder.yudao.framework.mq.core.RedisMQTemplate; import cn.iocoder.yudao.framework.mq.core.stream.AbstractStreamMessageListener; import lombok.extern.slf4j.Slf4j; +import org.redisson.api.RLock; +import org.redisson.api.RedissonClient; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.data.redis.connection.stream.Consumer; @@ -17,6 +19,7 @@ import org.springframework.scheduling.annotation.Scheduled; import java.util.List; import java.util.Map; +import java.util.concurrent.TimeUnit; /** * 这个定时器用于处理,crash 之后的消费者未消费完的消息 @@ -24,19 +27,35 @@ import java.util.Map; @Slf4j @EnableScheduling public class PendingMessageScheduler { - + private static final String LOCK_KEY = "redis:pending:msg:lock"; @Autowired private List> listeners; @Autowired private RedisMQTemplate redisTemplate; @Value("${spring.application.name}") private String groupName; + @Autowired + private RedissonClient redissonClient; /** * 一分钟执行一次 */ @Scheduled(fixedRate = 60 * 1000) public void processPendingMessage() { + final RLock lock = redissonClient.getLock(LOCK_KEY); + try { + // 尝试加锁,最多等待 30 秒,上锁以后 60 秒自动解锁 + boolean lockFlag = lock.tryLock(30, 60, TimeUnit.SECONDS); + if (lockFlag) { + execute(); + } + } catch (InterruptedException e) { + log.error("获取锁失败", e); + } + + } + + private void execute() { StreamOperations ops = redisTemplate.getRedisTemplate().opsForStream(); for (AbstractStreamMessageListener listener : listeners) {