完成基础的 stream 封装

This commit is contained in:
YunaiV 2021-03-21 00:50:33 +08:00
parent 9c76fd4b69
commit d6cc9e23a3
5 changed files with 39 additions and 41 deletions

View File

@ -83,12 +83,15 @@ public class RedisConfig {
redisTemplate.getRequiredConnectionFactory(), containerOptions); redisTemplate.getRequiredConnectionFactory(), containerOptions);
// 第二步注册监听器消费对应的 Stream 主题 // 第二步注册监听器消费对应的 Stream 主题
String consumerName = buildConsumerName(); // String consumerName = buildConsumerName();
String consumerName = "110";
listeners.forEach(listener -> { listeners.forEach(listener -> {
// 创建 listener 对应的消费者分组 // 创建 listener 对应的消费者分组
try { try {
redisTemplate.opsForStream().createGroup(listener.getStreamKey(), listener.getGroup()); redisTemplate.opsForStream().createGroup(listener.getStreamKey(), listener.getGroup());
} catch (Exception ignore) {} } catch (Exception ignore) {}
// 设置 listener 对应的 redisTemplate
listener.setRedisTemplate(redisTemplate);
// 创建 Consumer 对象 // 创建 Consumer 对象
Consumer consumer = Consumer.from(listener.getGroup(), consumerName); Consumer consumer = Consumer.from(listener.getGroup(), consumerName);
// 设置 Consumer 消费进度以最小消费进度为准 // 设置 Consumer 消费进度以最小消费进度为准

View File

@ -1,11 +1,10 @@
package cn.iocoder.dashboard.framework.redis.core.pubsub; package cn.iocoder.dashboard.framework.redis.core.pubsub;
import cn.hutool.core.util.ArrayUtil; import cn.hutool.core.util.TypeUtil;
import cn.iocoder.dashboard.util.json.JsonUtils; import cn.iocoder.dashboard.util.json.JsonUtils;
import lombok.SneakyThrows; import lombok.SneakyThrows;
import org.springframework.data.redis.connection.Message; import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener; import org.springframework.data.redis.connection.MessageListener;
import sun.reflect.generics.reflectiveObjects.ParameterizedTypeImpl;
import java.lang.reflect.Type; import java.lang.reflect.Type;
@ -62,21 +61,11 @@ public abstract class AbstractChannelMessageListener<T extends ChannelMessage> i
*/ */
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
private Class<T> getMessageClass() { private Class<T> getMessageClass() {
Class<?> targetClass = getClass(); Type type = TypeUtil.getTypeArgument(getClass(), 0);
while (targetClass.getSuperclass() != null) { if (type == null) {
// 如果不是 AbstractMessageListener 父类继续向上查找 throw new IllegalStateException(String.format("类型(%s) 需要设置消息类型", getClass().getName()));
if (targetClass.getSuperclass() != AbstractChannelMessageListener.class) {
targetClass = targetClass.getSuperclass();
continue;
}
// 如果是 AbstractMessageListener 父类则解析泛型
Type[] types = ((ParameterizedTypeImpl) targetClass.getGenericSuperclass()).getActualTypeArguments();
if (ArrayUtil.isEmpty(types)) {
throw new IllegalStateException(String.format("类型(%s) 需要设置消息类型", getClass().getName()));
}
return (Class<T>) types[0];
} }
throw new IllegalStateException(String.format("类型(%s) 找不到 AbstractMessageListener 父类", getClass().getName())); return (Class<T>) type;
} }
} }

View File

@ -1,12 +1,14 @@
package cn.iocoder.dashboard.framework.redis.core.stream; package cn.iocoder.dashboard.framework.redis.core.stream;
import cn.hutool.core.util.ArrayUtil; import cn.hutool.core.util.TypeUtil;
import cn.iocoder.dashboard.util.json.JsonUtils;
import lombok.Getter; import lombok.Getter;
import lombok.Setter;
import lombok.SneakyThrows; import lombok.SneakyThrows;
import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.redis.connection.stream.ObjectRecord; import org.springframework.data.redis.connection.stream.ObjectRecord;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.stream.StreamListener; import org.springframework.data.redis.stream.StreamListener;
import sun.reflect.generics.reflectiveObjects.ParameterizedTypeImpl;
import java.lang.reflect.Type; import java.lang.reflect.Type;
@ -33,9 +35,14 @@ public abstract class AbstractStreamMessageListener<T extends StreamMessage>
/** /**
* Redis 消费者分组默认使用 spring.application.name 名字 * Redis 消费者分组默认使用 spring.application.name 名字
*/ */
@Value("spring.application.name") @Value("${spring.application.name}")
@Getter @Getter
private String group; private String group;
/**
*
*/
@Setter
private RedisTemplate<String, ?> redisTemplate;
@SneakyThrows @SneakyThrows
protected AbstractStreamMessageListener() { protected AbstractStreamMessageListener() {
@ -45,10 +52,16 @@ public abstract class AbstractStreamMessageListener<T extends StreamMessage>
@Override @Override
public void onMessage(ObjectRecord<String, String> message) { public void onMessage(ObjectRecord<String, String> message) {
System.out.println(message); // 消费消息
if (true) { T messageObj = JsonUtils.parseObject(message.getValue(), messageType);
// throw new IllegalStateException("测试下"); this.onMessage(messageObj);
} // ack 消息消费完成
redisTemplate.opsForStream().acknowledge(group, message);
// TODO 芋艿需要额外考虑以下几个点
// 1. 处理异常的情况
// 2. 发送日志以及事务的结合
// 3. 消费日志以及通用的幂等性
// 4. 消费失败的重试https://zhuanlan.zhihu.com/p/60501638
} }
/** /**
@ -64,23 +77,12 @@ public abstract class AbstractStreamMessageListener<T extends StreamMessage>
* @return 消息类型 * @return 消息类型
*/ */
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
// TODO 芋艿稍后重构
private Class<T> getMessageClass() { private Class<T> getMessageClass() {
Class<?> targetClass = getClass(); Type type = TypeUtil.getTypeArgument(getClass(), 0);
while (targetClass.getSuperclass() != null) { if (type == null) {
// 如果不是 AbstractMessageListener 父类继续向上查找 throw new IllegalStateException(String.format("类型(%s) 需要设置消息类型", getClass().getName()));
if (targetClass.getSuperclass() != AbstractStreamMessageListener.class) {
targetClass = targetClass.getSuperclass();
continue;
}
// 如果是 AbstractMessageListener 父类则解析泛型
Type[] types = ((ParameterizedTypeImpl) targetClass.getGenericSuperclass()).getActualTypeArguments();
if (ArrayUtil.isEmpty(types)) {
throw new IllegalStateException(String.format("类型(%s) 需要设置消息类型", getClass().getName()));
}
return (Class<T>) types[0];
} }
throw new IllegalStateException(String.format("类型(%s) 找不到 AbstractStreamMessageListener 父类", getClass().getName())); return (Class<T>) type;
} }
} }

View File

@ -2,14 +2,16 @@ package cn.iocoder.dashboard.modules.system.mq.consumer.mail;
import cn.iocoder.dashboard.framework.redis.core.stream.AbstractStreamMessageListener; import cn.iocoder.dashboard.framework.redis.core.stream.AbstractStreamMessageListener;
import cn.iocoder.dashboard.modules.system.mq.message.mail.SysMailSendMessage; import cn.iocoder.dashboard.modules.system.mq.message.mail.SysMailSendMessage;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
@Component @Component
@Slf4j
public class SysMailSendConsumer extends AbstractStreamMessageListener<SysMailSendMessage> { public class SysMailSendConsumer extends AbstractStreamMessageListener<SysMailSendMessage> {
@Override @Override
public void onMessage(SysMailSendMessage message) { public void onMessage(SysMailSendMessage message) {
log.info("[onMessage][消息内容({})]", message);
} }
} }

View File

@ -2,14 +2,16 @@ package cn.iocoder.dashboard.modules.system.mq.consumer.sms;
import cn.iocoder.dashboard.framework.redis.core.stream.AbstractStreamMessageListener; import cn.iocoder.dashboard.framework.redis.core.stream.AbstractStreamMessageListener;
import cn.iocoder.dashboard.modules.system.mq.message.sms.SysSmsSendMessage; import cn.iocoder.dashboard.modules.system.mq.message.sms.SysSmsSendMessage;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
@Component @Component
@Slf4j
public class SysSmsSendConsumer extends AbstractStreamMessageListener<SysSmsSendMessage> { public class SysSmsSendConsumer extends AbstractStreamMessageListener<SysSmsSendMessage> {
@Override @Override
public void onMessage(SysSmsSendMessage message) { public void onMessage(SysSmsSendMessage message) {
System.out.println(message); log.info("[onMessage][消息内容({})]", message);
} }
} }