diff --git a/yudao-dependencies/pom.xml b/yudao-dependencies/pom.xml index dbd6e2f3c..bbae1a273 100644 --- a/yudao-dependencies/pom.xml +++ b/yudao-dependencies/pom.xml @@ -175,6 +175,12 @@ ${revision} + + cn.iocoder.boot + yudao-spring-boot-starter-websocket + ${revision} + + com.github.xiaoymin knife4j-openapi3-spring-boot-starter diff --git a/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/redis/config/YudaoRedisMQAutoConfiguration.java b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/redis/config/YudaoRedisMQConsumerAutoConfiguration.java similarity index 92% rename from yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/redis/config/YudaoRedisMQAutoConfiguration.java rename to yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/redis/config/YudaoRedisMQConsumerAutoConfiguration.java index bbc63b719..d02e84b14 100644 --- a/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/redis/config/YudaoRedisMQAutoConfiguration.java +++ b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/redis/config/YudaoRedisMQConsumerAutoConfiguration.java @@ -5,7 +5,6 @@ import cn.hutool.core.util.StrUtil; import cn.hutool.system.SystemUtil; import cn.iocoder.yudao.framework.common.enums.DocumentEnum; import cn.iocoder.yudao.framework.mq.redis.core.RedisMQTemplate; -import cn.iocoder.yudao.framework.mq.redis.core.interceptor.RedisMessageInterceptor; import cn.iocoder.yudao.framework.mq.redis.core.job.RedisPendingMessageResendJob; import cn.iocoder.yudao.framework.mq.redis.core.pubsub.AbstractRedisChannelMessageListener; import cn.iocoder.yudao.framework.mq.redis.core.stream.AbstractRedisStreamMessageListener; @@ -23,7 +22,6 @@ import org.springframework.data.redis.connection.stream.ReadOffset; import org.springframework.data.redis.connection.stream.StreamOffset; import org.springframework.data.redis.core.RedisCallback; import org.springframework.data.redis.core.RedisTemplate; -import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.data.redis.listener.ChannelTopic; import org.springframework.data.redis.listener.RedisMessageListenerContainer; import org.springframework.data.redis.stream.StreamMessageListenerContainer; @@ -33,30 +31,19 @@ import java.util.List; import java.util.Properties; /** - * 消息队列配置类 + * Redis 消息队列 Consumer 配置类 * * @author 芋道源码 */ @Slf4j @EnableScheduling // 启用定时任务,用于 RedisPendingMessageResendJob 重发消息 @AutoConfiguration(after = YudaoRedisAutoConfiguration.class) -public class YudaoRedisMQAutoConfiguration { - - @Bean - public RedisMQTemplate redisMQTemplate(StringRedisTemplate redisTemplate, - List interceptors) { - RedisMQTemplate redisMQTemplate = new RedisMQTemplate(redisTemplate); - // 添加拦截器 - interceptors.forEach(redisMQTemplate::addInterceptor); - return redisMQTemplate; - } - - // ========== 消费者相关 ========== +public class YudaoRedisMQConsumerAutoConfiguration { /** * 创建 Redis Pub/Sub 广播消费的容器 */ - @Bean(initMethod = "start", destroyMethod = "stop") + @Bean @ConditionalOnBean(AbstractRedisChannelMessageListener.class) // 只有 AbstractChannelMessageListener 存在的时候,才需要注册 Redis pubsub 监听 public RedisMessageListenerContainer redisMessageListenerContainer( RedisMQTemplate redisMQTemplate, List> listeners) { diff --git a/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/redis/config/YudaoRedisMQProducerAutoConfiguration.java b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/redis/config/YudaoRedisMQProducerAutoConfiguration.java new file mode 100644 index 000000000..c1950c489 --- /dev/null +++ b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/redis/config/YudaoRedisMQProducerAutoConfiguration.java @@ -0,0 +1,31 @@ +package cn.iocoder.yudao.framework.mq.redis.config; + +import cn.iocoder.yudao.framework.mq.redis.core.RedisMQTemplate; +import cn.iocoder.yudao.framework.mq.redis.core.interceptor.RedisMessageInterceptor; +import cn.iocoder.yudao.framework.redis.config.YudaoRedisAutoConfiguration; +import lombok.extern.slf4j.Slf4j; +import org.springframework.boot.autoconfigure.AutoConfiguration; +import org.springframework.context.annotation.Bean; +import org.springframework.data.redis.core.StringRedisTemplate; + +import java.util.List; + +/** + * Redis 消息队列 Producer 配置类 + * + * @author 芋道源码 + */ +@Slf4j +@AutoConfiguration(after = YudaoRedisAutoConfiguration.class) +public class YudaoRedisMQProducerAutoConfiguration { + + @Bean + public RedisMQTemplate redisMQTemplate(StringRedisTemplate redisTemplate, + List interceptors) { + RedisMQTemplate redisMQTemplate = new RedisMQTemplate(redisTemplate); + // 添加拦截器 + interceptors.forEach(redisMQTemplate::addInterceptor); + return redisMQTemplate; + } + +} diff --git a/yudao-framework/yudao-spring-boot-starter-mq/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports b/yudao-framework/yudao-spring-boot-starter-mq/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports index 660865453..213850b94 100644 --- a/yudao-framework/yudao-spring-boot-starter-mq/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports +++ b/yudao-framework/yudao-spring-boot-starter-mq/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports @@ -1,2 +1,3 @@ -cn.iocoder.yudao.framework.mq.redis.config.YudaoRedisMQAutoConfiguration +cn.iocoder.yudao.framework.mq.redis.config.YudaoRedisMQProducerAutoConfiguration +cn.iocoder.yudao.framework.mq.redis.config.YudaoRedisMQConsumerAutoConfiguration cn.iocoder.yudao.framework.mq.rabbitmq.config.YudaoRabbitMQAutoConfiguration \ No newline at end of file diff --git a/yudao-framework/yudao-spring-boot-starter-security/src/main/java/cn/iocoder/yudao/framework/security/config/SecurityProperties.java b/yudao-framework/yudao-spring-boot-starter-security/src/main/java/cn/iocoder/yudao/framework/security/config/SecurityProperties.java index d95dd5ff5..3d19f32a6 100644 --- a/yudao-framework/yudao-spring-boot-starter-security/src/main/java/cn/iocoder/yudao/framework/security/config/SecurityProperties.java +++ b/yudao-framework/yudao-spring-boot-starter-security/src/main/java/cn/iocoder/yudao/framework/security/config/SecurityProperties.java @@ -19,6 +19,13 @@ public class SecurityProperties { */ @NotEmpty(message = "Token Header 不能为空") private String tokenHeader = "Authorization"; + /** + * HTTP 请求时,访问令牌的请求参数 + * + * 初始目的:解决 WebSocket 无法通过 header 传参,只能通过 token 参数拼接 + */ + @NotEmpty(message = "Token Parameter 不能为空") + private String tokenParameter = "token"; /** * mock 模式的开关 diff --git a/yudao-framework/yudao-spring-boot-starter-security/src/main/java/cn/iocoder/yudao/framework/security/config/YudaoWebSecurityConfigurerAdapter.java b/yudao-framework/yudao-spring-boot-starter-security/src/main/java/cn/iocoder/yudao/framework/security/config/YudaoWebSecurityConfigurerAdapter.java index 78aa328ad..06a8f9be2 100644 --- a/yudao-framework/yudao-spring-boot-starter-security/src/main/java/cn/iocoder/yudao/framework/security/config/YudaoWebSecurityConfigurerAdapter.java +++ b/yudao-framework/yudao-spring-boot-starter-security/src/main/java/cn/iocoder/yudao/framework/security/config/YudaoWebSecurityConfigurerAdapter.java @@ -129,8 +129,6 @@ public class YudaoWebSecurityConfigurerAdapter { .antMatchers(buildAppApi("/**")).permitAll() // 1.5 验证码captcha 允许匿名访问 .antMatchers("/captcha/get", "/captcha/check").permitAll() - // 1.6 webSocket 允许匿名访问 - .antMatchers("/websocket/message").permitAll() // ②:每个项目的自定义规则 .and().authorizeRequests(registry -> // 下面,循环设置自定义规则 authorizeRequestsCustomizers.forEach(customizer -> customizer.customize(registry))) diff --git a/yudao-framework/yudao-spring-boot-starter-security/src/main/java/cn/iocoder/yudao/framework/security/core/filter/TokenAuthenticationFilter.java b/yudao-framework/yudao-spring-boot-starter-security/src/main/java/cn/iocoder/yudao/framework/security/core/filter/TokenAuthenticationFilter.java index e87f5bc44..601415c00 100644 --- a/yudao-framework/yudao-spring-boot-starter-security/src/main/java/cn/iocoder/yudao/framework/security/core/filter/TokenAuthenticationFilter.java +++ b/yudao-framework/yudao-spring-boot-starter-security/src/main/java/cn/iocoder/yudao/framework/security/core/filter/TokenAuthenticationFilter.java @@ -41,7 +41,8 @@ public class TokenAuthenticationFilter extends OncePerRequestFilter { @SuppressWarnings("NullableProblems") protected void doFilterInternal(HttpServletRequest request, HttpServletResponse response, FilterChain chain) throws ServletException, IOException { - String token = SecurityFrameworkUtils.obtainAuthorization(request, securityProperties.getTokenHeader()); + String token = SecurityFrameworkUtils.obtainAuthorization(request, + securityProperties.getTokenHeader(), securityProperties.getTokenParameter()); if (StrUtil.isNotEmpty(token)) { Integer userType = WebFrameworkUtils.getLoginUserType(request); try { @@ -74,7 +75,10 @@ public class TokenAuthenticationFilter extends OncePerRequestFilter { return null; } // 用户类型不匹配,无权限 - if (ObjectUtil.notEqual(accessToken.getUserType(), userType)) { + // 注意:只有 /admin-api/* 和 /app-api/* 有 userType,才需要比对用户类型 + // TODO 芋艿:ws 要不要区分开? + if (userType != null + && ObjectUtil.notEqual(accessToken.getUserType(), userType)) { throw new AccessDeniedException("错误的用户类型"); } // 构建登录用户 diff --git a/yudao-framework/yudao-spring-boot-starter-security/src/main/java/cn/iocoder/yudao/framework/security/core/util/SecurityFrameworkUtils.java b/yudao-framework/yudao-spring-boot-starter-security/src/main/java/cn/iocoder/yudao/framework/security/core/util/SecurityFrameworkUtils.java index 5dc17b626..3caa7f98b 100644 --- a/yudao-framework/yudao-spring-boot-starter-security/src/main/java/cn/iocoder/yudao/framework/security/core/util/SecurityFrameworkUtils.java +++ b/yudao-framework/yudao-spring-boot-starter-security/src/main/java/cn/iocoder/yudao/framework/security/core/util/SecurityFrameworkUtils.java @@ -1,5 +1,6 @@ package cn.iocoder.yudao.framework.security.core.util; +import cn.hutool.core.util.StrUtil; import cn.iocoder.yudao.framework.security.core.LoginUser; import cn.iocoder.yudao.framework.web.core.util.WebFrameworkUtils; import org.springframework.lang.Nullable; @@ -20,6 +21,9 @@ import java.util.Collections; */ public class SecurityFrameworkUtils { + /** + * HEADER 认证头 value 的前缀 + */ public static final String AUTHORIZATION_BEARER = "Bearer"; private SecurityFrameworkUtils() {} @@ -28,19 +32,23 @@ public class SecurityFrameworkUtils { * 从请求中,获得认证 Token * * @param request 请求 - * @param header 认证 Token 对应的 Header 名字 + * @param headerName 认证 Token 对应的 Header 名字 + * @param parameterName 认证 Token 对应的 Parameter 名字 * @return 认证 Token */ - public static String obtainAuthorization(HttpServletRequest request, String header) { - String authorization = request.getHeader(header); - if (!StringUtils.hasText(authorization)) { + public static String obtainAuthorization(HttpServletRequest request, + String headerName, String parameterName) { + // 1. 获得 Token。优先级:Header > Parameter + String token = request.getHeader(headerName); + if (StrUtil.isEmpty(token)) { + token = request.getParameter(parameterName); + } + if (!StringUtils.hasText(token)) { return null; } - int index = authorization.indexOf(AUTHORIZATION_BEARER + " "); - if (index == -1) { // 未找到 - return null; - } - return authorization.substring(index + 7).trim(); + // 2. 去除 Token 中带的 Bearer + int index = token.indexOf(AUTHORIZATION_BEARER + " "); + return index >= 0 ? token.substring(index + 7).trim() : token; } /** diff --git a/yudao-framework/yudao-spring-boot-starter-websocket/pom.xml b/yudao-framework/yudao-spring-boot-starter-websocket/pom.xml index 320e52c48..b18ee4783 100644 --- a/yudao-framework/yudao-spring-boot-starter-websocket/pom.xml +++ b/yudao-framework/yudao-spring-boot-starter-websocket/pom.xml @@ -12,26 +12,73 @@ jar ${project.artifactId} - WebSocket + WebSocket 框架,支持多节点的广播 https://github.com/YunaiV/ruoyi-vue-pro - cn.iocoder.boot yudao-common + + cn.iocoder.boot yudao-spring-boot-starter-security + provided org.springframework.boot spring-boot-starter-websocket + + + + + cn.iocoder.boot + yudao-spring-boot-starter-security + provided + + + + + cn.iocoder.boot + yudao-spring-boot-starter-mq + + + org.springframework.kafka + spring-kafka + true + + + org.springframework.amqp + spring-rabbit + true + + + org.apache.rocketmq + rocketmq-spring-boot-starter + true + + + + + + cn.iocoder.boot + yudao-spring-boot-starter-biz-tenant + provided + \ No newline at end of file diff --git a/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/config/WebSocketHandlerConfig.java b/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/config/WebSocketHandlerConfig.java deleted file mode 100644 index 02c3415d5..000000000 --- a/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/config/WebSocketHandlerConfig.java +++ /dev/null @@ -1,14 +0,0 @@ -package cn.iocoder.yudao.framework.websocket.config; - -import cn.iocoder.yudao.framework.websocket.core.UserHandshakeInterceptor; -import org.springframework.boot.context.properties.EnableConfigurationProperties; -import org.springframework.context.annotation.Bean; -import org.springframework.web.socket.server.HandshakeInterceptor; - -@EnableConfigurationProperties(WebSocketProperties.class) -public class WebSocketHandlerConfig { - @Bean - public HandshakeInterceptor handshakeInterceptor() { - return new UserHandshakeInterceptor(); - } -} diff --git a/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/config/WebSocketProperties.java b/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/config/WebSocketProperties.java index 0ab1b498f..aa618fb04 100644 --- a/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/config/WebSocketProperties.java +++ b/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/config/WebSocketProperties.java @@ -4,6 +4,9 @@ import lombok.Data; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.validation.annotation.Validated; +import javax.validation.constraints.NotEmpty; +import javax.validation.constraints.NotNull; + /** * WebSocket 配置项 * @@ -15,15 +18,17 @@ import org.springframework.validation.annotation.Validated; public class WebSocketProperties { /** - * 路径 + * WebSocket 的连接路径 */ - private String path = ""; + @NotEmpty(message = "WebSocket 的连接路径不能为空") + private String path = "/ws"; + /** - * 默认最多允许同时在线用户数 + * 消息发送器的类型 + * + * 可选值:local、redis、rocketmq、kafka、rabbitmq */ - private int maxOnlineCount = 0; - /** - * 是否保存session - */ - private boolean sessionMap = true; + @NotNull(message = "WebSocket 的消息发送者不能为空") + private String senderType = "local"; + } diff --git a/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/config/YudaoWebSocketAutoConfiguration.java b/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/config/YudaoWebSocketAutoConfiguration.java index f8c50ae6a..0f08b7cf5 100644 --- a/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/config/YudaoWebSocketAutoConfiguration.java +++ b/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/config/YudaoWebSocketAutoConfiguration.java @@ -1,11 +1,34 @@ package cn.iocoder.yudao.framework.websocket.config; +import cn.iocoder.yudao.framework.mq.redis.config.YudaoRedisMQConsumerAutoConfiguration; +import cn.iocoder.yudao.framework.mq.redis.core.RedisMQTemplate; +import cn.iocoder.yudao.framework.websocket.core.handler.JsonWebSocketMessageHandler; +import cn.iocoder.yudao.framework.websocket.core.listener.WebSocketMessageListener; +import cn.iocoder.yudao.framework.websocket.core.security.LoginUserHandshakeInterceptor; +import cn.iocoder.yudao.framework.websocket.core.sender.kafka.KafkaWebSocketMessageConsumer; +import cn.iocoder.yudao.framework.websocket.core.sender.kafka.KafkaWebSocketMessageSender; +import cn.iocoder.yudao.framework.websocket.core.sender.local.LocalWebSocketMessageSender; +import cn.iocoder.yudao.framework.websocket.core.sender.rabbitmq.RabbitMQWebSocketMessageConsumer; +import cn.iocoder.yudao.framework.websocket.core.sender.rabbitmq.RabbitMQWebSocketMessageSender; +import cn.iocoder.yudao.framework.websocket.core.sender.redis.RedisWebSocketMessageConsumer; +import cn.iocoder.yudao.framework.websocket.core.sender.redis.RedisWebSocketMessageSender; +import cn.iocoder.yudao.framework.websocket.core.sender.rocketmq.RocketMQWebSocketMessageConsumer; +import cn.iocoder.yudao.framework.websocket.core.sender.rocketmq.RocketMQWebSocketMessageSender; +import cn.iocoder.yudao.framework.websocket.core.session.WebSocketSessionHandlerDecorator; +import cn.iocoder.yudao.framework.websocket.core.session.WebSocketSessionManager; +import cn.iocoder.yudao.framework.websocket.core.session.WebSocketSessionManagerImpl; +import org.apache.rocketmq.spring.core.RocketMQTemplate; +import org.springframework.amqp.core.TopicExchange; +import org.springframework.amqp.rabbit.core.RabbitTemplate; +import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.autoconfigure.AutoConfiguration; -import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.core.KafkaTemplate; import org.springframework.web.socket.WebSocketHandler; +import org.springframework.web.socket.config.annotation.EnableWebSocket; import org.springframework.web.socket.config.annotation.WebSocketConfigurer; import org.springframework.web.socket.server.HandshakeInterceptor; @@ -16,19 +39,139 @@ import java.util.List; * * @author xingyu4j */ -@AutoConfiguration -// 允许使用 yudao.websocket.enable=false 禁用websocket -@ConditionalOnProperty(prefix = "yudao.websocket", value = "enable", matchIfMissing = true) +@AutoConfiguration(before = YudaoRedisMQConsumerAutoConfiguration.class) // before YudaoRedisMQConsumerAutoConfiguration 的原因是,需要保证 RedisWebSocketMessageConsumer 先创建,才能创建 RedisMessageListenerContainer +@EnableWebSocket // 开启 websocket +@ConditionalOnProperty(prefix = "yudao.websocket", value = "enable", matchIfMissing = true) // 允许使用 yudao.websocket.enable=false 禁用 websocket @EnableConfigurationProperties(WebSocketProperties.class) public class YudaoWebSocketAutoConfiguration { + @Bean - @ConditionalOnMissingBean - public WebSocketConfigurer webSocketConfigurer(List handshakeInterceptor, + public WebSocketConfigurer webSocketConfigurer(HandshakeInterceptor[] handshakeInterceptors, WebSocketHandler webSocketHandler, WebSocketProperties webSocketProperties) { - return registry -> registry + // 添加 WebSocketHandler .addHandler(webSocketHandler, webSocketProperties.getPath()) - .addInterceptors(handshakeInterceptor.toArray(new HandshakeInterceptor[0])); + .addInterceptors(handshakeInterceptors) + // 允许跨域,否则前端连接会直接断开 + .setAllowedOriginPatterns("*"); } -} + + @Bean + public HandshakeInterceptor handshakeInterceptor() { + return new LoginUserHandshakeInterceptor(); + } + + @Bean + public WebSocketHandler webSocketHandler(WebSocketSessionManager sessionManager, + List> messageListeners) { + // 1. 创建 JsonWebSocketMessageHandler 对象,处理消息 + JsonWebSocketMessageHandler messageHandler = new JsonWebSocketMessageHandler(messageListeners); + // 2. 创建 WebSocketSessionHandlerDecorator 对象,处理连接 + return new WebSocketSessionHandlerDecorator(messageHandler, sessionManager); + } + + @Bean + public WebSocketSessionManager webSocketSessionManager() { + return new WebSocketSessionManagerImpl(); + } + + // ==================== Sender 相关 ==================== + + @Configuration + @ConditionalOnProperty(prefix = "yudao.websocket", name = "sender-type", havingValue = "local", matchIfMissing = true) + public class LocalWebSocketMessageSenderConfiguration { + + @Bean + public LocalWebSocketMessageSender localWebSocketMessageSender(WebSocketSessionManager sessionManager) { + return new LocalWebSocketMessageSender(sessionManager); + } + + } + + @Configuration + @ConditionalOnProperty(prefix = "yudao.websocket", name = "sender-type", havingValue = "redis", matchIfMissing = true) + public class RedisWebSocketMessageSenderConfiguration { + + @Bean + public RedisWebSocketMessageSender redisWebSocketMessageSender(WebSocketSessionManager sessionManager, + RedisMQTemplate redisMQTemplate) { + return new RedisWebSocketMessageSender(sessionManager, redisMQTemplate); + } + + @Bean + public RedisWebSocketMessageConsumer redisWebSocketMessageConsumer( + RedisWebSocketMessageSender redisWebSocketMessageSender) { + return new RedisWebSocketMessageConsumer(redisWebSocketMessageSender); + } + + } + + @Configuration + @ConditionalOnProperty(prefix = "yudao.websocket", name = "sender-type", havingValue = "rocketmq", matchIfMissing = true) + public class RocketMQWebSocketMessageSenderConfiguration { + + @Bean + public RocketMQWebSocketMessageSender rocketMQWebSocketMessageSender( + WebSocketSessionManager sessionManager, RocketMQTemplate rocketMQTemplate, + @Value("${yudao.websocket.sender-rocketmq.topic}") String topic) { + return new RocketMQWebSocketMessageSender(sessionManager, rocketMQTemplate, topic); + } + + @Bean + public RocketMQWebSocketMessageConsumer rocketMQWebSocketMessageConsumer( + RocketMQWebSocketMessageSender rocketMQWebSocketMessageSender) { + return new RocketMQWebSocketMessageConsumer(rocketMQWebSocketMessageSender); + } + + } + + @Configuration + @ConditionalOnProperty(prefix = "yudao.websocket", name = "sender-type", havingValue = "rabbitmq", matchIfMissing = true) + public class RabbitMQWebSocketMessageSenderConfiguration { + + @Bean + public RabbitMQWebSocketMessageSender rabbitMQWebSocketMessageSender( + WebSocketSessionManager sessionManager, RabbitTemplate rabbitTemplate, + TopicExchange websocketTopicExchange) { + return new RabbitMQWebSocketMessageSender(sessionManager, rabbitTemplate, websocketTopicExchange); + } + + @Bean + public RabbitMQWebSocketMessageConsumer rabbitMQWebSocketMessageConsumer( + RabbitMQWebSocketMessageSender rabbitMQWebSocketMessageSender) { + return new RabbitMQWebSocketMessageConsumer(rabbitMQWebSocketMessageSender); + } + + /** + * 创建 Topic Exchange + */ + @Bean + public TopicExchange websocketTopicExchange(@Value("${yudao.websocket.sender-rabbitmq.exchange}") String exchange) { + return new TopicExchange(exchange, + true, // durable: 是否持久化 + false); // exclusive: 是否排它 + } + + } + + @Configuration + @ConditionalOnProperty(prefix = "yudao.websocket", name = "sender-type", havingValue = "kafka", matchIfMissing = true) + public class KafkaWebSocketMessageSenderConfiguration { + + @Bean + public KafkaWebSocketMessageSender kafkaWebSocketMessageSender( + WebSocketSessionManager sessionManager, KafkaTemplate kafkaTemplate, + @Value("${yudao.websocket.sender-kafka.topic}") String topic) { + return new KafkaWebSocketMessageSender(sessionManager, kafkaTemplate, topic); + } + + @Bean + public KafkaWebSocketMessageConsumer kafkaWebSocketMessageConsumer( + KafkaWebSocketMessageSender kafkaWebSocketMessageSender) { + return new KafkaWebSocketMessageConsumer(kafkaWebSocketMessageSender); + } + + } + +} \ No newline at end of file diff --git a/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/UserHandshakeInterceptor.java b/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/UserHandshakeInterceptor.java deleted file mode 100644 index 3f2fa4ec3..000000000 --- a/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/UserHandshakeInterceptor.java +++ /dev/null @@ -1,24 +0,0 @@ -package cn.iocoder.yudao.framework.websocket.core; - -import cn.iocoder.yudao.framework.security.core.LoginUser; -import cn.iocoder.yudao.framework.security.core.util.SecurityFrameworkUtils; -import org.springframework.http.server.ServerHttpRequest; -import org.springframework.http.server.ServerHttpResponse; -import org.springframework.web.socket.WebSocketHandler; -import org.springframework.web.socket.server.HandshakeInterceptor; - -import java.util.Map; - -public class UserHandshakeInterceptor implements HandshakeInterceptor { - @Override - public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Map attributes) throws Exception { - LoginUser loginUser = SecurityFrameworkUtils.getLoginUser(); - attributes.put(WebSocketKeyDefine.LOGIN_USER, loginUser); - return true; - } - - @Override - public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Exception exception) { - - } -} diff --git a/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/WebSocketKeyDefine.java b/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/WebSocketKeyDefine.java deleted file mode 100644 index f75ebc41c..000000000 --- a/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/WebSocketKeyDefine.java +++ /dev/null @@ -1,9 +0,0 @@ -package cn.iocoder.yudao.framework.websocket.core; - - -import lombok.Data; - -@Data -public class WebSocketKeyDefine { - public static final String LOGIN_USER ="LOGIN_USER"; -} diff --git a/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/WebSocketMessageDO.java b/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/WebSocketMessageDO.java deleted file mode 100644 index 7bb348e99..000000000 --- a/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/WebSocketMessageDO.java +++ /dev/null @@ -1,24 +0,0 @@ -package cn.iocoder.yudao.framework.websocket.core; - -import lombok.Data; -import lombok.experimental.Accessors; - -import java.util.List; - -@Data -@Accessors(chain = true) -public class WebSocketMessageDO { - /** - * 接收消息的seesion - */ - private List seesionKeyList; - /** - * 发送消息 - */ - private String msgText; - - public static WebSocketMessageDO build(List seesionKeyList, String msgText) { - return new WebSocketMessageDO().setMsgText(msgText).setSeesionKeyList(seesionKeyList); - } - -} diff --git a/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/WebSocketSessionHandler.java b/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/WebSocketSessionHandler.java deleted file mode 100644 index 2747f8192..000000000 --- a/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/WebSocketSessionHandler.java +++ /dev/null @@ -1,36 +0,0 @@ -package cn.iocoder.yudao.framework.websocket.core; - -import org.springframework.web.socket.WebSocketSession; - -import java.util.Collection; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; - -public final class WebSocketSessionHandler { - private WebSocketSessionHandler() { - } - - private static final Map SESSION_MAP = new ConcurrentHashMap<>(); - - public static void addSession(Object sessionKey, WebSocketSession session) { - SESSION_MAP.put(sessionKey.toString(), session); - } - - public static void removeSession(Object sessionKey) { - SESSION_MAP.remove(sessionKey.toString()); - } - - public static WebSocketSession getSession(Object sessionKey) { - return SESSION_MAP.get(sessionKey.toString()); - } - - public static Collection getSessions() { - return SESSION_MAP.values(); - } - - public static Set getSessionKeys() { - return SESSION_MAP.keySet(); - } - -} diff --git a/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/WebSocketUtils.java b/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/WebSocketUtils.java deleted file mode 100644 index 816e664cc..000000000 --- a/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/WebSocketUtils.java +++ /dev/null @@ -1,31 +0,0 @@ -package cn.iocoder.yudao.framework.websocket.core; - -import lombok.extern.slf4j.Slf4j; -import org.springframework.web.socket.TextMessage; -import org.springframework.web.socket.WebSocketSession; - -import java.io.IOException; - -@Slf4j -public class WebSocketUtils { - public static boolean sendMessage(WebSocketSession seesion, String message) { - if (seesion == null) { - log.error("seesion 不存在"); - return false; - } - if (seesion.isOpen()) { - try { - seesion.sendMessage(new TextMessage(message)); - } catch (IOException e) { - log.error("WebSocket 消息发送异常 Session={} | msg= {} | exception={}", seesion, message, e); - return false; - } - } - return true; - } - - public static boolean sendMessage(Object sessionKey, String message) { - WebSocketSession session = WebSocketSessionHandler.getSession(sessionKey); - return sendMessage(session, message); - } -} diff --git a/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/YudaoWebSocketHandlerDecorator.java b/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/YudaoWebSocketHandlerDecorator.java deleted file mode 100644 index dd8dc602e..000000000 --- a/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/YudaoWebSocketHandlerDecorator.java +++ /dev/null @@ -1,49 +0,0 @@ -package cn.iocoder.yudao.framework.websocket.core; - -import cn.iocoder.yudao.framework.security.core.LoginUser; -import org.springframework.web.socket.CloseStatus; -import org.springframework.web.socket.WebSocketHandler; -import org.springframework.web.socket.WebSocketSession; -import org.springframework.web.socket.handler.WebSocketHandlerDecorator; - -public class YudaoWebSocketHandlerDecorator extends WebSocketHandlerDecorator { - public YudaoWebSocketHandlerDecorator(WebSocketHandler delegate) { - super(delegate); - } - - /** - * websocket 连接时执行的动作 - * @param session websocket session 对象 - * @throws Exception 异常对象 - */ - @Override - public void afterConnectionEstablished(final WebSocketSession session) throws Exception { - Object sessionKey = sessionKeyGen(session); - WebSocketSessionHandler.addSession(sessionKey, session); - } - - /** - * websocket 关闭连接时执行的动作 - * @param session websocket session 对象 - * @param closeStatus 关闭状态对象 - * @throws Exception 异常对象 - */ - @Override - public void afterConnectionClosed(final WebSocketSession session, CloseStatus closeStatus) throws Exception { - Object sessionKey = sessionKeyGen(session); - WebSocketSessionHandler.removeSession(sessionKey); - } - - public Object sessionKeyGen(WebSocketSession webSocketSession) { - - Object obj = webSocketSession.getAttributes().get(WebSocketKeyDefine.LOGIN_USER); - - if (obj instanceof LoginUser) { - LoginUser loginUser = (LoginUser) obj; - // userId 作为唯一区分 - return String.valueOf(loginUser.getId()); - } - - return null; - } -} diff --git a/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/handler/JsonWebSocketMessageHandler.java b/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/handler/JsonWebSocketMessageHandler.java new file mode 100644 index 000000000..120f529c2 --- /dev/null +++ b/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/handler/JsonWebSocketMessageHandler.java @@ -0,0 +1,83 @@ +package cn.iocoder.yudao.framework.websocket.core.handler; + +import cn.hutool.core.util.StrUtil; +import cn.hutool.core.util.TypeUtil; +import cn.iocoder.yudao.framework.common.util.json.JsonUtils; +import cn.iocoder.yudao.framework.tenant.core.util.TenantUtils; +import cn.iocoder.yudao.framework.websocket.core.listener.WebSocketMessageListener; +import cn.iocoder.yudao.framework.websocket.core.message.JsonWebSocketMessage; +import cn.iocoder.yudao.framework.websocket.core.util.WebSocketFrameworkUtils; +import lombok.extern.slf4j.Slf4j; +import org.springframework.web.socket.TextMessage; +import org.springframework.web.socket.WebSocketHandler; +import org.springframework.web.socket.WebSocketSession; +import org.springframework.web.socket.handler.TextWebSocketHandler; + +import java.lang.reflect.Type; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.function.Consumer; + +/** + * JSON 格式 {@link WebSocketHandler} 实现类 + * + * 基于 {@link JsonWebSocketMessage#getType()} 消息类型,调度到对应的 {@link WebSocketMessageListener} 监听器。 + * + * @author 芋道源码 + */ +@Slf4j +public class JsonWebSocketMessageHandler extends TextWebSocketHandler { + + /** + * type 与 WebSocketMessageListener 的映射 + */ + private final Map> listeners = new HashMap<>(); + + @SuppressWarnings({"rawtypes", "unchecked"}) + public JsonWebSocketMessageHandler(List listenersList) { + listenersList.forEach((Consumer) + listener -> listeners.put(listener.getType(), listener)); + } + + @Override + protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception { + // 1.1 空消息,跳过 + if (message.getPayloadLength() == 0) { + return; + } + // 1.2 ping 心跳消息,直接返回 pong 消息。 + if (message.getPayloadLength() == 4 && Objects.equals(message.getPayload(), "ping")) { + session.sendMessage(new TextMessage("pong")); + return; + } + + // 2.1 解析消息 + try { + JsonWebSocketMessage jsonMessage = JsonUtils.parseObject(message.getPayload(), JsonWebSocketMessage.class); + if (jsonMessage == null) { + log.error("[handleTextMessage][session({}) message({}) 解析为空]", session.getId(), message.getPayload()); + return; + } + if (StrUtil.isEmpty(jsonMessage.getType())) { + log.error("[handleTextMessage][session({}) message({}) 类型为空]", session.getId(), message.getPayload()); + return; + } + // 2.2 获得对应的 WebSocketMessageListener + WebSocketMessageListener messageListener = listeners.get(jsonMessage.getType()); + if (messageListener == null) { + log.error("[handleTextMessage][session({}) message({}) 监听器为空]", session.getId(), message.getPayload()); + return; + } + // 2.3 处理消息 + Type type = TypeUtil.getTypeArgument(messageListener.getClass(), 0); + Object messageObj = JsonUtils.parseObject(jsonMessage.getContent(), type); + Long tenantId = WebSocketFrameworkUtils.getTenantId(session); + TenantUtils.execute(tenantId, () -> messageListener.onMessage(session, messageObj)); + } catch (Throwable ex) { + log.error("[handleTextMessage][session({}) message({}) 处理异常]", session.getId(), message.getPayload()); + } + } + +} diff --git a/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/listener/WebSocketMessageListener.java b/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/listener/WebSocketMessageListener.java new file mode 100644 index 000000000..f3a62cc39 --- /dev/null +++ b/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/listener/WebSocketMessageListener.java @@ -0,0 +1,31 @@ +package cn.iocoder.yudao.framework.websocket.core.listener; + +import cn.iocoder.yudao.framework.websocket.core.message.JsonWebSocketMessage; +import org.springframework.web.socket.WebSocketSession; + +/** + * WebSocket 消息监听器接口 + * + * 目的:前端发送消息给后端后,处理对应 {@link #getType()} 类型的消息 + * + * @param 泛型,消息类型 + */ +public interface WebSocketMessageListener { + + /** + * 处理消息 + * + * @param session Session + * @param message 消息 + */ + void onMessage(WebSocketSession session, T message); + + /** + * 获得消息类型 + * + * @see JsonWebSocketMessage#getType() + * @return 消息类型 + */ + String getType(); + +} diff --git a/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/message/JsonWebSocketMessage.java b/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/message/JsonWebSocketMessage.java new file mode 100644 index 000000000..0a55cd691 --- /dev/null +++ b/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/message/JsonWebSocketMessage.java @@ -0,0 +1,29 @@ +package cn.iocoder.yudao.framework.websocket.core.message; + +import cn.iocoder.yudao.framework.websocket.core.listener.WebSocketMessageListener; +import lombok.Data; + +import java.io.Serializable; + +/** + * JSON 格式的 WebSocket 消息帧 + * + * @author 芋道源码 + */ +@Data +public class JsonWebSocketMessage implements Serializable { + + /** + * 消息类型 + * + * 目的:用于分发到对应的 {@link WebSocketMessageListener} 实现类 + */ + private String type; + /** + * 消息内容 + * + * 要求 JSON 对象 + */ + private String content; + +} diff --git a/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/security/LoginUserHandshakeInterceptor.java b/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/security/LoginUserHandshakeInterceptor.java new file mode 100644 index 000000000..3a31825f5 --- /dev/null +++ b/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/security/LoginUserHandshakeInterceptor.java @@ -0,0 +1,42 @@ +package cn.iocoder.yudao.framework.websocket.core.security; + +import cn.iocoder.yudao.framework.security.core.LoginUser; +import cn.iocoder.yudao.framework.security.core.filter.TokenAuthenticationFilter; +import cn.iocoder.yudao.framework.security.core.util.SecurityFrameworkUtils; +import cn.iocoder.yudao.framework.websocket.core.util.WebSocketFrameworkUtils; +import org.springframework.http.server.ServerHttpRequest; +import org.springframework.http.server.ServerHttpResponse; +import org.springframework.web.socket.WebSocketHandler; +import org.springframework.web.socket.WebSocketSession; +import org.springframework.web.socket.server.HandshakeInterceptor; + +import java.util.Map; + +/** + * 登录用户的 {@link HandshakeInterceptor} 实现类 + * + * 流程如下: + * 1. 前端连接 websocket 时,会通过拼接 ?token={token} 到 ws:// 连接后,这样它可以被 {@link TokenAuthenticationFilter} 所认证通过 + * 2. {@link LoginUserHandshakeInterceptor} 负责把 {@link LoginUser} 添加到 {@link WebSocketSession} 中 + * + * @author 芋道源码 + */ +public class LoginUserHandshakeInterceptor implements HandshakeInterceptor { + + @Override + public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, + WebSocketHandler wsHandler, Map attributes) { + LoginUser loginUser = SecurityFrameworkUtils.getLoginUser(); + if (loginUser != null) { + WebSocketFrameworkUtils.setLoginUser(loginUser, attributes); + } + return true; + } + + @Override + public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, + WebSocketHandler wsHandler, Exception exception) { + // do nothing + } + +} diff --git a/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/security/WebSocketAuthorizeRequestsCustomizer.java b/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/security/WebSocketAuthorizeRequestsCustomizer.java new file mode 100644 index 000000000..5614f05ce --- /dev/null +++ b/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/security/WebSocketAuthorizeRequestsCustomizer.java @@ -0,0 +1,24 @@ +package cn.iocoder.yudao.framework.websocket.core.security; + +import cn.iocoder.yudao.framework.security.config.AuthorizeRequestsCustomizer; +import cn.iocoder.yudao.framework.websocket.config.WebSocketProperties; +import lombok.RequiredArgsConstructor; +import org.springframework.security.config.annotation.web.builders.HttpSecurity; +import org.springframework.security.config.annotation.web.configurers.ExpressionUrlAuthorizationConfigurer; + +/** + * WebSocket 的权限自定义 + * + * @author 芋道源码 + */ +@RequiredArgsConstructor +public class WebSocketAuthorizeRequestsCustomizer extends AuthorizeRequestsCustomizer { + + private final WebSocketProperties webSocketProperties; + + @Override + public void customize(ExpressionUrlAuthorizationConfigurer.ExpressionInterceptUrlRegistry registry) { + registry.antMatchers(webSocketProperties.getPath()).permitAll(); + } + +} diff --git a/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/sender/AbstractWebSocketMessageSender.java b/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/sender/AbstractWebSocketMessageSender.java new file mode 100644 index 000000000..4e0db44c9 --- /dev/null +++ b/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/sender/AbstractWebSocketMessageSender.java @@ -0,0 +1,104 @@ +package cn.iocoder.yudao.framework.websocket.core.sender; + +import cn.hutool.core.collection.CollUtil; +import cn.hutool.core.util.StrUtil; +import cn.iocoder.yudao.framework.common.util.json.JsonUtils; +import cn.iocoder.yudao.framework.websocket.core.message.JsonWebSocketMessage; +import cn.iocoder.yudao.framework.websocket.core.session.WebSocketSessionManager; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.web.socket.TextMessage; +import org.springframework.web.socket.WebSocketSession; + +import java.io.IOException; +import java.util.Collection; +import java.util.Collections; +import java.util.List; + +/** + * WebSocketMessageSender 实现类 + * + * @author 芋道源码 + */ +@Slf4j +@RequiredArgsConstructor +public abstract class AbstractWebSocketMessageSender implements WebSocketMessageSender { + + private final WebSocketSessionManager sessionManager; + + @Override + public void send(Integer userType, Long userId, String messageType, String messageContent) { + send(null, userType, userId, messageType, messageContent); + } + + @Override + public void send(Integer userType, String messageType, String messageContent) { + send(null, userType, null, messageType, messageContent); + } + + @Override + public void send(String sessionId, String messageType, String messageContent) { + send(sessionId, null, null, messageType, messageContent); + } + + /** + * 发送消息 + * + * @param sessionId Session 编号 + * @param userType 用户类型 + * @param userId 用户编号 + * @param messageType 消息类型 + * @param messageContent 消息内容 + */ + public void send(String sessionId, Integer userType, Long userId, String messageType, String messageContent) { + // 1. 获得 Session 列表 + List sessions = Collections.emptyList(); + if (StrUtil.isNotEmpty(sessionId)) { + WebSocketSession session = sessionManager.getSession(sessionId); + if (session != null) { + sessions = Collections.singletonList(session); + } + } else if (userType != null && userId != null) { + sessions = (List) sessionManager.getSessionList(userType, userId); + } else if (userType != null) { + sessions = (List) sessionManager.getSessionList(userType); + } + if (CollUtil.isEmpty(sessions)) { + log.info("[send][sessionId({}) userType({}) userId({}) messageType({}) messageContent({}) 未匹配到会话]", + sessionId, userType, userId, messageType, messageContent); + } + // 2. 执行发送 + doSend(sessions, messageType, messageContent); + } + + /** + * 发送消息的具体实现 + * + * @param sessions Session 列表 + * @param messageType 消息类型 + * @param messageContent 消息内容 + */ + public void doSend(Collection sessions, String messageType, String messageContent) { + JsonWebSocketMessage message = new JsonWebSocketMessage().setType(messageType).setContent(messageContent); + String payload = JsonUtils.toJsonString(message); // 关键,使用 JSON 序列化 + sessions.forEach(session -> { + // 1. 各种校验,保证 Session 可以被发送 + if (session == null) { + log.error("[doSend][session 为空, message({})]", message); + return; + } + if (!session.isOpen()) { + log.error("[doSend][session({}) 已关闭, message({})]", session.getId(), message); + return; + } + // 2. 执行发送 + try { + session.sendMessage(new TextMessage(payload)); + log.info("[doSend][session({}) 发送消息成功,message({})]", session.getId(), message); + } catch (IOException ex) { + log.error("[doSend][session({}) 发送消息失败,message({})]", session.getId(), message, ex); + } + }); + } + +} diff --git a/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/sender/WebSocketMessageSender.java b/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/sender/WebSocketMessageSender.java new file mode 100644 index 000000000..9f75ad52d --- /dev/null +++ b/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/sender/WebSocketMessageSender.java @@ -0,0 +1,52 @@ +package cn.iocoder.yudao.framework.websocket.core.sender; + +import cn.iocoder.yudao.framework.common.util.json.JsonUtils; + +/** + * WebSocket 消息的发送器接口 + * + * @author 芋道源码 + */ +public interface WebSocketMessageSender { + + /** + * 发送消息给指定用户 + * + * @param userType 用户类型 + * @param userId 用户编号 + * @param messageType 消息类型 + * @param messageContent 消息内容,JSON 格式 + */ + void send(Integer userType, Long userId, String messageType, String messageContent); + + /** + * 发送消息给指定用户类型 + * + * @param userType 用户类型 + * @param messageType 消息类型 + * @param messageContent 消息内容,JSON 格式 + */ + void send(Integer userType, String messageType, String messageContent); + + /** + * 发送消息给指定 Session + * + * @param sessionId Session 编号 + * @param messageType 消息类型 + * @param messageContent 消息内容,JSON 格式 + */ + void send(String sessionId, String messageType, String messageContent); + + default void sendObject(Integer userType, Long userId, String messageType, Object messageContent) { + send(userType, userId, messageType, JsonUtils.toJsonString(messageContent)); + } + + default void sendObject(Integer userType, String messageType, Object messageContent) { + send(userType, messageType, JsonUtils.toJsonString(messageContent)); + } + + default void sendObject(String sessionId, String messageType, Object messageContent) { + send(sessionId, messageType, JsonUtils.toJsonString(messageContent)); + } + +} diff --git a/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/sender/kafka/KafkaWebSocketMessage.java b/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/sender/kafka/KafkaWebSocketMessage.java new file mode 100644 index 000000000..5a4cf5311 --- /dev/null +++ b/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/sender/kafka/KafkaWebSocketMessage.java @@ -0,0 +1,35 @@ +package cn.iocoder.yudao.framework.websocket.core.sender.kafka; + +import lombok.Data; + +/** + * Kafka 广播 WebSocket 的消息 + * + * @author 芋道源码 + */ +@Data +public class KafkaWebSocketMessage { + + /** + * Session 编号 + */ + private String sessionId; + /** + * 用户类型 + */ + private Integer userType; + /** + * 用户编号 + */ + private Long userId; + + /** + * 消息类型 + */ + private String messageType; + /** + * 消息内容 + */ + private String messageContent; + +} diff --git a/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/sender/kafka/KafkaWebSocketMessageConsumer.java b/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/sender/kafka/KafkaWebSocketMessageConsumer.java new file mode 100644 index 000000000..201e65d81 --- /dev/null +++ b/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/sender/kafka/KafkaWebSocketMessageConsumer.java @@ -0,0 +1,28 @@ +package cn.iocoder.yudao.framework.websocket.core.sender.kafka; + +import lombok.RequiredArgsConstructor; +import org.springframework.amqp.rabbit.annotation.RabbitHandler; +import org.springframework.kafka.annotation.KafkaListener; + +/** + * {@link KafkaWebSocketMessage} 广播消息的消费者,真正把消息发送出去 + * + * @author 芋道源码 + */ +@RequiredArgsConstructor +public class KafkaWebSocketMessageConsumer { + + private final KafkaWebSocketMessageSender rabbitMQWebSocketMessageSender; + + @RabbitHandler + @KafkaListener( + topics = "${yudao.websocket.sender-kafka.topic}", + // 在 Group 上,使用 UUID 生成其后缀。这样,启动的 Consumer 的 Group 不同,以达到广播消费的目的 + groupId = "${yudao.websocket.sender-kafka.consumer-group}" + "-" + "#{T(java.util.UUID).randomUUID()}") + public void onMessage(KafkaWebSocketMessage message) { + rabbitMQWebSocketMessageSender.send(message.getSessionId(), + message.getUserType(), message.getUserId(), + message.getMessageType(), message.getMessageContent()); + } + +} diff --git a/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/sender/kafka/KafkaWebSocketMessageSender.java b/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/sender/kafka/KafkaWebSocketMessageSender.java new file mode 100644 index 000000000..47bb598ad --- /dev/null +++ b/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/sender/kafka/KafkaWebSocketMessageSender.java @@ -0,0 +1,67 @@ +package cn.iocoder.yudao.framework.websocket.core.sender.kafka; + +import cn.iocoder.yudao.framework.websocket.core.sender.AbstractWebSocketMessageSender; +import cn.iocoder.yudao.framework.websocket.core.sender.WebSocketMessageSender; +import cn.iocoder.yudao.framework.websocket.core.session.WebSocketSessionManager; +import lombok.extern.slf4j.Slf4j; +import org.springframework.kafka.core.KafkaTemplate; + +import java.util.concurrent.ExecutionException; + +/** + * 基于 Kafka 的 {@link WebSocketMessageSender} 实现类 + * + * @author 芋道源码 + */ +@Slf4j +public class KafkaWebSocketMessageSender extends AbstractWebSocketMessageSender { + + private final KafkaTemplate kafkaTemplate; + + private final String topic; + + public KafkaWebSocketMessageSender(WebSocketSessionManager sessionManager, + KafkaTemplate kafkaTemplate, + String topic) { + super(sessionManager); + this.kafkaTemplate = kafkaTemplate; + this.topic = topic; + } + + @Override + public void send(Integer userType, Long userId, String messageType, String messageContent) { + sendKafkaMessage(null, userId, userType, messageType, messageContent); + } + + @Override + public void send(Integer userType, String messageType, String messageContent) { + sendKafkaMessage(null, null, userType, messageType, messageContent); + } + + @Override + public void send(String sessionId, String messageType, String messageContent) { + sendKafkaMessage(sessionId, null, null, messageType, messageContent); + } + + /** + * 通过 Kafka 广播消息 + * + * @param sessionId Session 编号 + * @param userId 用户编号 + * @param userType 用户类型 + * @param messageType 消息类型 + * @param messageContent 消息内容 + */ + private void sendKafkaMessage(String sessionId, Long userId, Integer userType, + String messageType, String messageContent) { + KafkaWebSocketMessage mqMessage = new KafkaWebSocketMessage() + .setSessionId(sessionId).setUserId(userId).setUserType(userType) + .setMessageType(messageType).setMessageContent(messageContent); + try { + kafkaTemplate.send(topic, mqMessage).get(); + } catch (InterruptedException | ExecutionException e) { + log.error("[sendKafkaMessage][发送消息({}) 到 Kafka 失败]", mqMessage, e); + } + } + +} diff --git a/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/sender/local/LocalWebSocketMessageSender.java b/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/sender/local/LocalWebSocketMessageSender.java new file mode 100644 index 000000000..66640ef34 --- /dev/null +++ b/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/sender/local/LocalWebSocketMessageSender.java @@ -0,0 +1,20 @@ +package cn.iocoder.yudao.framework.websocket.core.sender.local; + +import cn.iocoder.yudao.framework.websocket.core.sender.AbstractWebSocketMessageSender; +import cn.iocoder.yudao.framework.websocket.core.sender.WebSocketMessageSender; +import cn.iocoder.yudao.framework.websocket.core.session.WebSocketSessionManager; + +/** + * 本地的 {@link WebSocketMessageSender} 实现类 + * + * 注意:仅仅适合单机场景!!! + * + * @author 芋道源码 + */ +public class LocalWebSocketMessageSender extends AbstractWebSocketMessageSender { + + public LocalWebSocketMessageSender(WebSocketSessionManager sessionManager) { + super(sessionManager); + } + +} diff --git a/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/sender/rabbitmq/RabbitMQWebSocketMessage.java b/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/sender/rabbitmq/RabbitMQWebSocketMessage.java new file mode 100644 index 000000000..80a4bc176 --- /dev/null +++ b/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/sender/rabbitmq/RabbitMQWebSocketMessage.java @@ -0,0 +1,37 @@ +package cn.iocoder.yudao.framework.websocket.core.sender.rabbitmq; + +import lombok.Data; + +import java.io.Serializable; + +/** + * RabbitMQ 广播 WebSocket 的消息 + * + * @author 芋道源码 + */ +@Data +public class RabbitMQWebSocketMessage implements Serializable { + + /** + * Session 编号 + */ + private String sessionId; + /** + * 用户类型 + */ + private Integer userType; + /** + * 用户编号 + */ + private Long userId; + + /** + * 消息类型 + */ + private String messageType; + /** + * 消息内容 + */ + private String messageContent; + +} diff --git a/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/sender/rabbitmq/RabbitMQWebSocketMessageConsumer.java b/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/sender/rabbitmq/RabbitMQWebSocketMessageConsumer.java new file mode 100644 index 000000000..59e382421 --- /dev/null +++ b/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/sender/rabbitmq/RabbitMQWebSocketMessageConsumer.java @@ -0,0 +1,39 @@ +package cn.iocoder.yudao.framework.websocket.core.sender.rabbitmq; + +import lombok.RequiredArgsConstructor; +import org.springframework.amqp.core.ExchangeTypes; +import org.springframework.amqp.rabbit.annotation.*; + +/** + * {@link RabbitMQWebSocketMessage} 广播消息的消费者,真正把消息发送出去 + * + * @author 芋道源码 + */ +@RabbitListener( + bindings = @QueueBinding( + value = @Queue( + // 在 Queue 的名字上,使用 UUID 生成其后缀。这样,启动的 Consumer 的 Queue 不同,以达到广播消费的目的 + name = "${yudao.websocket.sender-rabbitmq.queue}" + "-" + "#{T(java.util.UUID).randomUUID()}", + // Consumer 关闭时,该队列就可以被自动删除了 + autoDelete = "true" + ), + exchange = @Exchange( + name = "${yudao.websocket.sender-rabbitmq.exchange}", + type = ExchangeTypes.TOPIC, + declare = "false" + ) + ) +) +@RequiredArgsConstructor +public class RabbitMQWebSocketMessageConsumer { + + private final RabbitMQWebSocketMessageSender rabbitMQWebSocketMessageSender; + + @RabbitHandler + public void onMessage(RabbitMQWebSocketMessage message) { + rabbitMQWebSocketMessageSender.send(message.getSessionId(), + message.getUserType(), message.getUserId(), + message.getMessageType(), message.getMessageContent()); + } + +} diff --git a/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/sender/rabbitmq/RabbitMQWebSocketMessageSender.java b/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/sender/rabbitmq/RabbitMQWebSocketMessageSender.java new file mode 100644 index 000000000..065a5d6bf --- /dev/null +++ b/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/sender/rabbitmq/RabbitMQWebSocketMessageSender.java @@ -0,0 +1,62 @@ +package cn.iocoder.yudao.framework.websocket.core.sender.rabbitmq; + +import cn.iocoder.yudao.framework.websocket.core.sender.AbstractWebSocketMessageSender; +import cn.iocoder.yudao.framework.websocket.core.sender.WebSocketMessageSender; +import cn.iocoder.yudao.framework.websocket.core.session.WebSocketSessionManager; +import lombok.extern.slf4j.Slf4j; +import org.springframework.amqp.core.TopicExchange; +import org.springframework.amqp.rabbit.core.RabbitTemplate; + +/** + * 基于 RabbitMQ 的 {@link WebSocketMessageSender} 实现类 + * + * @author 芋道源码 + */ +@Slf4j +public class RabbitMQWebSocketMessageSender extends AbstractWebSocketMessageSender { + + private final RabbitTemplate rabbitTemplate; + + private final TopicExchange topicExchange; + + public RabbitMQWebSocketMessageSender(WebSocketSessionManager sessionManager, + RabbitTemplate rabbitTemplate, + TopicExchange topicExchange) { + super(sessionManager); + this.rabbitTemplate = rabbitTemplate; + this.topicExchange = topicExchange; + } + + @Override + public void send(Integer userType, Long userId, String messageType, String messageContent) { + sendRabbitMQMessage(null, userId, userType, messageType, messageContent); + } + + @Override + public void send(Integer userType, String messageType, String messageContent) { + sendRabbitMQMessage(null, null, userType, messageType, messageContent); + } + + @Override + public void send(String sessionId, String messageType, String messageContent) { + sendRabbitMQMessage(sessionId, null, null, messageType, messageContent); + } + + /** + * 通过 RabbitMQ 广播消息 + * + * @param sessionId Session 编号 + * @param userId 用户编号 + * @param userType 用户类型 + * @param messageType 消息类型 + * @param messageContent 消息内容 + */ + private void sendRabbitMQMessage(String sessionId, Long userId, Integer userType, + String messageType, String messageContent) { + RabbitMQWebSocketMessage mqMessage = new RabbitMQWebSocketMessage() + .setSessionId(sessionId).setUserId(userId).setUserType(userType) + .setMessageType(messageType).setMessageContent(messageContent); + rabbitTemplate.convertAndSend(topicExchange.getName(), null, mqMessage); + } + +} diff --git a/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/sender/redis/RedisWebSocketMessage.java b/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/sender/redis/RedisWebSocketMessage.java new file mode 100644 index 000000000..fb9ea0ca0 --- /dev/null +++ b/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/sender/redis/RedisWebSocketMessage.java @@ -0,0 +1,34 @@ +package cn.iocoder.yudao.framework.websocket.core.sender.redis; + +import cn.iocoder.yudao.framework.mq.redis.core.pubsub.AbstractRedisChannelMessage; +import lombok.Data; + +/** + * Redis 广播 WebSocket 的消息 + */ +@Data +public class RedisWebSocketMessage extends AbstractRedisChannelMessage { + + /** + * Session 编号 + */ + private String sessionId; + /** + * 用户类型 + */ + private Integer userType; + /** + * 用户编号 + */ + private Long userId; + + /** + * 消息类型 + */ + private String messageType; + /** + * 消息内容 + */ + private String messageContent; + +} diff --git a/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/sender/redis/RedisWebSocketMessageConsumer.java b/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/sender/redis/RedisWebSocketMessageConsumer.java new file mode 100644 index 000000000..abce00695 --- /dev/null +++ b/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/sender/redis/RedisWebSocketMessageConsumer.java @@ -0,0 +1,23 @@ +package cn.iocoder.yudao.framework.websocket.core.sender.redis; + +import cn.iocoder.yudao.framework.mq.redis.core.pubsub.AbstractRedisChannelMessageListener; +import lombok.RequiredArgsConstructor; + +/** + * {@link RedisWebSocketMessage} 广播消息的消费者,真正把消息发送出去 + * + * @author 芋道源码 + */ +@RequiredArgsConstructor +public class RedisWebSocketMessageConsumer extends AbstractRedisChannelMessageListener { + + private final RedisWebSocketMessageSender redisWebSocketMessageSender; + + @Override + public void onMessage(RedisWebSocketMessage message) { + redisWebSocketMessageSender.send(message.getSessionId(), + message.getUserType(), message.getUserId(), + message.getMessageType(), message.getMessageContent()); + } + +} diff --git a/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/sender/redis/RedisWebSocketMessageSender.java b/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/sender/redis/RedisWebSocketMessageSender.java new file mode 100644 index 000000000..d6004ac6d --- /dev/null +++ b/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/sender/redis/RedisWebSocketMessageSender.java @@ -0,0 +1,57 @@ +package cn.iocoder.yudao.framework.websocket.core.sender.redis; + +import cn.iocoder.yudao.framework.mq.redis.core.RedisMQTemplate; +import cn.iocoder.yudao.framework.websocket.core.sender.AbstractWebSocketMessageSender; +import cn.iocoder.yudao.framework.websocket.core.sender.WebSocketMessageSender; +import cn.iocoder.yudao.framework.websocket.core.session.WebSocketSessionManager; +import lombok.extern.slf4j.Slf4j; + +/** + * 基于 Redis 的 {@link WebSocketMessageSender} 实现类 + * + * @author 芋道源码 + */ +@Slf4j +public class RedisWebSocketMessageSender extends AbstractWebSocketMessageSender { + + private final RedisMQTemplate redisMQTemplate; + + public RedisWebSocketMessageSender(WebSocketSessionManager sessionManager, + RedisMQTemplate redisMQTemplate) { + super(sessionManager); + this.redisMQTemplate = redisMQTemplate; + } + + @Override + public void send(Integer userType, Long userId, String messageType, String messageContent) { + sendRedisMessage(null, userId, userType, messageType, messageContent); + } + + @Override + public void send(Integer userType, String messageType, String messageContent) { + sendRedisMessage(null, null, userType, messageType, messageContent); + } + + @Override + public void send(String sessionId, String messageType, String messageContent) { + sendRedisMessage(sessionId, null, null, messageType, messageContent); + } + + /** + * 通过 Redis 广播消息 + * + * @param sessionId Session 编号 + * @param userId 用户编号 + * @param userType 用户类型 + * @param messageType 消息类型 + * @param messageContent 消息内容 + */ + private void sendRedisMessage(String sessionId, Long userId, Integer userType, + String messageType, String messageContent) { + RedisWebSocketMessage mqMessage = new RedisWebSocketMessage() + .setSessionId(sessionId).setUserId(userId).setUserType(userType) + .setMessageType(messageType).setMessageContent(messageContent); + redisMQTemplate.send(mqMessage); + } + +} diff --git a/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/sender/rocketmq/RocketMQWebSocketMessage.java b/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/sender/rocketmq/RocketMQWebSocketMessage.java new file mode 100644 index 000000000..91570e3e3 --- /dev/null +++ b/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/sender/rocketmq/RocketMQWebSocketMessage.java @@ -0,0 +1,35 @@ +package cn.iocoder.yudao.framework.websocket.core.sender.rocketmq; + +import lombok.Data; + +/** + * RocketMQ 广播 WebSocket 的消息 + * + * @author 芋道源码 + */ +@Data +public class RocketMQWebSocketMessage { + + /** + * Session 编号 + */ + private String sessionId; + /** + * 用户类型 + */ + private Integer userType; + /** + * 用户编号 + */ + private Long userId; + + /** + * 消息类型 + */ + private String messageType; + /** + * 消息内容 + */ + private String messageContent; + +} diff --git a/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/sender/rocketmq/RocketMQWebSocketMessageConsumer.java b/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/sender/rocketmq/RocketMQWebSocketMessageConsumer.java new file mode 100644 index 000000000..ab2e2c4dc --- /dev/null +++ b/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/sender/rocketmq/RocketMQWebSocketMessageConsumer.java @@ -0,0 +1,30 @@ +package cn.iocoder.yudao.framework.websocket.core.sender.rocketmq; + +import lombok.RequiredArgsConstructor; +import org.apache.rocketmq.spring.annotation.MessageModel; +import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; +import org.apache.rocketmq.spring.core.RocketMQListener; + +/** + * {@link RocketMQWebSocketMessage} 广播消息的消费者,真正把消息发送出去 + * + * @author 芋道源码 + */ +@RocketMQMessageListener( // 重点:添加 @RocketMQMessageListener 注解,声明消费的 topic + topic = "${yudao.websocket.sender-rocketmq.topic}", + consumerGroup = "${yudao.websocket.sender-rocketmq.consumer-group}", + messageModel = MessageModel.BROADCASTING // 设置为广播模式,保证每个实例都能收到消息 +) +@RequiredArgsConstructor +public class RocketMQWebSocketMessageConsumer implements RocketMQListener { + + private final RocketMQWebSocketMessageSender rocketMQWebSocketMessageSender; + + @Override + public void onMessage(RocketMQWebSocketMessage message) { + rocketMQWebSocketMessageSender.send(message.getSessionId(), + message.getUserType(), message.getUserId(), + message.getMessageType(), message.getMessageContent()); + } + +} diff --git a/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/sender/rocketmq/RocketMQWebSocketMessageSender.java b/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/sender/rocketmq/RocketMQWebSocketMessageSender.java new file mode 100644 index 000000000..ed059bac4 --- /dev/null +++ b/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/sender/rocketmq/RocketMQWebSocketMessageSender.java @@ -0,0 +1,61 @@ +package cn.iocoder.yudao.framework.websocket.core.sender.rocketmq; + +import cn.iocoder.yudao.framework.websocket.core.sender.AbstractWebSocketMessageSender; +import cn.iocoder.yudao.framework.websocket.core.sender.WebSocketMessageSender; +import cn.iocoder.yudao.framework.websocket.core.session.WebSocketSessionManager; +import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.spring.core.RocketMQTemplate; + +/** + * 基于 RocketMQ 的 {@link WebSocketMessageSender} 实现类 + * + * @author 芋道源码 + */ +@Slf4j +public class RocketMQWebSocketMessageSender extends AbstractWebSocketMessageSender { + + private final RocketMQTemplate rocketMQTemplate; + + private final String topic; + + public RocketMQWebSocketMessageSender(WebSocketSessionManager sessionManager, + RocketMQTemplate rocketMQTemplate, + String topic) { + super(sessionManager); + this.rocketMQTemplate = rocketMQTemplate; + this.topic = topic; + } + + @Override + public void send(Integer userType, Long userId, String messageType, String messageContent) { + sendRocketMQMessage(null, userId, userType, messageType, messageContent); + } + + @Override + public void send(Integer userType, String messageType, String messageContent) { + sendRocketMQMessage(null, null, userType, messageType, messageContent); + } + + @Override + public void send(String sessionId, String messageType, String messageContent) { + sendRocketMQMessage(sessionId, null, null, messageType, messageContent); + } + + /** + * 通过 RocketMQ 广播消息 + * + * @param sessionId Session 编号 + * @param userId 用户编号 + * @param userType 用户类型 + * @param messageType 消息类型 + * @param messageContent 消息内容 + */ + private void sendRocketMQMessage(String sessionId, Long userId, Integer userType, + String messageType, String messageContent) { + RocketMQWebSocketMessage mqMessage = new RocketMQWebSocketMessage() + .setSessionId(sessionId).setUserId(userId).setUserType(userType) + .setMessageType(messageType).setMessageContent(messageContent); + rocketMQTemplate.syncSend(topic, mqMessage); + } + +} diff --git a/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/session/WebSocketSessionHandlerDecorator.java b/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/session/WebSocketSessionHandlerDecorator.java new file mode 100644 index 000000000..600a4dd96 --- /dev/null +++ b/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/session/WebSocketSessionHandlerDecorator.java @@ -0,0 +1,49 @@ +package cn.iocoder.yudao.framework.websocket.core.session; + +import org.springframework.web.socket.CloseStatus; +import org.springframework.web.socket.WebSocketHandler; +import org.springframework.web.socket.WebSocketSession; +import org.springframework.web.socket.handler.ConcurrentWebSocketSessionDecorator; +import org.springframework.web.socket.handler.WebSocketHandlerDecorator; + +/** + * {@link WebSocketHandler} 的装饰类,实现了以下功能: + * + * 1. {@link WebSocketSession} 连接或关闭时,使用 {@link #sessionManager} 进行管理 + * 2. 封装 {@link WebSocketSession} 支持并发操作 + * + * @author 芋道源码 + */ +public class WebSocketSessionHandlerDecorator extends WebSocketHandlerDecorator { + + /** + * 发送时间的限制,单位:毫秒 + */ + private static final Integer SEND_TIME_LIMIT = 1000 * 5; + /** + * 发送消息缓冲上线,单位:bytes + */ + private static final Integer BUFFER_SIZE_LIMIT = 1024 * 100; + + private final WebSocketSessionManager sessionManager; + + public WebSocketSessionHandlerDecorator(WebSocketHandler delegate, + WebSocketSessionManager sessionManager) { + super(delegate); + this.sessionManager = sessionManager; + } + + @Override + public void afterConnectionEstablished(WebSocketSession session) { + // 实现 session 支持并发,可参考 https://blog.csdn.net/abu935009066/article/details/131218149 + session = new ConcurrentWebSocketSessionDecorator(session, SEND_TIME_LIMIT, BUFFER_SIZE_LIMIT); + // 添加到 WebSocketSessionManager 中 + sessionManager.addSession(session); + } + + @Override + public void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) { + sessionManager.removeSession(session); + } + +} diff --git a/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/session/WebSocketSessionManager.java b/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/session/WebSocketSessionManager.java new file mode 100644 index 000000000..ad1de23c2 --- /dev/null +++ b/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/session/WebSocketSessionManager.java @@ -0,0 +1,53 @@ +package cn.iocoder.yudao.framework.websocket.core.session; + +import org.springframework.web.socket.WebSocketSession; + +import java.util.Collection; + +/** + * {@link WebSocketSession} 管理器的接口 + * + * @author 芋道源码 + */ +public interface WebSocketSessionManager { + + /** + * 添加 Session + * + * @param session Session + */ + void addSession(WebSocketSession session); + + /** + * 移除 Session + * + * @param session Session + */ + void removeSession(WebSocketSession session); + + /** + * 获得指定编号的 Session + * + * @param id Session 编号 + * @return Session + */ + WebSocketSession getSession(String id); + + /** + * 获得指定用户类型的 Session 列表 + * + * @param userType 用户类型 + * @return Session 列表 + */ + Collection getSessionList(Integer userType); + + /** + * 获得指定用户编号的 Session 列表 + * + * @param userType 用户类型 + * @param userId 用户编号 + * @return Session 列表 + */ + Collection getSessionList(Integer userType, Long userId); + +} \ No newline at end of file diff --git a/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/session/WebSocketSessionManagerImpl.java b/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/session/WebSocketSessionManagerImpl.java new file mode 100644 index 000000000..aca572f90 --- /dev/null +++ b/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/session/WebSocketSessionManagerImpl.java @@ -0,0 +1,125 @@ +package cn.iocoder.yudao.framework.websocket.core.session; + +import cn.hutool.core.collection.CollUtil; +import cn.iocoder.yudao.framework.security.core.LoginUser; +import cn.iocoder.yudao.framework.tenant.core.context.TenantContextHolder; +import cn.iocoder.yudao.framework.websocket.core.util.WebSocketFrameworkUtils; +import org.springframework.web.socket.WebSocketSession; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CopyOnWriteArrayList; + +/** + * 默认的 {@link WebSocketSessionManager} 实现类 + * + * @author 芋道源码 + */ +public class WebSocketSessionManagerImpl implements WebSocketSessionManager { + + /** + * id 与 WebSocketSession 映射 + * + * key:Session 编号 + */ + private final ConcurrentMap idSessions = new ConcurrentHashMap<>(); + + /** + * user 与 WebSocketSession 映射 + * + * key1:用户类型 + * key2:用户编号 + */ + private final ConcurrentMap>> userSessions + = new ConcurrentHashMap<>(); + + @Override + public void addSession(WebSocketSession session) { + // 添加到 idSessions 中 + idSessions.put(session.getId(), session); + // 添加到 userSessions 中 + LoginUser user = WebSocketFrameworkUtils.getLoginUser(session); + if (user == null) { + return; + } + ConcurrentMap> userSessionsMap = userSessions.get(user.getUserType()); + if (userSessionsMap == null) { + userSessionsMap = new ConcurrentHashMap<>(); + if (userSessions.putIfAbsent(user.getUserType(), userSessionsMap) != null) { + userSessionsMap = userSessions.get(user.getUserType()); + } + } + CopyOnWriteArrayList sessions = userSessionsMap.get(user.getId()); + if (sessions == null) { + sessions = new CopyOnWriteArrayList<>(); + if (userSessionsMap.putIfAbsent(user.getId(), sessions) != null) { + sessions = userSessionsMap.get(user.getId()); + } + } + sessions.add(session); + } + + @Override + public void removeSession(WebSocketSession session) { + // 移除从 idSessions 中 + idSessions.remove(session.getId(), session); + // 移除从 idSessions 中 + LoginUser user = WebSocketFrameworkUtils.getLoginUser(session); + if (user == null) { + return; + } + ConcurrentMap> userSessionsMap = userSessions.get(user.getUserType()); + if (userSessionsMap == null) { + return; + } + CopyOnWriteArrayList sessions = userSessionsMap.get(user.getId()); + sessions.removeIf(session0 -> session0.getId().equals(session.getId())); + if (CollUtil.isEmpty(sessions)) { + userSessionsMap.remove(user.getId(), sessions); + } + } + + @Override + public WebSocketSession getSession(String id) { + return idSessions.get(id); + } + + @Override + public Collection getSessionList(Integer userType) { + ConcurrentMap> userSessionsMap = userSessions.get(userType); + if (CollUtil.isEmpty(userSessionsMap)) { + return new ArrayList<>(); + } + LinkedList result = new LinkedList<>(); // 避免扩容 + Long contextTenantId = TenantContextHolder.getTenantId(); + for (List sessions : userSessionsMap.values()) { + if (CollUtil.isEmpty(sessions)) { + continue; + } + // 特殊:如果租户不匹配,则直接排除 + if (contextTenantId != null) { + Long userTenantId = WebSocketFrameworkUtils.getTenantId(sessions.get(0)); + if (!contextTenantId.equals(userTenantId)) { + continue; + } + } + result.addAll(sessions); + } + return result; + } + + @Override + public Collection getSessionList(Integer userType, Long userId) { + ConcurrentMap> userSessionsMap = userSessions.get(userType); + if (CollUtil.isEmpty(userSessionsMap)) { + return new ArrayList<>(); + } + CopyOnWriteArrayList sessions = userSessionsMap.get(userId); + return CollUtil.isNotEmpty(sessions) ? new ArrayList<>(sessions) : new ArrayList<>(); + } + +} diff --git a/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/util/WebSocketFrameworkUtils.java b/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/util/WebSocketFrameworkUtils.java new file mode 100644 index 000000000..58cdedc29 --- /dev/null +++ b/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/util/WebSocketFrameworkUtils.java @@ -0,0 +1,67 @@ +package cn.iocoder.yudao.framework.websocket.core.util; + +import cn.iocoder.yudao.framework.security.core.LoginUser; +import org.springframework.web.socket.WebSocketSession; + +import java.util.Map; + +/** + * 专属于 web 包的工具类 + * + * @author 芋道源码 + */ +public class WebSocketFrameworkUtils { + + public static final String ATTRIBUTE_LOGIN_USER = "LOGIN_USER"; + + /** + * 设置当前用户 + * + * @param loginUser 登录用户 + * @param attributes Session + */ + public static void setLoginUser(LoginUser loginUser, Map attributes) { + attributes.put(ATTRIBUTE_LOGIN_USER, loginUser); + } + + /** + * 获取当前用户 + * + * @return 当前用户 + */ + public static LoginUser getLoginUser(WebSocketSession session) { + return (LoginUser) session.getAttributes().get(ATTRIBUTE_LOGIN_USER); + } + + /** + * 获得当前用户的编号 + * + * @return 用户编号 + */ + public static Long getLoginUserId(WebSocketSession session) { + LoginUser loginUser = getLoginUser(session); + return loginUser != null ? loginUser.getId() : null; + } + + /** + * 获得当前用户的类型 + * + * @return 用户编号 + */ + public static Integer getLoginUserType(WebSocketSession session) { + LoginUser loginUser = getLoginUser(session); + return loginUser != null ? loginUser.getUserType() : null; + } + + /** + * 获得当前用户的租户编号 + * + * @param session Session + * @return 租户编号 + */ + public static Long getTenantId(WebSocketSession session) { + LoginUser loginUser = getLoginUser(session); + return loginUser != null ? loginUser.getTenantId() : null; + } + +} diff --git a/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/package-info.java b/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/package-info.java index c771dfaac..97bc5f951 100644 --- a/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/package-info.java +++ b/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/package-info.java @@ -1 +1,4 @@ +/** + * WebSocket 框架,支持多节点的广播 + */ package cn.iocoder.yudao.framework.websocket; diff --git a/yudao-framework/yudao-spring-boot-starter-websocket/《芋道 Spring Boot WebSocket 入门》.md b/yudao-framework/yudao-spring-boot-starter-websocket/《芋道 Spring Boot WebSocket 入门》.md new file mode 100644 index 000000000..8df5a7758 --- /dev/null +++ b/yudao-framework/yudao-spring-boot-starter-websocket/《芋道 Spring Boot WebSocket 入门》.md @@ -0,0 +1 @@ + diff --git a/yudao-module-infra/yudao-module-infra-api/src/main/java/cn/iocoder/yudao/module/infra/api/websocket/WebSocketSenderApi.java b/yudao-module-infra/yudao-module-infra-api/src/main/java/cn/iocoder/yudao/module/infra/api/websocket/WebSocketSenderApi.java new file mode 100644 index 000000000..38582c2f3 --- /dev/null +++ b/yudao-module-infra/yudao-module-infra-api/src/main/java/cn/iocoder/yudao/module/infra/api/websocket/WebSocketSenderApi.java @@ -0,0 +1,54 @@ +package cn.iocoder.yudao.module.infra.api.websocket; + +import cn.iocoder.yudao.framework.common.util.json.JsonUtils; + +/** + * WebSocket 发送器的 API 接口 + * + * 对 WebSocketMessageSender 进行封装,提供给其它模块使用 + * + * @author 芋道源码 + */ +public interface WebSocketSenderApi { + + /** + * 发送消息给指定用户 + * + * @param userType 用户类型 + * @param userId 用户编号 + * @param messageType 消息类型 + * @param messageContent 消息内容,JSON 格式 + */ + void send(Integer userType, Long userId, String messageType, String messageContent); + + /** + * 发送消息给指定用户类型 + * + * @param userType 用户类型 + * @param messageType 消息类型 + * @param messageContent 消息内容,JSON 格式 + */ + void send(Integer userType, String messageType, String messageContent); + + /** + * 发送消息给指定 Session + * + * @param sessionId Session 编号 + * @param messageType 消息类型 + * @param messageContent 消息内容,JSON 格式 + */ + void send(String sessionId, String messageType, String messageContent); + + default void sendObject(Integer userType, Long userId, String messageType, Object messageContent) { + send(userType, userId, messageType, JsonUtils.toJsonString(messageContent)); + } + + default void sendObject(Integer userType, String messageType, Object messageContent) { + send(userType, messageType, JsonUtils.toJsonString(messageContent)); + } + + default void sendObject(String sessionId, String messageType, Object messageContent) { + send(sessionId, messageType, JsonUtils.toJsonString(messageContent)); + } + +} diff --git a/yudao-module-infra/yudao-module-infra-biz/pom.xml b/yudao-module-infra/yudao-module-infra-biz/pom.xml index af5fb9ac2..4c6cd401b 100644 --- a/yudao-module-infra/yudao-module-infra-biz/pom.xml +++ b/yudao-module-infra/yudao-module-infra-biz/pom.xml @@ -46,6 +46,11 @@ yudao-spring-boot-starter-security + + cn.iocoder.boot + yudao-spring-boot-starter-websocket + + cn.iocoder.boot @@ -116,11 +121,6 @@ yudao-spring-boot-starter-file - - - org.springframework.boot - spring-boot-starter-websocket - diff --git a/yudao-module-infra/yudao-module-infra-biz/src/main/java/cn/iocoder/yudao/module/infra/api/websocket/WebSocketSenderApiImpl.java b/yudao-module-infra/yudao-module-infra-biz/src/main/java/cn/iocoder/yudao/module/infra/api/websocket/WebSocketSenderApiImpl.java new file mode 100644 index 000000000..046cd2fc1 --- /dev/null +++ b/yudao-module-infra/yudao-module-infra-biz/src/main/java/cn/iocoder/yudao/module/infra/api/websocket/WebSocketSenderApiImpl.java @@ -0,0 +1,34 @@ +package cn.iocoder.yudao.module.infra.api.websocket; + +import cn.iocoder.yudao.framework.websocket.core.sender.WebSocketMessageSender; +import org.springframework.stereotype.Component; + +import javax.annotation.Resource; + +/** + * WebSocket 发送器的 API 实现类 + * + * @author 芋道源码 + */ +@Component +public class WebSocketSenderApiImpl implements WebSocketSenderApi { + + @Resource + private WebSocketMessageSender webSocketMessageSender; + + @Override + public void send(Integer userType, Long userId, String messageType, String messageContent) { + webSocketMessageSender.send(userType, userId, messageType, messageContent); + } + + @Override + public void send(Integer userType, String messageType, String messageContent) { + webSocketMessageSender.send(userType, messageType, messageContent); + } + + @Override + public void send(String sessionId, String messageType, String messageContent) { + webSocketMessageSender.send(sessionId, messageType, messageContent); + } + +} diff --git a/yudao-module-infra/yudao-module-infra-biz/src/main/java/cn/iocoder/yudao/module/infra/websocket/DemoWebSocketMessageListener.java b/yudao-module-infra/yudao-module-infra-biz/src/main/java/cn/iocoder/yudao/module/infra/websocket/DemoWebSocketMessageListener.java new file mode 100644 index 000000000..9ccf6070e --- /dev/null +++ b/yudao-module-infra/yudao-module-infra-biz/src/main/java/cn/iocoder/yudao/module/infra/websocket/DemoWebSocketMessageListener.java @@ -0,0 +1,48 @@ +package cn.iocoder.yudao.module.infra.websocket; + +import cn.iocoder.yudao.framework.common.enums.UserTypeEnum; +import cn.iocoder.yudao.framework.websocket.core.listener.WebSocketMessageListener; +import cn.iocoder.yudao.framework.websocket.core.sender.WebSocketMessageSender; +import cn.iocoder.yudao.framework.websocket.core.util.WebSocketFrameworkUtils; +import cn.iocoder.yudao.module.infra.websocket.message.DemoReceiveMessage; +import cn.iocoder.yudao.module.infra.websocket.message.DemoSendMessage; +import org.springframework.stereotype.Component; +import org.springframework.web.socket.WebSocketSession; + +import javax.annotation.Resource; + +/** + * WebSocket 示例:单发消息 + * + * @author 芋道源码 + */ +@Component +public class DemoWebSocketMessageListener implements WebSocketMessageListener { + + @Resource + private WebSocketMessageSender webSocketMessageSender; + + @Override + public void onMessage(WebSocketSession session, DemoSendMessage message) { + Long fromUserId = WebSocketFrameworkUtils.getLoginUserId(session); + // 情况一:单发 + if (message.getToUserId() != null) { + DemoReceiveMessage toMessage = new DemoReceiveMessage().setFromUserId(fromUserId) + .setText(message.getText()).setSingle(true); + webSocketMessageSender.sendObject(UserTypeEnum.ADMIN.getValue(), message.getToUserId(), // 给指定用户 + "demo-message-receive", toMessage); + return; + } + // 情况二:群发 + DemoReceiveMessage toMessage = new DemoReceiveMessage().setFromUserId(fromUserId) + .setText(message.getText()).setSingle(false); + webSocketMessageSender.sendObject(UserTypeEnum.ADMIN.getValue(), // 给所有用户 + "demo-message-receive", toMessage); + } + + @Override + public String getType() { + return "demo-message-send"; + } + +} diff --git a/yudao-module-infra/yudao-module-infra-biz/src/main/java/cn/iocoder/yudao/module/infra/websocket/SemaphoreUtils.java b/yudao-module-infra/yudao-module-infra-biz/src/main/java/cn/iocoder/yudao/module/infra/websocket/SemaphoreUtils.java deleted file mode 100644 index 67a87f169..000000000 --- a/yudao-module-infra/yudao-module-infra-biz/src/main/java/cn/iocoder/yudao/module/infra/websocket/SemaphoreUtils.java +++ /dev/null @@ -1,45 +0,0 @@ -package cn.iocoder.yudao.module.infra.websocket; - -import lombok.extern.slf4j.Slf4j; - -import java.util.concurrent.Semaphore; - -/** - * 信号量相关处理 - * - */ -@Slf4j -public class SemaphoreUtils { - - /** - * 获取信号量 - * - * @param semaphore - * @return - */ - public static boolean tryAcquire(Semaphore semaphore) { - boolean flag = false; - - try { - flag = semaphore.tryAcquire(); - } catch (Exception e) { - log.error("获取信号量异常", e); - } - - return flag; - } - - /** - * 释放信号量 - * - * @param semaphore - */ - public static void release(Semaphore semaphore) { - - try { - semaphore.release(); - } catch (Exception e) { - log.error("释放信号量异常", e); - } - } -} diff --git a/yudao-module-infra/yudao-module-infra-biz/src/main/java/cn/iocoder/yudao/module/infra/websocket/WebSocketConfig.java b/yudao-module-infra/yudao-module-infra-biz/src/main/java/cn/iocoder/yudao/module/infra/websocket/WebSocketConfig.java deleted file mode 100644 index 380bc9317..000000000 --- a/yudao-module-infra/yudao-module-infra-biz/src/main/java/cn/iocoder/yudao/module/infra/websocket/WebSocketConfig.java +++ /dev/null @@ -1,16 +0,0 @@ -package cn.iocoder.yudao.module.infra.websocket; - -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; -import org.springframework.web.socket.server.standard.ServerEndpointExporter; - -/** - * websocket 配置 - */ -@Configuration -public class WebSocketConfig { - @Bean - public ServerEndpointExporter serverEndpointExporter() { - return new ServerEndpointExporter(); - } -} diff --git a/yudao-module-infra/yudao-module-infra-biz/src/main/java/cn/iocoder/yudao/module/infra/websocket/WebSocketServer.java b/yudao-module-infra/yudao-module-infra-biz/src/main/java/cn/iocoder/yudao/module/infra/websocket/WebSocketServer.java deleted file mode 100644 index f0cfdd9dc..000000000 --- a/yudao-module-infra/yudao-module-infra-biz/src/main/java/cn/iocoder/yudao/module/infra/websocket/WebSocketServer.java +++ /dev/null @@ -1,86 +0,0 @@ -package cn.iocoder.yudao.module.infra.websocket; - -import lombok.extern.slf4j.Slf4j; -import org.springframework.stereotype.Component; - -import javax.websocket.*; -import javax.websocket.server.ServerEndpoint; -import java.util.concurrent.Semaphore; - -/** - * websocket 消息处理 - */ -@Component -@ServerEndpoint("/websocket/message") -@Slf4j -public class WebSocketServer { - - /** - * 默认最多允许同时在线用户数100 - */ - public static int socketMaxOnlineCount = 100; - - private static final Semaphore SOCKET_SEMAPHORE = new Semaphore(socketMaxOnlineCount); - - /** - * 连接建立成功调用的方法 - */ - @OnOpen - public void onOpen(Session session) throws Exception { - // 尝试获取信号量 - boolean semaphoreFlag = SemaphoreUtils.tryAcquire(SOCKET_SEMAPHORE); - if (!semaphoreFlag) { - // 未获取到信号量 - log.error("当前在线人数超过限制数:{}", socketMaxOnlineCount); - WebSocketUsers.sendMessage(session, "当前在线人数超过限制数:" + socketMaxOnlineCount); - session.close(); - } else { - String userId = WebSocketUsers.getParam("userId", session); - if (userId != null) { - // 添加用户 - WebSocketUsers.addSession(userId, session); - log.info("用户【userId={}】建立连接,当前连接用户总数:{}", userId, WebSocketUsers.getUsers().size()); - WebSocketUsers.sendMessage(session, "接收内容:连接成功"); - } else { - WebSocketUsers.sendMessage(session, "接收内容:连接失败"); - } - } - } - - /** - * 连接关闭时处理 - */ - @OnClose - public void onClose(Session session) { - log.info("用户【sessionId={}】关闭连接!", session.getId()); - // 移除用户 - WebSocketUsers.removeSession(session); - // 获取到信号量则需释放 - SemaphoreUtils.release(SOCKET_SEMAPHORE); - } - - /** - * 抛出异常时处理 - */ - @OnError - public void onError(Session session, Throwable exception) throws Exception { - if (session.isOpen()) { - // 关闭连接 - session.close(); - } - String sessionId = session.getId(); - log.info("用户【sessionId={}】连接异常!异常信息:{}", sessionId, exception); - // 移出用户 - WebSocketUsers.removeSession(session); - // 获取到信号量则需释放 - SemaphoreUtils.release(SOCKET_SEMAPHORE); - } - - /** - * 收到客户端消息时调用的方法 - */ - @OnMessage - public void onMessage(Session session, String message) { - WebSocketUsers.sendMessage(session, "接收内容:" + message); - } -} diff --git a/yudao-module-infra/yudao-module-infra-biz/src/main/java/cn/iocoder/yudao/module/infra/websocket/WebSocketUsers.java b/yudao-module-infra/yudao-module-infra-biz/src/main/java/cn/iocoder/yudao/module/infra/websocket/WebSocketUsers.java deleted file mode 100644 index 281a97c7d..000000000 --- a/yudao-module-infra/yudao-module-infra-biz/src/main/java/cn/iocoder/yudao/module/infra/websocket/WebSocketUsers.java +++ /dev/null @@ -1,178 +0,0 @@ -package cn.iocoder.yudao.module.infra.websocket; - -import cn.hutool.core.map.MapUtil; -import cn.hutool.core.util.StrUtil; -import lombok.extern.slf4j.Slf4j; -import org.bouncycastle.util.Strings; - -import javax.validation.constraints.NotNull; -import javax.websocket.Session; -import java.io.IOException; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; - -/** - * websocket 客户端用户 - */ -@Slf4j -public class WebSocketUsers { - - /** - * 用户集 - * TODO 需要登录用户的session? - */ - private static final Map SESSION_MAP = new ConcurrentHashMap<>(); - - /** - * 存储用户 - * - * @param userId 唯一键 - * @param session 用户信息 - */ - public static void addSession(String userId, Session session) { - SESSION_MAP.put(userId, session); - } - - /** - * 移除用户 - * - * @param session 用户信息 - * @return 移除结果 - */ - public static boolean removeSession(Session session) { - String key = null; - boolean flag = SESSION_MAP.containsValue(session); - if (flag) { - Set> entries = SESSION_MAP.entrySet(); - for (Map.Entry entry : entries) { - Session value = entry.getValue(); - if (value.equals(session)) { - key = entry.getKey(); - break; - } - } - } else { - return true; - } - return removeSession(key); - } - - /** - * 移出用户 - * - * @param userId 用户id - */ - public static boolean removeSession(String userId) { - log.info("用户【userId={}】退出", userId); - Session remove = SESSION_MAP.remove(userId); - if (remove != null) { - boolean containsValue = SESSION_MAP.containsValue(remove); - log.info("用户【userId={}】退出{},当前连接用户总数:{}", userId, containsValue ? "失败" : "成功", SESSION_MAP.size()); - return containsValue; - } else { - return true; - } - } - - /** - * 获取在线用户列表 - * - * @return 返回用户集合 - */ - public static Map getUsers() { - return SESSION_MAP; - } - - /** - * 向所有在线人发送消息 - * - * @param message 消息内容 - */ - public static void sendMessageToAll(String message) { - SESSION_MAP.forEach((userId, session) -> { - if (session.isOpen()) { - sendMessage(session, message); - } - }); - } - - /** - * 异步发送文本消息 - * - * @param session 用户session - * @param message 消息内容 - */ - public static void sendMessageAsync(Session session, String message) { - if (session.isOpen()) { - // TODO 需要加synchronized锁(synchronized(session))?单个session创建线程? - session.getAsyncRemote().sendText(message); - } else { - log.warn("用户【session={}】不在线", session.getId()); - } - } - - /** - * 同步发送文本消息 - * - * @param session 用户session - * @param message 消息内容 - */ - public static void sendMessage(Session session, String message) { - try { - if (session.isOpen()) { - // TODO 需要加synchronized锁(synchronized(session))?单个session创建线程? - session.getBasicRemote().sendText(message); - } else { - log.warn("用户【session={}】不在线", session.getId()); - } - } catch (IOException e) { - log.error("发送消息异常", e); - } - - } - - /** - * 根据用户id发送消息 - * - * @param userId 用户id - * @param message 消息内容 - */ - public static void sendMessage(String userId, String message) { - Session session = SESSION_MAP.get(userId); - //判断是否存在该用户的session,并且是否在线 - if (session == null || !session.isOpen()) { - return; - } - sendMessage(session, message); - } - - - /** - * 获取session中的指定参数值 - * - * @param key 参数key - * @param session 用户session - */ - public static String getParam(@NotNull String key, Session session) { - //TODO 目前只针对获取一个key的值,后期根据情况拓展多个 或者直接在onClose onOpen上获取参数? - String value = null; - Map> parameters = session.getRequestParameterMap(); - if (MapUtil.isNotEmpty(parameters)) { - value = parameters.get(key).get(0); - } else { - String queryString = session.getQueryString(); - if (!StrUtil.isEmpty(queryString)) { - String[] params = Strings.split(queryString, '&'); - for (String paramPair : params) { - String[] nameValues = Strings.split(paramPair, '='); - if (key.equals(nameValues[0])) { - value = nameValues[1]; - } - } - } - } - return value; - } -} diff --git a/yudao-module-infra/yudao-module-infra-biz/src/main/java/cn/iocoder/yudao/module/infra/websocket/message/DemoReceiveMessage.java b/yudao-module-infra/yudao-module-infra-biz/src/main/java/cn/iocoder/yudao/module/infra/websocket/message/DemoReceiveMessage.java new file mode 100644 index 000000000..03a246cf9 --- /dev/null +++ b/yudao-module-infra/yudao-module-infra-biz/src/main/java/cn/iocoder/yudao/module/infra/websocket/message/DemoReceiveMessage.java @@ -0,0 +1,27 @@ +package cn.iocoder.yudao.module.infra.websocket.message; + +import lombok.Data; + +/** + * 示例:server -> client 同步消息 + * + * @author 芋道源码 + */ +@Data +public class DemoReceiveMessage { + + /** + * 接收人的编号 + */ + private Long fromUserId; + /** + * 内容 + */ + private String text; + + /** + * 是否单聊 + */ + private Boolean single; + +} diff --git a/yudao-module-infra/yudao-module-infra-biz/src/main/java/cn/iocoder/yudao/module/infra/websocket/message/DemoSendMessage.java b/yudao-module-infra/yudao-module-infra-biz/src/main/java/cn/iocoder/yudao/module/infra/websocket/message/DemoSendMessage.java new file mode 100644 index 000000000..f0c14f5d3 --- /dev/null +++ b/yudao-module-infra/yudao-module-infra-biz/src/main/java/cn/iocoder/yudao/module/infra/websocket/message/DemoSendMessage.java @@ -0,0 +1,24 @@ +package cn.iocoder.yudao.module.infra.websocket.message; + +import lombok.Data; + +/** + * 示例:client -> server 发送消息 + * + * @author 芋道源码 + */ +@Data +public class DemoSendMessage { + + /** + * 发送给谁 + * + * 如果为空,说明发送给所有人 + */ + private Long toUserId; + /** + * 内容 + */ + private String text; + +} diff --git a/yudao-module-member/yudao-module-member-biz/src/main/java/cn/iocoder/yudao/module/member/controller/app/auth/AppAuthController.java b/yudao-module-member/yudao-module-member-biz/src/main/java/cn/iocoder/yudao/module/member/controller/app/auth/AppAuthController.java index ed3963e90..4b6ed7b26 100644 --- a/yudao-module-member/yudao-module-member-biz/src/main/java/cn/iocoder/yudao/module/member/controller/app/auth/AppAuthController.java +++ b/yudao-module-member/yudao-module-member-biz/src/main/java/cn/iocoder/yudao/module/member/controller/app/auth/AppAuthController.java @@ -53,7 +53,8 @@ public class AppAuthController { @PermitAll @Operation(summary = "登出系统") public CommonResult logout(HttpServletRequest request) { - String token = SecurityFrameworkUtils.obtainAuthorization(request, securityProperties.getTokenHeader()); + String token = SecurityFrameworkUtils.obtainAuthorization(request, + securityProperties.getTokenHeader(), securityProperties.getTokenParameter()); if (StrUtil.isNotBlank(token)) { authService.logout(token); } diff --git a/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/controller/admin/auth/AuthController.java b/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/controller/admin/auth/AuthController.java index c2eae8d38..061539494 100644 --- a/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/controller/admin/auth/AuthController.java +++ b/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/controller/admin/auth/AuthController.java @@ -7,6 +7,7 @@ import cn.iocoder.yudao.framework.common.enums.UserTypeEnum; import cn.iocoder.yudao.framework.common.pojo.CommonResult; import cn.iocoder.yudao.framework.operatelog.core.annotations.OperateLog; import cn.iocoder.yudao.framework.security.config.SecurityProperties; +import cn.iocoder.yudao.framework.security.core.util.SecurityFrameworkUtils; import cn.iocoder.yudao.module.system.controller.admin.auth.vo.*; import cn.iocoder.yudao.module.system.convert.auth.AuthConvert; import cn.iocoder.yudao.module.system.dal.dataobject.permission.MenuDO; @@ -38,7 +39,6 @@ import java.util.Set; import static cn.iocoder.yudao.framework.common.pojo.CommonResult.success; import static cn.iocoder.yudao.framework.common.util.collection.CollectionUtils.convertSet; import static cn.iocoder.yudao.framework.security.core.util.SecurityFrameworkUtils.getLoginUserId; -import static cn.iocoder.yudao.framework.security.core.util.SecurityFrameworkUtils.obtainAuthorization; @Tag(name = "管理后台 - 认证") @RestController @@ -76,7 +76,8 @@ public class AuthController { @Operation(summary = "登出系统") @OperateLog(enable = false) // 避免 Post 请求被记录操作日志 public CommonResult logout(HttpServletRequest request) { - String token = obtainAuthorization(request, securityProperties.getTokenHeader()); + String token = SecurityFrameworkUtils.obtainAuthorization(request, + securityProperties.getTokenHeader(), securityProperties.getTokenParameter()); if (StrUtil.isNotBlank(token)) { authService.logout(token, LoginLogTypeEnum.LOGOUT_SELF.getType()); } diff --git a/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/controller/admin/notice/NoticeController.java b/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/controller/admin/notice/NoticeController.java index 0e1957785..5a566702f 100644 --- a/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/controller/admin/notice/NoticeController.java +++ b/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/controller/admin/notice/NoticeController.java @@ -1,12 +1,16 @@ package cn.iocoder.yudao.module.system.controller.admin.notice; +import cn.hutool.core.lang.Assert; +import cn.iocoder.yudao.framework.common.enums.UserTypeEnum; import cn.iocoder.yudao.framework.common.pojo.CommonResult; import cn.iocoder.yudao.framework.common.pojo.PageResult; +import cn.iocoder.yudao.module.infra.api.websocket.WebSocketSenderApi; import cn.iocoder.yudao.module.system.controller.admin.notice.vo.NoticeCreateReqVO; import cn.iocoder.yudao.module.system.controller.admin.notice.vo.NoticePageReqVO; import cn.iocoder.yudao.module.system.controller.admin.notice.vo.NoticeRespVO; import cn.iocoder.yudao.module.system.controller.admin.notice.vo.NoticeUpdateReqVO; import cn.iocoder.yudao.module.system.convert.notice.NoticeConvert; +import cn.iocoder.yudao.module.system.dal.dataobject.notice.NoticeDO; import cn.iocoder.yudao.module.system.service.notice.NoticeService; import io.swagger.v3.oas.annotations.tags.Tag; import io.swagger.v3.oas.annotations.Parameter; @@ -29,6 +33,9 @@ public class NoticeController { @Resource private NoticeService noticeService; + @Resource + private WebSocketSenderApi webSocketSenderApi; + @PostMapping("/create") @Operation(summary = "创建通知公告") @PreAuthorize("@ss.hasPermission('system:notice:create')") @@ -69,4 +76,16 @@ public class NoticeController { return success(NoticeConvert.INSTANCE.convert(noticeService.getNotice(id))); } + @PostMapping("/push") + @Operation(summary = "推送通知公告", description = "只发送给 websocket 连接在线的用户") + @Parameter(name = "id", description = "编号", required = true, example = "1024") + @PreAuthorize("@ss.hasPermission('system:notice:update')") + public CommonResult push(@RequestParam("id") Long id) { + NoticeDO notice = noticeService.getNotice(id); + Assert.notNull(notice, "公告不能为空"); + // 通过 websocket 推送给在线的用户 + webSocketSenderApi.sendObject(UserTypeEnum.ADMIN.getValue(), "notice-push", notice); + return success(true); + } + } diff --git a/yudao-server/src/main/resources/application-local.yaml b/yudao-server/src/main/resources/application-local.yaml index f15b0cf8d..6dfe13fff 100644 --- a/yudao-server/src/main/resources/application-local.yaml +++ b/yudao-server/src/main/resources/application-local.yaml @@ -123,8 +123,8 @@ spring: rabbitmq: host: 127.0.0.1 # RabbitMQ 服务的地址 port: 5672 # RabbitMQ 服务的端口 - username: guest # RabbitMQ 服务的账号 - password: guest # RabbitMQ 服务的密码 + username: rabbit # RabbitMQ 服务的账号 + password: rabbit # RabbitMQ 服务的密码 # Kafka 配置项,对应 KafkaProperties 配置类 kafka: bootstrap-servers: 127.0.0.1:9092 # 指定 Kafka Broker 地址,可以设置多个,以逗号分隔 diff --git a/yudao-server/src/main/resources/application.yaml b/yudao-server/src/main/resources/application.yaml index a51fa9bbb..954411022 100644 --- a/yudao-server/src/main/resources/application.yaml +++ b/yudao-server/src/main/resources/application.yaml @@ -146,9 +146,17 @@ yudao: - /admin-api/mp/open/** # 微信公众号开放平台,微信回调接口,不需要登录 websocket: enable: true # websocket的开关 - path: /websocket/message # 路径 - maxOnlineCount: 0 # 最大连接人数 - sessionMap: true # 保存sessionMap + path: /infra/ws # 路径 + sender-type: local # 消息发送的类型,可选值为 local、redis、rocketmq、kafka、rabbitmq + sender-rocketmq: + topic: ${spring.application.name}-websocket # 消息发送的 RocketMQ Topic + consumer-group: ${spring.application.name}-websocket-consumer # 消息发送的 RocketMQ Consumer Group + sender-rabbitmq: + exchange: ${spring.application.name}-websocket-exchange # 消息发送的 RabbitMQ Exchange + queue: ${spring.application.name}-websocket-queue # 消息发送的 RabbitMQ Queue + sender-kafka: + topic: ${spring.application.name}-websocket # 消息发送的 Kafka Topic + consumer-group: ${spring.application.name}-websocket-consumer # 消息发送的 Kafka Consumer Group swagger: title: 芋道快速开发平台 description: 提供管理后台、用户 App 的所有功能