From e9645d7054db531e02910c7ea89f35b09aea2fe4 Mon Sep 17 00:00:00 2001 From: gaibu <1016771049@qq.com> Date: Fri, 23 Dec 2022 10:31:41 +0800 Subject: [PATCH] =?UTF-8?q?fix:=20=E8=A7=A3=E5=86=B3=20redis=20mq=20?= =?UTF-8?q?=E6=B6=88=E6=81=AF=E4=B8=A2=E5=A4=B1=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../mq/scheduler/RedisPendingMessageResendJob.java | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/scheduler/RedisPendingMessageResendJob.java b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/scheduler/RedisPendingMessageResendJob.java index 2f64ab498..59d9a0489 100644 --- a/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/scheduler/RedisPendingMessageResendJob.java +++ b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/scheduler/RedisPendingMessageResendJob.java @@ -61,17 +61,17 @@ public class RedisPendingMessageResendJob { listeners.forEach(listener -> { PendingMessagesSummary pendingMessagesSummary = ops.pending(listener.getStreamKey(), groupName); - // 每个消费者的pending消息数量 + // 每个消费者的 pending 队列消息数量 Map pendingMessagesPerConsumer = pendingMessagesSummary.getPendingMessagesPerConsumer(); pendingMessagesPerConsumer.entrySet().forEach(entry -> { String consumerName = entry.getKey(); Long pendingMessageCount = entry.getValue(); log.info("[processPendingMessage][消费者({}) 消息数量({})]", consumerName, pendingMessageCount); - // 从消费者的pending队列中读取消息 - List> retVal = ops.read(Consumer.from(groupName, consumerName), StreamOffset.create(listener.getStreamKey(), ReadOffset.from("0"))); - if (CollUtil.isNotEmpty(retVal)) { - for (MapRecord record : retVal) { + // 从消费者的 pending 队列中读取消息 + List> records = ops.read(Consumer.from(groupName, consumerName), StreamOffset.create(listener.getStreamKey(), ReadOffset.from("0"))); + if (CollUtil.isNotEmpty(records)) { + for (MapRecord record : records) { // 重新投递消息 redisTemplate.getRedisTemplate().opsForStream().add(StreamRecords.newRecord() .ofObject(record.getValue()) // 设置内容