websocket:修复 RedisWebSocketMessageConsumer 注册失效的问题

This commit is contained in:
YunaiV 2023-11-24 18:56:19 +08:00
parent 0233b092b8
commit a597eb17d4
4 changed files with 38 additions and 19 deletions

View File

@ -5,7 +5,6 @@ import cn.hutool.core.util.StrUtil;
import cn.hutool.system.SystemUtil;
import cn.iocoder.yudao.framework.common.enums.DocumentEnum;
import cn.iocoder.yudao.framework.mq.redis.core.RedisMQTemplate;
import cn.iocoder.yudao.framework.mq.redis.core.interceptor.RedisMessageInterceptor;
import cn.iocoder.yudao.framework.mq.redis.core.job.RedisPendingMessageResendJob;
import cn.iocoder.yudao.framework.mq.redis.core.pubsub.AbstractRedisChannelMessageListener;
import cn.iocoder.yudao.framework.mq.redis.core.stream.AbstractRedisStreamMessageListener;
@ -23,7 +22,6 @@ import org.springframework.data.redis.connection.stream.ReadOffset;
import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.core.RedisCallback;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.listener.ChannelTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.stream.StreamMessageListenerContainer;
@ -33,30 +31,19 @@ import java.util.List;
import java.util.Properties;
/**
* 消息队列配置类
* Redis 消息队列 Consumer 配置类
*
* @author 芋道源码
*/
@Slf4j
@EnableScheduling // 启用定时任务用于 RedisPendingMessageResendJob 重发消息
@AutoConfiguration(after = YudaoRedisAutoConfiguration.class)
public class YudaoRedisMQAutoConfiguration {
@Bean
public RedisMQTemplate redisMQTemplate(StringRedisTemplate redisTemplate,
List<RedisMessageInterceptor> interceptors) {
RedisMQTemplate redisMQTemplate = new RedisMQTemplate(redisTemplate);
// 添加拦截器
interceptors.forEach(redisMQTemplate::addInterceptor);
return redisMQTemplate;
}
// ========== 消费者相关 ==========
public class YudaoRedisMQConsumerAutoConfiguration {
/**
* 创建 Redis Pub/Sub 广播消费的容器
*/
@Bean(initMethod = "start", destroyMethod = "stop")
@Bean
@ConditionalOnBean(AbstractRedisChannelMessageListener.class) // 只有 AbstractChannelMessageListener 存在的时候才需要注册 Redis pubsub 监听
public RedisMessageListenerContainer redisMessageListenerContainer(
RedisMQTemplate redisMQTemplate, List<AbstractRedisChannelMessageListener<?>> listeners) {

View File

@ -0,0 +1,31 @@
package cn.iocoder.yudao.framework.mq.redis.config;
import cn.iocoder.yudao.framework.mq.redis.core.RedisMQTemplate;
import cn.iocoder.yudao.framework.mq.redis.core.interceptor.RedisMessageInterceptor;
import cn.iocoder.yudao.framework.redis.config.YudaoRedisAutoConfiguration;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.autoconfigure.AutoConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.data.redis.core.StringRedisTemplate;
import java.util.List;
/**
* Redis 消息队列 Producer 配置类
*
* @author 芋道源码
*/
@Slf4j
@AutoConfiguration(after = YudaoRedisAutoConfiguration.class)
public class YudaoRedisMQProducerAutoConfiguration {
@Bean
public RedisMQTemplate redisMQTemplate(StringRedisTemplate redisTemplate,
List<RedisMessageInterceptor> interceptors) {
RedisMQTemplate redisMQTemplate = new RedisMQTemplate(redisTemplate);
// 添加拦截器
interceptors.forEach(redisMQTemplate::addInterceptor);
return redisMQTemplate;
}
}

View File

@ -1,2 +1,3 @@
cn.iocoder.yudao.framework.mq.redis.config.YudaoRedisMQAutoConfiguration
cn.iocoder.yudao.framework.mq.redis.config.YudaoRedisMQProducerAutoConfiguration
cn.iocoder.yudao.framework.mq.redis.config.YudaoRedisMQConsumerAutoConfiguration
cn.iocoder.yudao.framework.mq.rabbitmq.config.YudaoRabbitMQAutoConfiguration

View File

@ -1,5 +1,6 @@
package cn.iocoder.yudao.framework.websocket.config;
import cn.iocoder.yudao.framework.mq.redis.config.YudaoRedisMQConsumerAutoConfiguration;
import cn.iocoder.yudao.framework.mq.redis.core.RedisMQTemplate;
import cn.iocoder.yudao.framework.websocket.core.handler.JsonWebSocketMessageHandler;
import cn.iocoder.yudao.framework.websocket.core.listener.WebSocketMessageListener;
@ -38,7 +39,7 @@ import java.util.List;
*
* @author xingyu4j
*/
@AutoConfiguration
@AutoConfiguration(before = YudaoRedisMQConsumerAutoConfiguration.class) // before YudaoRedisMQConsumerAutoConfiguration 的原因是需要保证 RedisWebSocketMessageConsumer 先创建才能创建 RedisMessageListenerContainer
@EnableWebSocket // 开启 websocket
@ConditionalOnProperty(prefix = "yudao.websocket", value = "enable", matchIfMissing = true) // 允许使用 yudao.websocket.enable=false 禁用 websocket
@EnableConfigurationProperties(WebSocketProperties.class)
@ -98,7 +99,6 @@ public class YudaoWebSocketAutoConfiguration {
return new RedisWebSocketMessageSender(sessionManager, redisMQTemplate);
}
// TODO 芋艿需要额外删除 YudaoRedisMQAutoConfiguration RedisMessageListenerContainer Bean 上的 @ConditionalOnBean 注解可能是 spring boot bug
@Bean
public RedisWebSocketMessageConsumer redisWebSocketMessageConsumer(
RedisWebSocketMessageSender redisWebSocketMessageSender) {