增加websocket模块

This commit is contained in:
dataprince 2024-01-17 20:24:09 +08:00
parent 17eb0212fd
commit 74853f4d9c
16 changed files with 545 additions and 1 deletions

View File

@ -43,7 +43,7 @@
<mapstruct-plus.version>1.3.6</mapstruct-plus.version>
<mapstruct-plus.lombok.version>0.2.0</mapstruct-plus.lombok.version>
<hutool.version>5.8.24</hutool.version>
<redisson.version>3.25.2</redisson.version>
<redisson.version>3.26.0</redisson.version>
<lock4j.version>2.2.7</lock4j.version>
<alibaba-ttl.version>2.14.4</alibaba-ttl.version>
<spring-boot-admin.version>3.2.0</spring-boot-admin.version>

View File

@ -134,6 +134,7 @@ public class AuthController {
StringUtils.equals(vo.getDomain(), host));
// 返回对象
LoginTenantVo vo = new LoginTenantVo();
vo.setTenantEnabled(true);
vo.setVoList(CollUtil.isNotEmpty(list) ? list : voList);
return R.ok(vo);
}

View File

@ -12,6 +12,11 @@ import java.util.List;
@Data
public class LoginTenantVo {
/**
* 租户开关
*/
private Boolean tenantEnabled;
/**
* 租户对象列表
*/

View File

@ -26,6 +26,7 @@
<module>ruoyi-common-springdoc</module>
<module>ruoyi-common-tenant</module>
<module>ruoyi-common-web</module>
<module>ruoyi-common-websocket</module>
</modules>
<artifactId>ruoyi-common</artifactId>

View File

@ -117,6 +117,13 @@
<version>${revision}</version>
</dependency>
<!-- WebSocket模块 -->
<dependency>
<groupId>com.ruoyi</groupId>
<artifactId>ruoyi-common-websocket</artifactId>
<version>${revision}</version>
</dependency>
</dependencies>
</dependencyManagement>

View File

@ -0,0 +1,40 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>com.ruoyi</groupId>
<artifactId>ruoyi-common</artifactId>
<version>${revision}</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>ruoyi-common-websocket</artifactId>
<description>
ruoyi-common-websocket 模块
</description>
<dependencies>
<dependency>
<groupId>com.ruoyi</groupId>
<artifactId>ruoyi-common-core</artifactId>
</dependency>
<dependency>
<groupId>com.ruoyi</groupId>
<artifactId>ruoyi-common-redis</artifactId>
</dependency>
<dependency>
<groupId>com.ruoyi</groupId>
<artifactId>ruoyi-common-security</artifactId>
</dependency>
<dependency>
<groupId>com.ruoyi</groupId>
<artifactId>ruoyi-common-json</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,59 @@
package com.ruoyi.common.websocket.config;
import cn.hutool.core.util.StrUtil;
import com.ruoyi.common.websocket.config.properties.WebSocketProperties;
import com.ruoyi.common.websocket.handler.FlexWebSocketHandler;
import com.ruoyi.common.websocket.interceptor.FlexWebSocketInterceptor;
import com.ruoyi.common.websocket.listener.WebSocketTopicListener;
import org.springframework.boot.autoconfigure.AutoConfiguration;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.web.socket.config.annotation.EnableWebSocket;
import org.springframework.web.socket.config.annotation.WebSocketConfigurer;
import org.springframework.web.socket.server.HandshakeInterceptor;
/**
* WebSocket 配置
*
* @author zendwang
*/
@AutoConfiguration
@ConditionalOnProperty(value = "websocket.enabled", havingValue = "true")
@EnableConfigurationProperties(WebSocketProperties.class)
@EnableWebSocket
public class WebSocketConfig {
@Bean
public WebSocketConfigurer webSocketConfigurer(HandshakeInterceptor handshakeInterceptor,
org.springframework.web.socket.WebSocketHandler webSocketHandler,
WebSocketProperties webSocketProperties) {
if (StrUtil.isBlank(webSocketProperties.getPath())) {
webSocketProperties.setPath("/websocket");
}
if (StrUtil.isBlank(webSocketProperties.getAllowedOrigins())) {
webSocketProperties.setAllowedOrigins("*");
}
return registry -> registry
.addHandler(webSocketHandler, webSocketProperties.getPath())
.addInterceptors(handshakeInterceptor)
.setAllowedOrigins(webSocketProperties.getAllowedOrigins());
}
@Bean
public HandshakeInterceptor handshakeInterceptor() {
return new FlexWebSocketInterceptor();
}
@Bean
public org.springframework.web.socket.WebSocketHandler webSocketHandler() {
return new FlexWebSocketHandler();
}
@Bean
public WebSocketTopicListener topicListener() {
return new WebSocketTopicListener();
}
}

View File

@ -0,0 +1,26 @@
package com.ruoyi.common.websocket.config.properties;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
/**
* WebSocket 配置项
*
* @author zendwang
*/
@ConfigurationProperties("websocket")
@Data
public class WebSocketProperties {
private Boolean enabled;
/**
* 路径
*/
private String path;
/**
* 设置访问源地址
*/
private String allowedOrigins;
}

View File

@ -0,0 +1,28 @@
package com.ruoyi.common.websocket.constant;
/**
* websocket的常量配置
*
* @author zendwang
*/
public interface WebSocketConstants {
/**
* websocketSession中的参数的key
*/
String LOGIN_USER_KEY = "loginUser";
/**
* 订阅的频道
*/
String WEB_SOCKET_TOPIC = "global:websocket";
/**
* 前端心跳检查的命令
*/
String PING = "ping";
/**
* 服务端心跳恢复的字符串
*/
String PONG = "pong";
}

View File

@ -0,0 +1,29 @@
package com.ruoyi.common.websocket.dto;
import lombok.Data;
import java.io.Serial;
import java.io.Serializable;
import java.util.List;
/**
* 消息的dto
*
* @author zendwang
*/
@Data
public class WebSocketMessageDto implements Serializable {
@Serial
private static final long serialVersionUID = 1L;
/**
* 需要推送到的session key 列表
*/
private List<Long> sessionKeys;
/**
* 需要发送的消息
*/
private String message;
}

View File

@ -0,0 +1,101 @@
package com.ruoyi.common.websocket.handler;
import com.ruoyi.common.core.core.domain.model.LoginUser;
import com.ruoyi.common.websocket.constant.WebSocketConstants;
import com.ruoyi.common.websocket.dto.WebSocketMessageDto;
import com.ruoyi.common.websocket.holder.WebSocketSessionHolder;
import com.ruoyi.common.websocket.utils.WebSocketUtils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.socket.*;
import org.springframework.web.socket.handler.AbstractWebSocketHandler;
import java.util.List;
/**
* WebSocketHandler 实现类
*
* @author zendwang
*/
@Slf4j
public class FlexWebSocketHandler extends AbstractWebSocketHandler {
/**
* 连接成功后
*/
@Override
public void afterConnectionEstablished(WebSocketSession session) {
LoginUser loginUser = (LoginUser) session.getAttributes().get(WebSocketConstants.LOGIN_USER_KEY);
WebSocketSessionHolder.addSession(loginUser.getUserId(), session);
log.info("[connect] sessionId: {},userId:{},userType:{}", session.getId(), loginUser.getUserId(), loginUser.getUserType());
}
/**
* 处理发送来的文本消息
*
* @param session
* @param message
* @throws Exception
*/
@Override
protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
LoginUser loginUser = (LoginUser) session.getAttributes().get(WebSocketConstants.LOGIN_USER_KEY);
List<Long> userIds = List.of(loginUser.getUserId());
WebSocketMessageDto webSocketMessageDto = new WebSocketMessageDto();
webSocketMessageDto.setSessionKeys(userIds);
webSocketMessageDto.setMessage(message.getPayload());
WebSocketUtils.publishMessage(webSocketMessageDto);
}
@Override
protected void handleBinaryMessage(WebSocketSession session, BinaryMessage message) throws Exception {
super.handleBinaryMessage(session, message);
}
/**
* 心跳监测的回复
*
* @param session
* @param message
* @throws Exception
*/
@Override
protected void handlePongMessage(WebSocketSession session, PongMessage message) throws Exception {
WebSocketUtils.sendPongMessage(session);
}
/**
* 连接出错时
*
* @param session
* @param exception
* @throws Exception
*/
@Override
public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception {
log.error("[transport error] sessionId: {} , exception:{}", session.getId(), exception.getMessage());
}
/**
* 连接关闭后
*
* @param session
* @param status
*/
@Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus status) {
LoginUser loginUser = (LoginUser) session.getAttributes().get(WebSocketConstants.LOGIN_USER_KEY);
WebSocketSessionHolder.removeSession(loginUser.getUserId());
log.info("[disconnect] sessionId: {},userId:{},userType:{}", session.getId(), loginUser.getUserId(), loginUser.getUserType());
}
/**
* 是否支持分片消息
*
* @return
*/
@Override
public boolean supportsPartialMessages() {
return false;
}
}

View File

@ -0,0 +1,42 @@
package com.ruoyi.common.websocket.holder;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import org.springframework.web.socket.WebSocketSession;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
/**
* WebSocketSession 用于保存当前所有在线的会话信息
*
* @author zendwang
*/
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public class WebSocketSessionHolder {
private static final Map<Long, WebSocketSession> USER_SESSION_MAP = new ConcurrentHashMap<>();
public static void addSession(Long sessionKey, WebSocketSession session) {
USER_SESSION_MAP.put(sessionKey, session);
}
public static void removeSession(Long sessionKey) {
if (USER_SESSION_MAP.containsKey(sessionKey)) {
USER_SESSION_MAP.remove(sessionKey);
}
}
public static WebSocketSession getSessions(Long sessionKey) {
return USER_SESSION_MAP.get(sessionKey);
}
public static Set<Long> getSessionsAll() {
return USER_SESSION_MAP.keySet();
}
public static Boolean existSession(Long sessionKey) {
return USER_SESSION_MAP.containsKey(sessionKey);
}
}

View File

@ -0,0 +1,51 @@
package com.ruoyi.common.websocket.interceptor;
import com.ruoyi.common.core.core.domain.model.LoginUser;
import com.ruoyi.common.security.utils.LoginHelper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.server.ServerHttpRequest;
import org.springframework.http.server.ServerHttpResponse;
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.server.HandshakeInterceptor;
import java.util.Map;
import static com.ruoyi.common.websocket.constant.WebSocketConstants.LOGIN_USER_KEY;
/**
* WebSocket握手请求的拦截器
*
* @author zendwang
*/
@Slf4j
public class FlexWebSocketInterceptor implements HandshakeInterceptor {
/**
* 握手前
*
* @param request request
* @param response response
* @param wsHandler wsHandler
* @param attributes attributes
* @return 是否握手成功
*/
@Override
public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Map<String, Object> attributes) {
LoginUser loginUser = LoginHelper.getLoginUser();
attributes.put(LOGIN_USER_KEY, loginUser);
return true;
}
/**
* 握手后
*
* @param request request
* @param response response
* @param wsHandler wsHandler
* @param exception 异常
*/
@Override
public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Exception exception) {
}
}

View File

@ -0,0 +1,43 @@
package com.ruoyi.common.websocket.listener;
import cn.hutool.core.collection.CollUtil;
import com.ruoyi.common.websocket.utils.WebSocketUtils;
import com.ruoyi.common.websocket.holder.WebSocketSessionHolder;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.core.Ordered;
/**
* WebSocket 主题订阅监听器
*
* @author zendwang
*/
@Slf4j
public class WebSocketTopicListener implements ApplicationRunner, Ordered {
@Override
public void run(ApplicationArguments args) throws Exception {
WebSocketUtils.subscribeMessage((message) -> {
log.info("WebSocket主题订阅收到消息session keys={} message={}", message.getSessionKeys(), message.getMessage());
// 如果key不为空就按照key发消息 如果为空就群发
if (CollUtil.isNotEmpty(message.getSessionKeys())) {
message.getSessionKeys().forEach(key -> {
if (WebSocketSessionHolder.existSession(key)) {
WebSocketUtils.sendMessage(key, message.getMessage());
}
});
} else {
WebSocketSessionHolder.getSessionsAll().forEach(key -> {
WebSocketUtils.sendMessage(key, message.getMessage());
});
}
});
log.info("初始化WebSocket主题订阅监听器成功");
}
@Override
public int getOrder() {
return -1;
}
}

View File

@ -0,0 +1,110 @@
package com.ruoyi.common.websocket.utils;
import cn.hutool.core.collection.CollUtil;
import com.ruoyi.common.websocket.dto.WebSocketMessageDto;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import com.ruoyi.common.redis.utils.RedisUtils;
import com.ruoyi.common.websocket.holder.WebSocketSessionHolder;
import org.springframework.web.socket.PongMessage;
import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.WebSocketMessage;
import org.springframework.web.socket.WebSocketSession;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.function.Consumer;
import static com.ruoyi.common.websocket.constant.WebSocketConstants.WEB_SOCKET_TOPIC;
/**
* 工具类
*
* @author zendwang
*/
@Slf4j
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public class WebSocketUtils {
/**
* 发送消息
*
* @param sessionKey session主键 一般为用户id
* @param message 消息文本
*/
public static void sendMessage(Long sessionKey, String message) {
WebSocketSession session = WebSocketSessionHolder.getSessions(sessionKey);
sendMessage(session, message);
}
/**
* 订阅消息
*
* @param consumer 自定义处理
*/
public static void subscribeMessage(Consumer<WebSocketMessageDto> consumer) {
RedisUtils.subscribe(WEB_SOCKET_TOPIC, WebSocketMessageDto.class, consumer);
}
/**
* 发布订阅的消息
*
* @param webSocketMessage 消息对象
*/
public static void publishMessage(WebSocketMessageDto webSocketMessage) {
List<Long> unsentSessionKeys = new ArrayList<>();
// 当前服务内session,直接发送消息
for (Long sessionKey : webSocketMessage.getSessionKeys()) {
if (WebSocketSessionHolder.existSession(sessionKey)) {
WebSocketUtils.sendMessage(sessionKey, webSocketMessage.getMessage());
continue;
}
unsentSessionKeys.add(sessionKey);
}
// 不在当前服务内session,发布订阅消息
if (CollUtil.isNotEmpty(unsentSessionKeys)) {
WebSocketMessageDto broadcastMessage = new WebSocketMessageDto();
broadcastMessage.setMessage(webSocketMessage.getMessage());
broadcastMessage.setSessionKeys(unsentSessionKeys);
RedisUtils.publish(WEB_SOCKET_TOPIC, broadcastMessage, consumer -> {
log.info(" WebSocket发送主题订阅消息topic:{} session keys:{} message:{}",
WEB_SOCKET_TOPIC, unsentSessionKeys, webSocketMessage.getMessage());
});
}
}
/**
* 发布订阅的消息(群发)
*
* @param message 消息内容
*/
public static void publishAll(String message) {
WebSocketMessageDto broadcastMessage = new WebSocketMessageDto();
broadcastMessage.setMessage(message);
RedisUtils.publish(WEB_SOCKET_TOPIC, broadcastMessage, consumer -> {
log.info("WebSocket发送主题订阅消息topic:{} message:{}", WEB_SOCKET_TOPIC, message);
});
}
public static void sendPongMessage(WebSocketSession session) {
sendMessage(session, new PongMessage());
}
public static void sendMessage(WebSocketSession session, String message) {
sendMessage(session, new TextMessage(message));
}
private static void sendMessage(WebSocketSession session, WebSocketMessage<?> message) {
if (session == null || !session.isOpen()) {
log.warn("[send] session会话已经关闭");
} else {
try {
session.sendMessage(message);
} catch (IOException e) {
log.error("[send] session({}) 发送消息({}) 异常", session, message, e);
}
}
}
}

View File

@ -0,0 +1 @@
com.ruoyi.common.websocket.config.WebSocketConfig