优化 Redis pub/sub 的封装,提升使用的便利性

This commit is contained in:
YunaiV 2021-01-22 21:50:23 +08:00
parent 2a349972b4
commit c10ab1753a
4 changed files with 83 additions and 17 deletions

View File

@ -1,14 +1,31 @@
package cn.iocoder.dashboard.framework.redis.core.listener; package cn.iocoder.dashboard.framework.redis.core.listener;
import cn.hutool.core.util.ArrayUtil;
import cn.iocoder.dashboard.util.json.JSONUtils;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener; import org.springframework.data.redis.connection.MessageListener;
import sun.reflect.generics.reflectiveObjects.ParameterizedTypeImpl;
import java.lang.reflect.Type;
/** /**
* Redis Pub/Sub 监听器抽象类用于实现广播消费 * Redis Pub/Sub 监听器抽象类用于实现广播消费
* *
* @param <T> 消息类型一定要填写噢不然会报错
*
* @author 芋道源码 * @author 芋道源码
*/ */
public abstract class AbstractMessageListener<T> implements MessageListener { public abstract class AbstractMessageListener<T> implements MessageListener {
/**
* 消息类型
*/
private final Class<T> messageType;
protected AbstractMessageListener() {
this.messageType = getMessageClass();
}
/** /**
* 获得 Sub 订阅的 Redis Channel 通道 * 获得 Sub 订阅的 Redis Channel 通道
* *
@ -16,4 +33,41 @@ public abstract class AbstractMessageListener<T> implements MessageListener {
*/ */
public abstract String getChannel(); public abstract String getChannel();
@Override
public final void onMessage(Message message, byte[] bytes) {
T messageObj = JSONUtils.parseObject(message.getBody(), messageType);
this.onMessage(messageObj);
}
/**
* 处理消息
*
* @param message 消息
*/
public abstract void onMessage(T message);
/**
* 通过解析类上的泛型获得消息类型
*
* @return 消息类型
*/
@SuppressWarnings("unchecked")
private Class<T> getMessageClass() {
Class<?> targetClass = getClass();
while (targetClass.getSuperclass() != null) {
// 如果不是 AbstractMessageListener 父类继续向上查找
if (targetClass.getSuperclass() != AbstractMessageListener.class) {
targetClass = targetClass.getSuperclass();
continue;
}
// 如果是 AbstractMessageListener 父类则解析泛型
Type[] types = ((ParameterizedTypeImpl) targetClass.getGenericSuperclass()).getActualTypeArguments();
if (ArrayUtil.isEmpty(types)) {
throw new IllegalStateException(String.format("类型(%s) 需要设置消息类型", getClass().getName()));
}
return (Class<T>) types[0];
}
throw new IllegalStateException(String.format("类型(%s) 找不到 AbstractMessageListener 父类", getClass().getName()));
}
} }

View File

@ -2,15 +2,23 @@ package cn.iocoder.dashboard.modules.system.mq.consumer;
import cn.iocoder.dashboard.framework.redis.core.listener.AbstractMessageListener; import cn.iocoder.dashboard.framework.redis.core.listener.AbstractMessageListener;
import cn.iocoder.dashboard.modules.system.mq.message.permission.SysMenuRefreshMessage; import cn.iocoder.dashboard.modules.system.mq.message.permission.SysMenuRefreshMessage;
import org.springframework.data.redis.connection.Message; import cn.iocoder.dashboard.modules.system.service.permission.SysMenuService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import javax.annotation.Resource;
@Component @Component
public class SysMenuRefreshConsumer extends AbstractMessageListener { @Slf4j
public class SysMenuRefreshConsumer extends AbstractMessageListener<SysMenuRefreshMessage> {
@Resource
private SysMenuService menuService;
@Override @Override
public void onMessage(Message message, byte[] bytes) { public void onMessage(SysMenuRefreshMessage message) {
System.out.println(message); log.info("[onMessage][收到 Menu 刷新消息]");
menuService.init();
} }
@Override @Override

View File

@ -1,7 +1,8 @@
package cn.iocoder.dashboard.modules.system.mq.producer.permission; package cn.iocoder.dashboard.modules.system.mq.producer.permission;
import cn.iocoder.dashboard.modules.system.mq.message.permission.SysMenuRefreshMessage; import cn.iocoder.dashboard.modules.system.mq.message.permission.SysMenuRefreshMessage;
import org.springframework.data.redis.core.RedisTemplate; import cn.iocoder.dashboard.util.json.JSONUtils;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import javax.annotation.Resource; import javax.annotation.Resource;
@ -13,14 +14,14 @@ import javax.annotation.Resource;
public class SysMenuProducer { public class SysMenuProducer {
@Resource @Resource
private RedisTemplate<String, Object> redisTemplate; private StringRedisTemplate stringRedisTemplate;
/** /**
* 发送 {@link SysMenuRefreshMessage} 消息 * 发送 {@link SysMenuRefreshMessage} 消息
*/ */
public void sendMenuRefreshMessage() { public void sendMenuRefreshMessage() {
SysMenuRefreshMessage message = new SysMenuRefreshMessage(); SysMenuRefreshMessage message = new SysMenuRefreshMessage();
redisTemplate.convertAndSend(SysMenuRefreshMessage.TOPIC, message); stringRedisTemplate.convertAndSend(SysMenuRefreshMessage.TOPIC, JSONUtils.toJSONString(message));
} }
} }

View File

@ -1,5 +1,7 @@
package cn.iocoder.dashboard.util.json; package cn.iocoder.dashboard.util.json;
import com.alibaba.fastjson.JSON;
/** /**
* JSON 工具类 * JSON 工具类
* *
@ -7,15 +9,16 @@ package cn.iocoder.dashboard.util.json;
*/ */
public class JSONUtils { public class JSONUtils {
// public static Map<String, Object> toJSONMap(Object javaObject) { public static String toJSONString(Object object) {
// return (Map<String, Object>) JSON.toJSON(javaObject); return JSON.toJSONString(object);
// } }
//
// public static void main(String[] args) { public static <T> T parseObject(String text, Class<T> clazz) {
// SysDictTypeCreateReqVO createReqVO = new SysDictTypeCreateReqVO(); return JSON.parseObject(text, clazz);
// createReqVO.setType("1"); }
// createReqVO.setRemark("2");
// System.out.println(toJSONMap(createReqVO)); public static <T> T parseObject(byte[] bytes, Class<T> clazz) {
// } return JSON.parseObject(bytes, clazz);
}
} }