diff --git a/yudao-dependencies/pom.xml b/yudao-dependencies/pom.xml index dbd6e2f3c..bbae1a273 100644 --- a/yudao-dependencies/pom.xml +++ b/yudao-dependencies/pom.xml @@ -175,6 +175,12 @@ ${revision} + + cn.iocoder.boot + yudao-spring-boot-starter-websocket + ${revision} + + com.github.xiaoymin knife4j-openapi3-spring-boot-starter diff --git a/yudao-framework/yudao-spring-boot-starter-websocket/pom.xml b/yudao-framework/yudao-spring-boot-starter-websocket/pom.xml index 320e52c48..3e9cecf3c 100644 --- a/yudao-framework/yudao-spring-boot-starter-websocket/pom.xml +++ b/yudao-framework/yudao-spring-boot-starter-websocket/pom.xml @@ -12,18 +12,22 @@ jar ${project.artifactId} - WebSocket + WebSocket 框架,支持多节点的广播 https://github.com/YunaiV/ruoyi-vue-pro - cn.iocoder.boot yudao-common + + cn.iocoder.boot yudao-spring-boot-starter-security diff --git a/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/config/WebSocketHandlerConfig.java b/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/config/WebSocketHandlerConfig.java deleted file mode 100644 index 02c3415d5..000000000 --- a/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/config/WebSocketHandlerConfig.java +++ /dev/null @@ -1,14 +0,0 @@ -package cn.iocoder.yudao.framework.websocket.config; - -import cn.iocoder.yudao.framework.websocket.core.UserHandshakeInterceptor; -import org.springframework.boot.context.properties.EnableConfigurationProperties; -import org.springframework.context.annotation.Bean; -import org.springframework.web.socket.server.HandshakeInterceptor; - -@EnableConfigurationProperties(WebSocketProperties.class) -public class WebSocketHandlerConfig { - @Bean - public HandshakeInterceptor handshakeInterceptor() { - return new UserHandshakeInterceptor(); - } -} diff --git a/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/config/WebSocketProperties.java b/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/config/WebSocketProperties.java index 0ab1b498f..7c1bd5abe 100644 --- a/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/config/WebSocketProperties.java +++ b/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/config/WebSocketProperties.java @@ -15,15 +15,8 @@ import org.springframework.validation.annotation.Validated; public class WebSocketProperties { /** - * 路径 + * WebSocket 的连接路径 */ - private String path = ""; - /** - * 默认最多允许同时在线用户数 - */ - private int maxOnlineCount = 0; - /** - * 是否保存session - */ - private boolean sessionMap = true; + private String path = "/ws"; + } diff --git a/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/config/YudaoWebSocketAutoConfiguration.java b/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/config/YudaoWebSocketAutoConfiguration.java index f8c50ae6a..e116f3c2c 100644 --- a/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/config/YudaoWebSocketAutoConfiguration.java +++ b/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/config/YudaoWebSocketAutoConfiguration.java @@ -1,11 +1,17 @@ package cn.iocoder.yudao.framework.websocket.config; +import cn.iocoder.yudao.framework.websocket.core.handler.JsonWebSocketMessageHandler; +import cn.iocoder.yudao.framework.websocket.core.listener.WebSocketMessageListener; +import cn.iocoder.yudao.framework.websocket.core.security.LoginUserHandshakeInterceptor; +import cn.iocoder.yudao.framework.websocket.core.session.WebSocketSessionHandlerDecorator; +import cn.iocoder.yudao.framework.websocket.core.session.WebSocketSessionManager; +import cn.iocoder.yudao.framework.websocket.core.session.WebSocketSessionManagerImpl; import org.springframework.boot.autoconfigure.AutoConfiguration; -import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.web.socket.WebSocketHandler; +import org.springframework.web.socket.config.annotation.EnableWebSocket; import org.springframework.web.socket.config.annotation.WebSocketConfigurer; import org.springframework.web.socket.server.HandshakeInterceptor; @@ -17,18 +23,41 @@ import java.util.List; * @author xingyu4j */ @AutoConfiguration -// 允许使用 yudao.websocket.enable=false 禁用websocket -@ConditionalOnProperty(prefix = "yudao.websocket", value = "enable", matchIfMissing = true) +@EnableWebSocket // 开启 websocket +@ConditionalOnProperty(prefix = "yudao.websocket", value = "enable", matchIfMissing = true) // 允许使用 yudao.websocket.enable=false 禁用 websocket + @EnableConfigurationProperties(WebSocketProperties.class) public class YudaoWebSocketAutoConfiguration { + @Bean - @ConditionalOnMissingBean - public WebSocketConfigurer webSocketConfigurer(List handshakeInterceptor, + public WebSocketConfigurer webSocketConfigurer(HandshakeInterceptor[] handshakeInterceptors, WebSocketHandler webSocketHandler, WebSocketProperties webSocketProperties) { - return registry -> registry + // 添加 WebSocketHandler .addHandler(webSocketHandler, webSocketProperties.getPath()) - .addInterceptors(handshakeInterceptor.toArray(new HandshakeInterceptor[0])); + .addInterceptors(handshakeInterceptors) + // 允许跨域,否则前端连接会直接断开 + .setAllowedOriginPatterns("*"); } -} + + @Bean + public HandshakeInterceptor handshakeInterceptor() { + return new LoginUserHandshakeInterceptor(); + } + + @Bean + public WebSocketHandler webSocketHandler(WebSocketSessionManager sessionManager, + List> messageListeners) { + // 1. 创建 JsonWebSocketMessageHandler 对象,处理消息 + JsonWebSocketMessageHandler messageHandler = new JsonWebSocketMessageHandler(messageListeners); + // 2. 创建 WebSocketSessionHandlerDecorator 对象,处理连接 + return new WebSocketSessionHandlerDecorator(messageHandler, sessionManager); + } + + @Bean + public WebSocketSessionManager webSocketSessionManager() { + return new WebSocketSessionManagerImpl(); + } + +} \ No newline at end of file diff --git a/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/UserHandshakeInterceptor.java b/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/UserHandshakeInterceptor.java deleted file mode 100644 index 3f2fa4ec3..000000000 --- a/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/UserHandshakeInterceptor.java +++ /dev/null @@ -1,24 +0,0 @@ -package cn.iocoder.yudao.framework.websocket.core; - -import cn.iocoder.yudao.framework.security.core.LoginUser; -import cn.iocoder.yudao.framework.security.core.util.SecurityFrameworkUtils; -import org.springframework.http.server.ServerHttpRequest; -import org.springframework.http.server.ServerHttpResponse; -import org.springframework.web.socket.WebSocketHandler; -import org.springframework.web.socket.server.HandshakeInterceptor; - -import java.util.Map; - -public class UserHandshakeInterceptor implements HandshakeInterceptor { - @Override - public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Map attributes) throws Exception { - LoginUser loginUser = SecurityFrameworkUtils.getLoginUser(); - attributes.put(WebSocketKeyDefine.LOGIN_USER, loginUser); - return true; - } - - @Override - public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Exception exception) { - - } -} diff --git a/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/WebSocketKeyDefine.java b/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/WebSocketKeyDefine.java deleted file mode 100644 index f75ebc41c..000000000 --- a/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/WebSocketKeyDefine.java +++ /dev/null @@ -1,9 +0,0 @@ -package cn.iocoder.yudao.framework.websocket.core; - - -import lombok.Data; - -@Data -public class WebSocketKeyDefine { - public static final String LOGIN_USER ="LOGIN_USER"; -} diff --git a/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/WebSocketMessageDO.java b/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/WebSocketMessageDO.java deleted file mode 100644 index 7bb348e99..000000000 --- a/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/WebSocketMessageDO.java +++ /dev/null @@ -1,24 +0,0 @@ -package cn.iocoder.yudao.framework.websocket.core; - -import lombok.Data; -import lombok.experimental.Accessors; - -import java.util.List; - -@Data -@Accessors(chain = true) -public class WebSocketMessageDO { - /** - * 接收消息的seesion - */ - private List seesionKeyList; - /** - * 发送消息 - */ - private String msgText; - - public static WebSocketMessageDO build(List seesionKeyList, String msgText) { - return new WebSocketMessageDO().setMsgText(msgText).setSeesionKeyList(seesionKeyList); - } - -} diff --git a/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/WebSocketSessionHandler.java b/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/WebSocketSessionHandler.java deleted file mode 100644 index 2747f8192..000000000 --- a/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/WebSocketSessionHandler.java +++ /dev/null @@ -1,36 +0,0 @@ -package cn.iocoder.yudao.framework.websocket.core; - -import org.springframework.web.socket.WebSocketSession; - -import java.util.Collection; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; - -public final class WebSocketSessionHandler { - private WebSocketSessionHandler() { - } - - private static final Map SESSION_MAP = new ConcurrentHashMap<>(); - - public static void addSession(Object sessionKey, WebSocketSession session) { - SESSION_MAP.put(sessionKey.toString(), session); - } - - public static void removeSession(Object sessionKey) { - SESSION_MAP.remove(sessionKey.toString()); - } - - public static WebSocketSession getSession(Object sessionKey) { - return SESSION_MAP.get(sessionKey.toString()); - } - - public static Collection getSessions() { - return SESSION_MAP.values(); - } - - public static Set getSessionKeys() { - return SESSION_MAP.keySet(); - } - -} diff --git a/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/WebSocketUtils.java b/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/WebSocketUtils.java deleted file mode 100644 index 816e664cc..000000000 --- a/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/WebSocketUtils.java +++ /dev/null @@ -1,31 +0,0 @@ -package cn.iocoder.yudao.framework.websocket.core; - -import lombok.extern.slf4j.Slf4j; -import org.springframework.web.socket.TextMessage; -import org.springframework.web.socket.WebSocketSession; - -import java.io.IOException; - -@Slf4j -public class WebSocketUtils { - public static boolean sendMessage(WebSocketSession seesion, String message) { - if (seesion == null) { - log.error("seesion 不存在"); - return false; - } - if (seesion.isOpen()) { - try { - seesion.sendMessage(new TextMessage(message)); - } catch (IOException e) { - log.error("WebSocket 消息发送异常 Session={} | msg= {} | exception={}", seesion, message, e); - return false; - } - } - return true; - } - - public static boolean sendMessage(Object sessionKey, String message) { - WebSocketSession session = WebSocketSessionHandler.getSession(sessionKey); - return sendMessage(session, message); - } -} diff --git a/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/YudaoWebSocketHandlerDecorator.java b/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/YudaoWebSocketHandlerDecorator.java deleted file mode 100644 index dd8dc602e..000000000 --- a/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/YudaoWebSocketHandlerDecorator.java +++ /dev/null @@ -1,49 +0,0 @@ -package cn.iocoder.yudao.framework.websocket.core; - -import cn.iocoder.yudao.framework.security.core.LoginUser; -import org.springframework.web.socket.CloseStatus; -import org.springframework.web.socket.WebSocketHandler; -import org.springframework.web.socket.WebSocketSession; -import org.springframework.web.socket.handler.WebSocketHandlerDecorator; - -public class YudaoWebSocketHandlerDecorator extends WebSocketHandlerDecorator { - public YudaoWebSocketHandlerDecorator(WebSocketHandler delegate) { - super(delegate); - } - - /** - * websocket 连接时执行的动作 - * @param session websocket session 对象 - * @throws Exception 异常对象 - */ - @Override - public void afterConnectionEstablished(final WebSocketSession session) throws Exception { - Object sessionKey = sessionKeyGen(session); - WebSocketSessionHandler.addSession(sessionKey, session); - } - - /** - * websocket 关闭连接时执行的动作 - * @param session websocket session 对象 - * @param closeStatus 关闭状态对象 - * @throws Exception 异常对象 - */ - @Override - public void afterConnectionClosed(final WebSocketSession session, CloseStatus closeStatus) throws Exception { - Object sessionKey = sessionKeyGen(session); - WebSocketSessionHandler.removeSession(sessionKey); - } - - public Object sessionKeyGen(WebSocketSession webSocketSession) { - - Object obj = webSocketSession.getAttributes().get(WebSocketKeyDefine.LOGIN_USER); - - if (obj instanceof LoginUser) { - LoginUser loginUser = (LoginUser) obj; - // userId 作为唯一区分 - return String.valueOf(loginUser.getId()); - } - - return null; - } -} diff --git a/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/handler/JsonWebSocketMessageHandler.java b/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/handler/JsonWebSocketMessageHandler.java new file mode 100644 index 000000000..06bc6c0fb --- /dev/null +++ b/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/handler/JsonWebSocketMessageHandler.java @@ -0,0 +1,80 @@ +package cn.iocoder.yudao.framework.websocket.core.handler; + +import cn.hutool.core.util.StrUtil; +import cn.hutool.core.util.TypeUtil; +import cn.iocoder.yudao.framework.common.util.json.JsonUtils; +import cn.iocoder.yudao.framework.websocket.core.listener.WebSocketMessageListener; +import cn.iocoder.yudao.framework.websocket.core.message.JsonWebSocketMessage; +import lombok.extern.slf4j.Slf4j; +import org.springframework.web.socket.TextMessage; +import org.springframework.web.socket.WebSocketHandler; +import org.springframework.web.socket.WebSocketSession; +import org.springframework.web.socket.handler.TextWebSocketHandler; + +import java.lang.reflect.Type; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.function.Consumer; + +/** + * JSON 格式 {@link WebSocketHandler} 实现类 + * + * 基于 {@link JsonWebSocketMessage#getType()} 消息类型,调度到对应的 {@link WebSocketMessageListener} 监听器。 + * + * @author 芋道源码 + */ +@Slf4j +public class JsonWebSocketMessageHandler extends TextWebSocketHandler { + + /** + * type 与 WebSocketMessageListener 的映射 + */ + private final Map> listeners = new HashMap<>(); + + @SuppressWarnings({"rawtypes", "unchecked"}) + public JsonWebSocketMessageHandler(List listenersList) { + listenersList.forEach((Consumer) + listener -> listeners.put(listener.getType(), listener)); + } + + @Override + protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception { + // 1.1 空消息,跳过 + if (message.getPayloadLength() == 0) { + return; + } + // 1.2 ping 心跳消息,直接返回 pong 消息。 + if (message.getPayloadLength() == 4 && Objects.equals(message.getPayload(), "ping")) { + session.sendMessage(new TextMessage("pong")); + return; + } + + // 2.1 解析消息 + try { + JsonWebSocketMessage jsonMessage = JsonUtils.parseObject(message.getPayload(), JsonWebSocketMessage.class); + if (jsonMessage == null) { + log.error("[handleTextMessage][session({}) message({}) 解析为空]", session.getId(), message.getPayload()); + return; + } + if (StrUtil.isEmpty(jsonMessage.getType())) { + log.error("[handleTextMessage][session({}) message({}) 类型为空]", session.getId(), message.getPayload()); + return; + } + // 2.2 获得对应的 WebSocketMessageListener + WebSocketMessageListener messageListener = listeners.get(jsonMessage.getType()); + if (messageListener == null) { + log.error("[handleTextMessage][session({}) message({}) 监听器为空]", session.getId(), message.getPayload()); + return; + } + // 2.3 处理消息 + Type type = TypeUtil.getTypeArgument(messageListener.getClass(), 0); + Object messageObj = JsonUtils.parseObject(jsonMessage.getMessage(), type); + messageListener.onMessage(session, messageObj); + } catch (Throwable ex) { + log.error("[handleTextMessage][session({}) message({}) 处理异常]", session.getId(), message.getPayload()); + } + } + +} diff --git a/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/listener/WebSocketMessageListener.java b/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/listener/WebSocketMessageListener.java new file mode 100644 index 000000000..f3a62cc39 --- /dev/null +++ b/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/listener/WebSocketMessageListener.java @@ -0,0 +1,31 @@ +package cn.iocoder.yudao.framework.websocket.core.listener; + +import cn.iocoder.yudao.framework.websocket.core.message.JsonWebSocketMessage; +import org.springframework.web.socket.WebSocketSession; + +/** + * WebSocket 消息监听器接口 + * + * 目的:前端发送消息给后端后,处理对应 {@link #getType()} 类型的消息 + * + * @param 泛型,消息类型 + */ +public interface WebSocketMessageListener { + + /** + * 处理消息 + * + * @param session Session + * @param message 消息 + */ + void onMessage(WebSocketSession session, T message); + + /** + * 获得消息类型 + * + * @see JsonWebSocketMessage#getType() + * @return 消息类型 + */ + String getType(); + +} diff --git a/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/message/JsonWebSocketMessage.java b/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/message/JsonWebSocketMessage.java new file mode 100644 index 000000000..2257760c9 --- /dev/null +++ b/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/message/JsonWebSocketMessage.java @@ -0,0 +1,27 @@ +package cn.iocoder.yudao.framework.websocket.core.message; + +import cn.iocoder.yudao.framework.websocket.core.listener.WebSocketMessageListener; +import lombok.Data; + +/** + * JSON 格式的 WebSocket 消息帧 + * + * @author 芋道源码 + */ +@Data +public class JsonWebSocketMessage { + + /** + * 消息类型 + * + * 目的:用于分发到对应的 {@link WebSocketMessageListener} 实现类 + */ + private String type; + /** + * 消息内容 + * + * 要求 JSON 对象 + */ + private String message; + +} diff --git a/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/security/LoginUserHandshakeInterceptor.java b/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/security/LoginUserHandshakeInterceptor.java new file mode 100644 index 000000000..3a31825f5 --- /dev/null +++ b/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/security/LoginUserHandshakeInterceptor.java @@ -0,0 +1,42 @@ +package cn.iocoder.yudao.framework.websocket.core.security; + +import cn.iocoder.yudao.framework.security.core.LoginUser; +import cn.iocoder.yudao.framework.security.core.filter.TokenAuthenticationFilter; +import cn.iocoder.yudao.framework.security.core.util.SecurityFrameworkUtils; +import cn.iocoder.yudao.framework.websocket.core.util.WebSocketFrameworkUtils; +import org.springframework.http.server.ServerHttpRequest; +import org.springframework.http.server.ServerHttpResponse; +import org.springframework.web.socket.WebSocketHandler; +import org.springframework.web.socket.WebSocketSession; +import org.springframework.web.socket.server.HandshakeInterceptor; + +import java.util.Map; + +/** + * 登录用户的 {@link HandshakeInterceptor} 实现类 + * + * 流程如下: + * 1. 前端连接 websocket 时,会通过拼接 ?token={token} 到 ws:// 连接后,这样它可以被 {@link TokenAuthenticationFilter} 所认证通过 + * 2. {@link LoginUserHandshakeInterceptor} 负责把 {@link LoginUser} 添加到 {@link WebSocketSession} 中 + * + * @author 芋道源码 + */ +public class LoginUserHandshakeInterceptor implements HandshakeInterceptor { + + @Override + public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, + WebSocketHandler wsHandler, Map attributes) { + LoginUser loginUser = SecurityFrameworkUtils.getLoginUser(); + if (loginUser != null) { + WebSocketFrameworkUtils.setLoginUser(loginUser, attributes); + } + return true; + } + + @Override + public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, + WebSocketHandler wsHandler, Exception exception) { + // do nothing + } + +} diff --git a/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/security/WebSocketAuthorizeRequestsCustomizer.java b/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/security/WebSocketAuthorizeRequestsCustomizer.java new file mode 100644 index 000000000..5614f05ce --- /dev/null +++ b/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/security/WebSocketAuthorizeRequestsCustomizer.java @@ -0,0 +1,24 @@ +package cn.iocoder.yudao.framework.websocket.core.security; + +import cn.iocoder.yudao.framework.security.config.AuthorizeRequestsCustomizer; +import cn.iocoder.yudao.framework.websocket.config.WebSocketProperties; +import lombok.RequiredArgsConstructor; +import org.springframework.security.config.annotation.web.builders.HttpSecurity; +import org.springframework.security.config.annotation.web.configurers.ExpressionUrlAuthorizationConfigurer; + +/** + * WebSocket 的权限自定义 + * + * @author 芋道源码 + */ +@RequiredArgsConstructor +public class WebSocketAuthorizeRequestsCustomizer extends AuthorizeRequestsCustomizer { + + private final WebSocketProperties webSocketProperties; + + @Override + public void customize(ExpressionUrlAuthorizationConfigurer.ExpressionInterceptUrlRegistry registry) { + registry.antMatchers(webSocketProperties.getPath()).permitAll(); + } + +} diff --git a/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/session/WebSocketSessionHandlerDecorator.java b/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/session/WebSocketSessionHandlerDecorator.java new file mode 100644 index 000000000..600a4dd96 --- /dev/null +++ b/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/session/WebSocketSessionHandlerDecorator.java @@ -0,0 +1,49 @@ +package cn.iocoder.yudao.framework.websocket.core.session; + +import org.springframework.web.socket.CloseStatus; +import org.springframework.web.socket.WebSocketHandler; +import org.springframework.web.socket.WebSocketSession; +import org.springframework.web.socket.handler.ConcurrentWebSocketSessionDecorator; +import org.springframework.web.socket.handler.WebSocketHandlerDecorator; + +/** + * {@link WebSocketHandler} 的装饰类,实现了以下功能: + * + * 1. {@link WebSocketSession} 连接或关闭时,使用 {@link #sessionManager} 进行管理 + * 2. 封装 {@link WebSocketSession} 支持并发操作 + * + * @author 芋道源码 + */ +public class WebSocketSessionHandlerDecorator extends WebSocketHandlerDecorator { + + /** + * 发送时间的限制,单位:毫秒 + */ + private static final Integer SEND_TIME_LIMIT = 1000 * 5; + /** + * 发送消息缓冲上线,单位:bytes + */ + private static final Integer BUFFER_SIZE_LIMIT = 1024 * 100; + + private final WebSocketSessionManager sessionManager; + + public WebSocketSessionHandlerDecorator(WebSocketHandler delegate, + WebSocketSessionManager sessionManager) { + super(delegate); + this.sessionManager = sessionManager; + } + + @Override + public void afterConnectionEstablished(WebSocketSession session) { + // 实现 session 支持并发,可参考 https://blog.csdn.net/abu935009066/article/details/131218149 + session = new ConcurrentWebSocketSessionDecorator(session, SEND_TIME_LIMIT, BUFFER_SIZE_LIMIT); + // 添加到 WebSocketSessionManager 中 + sessionManager.addSession(session); + } + + @Override + public void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) { + sessionManager.removeSession(session); + } + +} diff --git a/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/session/WebSocketSessionManager.java b/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/session/WebSocketSessionManager.java new file mode 100644 index 000000000..ad1de23c2 --- /dev/null +++ b/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/session/WebSocketSessionManager.java @@ -0,0 +1,53 @@ +package cn.iocoder.yudao.framework.websocket.core.session; + +import org.springframework.web.socket.WebSocketSession; + +import java.util.Collection; + +/** + * {@link WebSocketSession} 管理器的接口 + * + * @author 芋道源码 + */ +public interface WebSocketSessionManager { + + /** + * 添加 Session + * + * @param session Session + */ + void addSession(WebSocketSession session); + + /** + * 移除 Session + * + * @param session Session + */ + void removeSession(WebSocketSession session); + + /** + * 获得指定编号的 Session + * + * @param id Session 编号 + * @return Session + */ + WebSocketSession getSession(String id); + + /** + * 获得指定用户类型的 Session 列表 + * + * @param userType 用户类型 + * @return Session 列表 + */ + Collection getSessionList(Integer userType); + + /** + * 获得指定用户编号的 Session 列表 + * + * @param userType 用户类型 + * @param userId 用户编号 + * @return Session 列表 + */ + Collection getSessionList(Integer userType, Long userId); + +} \ No newline at end of file diff --git a/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/session/WebSocketSessionManagerImpl.java b/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/session/WebSocketSessionManagerImpl.java new file mode 100644 index 000000000..6004cc5da --- /dev/null +++ b/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/session/WebSocketSessionManagerImpl.java @@ -0,0 +1,113 @@ +package cn.iocoder.yudao.framework.websocket.core.session; + +import cn.hutool.core.collection.CollUtil; +import cn.iocoder.yudao.framework.security.core.LoginUser; +import cn.iocoder.yudao.framework.websocket.core.util.WebSocketFrameworkUtils; +import org.springframework.web.socket.WebSocketSession; + +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CopyOnWriteArrayList; + +/** + * 默认的 {@link WebSocketSessionManager} 实现类 + * + * @author 芋道源码 + */ +public class WebSocketSessionManagerImpl implements WebSocketSessionManager { + + /** + * id 与 WebSocketSession 映射 + * + * key:Session 编号 + */ + private final ConcurrentMap idSessions = new ConcurrentHashMap<>(); + + /** + * user 与 WebSocketSession 映射 + * + * key1:用户类型 + * key2:用户编号 + */ + private final ConcurrentMap>> userSessions + = new ConcurrentHashMap<>(); + + @Override + public void addSession(WebSocketSession session) { + // 添加到 idSessions 中 + idSessions.put(session.getId(), session); + // 添加到 userSessions 中 + LoginUser user = WebSocketFrameworkUtils.getLoginUser(session); + if (user == null) { + return; + } + ConcurrentMap> userSessionsMap = userSessions.get(user.getUserType()); + if (userSessionsMap == null) { + userSessionsMap = new ConcurrentHashMap<>(); + if (userSessions.putIfAbsent(user.getUserType(), userSessionsMap) != null) { + userSessionsMap = userSessions.get(user.getUserType()); + } + } + CopyOnWriteArrayList sessions = userSessionsMap.get(user.getId()); + if (sessions == null) { + sessions = new CopyOnWriteArrayList<>(); + if (userSessionsMap.putIfAbsent(user.getId(), sessions) != null) { + sessions = userSessionsMap.get(user.getId()); + } + } + sessions.add(session); + } + + @Override + public void removeSession(WebSocketSession session) { + // 移除从 idSessions 中 + idSessions.remove(session.getId(), session); + // 移除从 idSessions 中 + LoginUser user = WebSocketFrameworkUtils.getLoginUser(session); + if (user == null) { + return; + } + ConcurrentMap> userSessionsMap = userSessions.get(user.getUserType()); + if (userSessionsMap == null) { + return; + } + CopyOnWriteArrayList sessions = userSessionsMap.get(user.getId()); + sessions.removeIf(session0 -> session0.getId().equals(session.getId())); + if (CollUtil.isEmpty(sessions)) { + userSessionsMap.remove(user.getId(), sessions); + } + } + + @Override + public WebSocketSession getSession(String id) { + return idSessions.get(id); + } + + @Override + public Collection getSessionList(Integer userType) { + ConcurrentMap> userSessionsMap = userSessions.get(userType); + if (CollUtil.isEmpty(userSessionsMap)) { + return new ArrayList<>(); + } + LinkedList result = new LinkedList<>(); // 避免扩容 + for (List sessions : userSessionsMap.values()) { + if (CollUtil.isNotEmpty(sessions)) { + continue; + } + result.addAll(sessions); + } + return result; + } + + @Override + public Collection getSessionList(Integer userType, Long userId) { + ConcurrentMap> userSessionsMap = userSessions.get(userType); + if (CollUtil.isEmpty(userSessionsMap)) { + return new ArrayList<>(); + } + CopyOnWriteArrayList sessions = userSessionsMap.get(userId); + return CollUtil.isNotEmpty(sessions) ? new ArrayList<>(sessions) : new ArrayList<>(); + } + +} diff --git a/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/util/WebSocketFrameworkUtils.java b/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/util/WebSocketFrameworkUtils.java new file mode 100644 index 000000000..a77028df2 --- /dev/null +++ b/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/util/WebSocketFrameworkUtils.java @@ -0,0 +1,59 @@ +package cn.iocoder.yudao.framework.websocket.core.util; + +import cn.iocoder.yudao.framework.security.core.LoginUser; +import org.springframework.lang.Nullable; +import org.springframework.web.socket.WebSocketSession; + +import java.util.Map; + +/** + * 专属于 web 包的工具类 + * + * @author 芋道源码 + */ +public class WebSocketFrameworkUtils { + + public static final String ATTRIBUTE_LOGIN_USER = "LOGIN_USER"; + + /** + * 设置当前用户 + * + * @param loginUser 登录用户 + * @param attributes Session + */ + public static void setLoginUser(LoginUser loginUser, Map attributes) { + attributes.put(ATTRIBUTE_LOGIN_USER, loginUser); + } + + /** + * 获取当前用户 + * + * @return 当前用户 + */ + public static LoginUser getLoginUser(WebSocketSession session) { + return (LoginUser) session.getAttributes().get(ATTRIBUTE_LOGIN_USER); + } + + /** + * 获得当前用户的编号 + * + * @return 用户编号 + */ + @Nullable + public static Long getLoginUserId(WebSocketSession session) { + LoginUser loginUser = getLoginUser(session); + return loginUser != null ? loginUser.getId() : null; + } + + /** + * 获得当前用户的类型 + * + * @return 用户编号 + */ + @Nullable + public static Integer getLoginUserType(WebSocketSession session) { + LoginUser loginUser = getLoginUser(session); + return loginUser != null ? loginUser.getUserType() : null; + } + +} diff --git a/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/package-info.java b/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/package-info.java index c771dfaac..97bc5f951 100644 --- a/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/package-info.java +++ b/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/package-info.java @@ -1 +1,4 @@ +/** + * WebSocket 框架,支持多节点的广播 + */ package cn.iocoder.yudao.framework.websocket; diff --git a/yudao-module-infra/yudao-module-infra-biz/pom.xml b/yudao-module-infra/yudao-module-infra-biz/pom.xml index af5fb9ac2..4c6cd401b 100644 --- a/yudao-module-infra/yudao-module-infra-biz/pom.xml +++ b/yudao-module-infra/yudao-module-infra-biz/pom.xml @@ -46,6 +46,11 @@ yudao-spring-boot-starter-security + + cn.iocoder.boot + yudao-spring-boot-starter-websocket + + cn.iocoder.boot @@ -116,11 +121,6 @@ yudao-spring-boot-starter-file - - - org.springframework.boot - spring-boot-starter-websocket - diff --git a/yudao-module-infra/yudao-module-infra-biz/src/main/java/cn/iocoder/yudao/module/infra/websocket/SemaphoreUtils.java b/yudao-module-infra/yudao-module-infra-biz/src/main/java/cn/iocoder/yudao/module/infra/websocket/SemaphoreUtils.java deleted file mode 100644 index 67a87f169..000000000 --- a/yudao-module-infra/yudao-module-infra-biz/src/main/java/cn/iocoder/yudao/module/infra/websocket/SemaphoreUtils.java +++ /dev/null @@ -1,45 +0,0 @@ -package cn.iocoder.yudao.module.infra.websocket; - -import lombok.extern.slf4j.Slf4j; - -import java.util.concurrent.Semaphore; - -/** - * 信号量相关处理 - * - */ -@Slf4j -public class SemaphoreUtils { - - /** - * 获取信号量 - * - * @param semaphore - * @return - */ - public static boolean tryAcquire(Semaphore semaphore) { - boolean flag = false; - - try { - flag = semaphore.tryAcquire(); - } catch (Exception e) { - log.error("获取信号量异常", e); - } - - return flag; - } - - /** - * 释放信号量 - * - * @param semaphore - */ - public static void release(Semaphore semaphore) { - - try { - semaphore.release(); - } catch (Exception e) { - log.error("释放信号量异常", e); - } - } -} diff --git a/yudao-module-infra/yudao-module-infra-biz/src/main/java/cn/iocoder/yudao/module/infra/websocket/WebSocketConfig.java b/yudao-module-infra/yudao-module-infra-biz/src/main/java/cn/iocoder/yudao/module/infra/websocket/WebSocketConfig.java deleted file mode 100644 index 380bc9317..000000000 --- a/yudao-module-infra/yudao-module-infra-biz/src/main/java/cn/iocoder/yudao/module/infra/websocket/WebSocketConfig.java +++ /dev/null @@ -1,16 +0,0 @@ -package cn.iocoder.yudao.module.infra.websocket; - -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; -import org.springframework.web.socket.server.standard.ServerEndpointExporter; - -/** - * websocket 配置 - */ -@Configuration -public class WebSocketConfig { - @Bean - public ServerEndpointExporter serverEndpointExporter() { - return new ServerEndpointExporter(); - } -} diff --git a/yudao-module-infra/yudao-module-infra-biz/src/main/java/cn/iocoder/yudao/module/infra/websocket/WebSocketServer.java b/yudao-module-infra/yudao-module-infra-biz/src/main/java/cn/iocoder/yudao/module/infra/websocket/WebSocketServer.java deleted file mode 100644 index f0cfdd9dc..000000000 --- a/yudao-module-infra/yudao-module-infra-biz/src/main/java/cn/iocoder/yudao/module/infra/websocket/WebSocketServer.java +++ /dev/null @@ -1,86 +0,0 @@ -package cn.iocoder.yudao.module.infra.websocket; - -import lombok.extern.slf4j.Slf4j; -import org.springframework.stereotype.Component; - -import javax.websocket.*; -import javax.websocket.server.ServerEndpoint; -import java.util.concurrent.Semaphore; - -/** - * websocket 消息处理 - */ -@Component -@ServerEndpoint("/websocket/message") -@Slf4j -public class WebSocketServer { - - /** - * 默认最多允许同时在线用户数100 - */ - public static int socketMaxOnlineCount = 100; - - private static final Semaphore SOCKET_SEMAPHORE = new Semaphore(socketMaxOnlineCount); - - /** - * 连接建立成功调用的方法 - */ - @OnOpen - public void onOpen(Session session) throws Exception { - // 尝试获取信号量 - boolean semaphoreFlag = SemaphoreUtils.tryAcquire(SOCKET_SEMAPHORE); - if (!semaphoreFlag) { - // 未获取到信号量 - log.error("当前在线人数超过限制数:{}", socketMaxOnlineCount); - WebSocketUsers.sendMessage(session, "当前在线人数超过限制数:" + socketMaxOnlineCount); - session.close(); - } else { - String userId = WebSocketUsers.getParam("userId", session); - if (userId != null) { - // 添加用户 - WebSocketUsers.addSession(userId, session); - log.info("用户【userId={}】建立连接,当前连接用户总数:{}", userId, WebSocketUsers.getUsers().size()); - WebSocketUsers.sendMessage(session, "接收内容:连接成功"); - } else { - WebSocketUsers.sendMessage(session, "接收内容:连接失败"); - } - } - } - - /** - * 连接关闭时处理 - */ - @OnClose - public void onClose(Session session) { - log.info("用户【sessionId={}】关闭连接!", session.getId()); - // 移除用户 - WebSocketUsers.removeSession(session); - // 获取到信号量则需释放 - SemaphoreUtils.release(SOCKET_SEMAPHORE); - } - - /** - * 抛出异常时处理 - */ - @OnError - public void onError(Session session, Throwable exception) throws Exception { - if (session.isOpen()) { - // 关闭连接 - session.close(); - } - String sessionId = session.getId(); - log.info("用户【sessionId={}】连接异常!异常信息:{}", sessionId, exception); - // 移出用户 - WebSocketUsers.removeSession(session); - // 获取到信号量则需释放 - SemaphoreUtils.release(SOCKET_SEMAPHORE); - } - - /** - * 收到客户端消息时调用的方法 - */ - @OnMessage - public void onMessage(Session session, String message) { - WebSocketUsers.sendMessage(session, "接收内容:" + message); - } -} diff --git a/yudao-module-infra/yudao-module-infra-biz/src/main/java/cn/iocoder/yudao/module/infra/websocket/WebSocketUsers.java b/yudao-module-infra/yudao-module-infra-biz/src/main/java/cn/iocoder/yudao/module/infra/websocket/WebSocketUsers.java deleted file mode 100644 index 281a97c7d..000000000 --- a/yudao-module-infra/yudao-module-infra-biz/src/main/java/cn/iocoder/yudao/module/infra/websocket/WebSocketUsers.java +++ /dev/null @@ -1,178 +0,0 @@ -package cn.iocoder.yudao.module.infra.websocket; - -import cn.hutool.core.map.MapUtil; -import cn.hutool.core.util.StrUtil; -import lombok.extern.slf4j.Slf4j; -import org.bouncycastle.util.Strings; - -import javax.validation.constraints.NotNull; -import javax.websocket.Session; -import java.io.IOException; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; - -/** - * websocket 客户端用户 - */ -@Slf4j -public class WebSocketUsers { - - /** - * 用户集 - * TODO 需要登录用户的session? - */ - private static final Map SESSION_MAP = new ConcurrentHashMap<>(); - - /** - * 存储用户 - * - * @param userId 唯一键 - * @param session 用户信息 - */ - public static void addSession(String userId, Session session) { - SESSION_MAP.put(userId, session); - } - - /** - * 移除用户 - * - * @param session 用户信息 - * @return 移除结果 - */ - public static boolean removeSession(Session session) { - String key = null; - boolean flag = SESSION_MAP.containsValue(session); - if (flag) { - Set> entries = SESSION_MAP.entrySet(); - for (Map.Entry entry : entries) { - Session value = entry.getValue(); - if (value.equals(session)) { - key = entry.getKey(); - break; - } - } - } else { - return true; - } - return removeSession(key); - } - - /** - * 移出用户 - * - * @param userId 用户id - */ - public static boolean removeSession(String userId) { - log.info("用户【userId={}】退出", userId); - Session remove = SESSION_MAP.remove(userId); - if (remove != null) { - boolean containsValue = SESSION_MAP.containsValue(remove); - log.info("用户【userId={}】退出{},当前连接用户总数:{}", userId, containsValue ? "失败" : "成功", SESSION_MAP.size()); - return containsValue; - } else { - return true; - } - } - - /** - * 获取在线用户列表 - * - * @return 返回用户集合 - */ - public static Map getUsers() { - return SESSION_MAP; - } - - /** - * 向所有在线人发送消息 - * - * @param message 消息内容 - */ - public static void sendMessageToAll(String message) { - SESSION_MAP.forEach((userId, session) -> { - if (session.isOpen()) { - sendMessage(session, message); - } - }); - } - - /** - * 异步发送文本消息 - * - * @param session 用户session - * @param message 消息内容 - */ - public static void sendMessageAsync(Session session, String message) { - if (session.isOpen()) { - // TODO 需要加synchronized锁(synchronized(session))?单个session创建线程? - session.getAsyncRemote().sendText(message); - } else { - log.warn("用户【session={}】不在线", session.getId()); - } - } - - /** - * 同步发送文本消息 - * - * @param session 用户session - * @param message 消息内容 - */ - public static void sendMessage(Session session, String message) { - try { - if (session.isOpen()) { - // TODO 需要加synchronized锁(synchronized(session))?单个session创建线程? - session.getBasicRemote().sendText(message); - } else { - log.warn("用户【session={}】不在线", session.getId()); - } - } catch (IOException e) { - log.error("发送消息异常", e); - } - - } - - /** - * 根据用户id发送消息 - * - * @param userId 用户id - * @param message 消息内容 - */ - public static void sendMessage(String userId, String message) { - Session session = SESSION_MAP.get(userId); - //判断是否存在该用户的session,并且是否在线 - if (session == null || !session.isOpen()) { - return; - } - sendMessage(session, message); - } - - - /** - * 获取session中的指定参数值 - * - * @param key 参数key - * @param session 用户session - */ - public static String getParam(@NotNull String key, Session session) { - //TODO 目前只针对获取一个key的值,后期根据情况拓展多个 或者直接在onClose onOpen上获取参数? - String value = null; - Map> parameters = session.getRequestParameterMap(); - if (MapUtil.isNotEmpty(parameters)) { - value = parameters.get(key).get(0); - } else { - String queryString = session.getQueryString(); - if (!StrUtil.isEmpty(queryString)) { - String[] params = Strings.split(queryString, '&'); - for (String paramPair : params) { - String[] nameValues = Strings.split(paramPair, '='); - if (key.equals(nameValues[0])) { - value = nameValues[1]; - } - } - } - } - return value; - } -} diff --git a/yudao-server/src/main/resources/application.yaml b/yudao-server/src/main/resources/application.yaml index a51fa9bbb..605eef273 100644 --- a/yudao-server/src/main/resources/application.yaml +++ b/yudao-server/src/main/resources/application.yaml @@ -146,9 +146,7 @@ yudao: - /admin-api/mp/open/** # 微信公众号开放平台,微信回调接口,不需要登录 websocket: enable: true # websocket的开关 - path: /websocket/message # 路径 - maxOnlineCount: 0 # 最大连接人数 - sessionMap: true # 保存sessionMap + path: /infra/ws # 路径 swagger: title: 芋道快速开发平台 description: 提供管理后台、用户 App 的所有功能