diff --git a/yudao-framework/yudao-spring-boot-starter-websocket/pom.xml b/yudao-framework/yudao-spring-boot-starter-websocket/pom.xml index 3e9cecf3c..b18ee4783 100644 --- a/yudao-framework/yudao-spring-boot-starter-websocket/pom.xml +++ b/yudao-framework/yudao-spring-boot-starter-websocket/pom.xml @@ -30,12 +30,55 @@ --> cn.iocoder.boot yudao-spring-boot-starter-security + provided org.springframework.boot spring-boot-starter-websocket + + + + + cn.iocoder.boot + yudao-spring-boot-starter-security + provided + + + + + cn.iocoder.boot + yudao-spring-boot-starter-mq + + + org.springframework.kafka + spring-kafka + true + + + org.springframework.amqp + spring-rabbit + true + + + org.apache.rocketmq + rocketmq-spring-boot-starter + true + + + + + + cn.iocoder.boot + yudao-spring-boot-starter-biz-tenant + provided + \ No newline at end of file diff --git a/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/config/WebSocketProperties.java b/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/config/WebSocketProperties.java index 7c1bd5abe..aa618fb04 100644 --- a/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/config/WebSocketProperties.java +++ b/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/config/WebSocketProperties.java @@ -4,6 +4,9 @@ import lombok.Data; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.validation.annotation.Validated; +import javax.validation.constraints.NotEmpty; +import javax.validation.constraints.NotNull; + /** * WebSocket 配置项 * @@ -17,6 +20,15 @@ public class WebSocketProperties { /** * WebSocket 的连接路径 */ + @NotEmpty(message = "WebSocket 的连接路径不能为空") private String path = "/ws"; + /** + * 消息发送器的类型 + * + * 可选值:local、redis、rocketmq、kafka、rabbitmq + */ + @NotNull(message = "WebSocket 的消息发送者不能为空") + private String senderType = "local"; + } diff --git a/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/config/YudaoWebSocketAutoConfiguration.java b/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/config/YudaoWebSocketAutoConfiguration.java index e116f3c2c..80692c222 100644 --- a/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/config/YudaoWebSocketAutoConfiguration.java +++ b/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/config/YudaoWebSocketAutoConfiguration.java @@ -1,15 +1,31 @@ package cn.iocoder.yudao.framework.websocket.config; +import cn.iocoder.yudao.framework.mq.redis.core.RedisMQTemplate; import cn.iocoder.yudao.framework.websocket.core.handler.JsonWebSocketMessageHandler; import cn.iocoder.yudao.framework.websocket.core.listener.WebSocketMessageListener; import cn.iocoder.yudao.framework.websocket.core.security.LoginUserHandshakeInterceptor; +import cn.iocoder.yudao.framework.websocket.core.sender.kafka.KafkaWebSocketMessageConsumer; +import cn.iocoder.yudao.framework.websocket.core.sender.kafka.KafkaWebSocketMessageSender; +import cn.iocoder.yudao.framework.websocket.core.sender.local.LocalWebSocketMessageSender; +import cn.iocoder.yudao.framework.websocket.core.sender.rabbitmq.RabbitMQWebSocketMessageConsumer; +import cn.iocoder.yudao.framework.websocket.core.sender.rabbitmq.RabbitMQWebSocketMessageSender; +import cn.iocoder.yudao.framework.websocket.core.sender.redis.RedisWebSocketMessageConsumer; +import cn.iocoder.yudao.framework.websocket.core.sender.redis.RedisWebSocketMessageSender; +import cn.iocoder.yudao.framework.websocket.core.sender.rocketmq.RocketMQWebSocketMessageConsumer; +import cn.iocoder.yudao.framework.websocket.core.sender.rocketmq.RocketMQWebSocketMessageSender; import cn.iocoder.yudao.framework.websocket.core.session.WebSocketSessionHandlerDecorator; import cn.iocoder.yudao.framework.websocket.core.session.WebSocketSessionManager; import cn.iocoder.yudao.framework.websocket.core.session.WebSocketSessionManagerImpl; +import org.apache.rocketmq.spring.core.RocketMQTemplate; +import org.springframework.amqp.core.TopicExchange; +import org.springframework.amqp.rabbit.core.RabbitTemplate; +import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.autoconfigure.AutoConfiguration; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.core.KafkaTemplate; import org.springframework.web.socket.WebSocketHandler; import org.springframework.web.socket.config.annotation.EnableWebSocket; import org.springframework.web.socket.config.annotation.WebSocketConfigurer; @@ -25,7 +41,6 @@ import java.util.List; @AutoConfiguration @EnableWebSocket // 开启 websocket @ConditionalOnProperty(prefix = "yudao.websocket", value = "enable", matchIfMissing = true) // 允许使用 yudao.websocket.enable=false 禁用 websocket - @EnableConfigurationProperties(WebSocketProperties.class) public class YudaoWebSocketAutoConfiguration { @@ -60,4 +75,103 @@ public class YudaoWebSocketAutoConfiguration { return new WebSocketSessionManagerImpl(); } + // ==================== Sender 相关 ==================== + + @Configuration + @ConditionalOnProperty(prefix = "yudao.websocket", name = "sender-type", havingValue = "local", matchIfMissing = true) + public class LocalWebSocketMessageSenderConfiguration { + + @Bean + public LocalWebSocketMessageSender localWebSocketMessageSender(WebSocketSessionManager sessionManager) { + return new LocalWebSocketMessageSender(sessionManager); + } + + } + + @Configuration + @ConditionalOnProperty(prefix = "yudao.websocket", name = "sender-type", havingValue = "redis", matchIfMissing = true) + public class RedisWebSocketMessageSenderConfiguration { + + @Bean + public RedisWebSocketMessageSender redisWebSocketMessageSender(WebSocketSessionManager sessionManager, + RedisMQTemplate redisMQTemplate) { + return new RedisWebSocketMessageSender(sessionManager, redisMQTemplate); + } + + // TODO 芋艿:需要额外删除 YudaoRedisMQAutoConfiguration 的 RedisMessageListenerContainer Bean 上的 @ConditionalOnBean 注解。可能是 spring boot 的 bug! + @Bean + public RedisWebSocketMessageConsumer redisWebSocketMessageConsumer( + RedisWebSocketMessageSender redisWebSocketMessageSender) { + return new RedisWebSocketMessageConsumer(redisWebSocketMessageSender); + } + + } + + @Configuration + @ConditionalOnProperty(prefix = "yudao.websocket", name = "sender-type", havingValue = "rocketmq", matchIfMissing = true) + public class RocketMQWebSocketMessageSenderConfiguration { + + @Bean + public RocketMQWebSocketMessageSender rocketMQWebSocketMessageSender( + WebSocketSessionManager sessionManager, RocketMQTemplate rocketMQTemplate, + @Value("${yudao.websocket.sender-rocketmq.topic}") String topic) { + return new RocketMQWebSocketMessageSender(sessionManager, rocketMQTemplate, topic); + } + + @Bean + public RocketMQWebSocketMessageConsumer rocketMQWebSocketMessageConsumer( + RocketMQWebSocketMessageSender rocketMQWebSocketMessageSender) { + return new RocketMQWebSocketMessageConsumer(rocketMQWebSocketMessageSender); + } + + } + + @Configuration + @ConditionalOnProperty(prefix = "yudao.websocket", name = "sender-type", havingValue = "rabbitmq", matchIfMissing = true) + public class RabbitMQWebSocketMessageSenderConfiguration { + + @Bean + public RabbitMQWebSocketMessageSender rabbitMQWebSocketMessageSender( + WebSocketSessionManager sessionManager, RabbitTemplate rabbitTemplate, + TopicExchange websocketTopicExchange) { + return new RabbitMQWebSocketMessageSender(sessionManager, rabbitTemplate, websocketTopicExchange); + } + + @Bean + public RabbitMQWebSocketMessageConsumer rabbitMQWebSocketMessageConsumer( + RabbitMQWebSocketMessageSender rabbitMQWebSocketMessageSender) { + return new RabbitMQWebSocketMessageConsumer(rabbitMQWebSocketMessageSender); + } + + /** + * 创建 Topic Exchange + */ + @Bean + public TopicExchange websocketTopicExchange(@Value("${yudao.websocket.sender-rabbitmq.exchange}") String exchange) { + return new TopicExchange(exchange, + true, // durable: 是否持久化 + false); // exclusive: 是否排它 + } + + } + + @Configuration + @ConditionalOnProperty(prefix = "yudao.websocket", name = "sender-type", havingValue = "kafka", matchIfMissing = true) + public class KafkaWebSocketMessageSenderConfiguration { + + @Bean + public KafkaWebSocketMessageSender kafkaWebSocketMessageSender( + WebSocketSessionManager sessionManager, KafkaTemplate kafkaTemplate, + @Value("${yudao.websocket.sender-kafka.topic}") String topic) { + return new KafkaWebSocketMessageSender(sessionManager, kafkaTemplate, topic); + } + + @Bean + public KafkaWebSocketMessageConsumer kafkaWebSocketMessageConsumer( + KafkaWebSocketMessageSender kafkaWebSocketMessageSender) { + return new KafkaWebSocketMessageConsumer(kafkaWebSocketMessageSender); + } + + } + } \ No newline at end of file diff --git a/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/handler/JsonWebSocketMessageHandler.java b/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/handler/JsonWebSocketMessageHandler.java index 06bc6c0fb..120f529c2 100644 --- a/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/handler/JsonWebSocketMessageHandler.java +++ b/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/handler/JsonWebSocketMessageHandler.java @@ -3,8 +3,10 @@ package cn.iocoder.yudao.framework.websocket.core.handler; import cn.hutool.core.util.StrUtil; import cn.hutool.core.util.TypeUtil; import cn.iocoder.yudao.framework.common.util.json.JsonUtils; +import cn.iocoder.yudao.framework.tenant.core.util.TenantUtils; import cn.iocoder.yudao.framework.websocket.core.listener.WebSocketMessageListener; import cn.iocoder.yudao.framework.websocket.core.message.JsonWebSocketMessage; +import cn.iocoder.yudao.framework.websocket.core.util.WebSocketFrameworkUtils; import lombok.extern.slf4j.Slf4j; import org.springframework.web.socket.TextMessage; import org.springframework.web.socket.WebSocketHandler; @@ -70,8 +72,9 @@ public class JsonWebSocketMessageHandler extends TextWebSocketHandler { } // 2.3 处理消息 Type type = TypeUtil.getTypeArgument(messageListener.getClass(), 0); - Object messageObj = JsonUtils.parseObject(jsonMessage.getMessage(), type); - messageListener.onMessage(session, messageObj); + Object messageObj = JsonUtils.parseObject(jsonMessage.getContent(), type); + Long tenantId = WebSocketFrameworkUtils.getTenantId(session); + TenantUtils.execute(tenantId, () -> messageListener.onMessage(session, messageObj)); } catch (Throwable ex) { log.error("[handleTextMessage][session({}) message({}) 处理异常]", session.getId(), message.getPayload()); } diff --git a/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/message/JsonWebSocketMessage.java b/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/message/JsonWebSocketMessage.java index 2257760c9..0a55cd691 100644 --- a/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/message/JsonWebSocketMessage.java +++ b/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/message/JsonWebSocketMessage.java @@ -3,13 +3,15 @@ package cn.iocoder.yudao.framework.websocket.core.message; import cn.iocoder.yudao.framework.websocket.core.listener.WebSocketMessageListener; import lombok.Data; +import java.io.Serializable; + /** * JSON 格式的 WebSocket 消息帧 * * @author 芋道源码 */ @Data -public class JsonWebSocketMessage { +public class JsonWebSocketMessage implements Serializable { /** * 消息类型 @@ -22,6 +24,6 @@ public class JsonWebSocketMessage { * * 要求 JSON 对象 */ - private String message; + private String content; } diff --git a/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/sender/AbstractWebSocketMessageSender.java b/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/sender/AbstractWebSocketMessageSender.java new file mode 100644 index 000000000..4e0db44c9 --- /dev/null +++ b/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/sender/AbstractWebSocketMessageSender.java @@ -0,0 +1,104 @@ +package cn.iocoder.yudao.framework.websocket.core.sender; + +import cn.hutool.core.collection.CollUtil; +import cn.hutool.core.util.StrUtil; +import cn.iocoder.yudao.framework.common.util.json.JsonUtils; +import cn.iocoder.yudao.framework.websocket.core.message.JsonWebSocketMessage; +import cn.iocoder.yudao.framework.websocket.core.session.WebSocketSessionManager; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.web.socket.TextMessage; +import org.springframework.web.socket.WebSocketSession; + +import java.io.IOException; +import java.util.Collection; +import java.util.Collections; +import java.util.List; + +/** + * WebSocketMessageSender 实现类 + * + * @author 芋道源码 + */ +@Slf4j +@RequiredArgsConstructor +public abstract class AbstractWebSocketMessageSender implements WebSocketMessageSender { + + private final WebSocketSessionManager sessionManager; + + @Override + public void send(Integer userType, Long userId, String messageType, String messageContent) { + send(null, userType, userId, messageType, messageContent); + } + + @Override + public void send(Integer userType, String messageType, String messageContent) { + send(null, userType, null, messageType, messageContent); + } + + @Override + public void send(String sessionId, String messageType, String messageContent) { + send(sessionId, null, null, messageType, messageContent); + } + + /** + * 发送消息 + * + * @param sessionId Session 编号 + * @param userType 用户类型 + * @param userId 用户编号 + * @param messageType 消息类型 + * @param messageContent 消息内容 + */ + public void send(String sessionId, Integer userType, Long userId, String messageType, String messageContent) { + // 1. 获得 Session 列表 + List sessions = Collections.emptyList(); + if (StrUtil.isNotEmpty(sessionId)) { + WebSocketSession session = sessionManager.getSession(sessionId); + if (session != null) { + sessions = Collections.singletonList(session); + } + } else if (userType != null && userId != null) { + sessions = (List) sessionManager.getSessionList(userType, userId); + } else if (userType != null) { + sessions = (List) sessionManager.getSessionList(userType); + } + if (CollUtil.isEmpty(sessions)) { + log.info("[send][sessionId({}) userType({}) userId({}) messageType({}) messageContent({}) 未匹配到会话]", + sessionId, userType, userId, messageType, messageContent); + } + // 2. 执行发送 + doSend(sessions, messageType, messageContent); + } + + /** + * 发送消息的具体实现 + * + * @param sessions Session 列表 + * @param messageType 消息类型 + * @param messageContent 消息内容 + */ + public void doSend(Collection sessions, String messageType, String messageContent) { + JsonWebSocketMessage message = new JsonWebSocketMessage().setType(messageType).setContent(messageContent); + String payload = JsonUtils.toJsonString(message); // 关键,使用 JSON 序列化 + sessions.forEach(session -> { + // 1. 各种校验,保证 Session 可以被发送 + if (session == null) { + log.error("[doSend][session 为空, message({})]", message); + return; + } + if (!session.isOpen()) { + log.error("[doSend][session({}) 已关闭, message({})]", session.getId(), message); + return; + } + // 2. 执行发送 + try { + session.sendMessage(new TextMessage(payload)); + log.info("[doSend][session({}) 发送消息成功,message({})]", session.getId(), message); + } catch (IOException ex) { + log.error("[doSend][session({}) 发送消息失败,message({})]", session.getId(), message, ex); + } + }); + } + +} diff --git a/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/sender/WebSocketMessageSender.java b/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/sender/WebSocketMessageSender.java new file mode 100644 index 000000000..9f75ad52d --- /dev/null +++ b/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/sender/WebSocketMessageSender.java @@ -0,0 +1,52 @@ +package cn.iocoder.yudao.framework.websocket.core.sender; + +import cn.iocoder.yudao.framework.common.util.json.JsonUtils; + +/** + * WebSocket 消息的发送器接口 + * + * @author 芋道源码 + */ +public interface WebSocketMessageSender { + + /** + * 发送消息给指定用户 + * + * @param userType 用户类型 + * @param userId 用户编号 + * @param messageType 消息类型 + * @param messageContent 消息内容,JSON 格式 + */ + void send(Integer userType, Long userId, String messageType, String messageContent); + + /** + * 发送消息给指定用户类型 + * + * @param userType 用户类型 + * @param messageType 消息类型 + * @param messageContent 消息内容,JSON 格式 + */ + void send(Integer userType, String messageType, String messageContent); + + /** + * 发送消息给指定 Session + * + * @param sessionId Session 编号 + * @param messageType 消息类型 + * @param messageContent 消息内容,JSON 格式 + */ + void send(String sessionId, String messageType, String messageContent); + + default void sendObject(Integer userType, Long userId, String messageType, Object messageContent) { + send(userType, userId, messageType, JsonUtils.toJsonString(messageContent)); + } + + default void sendObject(Integer userType, String messageType, Object messageContent) { + send(userType, messageType, JsonUtils.toJsonString(messageContent)); + } + + default void sendObject(String sessionId, String messageType, Object messageContent) { + send(sessionId, messageType, JsonUtils.toJsonString(messageContent)); + } + +} diff --git a/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/sender/kafka/KafkaWebSocketMessage.java b/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/sender/kafka/KafkaWebSocketMessage.java new file mode 100644 index 000000000..5a4cf5311 --- /dev/null +++ b/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/sender/kafka/KafkaWebSocketMessage.java @@ -0,0 +1,35 @@ +package cn.iocoder.yudao.framework.websocket.core.sender.kafka; + +import lombok.Data; + +/** + * Kafka 广播 WebSocket 的消息 + * + * @author 芋道源码 + */ +@Data +public class KafkaWebSocketMessage { + + /** + * Session 编号 + */ + private String sessionId; + /** + * 用户类型 + */ + private Integer userType; + /** + * 用户编号 + */ + private Long userId; + + /** + * 消息类型 + */ + private String messageType; + /** + * 消息内容 + */ + private String messageContent; + +} diff --git a/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/sender/kafka/KafkaWebSocketMessageConsumer.java b/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/sender/kafka/KafkaWebSocketMessageConsumer.java new file mode 100644 index 000000000..201e65d81 --- /dev/null +++ b/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/sender/kafka/KafkaWebSocketMessageConsumer.java @@ -0,0 +1,28 @@ +package cn.iocoder.yudao.framework.websocket.core.sender.kafka; + +import lombok.RequiredArgsConstructor; +import org.springframework.amqp.rabbit.annotation.RabbitHandler; +import org.springframework.kafka.annotation.KafkaListener; + +/** + * {@link KafkaWebSocketMessage} 广播消息的消费者,真正把消息发送出去 + * + * @author 芋道源码 + */ +@RequiredArgsConstructor +public class KafkaWebSocketMessageConsumer { + + private final KafkaWebSocketMessageSender rabbitMQWebSocketMessageSender; + + @RabbitHandler + @KafkaListener( + topics = "${yudao.websocket.sender-kafka.topic}", + // 在 Group 上,使用 UUID 生成其后缀。这样,启动的 Consumer 的 Group 不同,以达到广播消费的目的 + groupId = "${yudao.websocket.sender-kafka.consumer-group}" + "-" + "#{T(java.util.UUID).randomUUID()}") + public void onMessage(KafkaWebSocketMessage message) { + rabbitMQWebSocketMessageSender.send(message.getSessionId(), + message.getUserType(), message.getUserId(), + message.getMessageType(), message.getMessageContent()); + } + +} diff --git a/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/sender/kafka/KafkaWebSocketMessageSender.java b/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/sender/kafka/KafkaWebSocketMessageSender.java new file mode 100644 index 000000000..47bb598ad --- /dev/null +++ b/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/sender/kafka/KafkaWebSocketMessageSender.java @@ -0,0 +1,67 @@ +package cn.iocoder.yudao.framework.websocket.core.sender.kafka; + +import cn.iocoder.yudao.framework.websocket.core.sender.AbstractWebSocketMessageSender; +import cn.iocoder.yudao.framework.websocket.core.sender.WebSocketMessageSender; +import cn.iocoder.yudao.framework.websocket.core.session.WebSocketSessionManager; +import lombok.extern.slf4j.Slf4j; +import org.springframework.kafka.core.KafkaTemplate; + +import java.util.concurrent.ExecutionException; + +/** + * 基于 Kafka 的 {@link WebSocketMessageSender} 实现类 + * + * @author 芋道源码 + */ +@Slf4j +public class KafkaWebSocketMessageSender extends AbstractWebSocketMessageSender { + + private final KafkaTemplate kafkaTemplate; + + private final String topic; + + public KafkaWebSocketMessageSender(WebSocketSessionManager sessionManager, + KafkaTemplate kafkaTemplate, + String topic) { + super(sessionManager); + this.kafkaTemplate = kafkaTemplate; + this.topic = topic; + } + + @Override + public void send(Integer userType, Long userId, String messageType, String messageContent) { + sendKafkaMessage(null, userId, userType, messageType, messageContent); + } + + @Override + public void send(Integer userType, String messageType, String messageContent) { + sendKafkaMessage(null, null, userType, messageType, messageContent); + } + + @Override + public void send(String sessionId, String messageType, String messageContent) { + sendKafkaMessage(sessionId, null, null, messageType, messageContent); + } + + /** + * 通过 Kafka 广播消息 + * + * @param sessionId Session 编号 + * @param userId 用户编号 + * @param userType 用户类型 + * @param messageType 消息类型 + * @param messageContent 消息内容 + */ + private void sendKafkaMessage(String sessionId, Long userId, Integer userType, + String messageType, String messageContent) { + KafkaWebSocketMessage mqMessage = new KafkaWebSocketMessage() + .setSessionId(sessionId).setUserId(userId).setUserType(userType) + .setMessageType(messageType).setMessageContent(messageContent); + try { + kafkaTemplate.send(topic, mqMessage).get(); + } catch (InterruptedException | ExecutionException e) { + log.error("[sendKafkaMessage][发送消息({}) 到 Kafka 失败]", mqMessage, e); + } + } + +} diff --git a/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/sender/local/LocalWebSocketMessageSender.java b/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/sender/local/LocalWebSocketMessageSender.java new file mode 100644 index 000000000..66640ef34 --- /dev/null +++ b/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/sender/local/LocalWebSocketMessageSender.java @@ -0,0 +1,20 @@ +package cn.iocoder.yudao.framework.websocket.core.sender.local; + +import cn.iocoder.yudao.framework.websocket.core.sender.AbstractWebSocketMessageSender; +import cn.iocoder.yudao.framework.websocket.core.sender.WebSocketMessageSender; +import cn.iocoder.yudao.framework.websocket.core.session.WebSocketSessionManager; + +/** + * 本地的 {@link WebSocketMessageSender} 实现类 + * + * 注意:仅仅适合单机场景!!! + * + * @author 芋道源码 + */ +public class LocalWebSocketMessageSender extends AbstractWebSocketMessageSender { + + public LocalWebSocketMessageSender(WebSocketSessionManager sessionManager) { + super(sessionManager); + } + +} diff --git a/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/sender/rabbitmq/RabbitMQWebSocketMessage.java b/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/sender/rabbitmq/RabbitMQWebSocketMessage.java new file mode 100644 index 000000000..80a4bc176 --- /dev/null +++ b/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/sender/rabbitmq/RabbitMQWebSocketMessage.java @@ -0,0 +1,37 @@ +package cn.iocoder.yudao.framework.websocket.core.sender.rabbitmq; + +import lombok.Data; + +import java.io.Serializable; + +/** + * RabbitMQ 广播 WebSocket 的消息 + * + * @author 芋道源码 + */ +@Data +public class RabbitMQWebSocketMessage implements Serializable { + + /** + * Session 编号 + */ + private String sessionId; + /** + * 用户类型 + */ + private Integer userType; + /** + * 用户编号 + */ + private Long userId; + + /** + * 消息类型 + */ + private String messageType; + /** + * 消息内容 + */ + private String messageContent; + +} diff --git a/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/sender/rabbitmq/RabbitMQWebSocketMessageConsumer.java b/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/sender/rabbitmq/RabbitMQWebSocketMessageConsumer.java new file mode 100644 index 000000000..59e382421 --- /dev/null +++ b/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/sender/rabbitmq/RabbitMQWebSocketMessageConsumer.java @@ -0,0 +1,39 @@ +package cn.iocoder.yudao.framework.websocket.core.sender.rabbitmq; + +import lombok.RequiredArgsConstructor; +import org.springframework.amqp.core.ExchangeTypes; +import org.springframework.amqp.rabbit.annotation.*; + +/** + * {@link RabbitMQWebSocketMessage} 广播消息的消费者,真正把消息发送出去 + * + * @author 芋道源码 + */ +@RabbitListener( + bindings = @QueueBinding( + value = @Queue( + // 在 Queue 的名字上,使用 UUID 生成其后缀。这样,启动的 Consumer 的 Queue 不同,以达到广播消费的目的 + name = "${yudao.websocket.sender-rabbitmq.queue}" + "-" + "#{T(java.util.UUID).randomUUID()}", + // Consumer 关闭时,该队列就可以被自动删除了 + autoDelete = "true" + ), + exchange = @Exchange( + name = "${yudao.websocket.sender-rabbitmq.exchange}", + type = ExchangeTypes.TOPIC, + declare = "false" + ) + ) +) +@RequiredArgsConstructor +public class RabbitMQWebSocketMessageConsumer { + + private final RabbitMQWebSocketMessageSender rabbitMQWebSocketMessageSender; + + @RabbitHandler + public void onMessage(RabbitMQWebSocketMessage message) { + rabbitMQWebSocketMessageSender.send(message.getSessionId(), + message.getUserType(), message.getUserId(), + message.getMessageType(), message.getMessageContent()); + } + +} diff --git a/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/sender/rabbitmq/RabbitMQWebSocketMessageSender.java b/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/sender/rabbitmq/RabbitMQWebSocketMessageSender.java new file mode 100644 index 000000000..065a5d6bf --- /dev/null +++ b/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/sender/rabbitmq/RabbitMQWebSocketMessageSender.java @@ -0,0 +1,62 @@ +package cn.iocoder.yudao.framework.websocket.core.sender.rabbitmq; + +import cn.iocoder.yudao.framework.websocket.core.sender.AbstractWebSocketMessageSender; +import cn.iocoder.yudao.framework.websocket.core.sender.WebSocketMessageSender; +import cn.iocoder.yudao.framework.websocket.core.session.WebSocketSessionManager; +import lombok.extern.slf4j.Slf4j; +import org.springframework.amqp.core.TopicExchange; +import org.springframework.amqp.rabbit.core.RabbitTemplate; + +/** + * 基于 RabbitMQ 的 {@link WebSocketMessageSender} 实现类 + * + * @author 芋道源码 + */ +@Slf4j +public class RabbitMQWebSocketMessageSender extends AbstractWebSocketMessageSender { + + private final RabbitTemplate rabbitTemplate; + + private final TopicExchange topicExchange; + + public RabbitMQWebSocketMessageSender(WebSocketSessionManager sessionManager, + RabbitTemplate rabbitTemplate, + TopicExchange topicExchange) { + super(sessionManager); + this.rabbitTemplate = rabbitTemplate; + this.topicExchange = topicExchange; + } + + @Override + public void send(Integer userType, Long userId, String messageType, String messageContent) { + sendRabbitMQMessage(null, userId, userType, messageType, messageContent); + } + + @Override + public void send(Integer userType, String messageType, String messageContent) { + sendRabbitMQMessage(null, null, userType, messageType, messageContent); + } + + @Override + public void send(String sessionId, String messageType, String messageContent) { + sendRabbitMQMessage(sessionId, null, null, messageType, messageContent); + } + + /** + * 通过 RabbitMQ 广播消息 + * + * @param sessionId Session 编号 + * @param userId 用户编号 + * @param userType 用户类型 + * @param messageType 消息类型 + * @param messageContent 消息内容 + */ + private void sendRabbitMQMessage(String sessionId, Long userId, Integer userType, + String messageType, String messageContent) { + RabbitMQWebSocketMessage mqMessage = new RabbitMQWebSocketMessage() + .setSessionId(sessionId).setUserId(userId).setUserType(userType) + .setMessageType(messageType).setMessageContent(messageContent); + rabbitTemplate.convertAndSend(topicExchange.getName(), null, mqMessage); + } + +} diff --git a/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/sender/redis/RedisWebSocketMessage.java b/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/sender/redis/RedisWebSocketMessage.java new file mode 100644 index 000000000..fb9ea0ca0 --- /dev/null +++ b/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/sender/redis/RedisWebSocketMessage.java @@ -0,0 +1,34 @@ +package cn.iocoder.yudao.framework.websocket.core.sender.redis; + +import cn.iocoder.yudao.framework.mq.redis.core.pubsub.AbstractRedisChannelMessage; +import lombok.Data; + +/** + * Redis 广播 WebSocket 的消息 + */ +@Data +public class RedisWebSocketMessage extends AbstractRedisChannelMessage { + + /** + * Session 编号 + */ + private String sessionId; + /** + * 用户类型 + */ + private Integer userType; + /** + * 用户编号 + */ + private Long userId; + + /** + * 消息类型 + */ + private String messageType; + /** + * 消息内容 + */ + private String messageContent; + +} diff --git a/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/sender/redis/RedisWebSocketMessageConsumer.java b/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/sender/redis/RedisWebSocketMessageConsumer.java new file mode 100644 index 000000000..abce00695 --- /dev/null +++ b/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/sender/redis/RedisWebSocketMessageConsumer.java @@ -0,0 +1,23 @@ +package cn.iocoder.yudao.framework.websocket.core.sender.redis; + +import cn.iocoder.yudao.framework.mq.redis.core.pubsub.AbstractRedisChannelMessageListener; +import lombok.RequiredArgsConstructor; + +/** + * {@link RedisWebSocketMessage} 广播消息的消费者,真正把消息发送出去 + * + * @author 芋道源码 + */ +@RequiredArgsConstructor +public class RedisWebSocketMessageConsumer extends AbstractRedisChannelMessageListener { + + private final RedisWebSocketMessageSender redisWebSocketMessageSender; + + @Override + public void onMessage(RedisWebSocketMessage message) { + redisWebSocketMessageSender.send(message.getSessionId(), + message.getUserType(), message.getUserId(), + message.getMessageType(), message.getMessageContent()); + } + +} diff --git a/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/sender/redis/RedisWebSocketMessageSender.java b/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/sender/redis/RedisWebSocketMessageSender.java new file mode 100644 index 000000000..d6004ac6d --- /dev/null +++ b/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/sender/redis/RedisWebSocketMessageSender.java @@ -0,0 +1,57 @@ +package cn.iocoder.yudao.framework.websocket.core.sender.redis; + +import cn.iocoder.yudao.framework.mq.redis.core.RedisMQTemplate; +import cn.iocoder.yudao.framework.websocket.core.sender.AbstractWebSocketMessageSender; +import cn.iocoder.yudao.framework.websocket.core.sender.WebSocketMessageSender; +import cn.iocoder.yudao.framework.websocket.core.session.WebSocketSessionManager; +import lombok.extern.slf4j.Slf4j; + +/** + * 基于 Redis 的 {@link WebSocketMessageSender} 实现类 + * + * @author 芋道源码 + */ +@Slf4j +public class RedisWebSocketMessageSender extends AbstractWebSocketMessageSender { + + private final RedisMQTemplate redisMQTemplate; + + public RedisWebSocketMessageSender(WebSocketSessionManager sessionManager, + RedisMQTemplate redisMQTemplate) { + super(sessionManager); + this.redisMQTemplate = redisMQTemplate; + } + + @Override + public void send(Integer userType, Long userId, String messageType, String messageContent) { + sendRedisMessage(null, userId, userType, messageType, messageContent); + } + + @Override + public void send(Integer userType, String messageType, String messageContent) { + sendRedisMessage(null, null, userType, messageType, messageContent); + } + + @Override + public void send(String sessionId, String messageType, String messageContent) { + sendRedisMessage(sessionId, null, null, messageType, messageContent); + } + + /** + * 通过 Redis 广播消息 + * + * @param sessionId Session 编号 + * @param userId 用户编号 + * @param userType 用户类型 + * @param messageType 消息类型 + * @param messageContent 消息内容 + */ + private void sendRedisMessage(String sessionId, Long userId, Integer userType, + String messageType, String messageContent) { + RedisWebSocketMessage mqMessage = new RedisWebSocketMessage() + .setSessionId(sessionId).setUserId(userId).setUserType(userType) + .setMessageType(messageType).setMessageContent(messageContent); + redisMQTemplate.send(mqMessage); + } + +} diff --git a/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/sender/rocketmq/RocketMQWebSocketMessage.java b/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/sender/rocketmq/RocketMQWebSocketMessage.java new file mode 100644 index 000000000..91570e3e3 --- /dev/null +++ b/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/sender/rocketmq/RocketMQWebSocketMessage.java @@ -0,0 +1,35 @@ +package cn.iocoder.yudao.framework.websocket.core.sender.rocketmq; + +import lombok.Data; + +/** + * RocketMQ 广播 WebSocket 的消息 + * + * @author 芋道源码 + */ +@Data +public class RocketMQWebSocketMessage { + + /** + * Session 编号 + */ + private String sessionId; + /** + * 用户类型 + */ + private Integer userType; + /** + * 用户编号 + */ + private Long userId; + + /** + * 消息类型 + */ + private String messageType; + /** + * 消息内容 + */ + private String messageContent; + +} diff --git a/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/sender/rocketmq/RocketMQWebSocketMessageConsumer.java b/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/sender/rocketmq/RocketMQWebSocketMessageConsumer.java new file mode 100644 index 000000000..ab2e2c4dc --- /dev/null +++ b/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/sender/rocketmq/RocketMQWebSocketMessageConsumer.java @@ -0,0 +1,30 @@ +package cn.iocoder.yudao.framework.websocket.core.sender.rocketmq; + +import lombok.RequiredArgsConstructor; +import org.apache.rocketmq.spring.annotation.MessageModel; +import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; +import org.apache.rocketmq.spring.core.RocketMQListener; + +/** + * {@link RocketMQWebSocketMessage} 广播消息的消费者,真正把消息发送出去 + * + * @author 芋道源码 + */ +@RocketMQMessageListener( // 重点:添加 @RocketMQMessageListener 注解,声明消费的 topic + topic = "${yudao.websocket.sender-rocketmq.topic}", + consumerGroup = "${yudao.websocket.sender-rocketmq.consumer-group}", + messageModel = MessageModel.BROADCASTING // 设置为广播模式,保证每个实例都能收到消息 +) +@RequiredArgsConstructor +public class RocketMQWebSocketMessageConsumer implements RocketMQListener { + + private final RocketMQWebSocketMessageSender rocketMQWebSocketMessageSender; + + @Override + public void onMessage(RocketMQWebSocketMessage message) { + rocketMQWebSocketMessageSender.send(message.getSessionId(), + message.getUserType(), message.getUserId(), + message.getMessageType(), message.getMessageContent()); + } + +} diff --git a/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/sender/rocketmq/RocketMQWebSocketMessageSender.java b/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/sender/rocketmq/RocketMQWebSocketMessageSender.java new file mode 100644 index 000000000..ed059bac4 --- /dev/null +++ b/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/sender/rocketmq/RocketMQWebSocketMessageSender.java @@ -0,0 +1,61 @@ +package cn.iocoder.yudao.framework.websocket.core.sender.rocketmq; + +import cn.iocoder.yudao.framework.websocket.core.sender.AbstractWebSocketMessageSender; +import cn.iocoder.yudao.framework.websocket.core.sender.WebSocketMessageSender; +import cn.iocoder.yudao.framework.websocket.core.session.WebSocketSessionManager; +import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.spring.core.RocketMQTemplate; + +/** + * 基于 RocketMQ 的 {@link WebSocketMessageSender} 实现类 + * + * @author 芋道源码 + */ +@Slf4j +public class RocketMQWebSocketMessageSender extends AbstractWebSocketMessageSender { + + private final RocketMQTemplate rocketMQTemplate; + + private final String topic; + + public RocketMQWebSocketMessageSender(WebSocketSessionManager sessionManager, + RocketMQTemplate rocketMQTemplate, + String topic) { + super(sessionManager); + this.rocketMQTemplate = rocketMQTemplate; + this.topic = topic; + } + + @Override + public void send(Integer userType, Long userId, String messageType, String messageContent) { + sendRocketMQMessage(null, userId, userType, messageType, messageContent); + } + + @Override + public void send(Integer userType, String messageType, String messageContent) { + sendRocketMQMessage(null, null, userType, messageType, messageContent); + } + + @Override + public void send(String sessionId, String messageType, String messageContent) { + sendRocketMQMessage(sessionId, null, null, messageType, messageContent); + } + + /** + * 通过 RocketMQ 广播消息 + * + * @param sessionId Session 编号 + * @param userId 用户编号 + * @param userType 用户类型 + * @param messageType 消息类型 + * @param messageContent 消息内容 + */ + private void sendRocketMQMessage(String sessionId, Long userId, Integer userType, + String messageType, String messageContent) { + RocketMQWebSocketMessage mqMessage = new RocketMQWebSocketMessage() + .setSessionId(sessionId).setUserId(userId).setUserType(userType) + .setMessageType(messageType).setMessageContent(messageContent); + rocketMQTemplate.syncSend(topic, mqMessage); + } + +} diff --git a/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/session/WebSocketSessionManagerImpl.java b/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/session/WebSocketSessionManagerImpl.java index 6004cc5da..aca572f90 100644 --- a/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/session/WebSocketSessionManagerImpl.java +++ b/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/session/WebSocketSessionManagerImpl.java @@ -2,10 +2,14 @@ package cn.iocoder.yudao.framework.websocket.core.session; import cn.hutool.core.collection.CollUtil; import cn.iocoder.yudao.framework.security.core.LoginUser; +import cn.iocoder.yudao.framework.tenant.core.context.TenantContextHolder; import cn.iocoder.yudao.framework.websocket.core.util.WebSocketFrameworkUtils; import org.springframework.web.socket.WebSocketSession; -import java.util.*; +import java.util.ArrayList; +import java.util.Collection; +import java.util.LinkedList; +import java.util.List; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CopyOnWriteArrayList; @@ -91,10 +95,18 @@ public class WebSocketSessionManagerImpl implements WebSocketSessionManager { return new ArrayList<>(); } LinkedList result = new LinkedList<>(); // 避免扩容 + Long contextTenantId = TenantContextHolder.getTenantId(); for (List sessions : userSessionsMap.values()) { - if (CollUtil.isNotEmpty(sessions)) { + if (CollUtil.isEmpty(sessions)) { continue; } + // 特殊:如果租户不匹配,则直接排除 + if (contextTenantId != null) { + Long userTenantId = WebSocketFrameworkUtils.getTenantId(sessions.get(0)); + if (!contextTenantId.equals(userTenantId)) { + continue; + } + } result.addAll(sessions); } return result; diff --git a/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/util/WebSocketFrameworkUtils.java b/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/util/WebSocketFrameworkUtils.java index a77028df2..58cdedc29 100644 --- a/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/util/WebSocketFrameworkUtils.java +++ b/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/util/WebSocketFrameworkUtils.java @@ -1,7 +1,6 @@ package cn.iocoder.yudao.framework.websocket.core.util; import cn.iocoder.yudao.framework.security.core.LoginUser; -import org.springframework.lang.Nullable; import org.springframework.web.socket.WebSocketSession; import java.util.Map; @@ -39,7 +38,6 @@ public class WebSocketFrameworkUtils { * * @return 用户编号 */ - @Nullable public static Long getLoginUserId(WebSocketSession session) { LoginUser loginUser = getLoginUser(session); return loginUser != null ? loginUser.getId() : null; @@ -50,10 +48,20 @@ public class WebSocketFrameworkUtils { * * @return 用户编号 */ - @Nullable public static Integer getLoginUserType(WebSocketSession session) { LoginUser loginUser = getLoginUser(session); return loginUser != null ? loginUser.getUserType() : null; } + /** + * 获得当前用户的租户编号 + * + * @param session Session + * @return 租户编号 + */ + public static Long getTenantId(WebSocketSession session) { + LoginUser loginUser = getLoginUser(session); + return loginUser != null ? loginUser.getTenantId() : null; + } + } diff --git a/yudao-framework/yudao-spring-boot-starter-websocket/《芋道 Spring Boot WebSocket 入门》.md b/yudao-framework/yudao-spring-boot-starter-websocket/《芋道 Spring Boot WebSocket 入门》.md new file mode 100644 index 000000000..8df5a7758 --- /dev/null +++ b/yudao-framework/yudao-spring-boot-starter-websocket/《芋道 Spring Boot WebSocket 入门》.md @@ -0,0 +1 @@ + diff --git a/yudao-module-infra/yudao-module-infra-api/src/main/java/cn/iocoder/yudao/module/infra/api/websocket/WebSocketSenderApi.java b/yudao-module-infra/yudao-module-infra-api/src/main/java/cn/iocoder/yudao/module/infra/api/websocket/WebSocketSenderApi.java new file mode 100644 index 000000000..38582c2f3 --- /dev/null +++ b/yudao-module-infra/yudao-module-infra-api/src/main/java/cn/iocoder/yudao/module/infra/api/websocket/WebSocketSenderApi.java @@ -0,0 +1,54 @@ +package cn.iocoder.yudao.module.infra.api.websocket; + +import cn.iocoder.yudao.framework.common.util.json.JsonUtils; + +/** + * WebSocket 发送器的 API 接口 + * + * 对 WebSocketMessageSender 进行封装,提供给其它模块使用 + * + * @author 芋道源码 + */ +public interface WebSocketSenderApi { + + /** + * 发送消息给指定用户 + * + * @param userType 用户类型 + * @param userId 用户编号 + * @param messageType 消息类型 + * @param messageContent 消息内容,JSON 格式 + */ + void send(Integer userType, Long userId, String messageType, String messageContent); + + /** + * 发送消息给指定用户类型 + * + * @param userType 用户类型 + * @param messageType 消息类型 + * @param messageContent 消息内容,JSON 格式 + */ + void send(Integer userType, String messageType, String messageContent); + + /** + * 发送消息给指定 Session + * + * @param sessionId Session 编号 + * @param messageType 消息类型 + * @param messageContent 消息内容,JSON 格式 + */ + void send(String sessionId, String messageType, String messageContent); + + default void sendObject(Integer userType, Long userId, String messageType, Object messageContent) { + send(userType, userId, messageType, JsonUtils.toJsonString(messageContent)); + } + + default void sendObject(Integer userType, String messageType, Object messageContent) { + send(userType, messageType, JsonUtils.toJsonString(messageContent)); + } + + default void sendObject(String sessionId, String messageType, Object messageContent) { + send(sessionId, messageType, JsonUtils.toJsonString(messageContent)); + } + +} diff --git a/yudao-module-infra/yudao-module-infra-biz/src/main/java/cn/iocoder/yudao/module/infra/api/websocket/WebSocketSenderApiImpl.java b/yudao-module-infra/yudao-module-infra-biz/src/main/java/cn/iocoder/yudao/module/infra/api/websocket/WebSocketSenderApiImpl.java new file mode 100644 index 000000000..046cd2fc1 --- /dev/null +++ b/yudao-module-infra/yudao-module-infra-biz/src/main/java/cn/iocoder/yudao/module/infra/api/websocket/WebSocketSenderApiImpl.java @@ -0,0 +1,34 @@ +package cn.iocoder.yudao.module.infra.api.websocket; + +import cn.iocoder.yudao.framework.websocket.core.sender.WebSocketMessageSender; +import org.springframework.stereotype.Component; + +import javax.annotation.Resource; + +/** + * WebSocket 发送器的 API 实现类 + * + * @author 芋道源码 + */ +@Component +public class WebSocketSenderApiImpl implements WebSocketSenderApi { + + @Resource + private WebSocketMessageSender webSocketMessageSender; + + @Override + public void send(Integer userType, Long userId, String messageType, String messageContent) { + webSocketMessageSender.send(userType, userId, messageType, messageContent); + } + + @Override + public void send(Integer userType, String messageType, String messageContent) { + webSocketMessageSender.send(userType, messageType, messageContent); + } + + @Override + public void send(String sessionId, String messageType, String messageContent) { + webSocketMessageSender.send(sessionId, messageType, messageContent); + } + +} diff --git a/yudao-module-infra/yudao-module-infra-biz/src/main/java/cn/iocoder/yudao/module/infra/websocket/DemoWebSocketMessageListener.java b/yudao-module-infra/yudao-module-infra-biz/src/main/java/cn/iocoder/yudao/module/infra/websocket/DemoWebSocketMessageListener.java new file mode 100644 index 000000000..9ccf6070e --- /dev/null +++ b/yudao-module-infra/yudao-module-infra-biz/src/main/java/cn/iocoder/yudao/module/infra/websocket/DemoWebSocketMessageListener.java @@ -0,0 +1,48 @@ +package cn.iocoder.yudao.module.infra.websocket; + +import cn.iocoder.yudao.framework.common.enums.UserTypeEnum; +import cn.iocoder.yudao.framework.websocket.core.listener.WebSocketMessageListener; +import cn.iocoder.yudao.framework.websocket.core.sender.WebSocketMessageSender; +import cn.iocoder.yudao.framework.websocket.core.util.WebSocketFrameworkUtils; +import cn.iocoder.yudao.module.infra.websocket.message.DemoReceiveMessage; +import cn.iocoder.yudao.module.infra.websocket.message.DemoSendMessage; +import org.springframework.stereotype.Component; +import org.springframework.web.socket.WebSocketSession; + +import javax.annotation.Resource; + +/** + * WebSocket 示例:单发消息 + * + * @author 芋道源码 + */ +@Component +public class DemoWebSocketMessageListener implements WebSocketMessageListener { + + @Resource + private WebSocketMessageSender webSocketMessageSender; + + @Override + public void onMessage(WebSocketSession session, DemoSendMessage message) { + Long fromUserId = WebSocketFrameworkUtils.getLoginUserId(session); + // 情况一:单发 + if (message.getToUserId() != null) { + DemoReceiveMessage toMessage = new DemoReceiveMessage().setFromUserId(fromUserId) + .setText(message.getText()).setSingle(true); + webSocketMessageSender.sendObject(UserTypeEnum.ADMIN.getValue(), message.getToUserId(), // 给指定用户 + "demo-message-receive", toMessage); + return; + } + // 情况二:群发 + DemoReceiveMessage toMessage = new DemoReceiveMessage().setFromUserId(fromUserId) + .setText(message.getText()).setSingle(false); + webSocketMessageSender.sendObject(UserTypeEnum.ADMIN.getValue(), // 给所有用户 + "demo-message-receive", toMessage); + } + + @Override + public String getType() { + return "demo-message-send"; + } + +} diff --git a/yudao-module-infra/yudao-module-infra-biz/src/main/java/cn/iocoder/yudao/module/infra/websocket/message/DemoReceiveMessage.java b/yudao-module-infra/yudao-module-infra-biz/src/main/java/cn/iocoder/yudao/module/infra/websocket/message/DemoReceiveMessage.java new file mode 100644 index 000000000..03a246cf9 --- /dev/null +++ b/yudao-module-infra/yudao-module-infra-biz/src/main/java/cn/iocoder/yudao/module/infra/websocket/message/DemoReceiveMessage.java @@ -0,0 +1,27 @@ +package cn.iocoder.yudao.module.infra.websocket.message; + +import lombok.Data; + +/** + * 示例:server -> client 同步消息 + * + * @author 芋道源码 + */ +@Data +public class DemoReceiveMessage { + + /** + * 接收人的编号 + */ + private Long fromUserId; + /** + * 内容 + */ + private String text; + + /** + * 是否单聊 + */ + private Boolean single; + +} diff --git a/yudao-module-infra/yudao-module-infra-biz/src/main/java/cn/iocoder/yudao/module/infra/websocket/message/DemoSendMessage.java b/yudao-module-infra/yudao-module-infra-biz/src/main/java/cn/iocoder/yudao/module/infra/websocket/message/DemoSendMessage.java new file mode 100644 index 000000000..f0c14f5d3 --- /dev/null +++ b/yudao-module-infra/yudao-module-infra-biz/src/main/java/cn/iocoder/yudao/module/infra/websocket/message/DemoSendMessage.java @@ -0,0 +1,24 @@ +package cn.iocoder.yudao.module.infra.websocket.message; + +import lombok.Data; + +/** + * 示例:client -> server 发送消息 + * + * @author 芋道源码 + */ +@Data +public class DemoSendMessage { + + /** + * 发送给谁 + * + * 如果为空,说明发送给所有人 + */ + private Long toUserId; + /** + * 内容 + */ + private String text; + +} diff --git a/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/controller/admin/notice/NoticeController.java b/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/controller/admin/notice/NoticeController.java index 0e1957785..5a566702f 100644 --- a/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/controller/admin/notice/NoticeController.java +++ b/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/controller/admin/notice/NoticeController.java @@ -1,12 +1,16 @@ package cn.iocoder.yudao.module.system.controller.admin.notice; +import cn.hutool.core.lang.Assert; +import cn.iocoder.yudao.framework.common.enums.UserTypeEnum; import cn.iocoder.yudao.framework.common.pojo.CommonResult; import cn.iocoder.yudao.framework.common.pojo.PageResult; +import cn.iocoder.yudao.module.infra.api.websocket.WebSocketSenderApi; import cn.iocoder.yudao.module.system.controller.admin.notice.vo.NoticeCreateReqVO; import cn.iocoder.yudao.module.system.controller.admin.notice.vo.NoticePageReqVO; import cn.iocoder.yudao.module.system.controller.admin.notice.vo.NoticeRespVO; import cn.iocoder.yudao.module.system.controller.admin.notice.vo.NoticeUpdateReqVO; import cn.iocoder.yudao.module.system.convert.notice.NoticeConvert; +import cn.iocoder.yudao.module.system.dal.dataobject.notice.NoticeDO; import cn.iocoder.yudao.module.system.service.notice.NoticeService; import io.swagger.v3.oas.annotations.tags.Tag; import io.swagger.v3.oas.annotations.Parameter; @@ -29,6 +33,9 @@ public class NoticeController { @Resource private NoticeService noticeService; + @Resource + private WebSocketSenderApi webSocketSenderApi; + @PostMapping("/create") @Operation(summary = "创建通知公告") @PreAuthorize("@ss.hasPermission('system:notice:create')") @@ -69,4 +76,16 @@ public class NoticeController { return success(NoticeConvert.INSTANCE.convert(noticeService.getNotice(id))); } + @PostMapping("/push") + @Operation(summary = "推送通知公告", description = "只发送给 websocket 连接在线的用户") + @Parameter(name = "id", description = "编号", required = true, example = "1024") + @PreAuthorize("@ss.hasPermission('system:notice:update')") + public CommonResult push(@RequestParam("id") Long id) { + NoticeDO notice = noticeService.getNotice(id); + Assert.notNull(notice, "公告不能为空"); + // 通过 websocket 推送给在线的用户 + webSocketSenderApi.sendObject(UserTypeEnum.ADMIN.getValue(), "notice-push", notice); + return success(true); + } + } diff --git a/yudao-server/src/main/resources/application-local.yaml b/yudao-server/src/main/resources/application-local.yaml index f15b0cf8d..6dfe13fff 100644 --- a/yudao-server/src/main/resources/application-local.yaml +++ b/yudao-server/src/main/resources/application-local.yaml @@ -123,8 +123,8 @@ spring: rabbitmq: host: 127.0.0.1 # RabbitMQ 服务的地址 port: 5672 # RabbitMQ 服务的端口 - username: guest # RabbitMQ 服务的账号 - password: guest # RabbitMQ 服务的密码 + username: rabbit # RabbitMQ 服务的账号 + password: rabbit # RabbitMQ 服务的密码 # Kafka 配置项,对应 KafkaProperties 配置类 kafka: bootstrap-servers: 127.0.0.1:9092 # 指定 Kafka Broker 地址,可以设置多个,以逗号分隔 diff --git a/yudao-server/src/main/resources/application.yaml b/yudao-server/src/main/resources/application.yaml index 605eef273..954411022 100644 --- a/yudao-server/src/main/resources/application.yaml +++ b/yudao-server/src/main/resources/application.yaml @@ -147,6 +147,16 @@ yudao: websocket: enable: true # websocket的开关 path: /infra/ws # 路径 + sender-type: local # 消息发送的类型,可选值为 local、redis、rocketmq、kafka、rabbitmq + sender-rocketmq: + topic: ${spring.application.name}-websocket # 消息发送的 RocketMQ Topic + consumer-group: ${spring.application.name}-websocket-consumer # 消息发送的 RocketMQ Consumer Group + sender-rabbitmq: + exchange: ${spring.application.name}-websocket-exchange # 消息发送的 RabbitMQ Exchange + queue: ${spring.application.name}-websocket-queue # 消息发送的 RabbitMQ Queue + sender-kafka: + topic: ${spring.application.name}-websocket # 消息发送的 Kafka Topic + consumer-group: ${spring.application.name}-websocket-consumer # 消息发送的 Kafka Consumer Group swagger: title: 芋道快速开发平台 description: 提供管理后台、用户 App 的所有功能