diff --git a/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/config/YudaoMQAutoConfiguration.java b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/config/YudaoMQAutoConfiguration.java index 3def221f3..488734947 100644 --- a/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/config/YudaoMQAutoConfiguration.java +++ b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/config/YudaoMQAutoConfiguration.java @@ -18,6 +18,7 @@ import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.data.redis.listener.ChannelTopic; import org.springframework.data.redis.listener.RedisMessageListenerContainer; +import org.springframework.data.redis.stream.DefaultStreamMessageListenerContainerX; import org.springframework.data.redis.stream.StreamMessageListenerContainer; import java.util.List; @@ -81,11 +82,12 @@ public class YudaoMQAutoConfiguration { .build(); // 创建 container 对象 StreamMessageListenerContainer> container = - StreamMessageListenerContainer.create(redisTemplate.getRequiredConnectionFactory(), containerOptions); +// StreamMessageListenerContainer.create(redisTemplate.getRequiredConnectionFactory(), containerOptions); + DefaultStreamMessageListenerContainerX.create(redisMQTemplate.getRedisTemplate().getRequiredConnectionFactory(), containerOptions); // 第二步,注册监听器,消费对应的 Stream 主题 String consumerName = buildConsumerName(); - listeners.forEach(listener -> { + listeners.parallelStream().forEach(listener -> { // 创建 listener 对应的消费者分组 try { redisTemplate.opsForStream().createGroup(listener.getStreamKey(), listener.getGroup()); diff --git a/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/org/springframework/data/redis/stream/DefaultStreamMessageListenerContainerX.java b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/org/springframework/data/redis/stream/DefaultStreamMessageListenerContainerX.java new file mode 100644 index 000000000..b4cf4c55e --- /dev/null +++ b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/org/springframework/data/redis/stream/DefaultStreamMessageListenerContainerX.java @@ -0,0 +1,62 @@ +package org.springframework.data.redis.stream; + +import cn.hutool.core.util.ReflectUtil; +import org.springframework.data.redis.connection.RedisConnectionFactory; +import org.springframework.data.redis.connection.stream.ByteRecord; +import org.springframework.data.redis.connection.stream.ReadOffset; +import org.springframework.data.redis.connection.stream.Record; +import org.springframework.util.Assert; + +import java.util.Collections; +import java.util.List; +import java.util.function.Function; + +/** + * 拓展 DefaultStreamMessageListenerContainer 实现,解决 Spring Data Redis + Redisson 结合使用时,Redisson 在 Stream 获得不到数据时,返回 null 而不是空 List,导致 NPE 异常。 + * 对应 issue:https://github.com/spring-projects/spring-data-redis/issues/2147 和 https://github.com/redisson/redisson/issues/4006 + * 目前看下来 Spring Data Redis 不肯加 null 判断,Redisson 暂时也没改返回 null 到空 List 的打算,所以暂时只能自己改,哽咽! + * + * @author 芋道源码 + */ +public class DefaultStreamMessageListenerContainerX> extends DefaultStreamMessageListenerContainer { + + /** + * 参考 {@link StreamMessageListenerContainer#create(RedisConnectionFactory, StreamMessageListenerContainerOptions)} 的实现 + */ + public static > StreamMessageListenerContainer create(RedisConnectionFactory connectionFactory, StreamMessageListenerContainer.StreamMessageListenerContainerOptions options) { + Assert.notNull(connectionFactory, "RedisConnectionFactory must not be null!"); + Assert.notNull(options, "StreamMessageListenerContainerOptions must not be null!"); + return new DefaultStreamMessageListenerContainerX<>(connectionFactory, options); + } + + public DefaultStreamMessageListenerContainerX(RedisConnectionFactory connectionFactory, StreamMessageListenerContainerOptions containerOptions) { + super(connectionFactory, containerOptions); + } + + /** + * 参考 {@link DefaultStreamMessageListenerContainer#register(StreamReadRequest, StreamListener)} 的实现 + */ + @Override + public Subscription register(StreamReadRequest streamRequest, StreamListener listener) { + return this.doRegisterX(getReadTaskX(streamRequest, listener)); + } + + @SuppressWarnings("unchecked") + private StreamPollTask getReadTaskX(StreamReadRequest streamRequest, StreamListener listener) { + StreamPollTask task = ReflectUtil.invoke(this, "getReadTask", streamRequest, listener); + // 修改 readFunction 方法 + Function> readFunction = (Function>) ReflectUtil.getFieldValue(task, "readFunction"); + ReflectUtil.setFieldValue(task, "readFunction", (Function>) readOffset -> { + List records = readFunction.apply(readOffset); + //【重点】保证 records 不是空,避免 NPE 的问题!!! + return records != null ? records : Collections.emptyList(); + }); + return task; + } + + private Subscription doRegisterX(Task task) { + return ReflectUtil.invoke(this, "doRegister", task); + } + +} + diff --git a/更新日志.md b/更新日志.md index 78e26f06c..7685d0acb 100644 --- a/更新日志.md +++ b/更新日志.md @@ -37,6 +37,7 @@ TODO * 【修复】Knife4j 接口文档 404 的问题,原因是 `spring.mvc.static-path-pattern` 配置项,影响了基础路径 [commit](https://gitee.com/zhijiantianya/ruoyi-vue-pro/commit/f966fae0606178ec5ffa47dd487f7984867da557) * 【修复】修复文件访问地址错误 [#68](https://github.com/YunaiV/ruoyi-vue-pro/pull/68) * 【修复】工作流程发起以及审批异常,由 `@NotEmpty` 校验、和 Long 类型异常导致 [#73](https://gitee.com/zhijiantianya/ruoyi-vue-pro/pulls/73) +* 【修复】自定义 DefaultStreamMessageListenerContainerX 实现,解决 Redisson Stream 读取不到数据返回 `null` 导致 NPE 问题 [commit]() ### 🔨 Dependency Upgrades