From c066ea46f9881e8ccf7b7abf3b2add8a38f9ea84 Mon Sep 17 00:00:00 2001 From: YunaiV Date: Mon, 30 Oct 2023 21:15:22 +0800 Subject: [PATCH] =?UTF-8?q?mq=20=E9=87=8D=E6=9E=84=EF=BC=9A=E9=BB=98?= =?UTF-8?q?=E8=AE=A4=E7=9A=84=20redis=20=E5=AE=9E=E7=8E=B0=EF=BC=9A=201?= =?UTF-8?q?=EF=BC=89=E9=BB=98=E8=AE=A4=20channel=20=E5=92=8C=20stream=20ke?= =?UTF-8?q?y=202=EF=BC=89=E7=A7=BB=E9=99=A4=20enabled=20=E5=BC=80=E5=85=B3?= =?UTF-8?q?=EF=BC=8C=E9=80=9A=E8=BF=87=20listener=20=E6=98=AF=E5=90=A6?= =?UTF-8?q?=E5=AD=98=E5=9C=A8=E6=9D=A5=E5=AE=9E=E7=8E=B0=203=EF=BC=89?= =?UTF-8?q?=E8=B0=83=E6=95=B4=E5=8C=85=E5=90=8D=EF=BC=8C=E4=B8=BA=E6=8E=A5?= =?UTF-8?q?=E5=85=A5=20rocketmq=20=E4=BD=9C=E4=B8=BA=20mq=20=E5=AE=9E?= =?UTF-8?q?=E7=8E=B0=E5=81=9A=E5=87=86=E5=A4=87?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../mq/TenantRedisMessageInterceptor.java | 4 +- .../yudao/framework/mq/package-info.java | 1 + .../YudaoRedisMQAutoConfiguration.java} | 26 +++----- .../mq/{ => redis}/core/RedisMQTemplate.java | 10 +-- .../interceptor/RedisMessageInterceptor.java | 4 +- .../job/RedisPendingMessageResendJob.java | 6 +- .../core/message/AbstractRedisMessage.java | 2 +- .../core/pubsub/AbstractChannelMessage.java | 10 +-- .../AbstractChannelMessageListener.java | 8 +-- .../core/stream/AbstractStreamMessage.java | 10 +-- .../stream/AbstractStreamMessageListener.java | 8 +-- .../framework/mq/redis/package-info.java | 6 ++ ...efaultStreamMessageListenerContainerX.java | 62 ------------------- ...ot.autoconfigure.AutoConfiguration.imports | 2 +- .../src/main/resources/application.yaml | 6 -- 15 files changed, 51 insertions(+), 114 deletions(-) rename yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/{config/YudaoMQAutoConfiguration.java => redis/config/YudaoRedisMQAutoConfiguration.java} (84%) rename yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/{ => redis}/core/RedisMQTemplate.java (86%) rename yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/{ => redis}/core/interceptor/RedisMessageInterceptor.java (79%) rename yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/{ => redis/core}/job/RedisPendingMessageResendJob.java (95%) rename yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/{ => redis}/core/message/AbstractRedisMessage.java (88%) rename yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/{ => redis}/core/pubsub/AbstractChannelMessage.java (57%) rename yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/{ => redis}/core/pubsub/AbstractChannelMessageListener.java (90%) rename yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/{ => redis}/core/stream/AbstractStreamMessage.java (51%) rename yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/{ => redis}/core/stream/AbstractStreamMessageListener.java (92%) create mode 100644 yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/redis/package-info.java delete mode 100644 yudao-framework/yudao-spring-boot-starter-mq/src/main/java/org/springframework/data/redis/stream/DefaultStreamMessageListenerContainerX.java diff --git a/yudao-framework/yudao-spring-boot-starter-biz-tenant/src/main/java/cn/iocoder/yudao/framework/tenant/core/mq/TenantRedisMessageInterceptor.java b/yudao-framework/yudao-spring-boot-starter-biz-tenant/src/main/java/cn/iocoder/yudao/framework/tenant/core/mq/TenantRedisMessageInterceptor.java index c493b41d1..e7abb6749 100644 --- a/yudao-framework/yudao-spring-boot-starter-biz-tenant/src/main/java/cn/iocoder/yudao/framework/tenant/core/mq/TenantRedisMessageInterceptor.java +++ b/yudao-framework/yudao-spring-boot-starter-biz-tenant/src/main/java/cn/iocoder/yudao/framework/tenant/core/mq/TenantRedisMessageInterceptor.java @@ -1,8 +1,8 @@ package cn.iocoder.yudao.framework.tenant.core.mq; import cn.hutool.core.util.StrUtil; -import cn.iocoder.yudao.framework.mq.core.interceptor.RedisMessageInterceptor; -import cn.iocoder.yudao.framework.mq.core.message.AbstractRedisMessage; +import cn.iocoder.yudao.framework.mq.redis.core.interceptor.RedisMessageInterceptor; +import cn.iocoder.yudao.framework.mq.redis.core.message.AbstractRedisMessage; import cn.iocoder.yudao.framework.tenant.core.context.TenantContextHolder; import static cn.iocoder.yudao.framework.web.core.util.WebFrameworkUtils.HEADER_TENANT_ID; diff --git a/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/package-info.java b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/package-info.java index 48eaf2386..62e7ce498 100644 --- a/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/package-info.java +++ b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/package-info.java @@ -1,4 +1,5 @@ /** + * TODO 芋艿:调整注释 * 消息队列,基于 Redis 提供: * 1. 基于 Pub/Sub 实现广播消费 * 2. 基于 Stream 实现集群消费 diff --git a/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/config/YudaoMQAutoConfiguration.java b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/redis/config/YudaoRedisMQAutoConfiguration.java similarity index 84% rename from yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/config/YudaoMQAutoConfiguration.java rename to yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/redis/config/YudaoRedisMQAutoConfiguration.java index e300b1ad5..7ecd87964 100644 --- a/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/config/YudaoMQAutoConfiguration.java +++ b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/redis/config/YudaoRedisMQAutoConfiguration.java @@ -1,21 +1,20 @@ -package cn.iocoder.yudao.framework.mq.config; +package cn.iocoder.yudao.framework.mq.redis.config; import cn.hutool.core.map.MapUtil; import cn.hutool.core.util.StrUtil; import cn.hutool.system.SystemUtil; import cn.iocoder.yudao.framework.common.enums.DocumentEnum; -import cn.iocoder.yudao.framework.mq.core.RedisMQTemplate; -import cn.iocoder.yudao.framework.mq.core.interceptor.RedisMessageInterceptor; -import cn.iocoder.yudao.framework.mq.core.pubsub.AbstractChannelMessageListener; -import cn.iocoder.yudao.framework.mq.core.stream.AbstractStreamMessageListener; -import cn.iocoder.yudao.framework.mq.job.RedisPendingMessageResendJob; +import cn.iocoder.yudao.framework.mq.redis.core.RedisMQTemplate; +import cn.iocoder.yudao.framework.mq.redis.core.interceptor.RedisMessageInterceptor; +import cn.iocoder.yudao.framework.mq.redis.core.job.RedisPendingMessageResendJob; +import cn.iocoder.yudao.framework.mq.redis.core.pubsub.AbstractChannelMessageListener; +import cn.iocoder.yudao.framework.mq.redis.core.stream.AbstractStreamMessageListener; import cn.iocoder.yudao.framework.redis.config.YudaoRedisAutoConfiguration; import lombok.extern.slf4j.Slf4j; import org.redisson.api.RedissonClient; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.autoconfigure.AutoConfiguration; import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; -import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.context.annotation.Bean; import org.springframework.data.redis.connection.RedisServerCommands; import org.springframework.data.redis.connection.stream.Consumer; @@ -27,7 +26,6 @@ import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.data.redis.listener.ChannelTopic; import org.springframework.data.redis.listener.RedisMessageListenerContainer; -import org.springframework.data.redis.stream.DefaultStreamMessageListenerContainerX; import org.springframework.data.redis.stream.StreamMessageListenerContainer; import org.springframework.scheduling.annotation.EnableScheduling; @@ -42,7 +40,7 @@ import java.util.Properties; @Slf4j @EnableScheduling // 启用定时任务,用于 RedisPendingMessageResendJob 重发消息 @AutoConfiguration(after = YudaoRedisAutoConfiguration.class) -public class YudaoMQAutoConfiguration { +public class YudaoRedisMQAutoConfiguration { @Bean public RedisMQTemplate redisMQTemplate(StringRedisTemplate redisTemplate, @@ -60,7 +58,6 @@ public class YudaoMQAutoConfiguration { */ @Bean(initMethod = "start", destroyMethod = "stop") @ConditionalOnBean(AbstractChannelMessageListener.class) // 只有 AbstractChannelMessageListener 存在的时候,才需要注册 Redis pubsub 监听 - @ConditionalOnProperty(prefix = "yudao.mq.redis.pubsub", value = "enable", matchIfMissing = true) // 允许使用 yudao.mq.redis.pubsub.enable=false 禁用多租户 public RedisMessageListenerContainer redisMessageListenerContainer( RedisMQTemplate redisMQTemplate, List> listeners) { // 创建 RedisMessageListenerContainer 对象 @@ -82,7 +79,6 @@ public class YudaoMQAutoConfiguration { */ @Bean @ConditionalOnBean(AbstractStreamMessageListener.class) // 只有 AbstractStreamMessageListener 存在的时候,才需要注册 Redis pubsub 监听 - @ConditionalOnProperty(prefix = "yudao.mq.redis.stream", value = "enable", matchIfMissing = true) // 允许使用 yudao.mq.redis.stream.enable=false 禁用多租户 public RedisPendingMessageResendJob redisPendingMessageResendJob(List> listeners, RedisMQTemplate redisTemplate, @Value("${spring.application.name}") String groupName, @@ -92,12 +88,11 @@ public class YudaoMQAutoConfiguration { /** * 创建 Redis Stream 集群消费的容器 - *

- * Redis Stream 的 xreadgroup 命令:https://www.geek-book.com/src/docs/redis/redis/redis.io/commands/xreadgroup.html + * + * 基础知识:Redis Stream 的 xreadgroup 命令 */ @Bean(initMethod = "start", destroyMethod = "stop") @ConditionalOnBean(AbstractStreamMessageListener.class) // 只有 AbstractStreamMessageListener 存在的时候,才需要注册 Redis pubsub 监听 - @ConditionalOnProperty(prefix = "yudao.mq.redis.stream", value = "enable", matchIfMissing = true) // 允许使用 yudao.mq.redis.stream.enable=false 禁用多租户 public StreamMessageListenerContainer> redisStreamMessageListenerContainer( RedisMQTemplate redisMQTemplate, List> listeners) { RedisTemplate redisTemplate = redisMQTemplate.getRedisTemplate(); @@ -111,8 +106,7 @@ public class YudaoMQAutoConfiguration { .build(); // 创建 container 对象 StreamMessageListenerContainer> container = -// StreamMessageListenerContainer.create(redisTemplate.getRequiredConnectionFactory(), containerOptions); - DefaultStreamMessageListenerContainerX.create(redisMQTemplate.getRedisTemplate().getRequiredConnectionFactory(), containerOptions); + StreamMessageListenerContainer.create(redisMQTemplate.getRedisTemplate().getRequiredConnectionFactory(), containerOptions); // 第二步,注册监听器,消费对应的 Stream 主题 String consumerName = buildConsumerName(); diff --git a/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/core/RedisMQTemplate.java b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/redis/core/RedisMQTemplate.java similarity index 86% rename from yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/core/RedisMQTemplate.java rename to yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/redis/core/RedisMQTemplate.java index 8a31feda7..92a0f772e 100644 --- a/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/core/RedisMQTemplate.java +++ b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/redis/core/RedisMQTemplate.java @@ -1,10 +1,10 @@ -package cn.iocoder.yudao.framework.mq.core; +package cn.iocoder.yudao.framework.mq.redis.core; import cn.iocoder.yudao.framework.common.util.json.JsonUtils; -import cn.iocoder.yudao.framework.mq.core.interceptor.RedisMessageInterceptor; -import cn.iocoder.yudao.framework.mq.core.message.AbstractRedisMessage; -import cn.iocoder.yudao.framework.mq.core.pubsub.AbstractChannelMessage; -import cn.iocoder.yudao.framework.mq.core.stream.AbstractStreamMessage; +import cn.iocoder.yudao.framework.mq.redis.core.interceptor.RedisMessageInterceptor; +import cn.iocoder.yudao.framework.mq.redis.core.message.AbstractRedisMessage; +import cn.iocoder.yudao.framework.mq.redis.core.pubsub.AbstractChannelMessage; +import cn.iocoder.yudao.framework.mq.redis.core.stream.AbstractStreamMessage; import lombok.AllArgsConstructor; import lombok.Getter; import org.springframework.data.redis.connection.stream.RecordId; diff --git a/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/core/interceptor/RedisMessageInterceptor.java b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/redis/core/interceptor/RedisMessageInterceptor.java similarity index 79% rename from yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/core/interceptor/RedisMessageInterceptor.java rename to yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/redis/core/interceptor/RedisMessageInterceptor.java index 11d8e1337..dbcee7fe2 100644 --- a/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/core/interceptor/RedisMessageInterceptor.java +++ b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/redis/core/interceptor/RedisMessageInterceptor.java @@ -1,6 +1,6 @@ -package cn.iocoder.yudao.framework.mq.core.interceptor; +package cn.iocoder.yudao.framework.mq.redis.core.interceptor; -import cn.iocoder.yudao.framework.mq.core.message.AbstractRedisMessage; +import cn.iocoder.yudao.framework.mq.redis.core.message.AbstractRedisMessage; /** * {@link AbstractRedisMessage} 消息拦截器 diff --git a/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/job/RedisPendingMessageResendJob.java b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/redis/core/job/RedisPendingMessageResendJob.java similarity index 95% rename from yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/job/RedisPendingMessageResendJob.java rename to yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/redis/core/job/RedisPendingMessageResendJob.java index ea0f53d19..02ede126e 100644 --- a/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/job/RedisPendingMessageResendJob.java +++ b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/redis/core/job/RedisPendingMessageResendJob.java @@ -1,8 +1,8 @@ -package cn.iocoder.yudao.framework.mq.job; +package cn.iocoder.yudao.framework.mq.redis.core.job; import cn.hutool.core.collection.CollUtil; -import cn.iocoder.yudao.framework.mq.core.RedisMQTemplate; -import cn.iocoder.yudao.framework.mq.core.stream.AbstractStreamMessageListener; +import cn.iocoder.yudao.framework.mq.redis.core.RedisMQTemplate; +import cn.iocoder.yudao.framework.mq.redis.core.stream.AbstractStreamMessageListener; import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.redisson.api.RLock; diff --git a/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/core/message/AbstractRedisMessage.java b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/redis/core/message/AbstractRedisMessage.java similarity index 88% rename from yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/core/message/AbstractRedisMessage.java rename to yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/redis/core/message/AbstractRedisMessage.java index f02e89d6f..ee40814dd 100644 --- a/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/core/message/AbstractRedisMessage.java +++ b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/redis/core/message/AbstractRedisMessage.java @@ -1,4 +1,4 @@ -package cn.iocoder.yudao.framework.mq.core.message; +package cn.iocoder.yudao.framework.mq.redis.core.message; import lombok.Data; diff --git a/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/core/pubsub/AbstractChannelMessage.java b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/redis/core/pubsub/AbstractChannelMessage.java similarity index 57% rename from yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/core/pubsub/AbstractChannelMessage.java rename to yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/redis/core/pubsub/AbstractChannelMessage.java index fbc2a2826..ce03efcf3 100644 --- a/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/core/pubsub/AbstractChannelMessage.java +++ b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/redis/core/pubsub/AbstractChannelMessage.java @@ -1,6 +1,6 @@ -package cn.iocoder.yudao.framework.mq.core.pubsub; +package cn.iocoder.yudao.framework.mq.redis.core.pubsub; -import cn.iocoder.yudao.framework.mq.core.message.AbstractRedisMessage; +import cn.iocoder.yudao.framework.mq.redis.core.message.AbstractRedisMessage; import com.fasterxml.jackson.annotation.JsonIgnore; /** @@ -11,11 +11,13 @@ import com.fasterxml.jackson.annotation.JsonIgnore; public abstract class AbstractChannelMessage extends AbstractRedisMessage { /** - * 获得 Redis Channel + * 获得 Redis Channel,默认使用类名 * * @return Channel */ @JsonIgnore // 避免序列化。原因是,Redis 发布 Channel 消息的时候,已经会指定。 - public abstract String getChannel(); + public String getChannel() { + return getClass().getSimpleName(); + } } diff --git a/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/core/pubsub/AbstractChannelMessageListener.java b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/redis/core/pubsub/AbstractChannelMessageListener.java similarity index 90% rename from yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/core/pubsub/AbstractChannelMessageListener.java rename to yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/redis/core/pubsub/AbstractChannelMessageListener.java index e7d737d1b..bae90a9b8 100644 --- a/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/core/pubsub/AbstractChannelMessageListener.java +++ b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/redis/core/pubsub/AbstractChannelMessageListener.java @@ -1,10 +1,10 @@ -package cn.iocoder.yudao.framework.mq.core.pubsub; +package cn.iocoder.yudao.framework.mq.redis.core.pubsub; import cn.hutool.core.util.TypeUtil; import cn.iocoder.yudao.framework.common.util.json.JsonUtils; -import cn.iocoder.yudao.framework.mq.core.RedisMQTemplate; -import cn.iocoder.yudao.framework.mq.core.interceptor.RedisMessageInterceptor; -import cn.iocoder.yudao.framework.mq.core.message.AbstractRedisMessage; +import cn.iocoder.yudao.framework.mq.redis.core.RedisMQTemplate; +import cn.iocoder.yudao.framework.mq.redis.core.interceptor.RedisMessageInterceptor; +import cn.iocoder.yudao.framework.mq.redis.core.message.AbstractRedisMessage; import lombok.Setter; import lombok.SneakyThrows; import org.springframework.data.redis.connection.Message; diff --git a/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/core/stream/AbstractStreamMessage.java b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/redis/core/stream/AbstractStreamMessage.java similarity index 51% rename from yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/core/stream/AbstractStreamMessage.java rename to yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/redis/core/stream/AbstractStreamMessage.java index 29ea833f3..72c6aac3f 100644 --- a/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/core/stream/AbstractStreamMessage.java +++ b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/redis/core/stream/AbstractStreamMessage.java @@ -1,6 +1,6 @@ -package cn.iocoder.yudao.framework.mq.core.stream; +package cn.iocoder.yudao.framework.mq.redis.core.stream; -import cn.iocoder.yudao.framework.mq.core.message.AbstractRedisMessage; +import cn.iocoder.yudao.framework.mq.redis.core.message.AbstractRedisMessage; import com.fasterxml.jackson.annotation.JsonIgnore; /** @@ -11,11 +11,13 @@ import com.fasterxml.jackson.annotation.JsonIgnore; public abstract class AbstractStreamMessage extends AbstractRedisMessage { /** - * 获得 Redis Stream Key + * 获得 Redis Stream Key,默认使用类名 * * @return Channel */ @JsonIgnore // 避免序列化 - public abstract String getStreamKey(); + public String getStreamKey() { + return getClass().getSimpleName(); + } } diff --git a/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/core/stream/AbstractStreamMessageListener.java b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/redis/core/stream/AbstractStreamMessageListener.java similarity index 92% rename from yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/core/stream/AbstractStreamMessageListener.java rename to yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/redis/core/stream/AbstractStreamMessageListener.java index f216885ae..4a4ea00c7 100644 --- a/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/core/stream/AbstractStreamMessageListener.java +++ b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/redis/core/stream/AbstractStreamMessageListener.java @@ -1,10 +1,10 @@ -package cn.iocoder.yudao.framework.mq.core.stream; +package cn.iocoder.yudao.framework.mq.redis.core.stream; import cn.hutool.core.util.TypeUtil; import cn.iocoder.yudao.framework.common.util.json.JsonUtils; -import cn.iocoder.yudao.framework.mq.core.RedisMQTemplate; -import cn.iocoder.yudao.framework.mq.core.interceptor.RedisMessageInterceptor; -import cn.iocoder.yudao.framework.mq.core.message.AbstractRedisMessage; +import cn.iocoder.yudao.framework.mq.redis.core.RedisMQTemplate; +import cn.iocoder.yudao.framework.mq.redis.core.interceptor.RedisMessageInterceptor; +import cn.iocoder.yudao.framework.mq.redis.core.message.AbstractRedisMessage; import lombok.Getter; import lombok.Setter; import lombok.SneakyThrows; diff --git a/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/redis/package-info.java b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/redis/package-info.java new file mode 100644 index 000000000..6621fc1ea --- /dev/null +++ b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/redis/package-info.java @@ -0,0 +1,6 @@ +/** + * 消息队列,基于 Redis 提供: + * 1. 基于 Pub/Sub 实现广播消费 + * 2. 基于 Stream 实现集群消费 + */ +package cn.iocoder.yudao.framework.mq.redis; diff --git a/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/org/springframework/data/redis/stream/DefaultStreamMessageListenerContainerX.java b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/org/springframework/data/redis/stream/DefaultStreamMessageListenerContainerX.java deleted file mode 100644 index b4cf4c55e..000000000 --- a/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/org/springframework/data/redis/stream/DefaultStreamMessageListenerContainerX.java +++ /dev/null @@ -1,62 +0,0 @@ -package org.springframework.data.redis.stream; - -import cn.hutool.core.util.ReflectUtil; -import org.springframework.data.redis.connection.RedisConnectionFactory; -import org.springframework.data.redis.connection.stream.ByteRecord; -import org.springframework.data.redis.connection.stream.ReadOffset; -import org.springframework.data.redis.connection.stream.Record; -import org.springframework.util.Assert; - -import java.util.Collections; -import java.util.List; -import java.util.function.Function; - -/** - * 拓展 DefaultStreamMessageListenerContainer 实现,解决 Spring Data Redis + Redisson 结合使用时,Redisson 在 Stream 获得不到数据时,返回 null 而不是空 List,导致 NPE 异常。 - * 对应 issue:https://github.com/spring-projects/spring-data-redis/issues/2147 和 https://github.com/redisson/redisson/issues/4006 - * 目前看下来 Spring Data Redis 不肯加 null 判断,Redisson 暂时也没改返回 null 到空 List 的打算,所以暂时只能自己改,哽咽! - * - * @author 芋道源码 - */ -public class DefaultStreamMessageListenerContainerX> extends DefaultStreamMessageListenerContainer { - - /** - * 参考 {@link StreamMessageListenerContainer#create(RedisConnectionFactory, StreamMessageListenerContainerOptions)} 的实现 - */ - public static > StreamMessageListenerContainer create(RedisConnectionFactory connectionFactory, StreamMessageListenerContainer.StreamMessageListenerContainerOptions options) { - Assert.notNull(connectionFactory, "RedisConnectionFactory must not be null!"); - Assert.notNull(options, "StreamMessageListenerContainerOptions must not be null!"); - return new DefaultStreamMessageListenerContainerX<>(connectionFactory, options); - } - - public DefaultStreamMessageListenerContainerX(RedisConnectionFactory connectionFactory, StreamMessageListenerContainerOptions containerOptions) { - super(connectionFactory, containerOptions); - } - - /** - * 参考 {@link DefaultStreamMessageListenerContainer#register(StreamReadRequest, StreamListener)} 的实现 - */ - @Override - public Subscription register(StreamReadRequest streamRequest, StreamListener listener) { - return this.doRegisterX(getReadTaskX(streamRequest, listener)); - } - - @SuppressWarnings("unchecked") - private StreamPollTask getReadTaskX(StreamReadRequest streamRequest, StreamListener listener) { - StreamPollTask task = ReflectUtil.invoke(this, "getReadTask", streamRequest, listener); - // 修改 readFunction 方法 - Function> readFunction = (Function>) ReflectUtil.getFieldValue(task, "readFunction"); - ReflectUtil.setFieldValue(task, "readFunction", (Function>) readOffset -> { - List records = readFunction.apply(readOffset); - //【重点】保证 records 不是空,避免 NPE 的问题!!! - return records != null ? records : Collections.emptyList(); - }); - return task; - } - - private Subscription doRegisterX(Task task) { - return ReflectUtil.invoke(this, "doRegister", task); - } - -} - diff --git a/yudao-framework/yudao-spring-boot-starter-mq/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports b/yudao-framework/yudao-spring-boot-starter-mq/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports index c47aa4d7b..29def72ff 100644 --- a/yudao-framework/yudao-spring-boot-starter-mq/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports +++ b/yudao-framework/yudao-spring-boot-starter-mq/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports @@ -1 +1 @@ -cn.iocoder.yudao.framework.mq.config.YudaoMQAutoConfiguration \ No newline at end of file +cn.iocoder.yudao.framework.mq.redis.config.YudaoRedisMQAutoConfiguration diff --git a/yudao-server/src/main/resources/application.yaml b/yudao-server/src/main/resources/application.yaml index 1e507c8ac..ef195f34a 100644 --- a/yudao-server/src/main/resources/application.yaml +++ b/yudao-server/src/main/resources/application.yaml @@ -145,12 +145,6 @@ yudao: - cn.iocoder.yudao.module.pay.enums.ErrorCodeConstants - cn.iocoder.yudao.module.system.enums.ErrorCodeConstants - cn.iocoder.yudao.module.mp.enums.ErrorCodeConstants - mq: - redis: - pubsub: - enable: false # 是否开启 Redis pubsub 广播消费,默认为 true。这里设置成 false,可以按需开启 - stream: - enable: false # 是否开启 Redis stream 集群消费,默认为 true。这里设置成 false,可以按需开启 tenant: # 多租户相关配置项 enable: true ignore-urls: