diff --git a/yudao-dependencies/pom.xml b/yudao-dependencies/pom.xml index ca662cb40..cd62f962a 100644 --- a/yudao-dependencies/pom.xml +++ b/yudao-dependencies/pom.xml @@ -583,6 +583,12 @@ xercesImpl ${xercesImpl.version} + + + org.springframework.boot + spring-boot-starter-websocket + ${spring.boot.version} + diff --git a/yudao-framework/yudao-spring-boot-starter-security/src/main/java/cn/iocoder/yudao/framework/security/config/YudaoWebSecurityConfigurerAdapter.java b/yudao-framework/yudao-spring-boot-starter-security/src/main/java/cn/iocoder/yudao/framework/security/config/YudaoWebSecurityConfigurerAdapter.java index c3715c185..082d84756 100644 --- a/yudao-framework/yudao-spring-boot-starter-security/src/main/java/cn/iocoder/yudao/framework/security/config/YudaoWebSecurityConfigurerAdapter.java +++ b/yudao-framework/yudao-spring-boot-starter-security/src/main/java/cn/iocoder/yudao/framework/security/config/YudaoWebSecurityConfigurerAdapter.java @@ -129,6 +129,8 @@ public class YudaoWebSecurityConfigurerAdapter { .antMatchers(buildAppApi("/**")).permitAll() // 1.5 验证码captcha 允许匿名访问 .antMatchers("/captcha/get", "/captcha/check").permitAll() + // 1.6 webSocket 允许匿名访问 + .antMatchers("/websocket/message").permitAll() // ②:每个项目的自定义规则 .and().authorizeRequests(registry -> // 下面,循环设置自定义规则 authorizeRequestsCustomizers.forEach(customizer -> customizer.customize(registry))) diff --git a/yudao-module-infra/yudao-module-infra-biz/pom.xml b/yudao-module-infra/yudao-module-infra-biz/pom.xml index db23e697d..6d7ac0cff 100644 --- a/yudao-module-infra/yudao-module-infra-biz/pom.xml +++ b/yudao-module-infra/yudao-module-infra-biz/pom.xml @@ -111,6 +111,12 @@ cn.iocoder.boot yudao-spring-boot-starter-file + + + + org.springframework.boot + spring-boot-starter-websocket + diff --git a/yudao-module-infra/yudao-module-infra-biz/src/main/java/cn/iocoder/yudao/module/infra/websocket/SemaphoreUtils.java b/yudao-module-infra/yudao-module-infra-biz/src/main/java/cn/iocoder/yudao/module/infra/websocket/SemaphoreUtils.java new file mode 100644 index 000000000..67a87f169 --- /dev/null +++ b/yudao-module-infra/yudao-module-infra-biz/src/main/java/cn/iocoder/yudao/module/infra/websocket/SemaphoreUtils.java @@ -0,0 +1,45 @@ +package cn.iocoder.yudao.module.infra.websocket; + +import lombok.extern.slf4j.Slf4j; + +import java.util.concurrent.Semaphore; + +/** + * 信号量相关处理 + * + */ +@Slf4j +public class SemaphoreUtils { + + /** + * 获取信号量 + * + * @param semaphore + * @return + */ + public static boolean tryAcquire(Semaphore semaphore) { + boolean flag = false; + + try { + flag = semaphore.tryAcquire(); + } catch (Exception e) { + log.error("获取信号量异常", e); + } + + return flag; + } + + /** + * 释放信号量 + * + * @param semaphore + */ + public static void release(Semaphore semaphore) { + + try { + semaphore.release(); + } catch (Exception e) { + log.error("释放信号量异常", e); + } + } +} diff --git a/yudao-module-infra/yudao-module-infra-biz/src/main/java/cn/iocoder/yudao/module/infra/websocket/WebSocketConfig.java b/yudao-module-infra/yudao-module-infra-biz/src/main/java/cn/iocoder/yudao/module/infra/websocket/WebSocketConfig.java new file mode 100644 index 000000000..1e73b32ed --- /dev/null +++ b/yudao-module-infra/yudao-module-infra-biz/src/main/java/cn/iocoder/yudao/module/infra/websocket/WebSocketConfig.java @@ -0,0 +1,20 @@ +package cn.iocoder.yudao.module.infra.websocket; + +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.web.socket.server.standard.ServerEndpointExporter; + +/** + * websocket 配置 + * + * @author ruoyi + */ +@Configuration +public class WebSocketConfig +{ + @Bean + public ServerEndpointExporter serverEndpointExporter() + { + return new ServerEndpointExporter(); + } +} diff --git a/yudao-module-infra/yudao-module-infra-biz/src/main/java/cn/iocoder/yudao/module/infra/websocket/WebSocketServer.java b/yudao-module-infra/yudao-module-infra-biz/src/main/java/cn/iocoder/yudao/module/infra/websocket/WebSocketServer.java new file mode 100644 index 000000000..f0cfdd9dc --- /dev/null +++ b/yudao-module-infra/yudao-module-infra-biz/src/main/java/cn/iocoder/yudao/module/infra/websocket/WebSocketServer.java @@ -0,0 +1,86 @@ +package cn.iocoder.yudao.module.infra.websocket; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + +import javax.websocket.*; +import javax.websocket.server.ServerEndpoint; +import java.util.concurrent.Semaphore; + +/** + * websocket 消息处理 + */ +@Component +@ServerEndpoint("/websocket/message") +@Slf4j +public class WebSocketServer { + + /** + * 默认最多允许同时在线用户数100 + */ + public static int socketMaxOnlineCount = 100; + + private static final Semaphore SOCKET_SEMAPHORE = new Semaphore(socketMaxOnlineCount); + + /** + * 连接建立成功调用的方法 + */ + @OnOpen + public void onOpen(Session session) throws Exception { + // 尝试获取信号量 + boolean semaphoreFlag = SemaphoreUtils.tryAcquire(SOCKET_SEMAPHORE); + if (!semaphoreFlag) { + // 未获取到信号量 + log.error("当前在线人数超过限制数:{}", socketMaxOnlineCount); + WebSocketUsers.sendMessage(session, "当前在线人数超过限制数:" + socketMaxOnlineCount); + session.close(); + } else { + String userId = WebSocketUsers.getParam("userId", session); + if (userId != null) { + // 添加用户 + WebSocketUsers.addSession(userId, session); + log.info("用户【userId={}】建立连接,当前连接用户总数:{}", userId, WebSocketUsers.getUsers().size()); + WebSocketUsers.sendMessage(session, "接收内容:连接成功"); + } else { + WebSocketUsers.sendMessage(session, "接收内容:连接失败"); + } + } + } + + /** + * 连接关闭时处理 + */ + @OnClose + public void onClose(Session session) { + log.info("用户【sessionId={}】关闭连接!", session.getId()); + // 移除用户 + WebSocketUsers.removeSession(session); + // 获取到信号量则需释放 + SemaphoreUtils.release(SOCKET_SEMAPHORE); + } + + /** + * 抛出异常时处理 + */ + @OnError + public void onError(Session session, Throwable exception) throws Exception { + if (session.isOpen()) { + // 关闭连接 + session.close(); + } + String sessionId = session.getId(); + log.info("用户【sessionId={}】连接异常!异常信息:{}", sessionId, exception); + // 移出用户 + WebSocketUsers.removeSession(session); + // 获取到信号量则需释放 + SemaphoreUtils.release(SOCKET_SEMAPHORE); + } + + /** + * 收到客户端消息时调用的方法 + */ + @OnMessage + public void onMessage(Session session, String message) { + WebSocketUsers.sendMessage(session, "接收内容:" + message); + } +} diff --git a/yudao-module-infra/yudao-module-infra-biz/src/main/java/cn/iocoder/yudao/module/infra/websocket/WebSocketUsers.java b/yudao-module-infra/yudao-module-infra-biz/src/main/java/cn/iocoder/yudao/module/infra/websocket/WebSocketUsers.java new file mode 100644 index 000000000..281a97c7d --- /dev/null +++ b/yudao-module-infra/yudao-module-infra-biz/src/main/java/cn/iocoder/yudao/module/infra/websocket/WebSocketUsers.java @@ -0,0 +1,178 @@ +package cn.iocoder.yudao.module.infra.websocket; + +import cn.hutool.core.map.MapUtil; +import cn.hutool.core.util.StrUtil; +import lombok.extern.slf4j.Slf4j; +import org.bouncycastle.util.Strings; + +import javax.validation.constraints.NotNull; +import javax.websocket.Session; +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +/** + * websocket 客户端用户 + */ +@Slf4j +public class WebSocketUsers { + + /** + * 用户集 + * TODO 需要登录用户的session? + */ + private static final Map SESSION_MAP = new ConcurrentHashMap<>(); + + /** + * 存储用户 + * + * @param userId 唯一键 + * @param session 用户信息 + */ + public static void addSession(String userId, Session session) { + SESSION_MAP.put(userId, session); + } + + /** + * 移除用户 + * + * @param session 用户信息 + * @return 移除结果 + */ + public static boolean removeSession(Session session) { + String key = null; + boolean flag = SESSION_MAP.containsValue(session); + if (flag) { + Set> entries = SESSION_MAP.entrySet(); + for (Map.Entry entry : entries) { + Session value = entry.getValue(); + if (value.equals(session)) { + key = entry.getKey(); + break; + } + } + } else { + return true; + } + return removeSession(key); + } + + /** + * 移出用户 + * + * @param userId 用户id + */ + public static boolean removeSession(String userId) { + log.info("用户【userId={}】退出", userId); + Session remove = SESSION_MAP.remove(userId); + if (remove != null) { + boolean containsValue = SESSION_MAP.containsValue(remove); + log.info("用户【userId={}】退出{},当前连接用户总数:{}", userId, containsValue ? "失败" : "成功", SESSION_MAP.size()); + return containsValue; + } else { + return true; + } + } + + /** + * 获取在线用户列表 + * + * @return 返回用户集合 + */ + public static Map getUsers() { + return SESSION_MAP; + } + + /** + * 向所有在线人发送消息 + * + * @param message 消息内容 + */ + public static void sendMessageToAll(String message) { + SESSION_MAP.forEach((userId, session) -> { + if (session.isOpen()) { + sendMessage(session, message); + } + }); + } + + /** + * 异步发送文本消息 + * + * @param session 用户session + * @param message 消息内容 + */ + public static void sendMessageAsync(Session session, String message) { + if (session.isOpen()) { + // TODO 需要加synchronized锁(synchronized(session))?单个session创建线程? + session.getAsyncRemote().sendText(message); + } else { + log.warn("用户【session={}】不在线", session.getId()); + } + } + + /** + * 同步发送文本消息 + * + * @param session 用户session + * @param message 消息内容 + */ + public static void sendMessage(Session session, String message) { + try { + if (session.isOpen()) { + // TODO 需要加synchronized锁(synchronized(session))?单个session创建线程? + session.getBasicRemote().sendText(message); + } else { + log.warn("用户【session={}】不在线", session.getId()); + } + } catch (IOException e) { + log.error("发送消息异常", e); + } + + } + + /** + * 根据用户id发送消息 + * + * @param userId 用户id + * @param message 消息内容 + */ + public static void sendMessage(String userId, String message) { + Session session = SESSION_MAP.get(userId); + //判断是否存在该用户的session,并且是否在线 + if (session == null || !session.isOpen()) { + return; + } + sendMessage(session, message); + } + + + /** + * 获取session中的指定参数值 + * + * @param key 参数key + * @param session 用户session + */ + public static String getParam(@NotNull String key, Session session) { + //TODO 目前只针对获取一个key的值,后期根据情况拓展多个 或者直接在onClose onOpen上获取参数? + String value = null; + Map> parameters = session.getRequestParameterMap(); + if (MapUtil.isNotEmpty(parameters)) { + value = parameters.get(key).get(0); + } else { + String queryString = session.getQueryString(); + if (!StrUtil.isEmpty(queryString)) { + String[] params = Strings.split(queryString, '&'); + for (String paramPair : params) { + String[] nameValues = Strings.split(paramPair, '='); + if (key.equals(nameValues[0])) { + value = nameValues[1]; + } + } + } + } + return value; + } +} diff --git a/yudao-ui-admin/src/views/infra/webSocket/index.vue b/yudao-ui-admin/src/views/infra/webSocket/index.vue new file mode 100644 index 000000000..d48ffd3c1 --- /dev/null +++ b/yudao-ui-admin/src/views/infra/webSocket/index.vue @@ -0,0 +1,92 @@ + + + + + + + + + + + + + {{ ws && ws.readyState === 1 ? "已连接" : "连接" }} + + 断开 + + + + + + + + 发送消息 + + + + + + 清空消息 + + + + + +