diff --git a/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/scheduler/RedisPendingMessageResendJob.java b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/scheduler/RedisPendingMessageResendJob.java index 2f64ab498..59d9a0489 100644 --- a/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/scheduler/RedisPendingMessageResendJob.java +++ b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/scheduler/RedisPendingMessageResendJob.java @@ -61,17 +61,17 @@ public class RedisPendingMessageResendJob { listeners.forEach(listener -> { PendingMessagesSummary pendingMessagesSummary = ops.pending(listener.getStreamKey(), groupName); - // 每个消费者的pending消息数量 + // 每个消费者的 pending 队列消息数量 Map pendingMessagesPerConsumer = pendingMessagesSummary.getPendingMessagesPerConsumer(); pendingMessagesPerConsumer.entrySet().forEach(entry -> { String consumerName = entry.getKey(); Long pendingMessageCount = entry.getValue(); log.info("[processPendingMessage][消费者({}) 消息数量({})]", consumerName, pendingMessageCount); - // 从消费者的pending队列中读取消息 - List> retVal = ops.read(Consumer.from(groupName, consumerName), StreamOffset.create(listener.getStreamKey(), ReadOffset.from("0"))); - if (CollUtil.isNotEmpty(retVal)) { - for (MapRecord record : retVal) { + // 从消费者的 pending 队列中读取消息 + List> records = ops.read(Consumer.from(groupName, consumerName), StreamOffset.create(listener.getStreamKey(), ReadOffset.from("0"))); + if (CollUtil.isNotEmpty(records)) { + for (MapRecord record : records) { // 重新投递消息 redisTemplate.getRedisTemplate().opsForStream().add(StreamRecords.newRecord() .ofObject(record.getValue()) // 设置内容