mq:优化 redis stream 的命名

This commit is contained in:
YunaiV 2023-11-01 22:46:02 +08:00
parent f8ed0e15f0
commit e1c34e9124
8 changed files with 21 additions and 20 deletions

View File

@ -7,8 +7,8 @@ import cn.iocoder.yudao.framework.common.enums.DocumentEnum;
import cn.iocoder.yudao.framework.mq.redis.core.RedisMQTemplate; import cn.iocoder.yudao.framework.mq.redis.core.RedisMQTemplate;
import cn.iocoder.yudao.framework.mq.redis.core.interceptor.RedisMessageInterceptor; import cn.iocoder.yudao.framework.mq.redis.core.interceptor.RedisMessageInterceptor;
import cn.iocoder.yudao.framework.mq.redis.core.job.RedisPendingMessageResendJob; import cn.iocoder.yudao.framework.mq.redis.core.job.RedisPendingMessageResendJob;
import cn.iocoder.yudao.framework.mq.redis.core.pubsub.AbstractChannelMessageListener; import cn.iocoder.yudao.framework.mq.redis.core.pubsub.AbstractRedisChannelMessageListener;
import cn.iocoder.yudao.framework.mq.redis.core.stream.AbstractStreamMessageListener; import cn.iocoder.yudao.framework.mq.redis.core.stream.AbstractRedisStreamMessageListener;
import cn.iocoder.yudao.framework.redis.config.YudaoRedisAutoConfiguration; import cn.iocoder.yudao.framework.redis.config.YudaoRedisAutoConfiguration;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RedissonClient; import org.redisson.api.RedissonClient;
@ -57,9 +57,9 @@ public class YudaoRedisMQAutoConfiguration {
* 创建 Redis Pub/Sub 广播消费的容器 * 创建 Redis Pub/Sub 广播消费的容器
*/ */
@Bean(initMethod = "start", destroyMethod = "stop") @Bean(initMethod = "start", destroyMethod = "stop")
@ConditionalOnBean(AbstractChannelMessageListener.class) // 只有 AbstractChannelMessageListener 存在的时候才需要注册 Redis pubsub 监听 @ConditionalOnBean(AbstractRedisChannelMessageListener.class) // 只有 AbstractChannelMessageListener 存在的时候才需要注册 Redis pubsub 监听
public RedisMessageListenerContainer redisMessageListenerContainer( public RedisMessageListenerContainer redisMessageListenerContainer(
RedisMQTemplate redisMQTemplate, List<AbstractChannelMessageListener<?>> listeners) { RedisMQTemplate redisMQTemplate, List<AbstractRedisChannelMessageListener<?>> listeners) {
// 创建 RedisMessageListenerContainer 对象 // 创建 RedisMessageListenerContainer 对象
RedisMessageListenerContainer container = new RedisMessageListenerContainer(); RedisMessageListenerContainer container = new RedisMessageListenerContainer();
// 设置 RedisConnection 工厂 // 设置 RedisConnection 工厂
@ -78,8 +78,8 @@ public class YudaoRedisMQAutoConfiguration {
* 创建 Redis Stream 重新消费的任务 * 创建 Redis Stream 重新消费的任务
*/ */
@Bean @Bean
@ConditionalOnBean(AbstractStreamMessageListener.class) // 只有 AbstractStreamMessageListener 存在的时候才需要注册 Redis pubsub 监听 @ConditionalOnBean(AbstractRedisStreamMessageListener.class) // 只有 AbstractStreamMessageListener 存在的时候才需要注册 Redis pubsub 监听
public RedisPendingMessageResendJob redisPendingMessageResendJob(List<AbstractStreamMessageListener<?>> listeners, public RedisPendingMessageResendJob redisPendingMessageResendJob(List<AbstractRedisStreamMessageListener<?>> listeners,
RedisMQTemplate redisTemplate, RedisMQTemplate redisTemplate,
@Value("${spring.application.name}") String groupName, @Value("${spring.application.name}") String groupName,
RedissonClient redissonClient) { RedissonClient redissonClient) {
@ -92,9 +92,9 @@ public class YudaoRedisMQAutoConfiguration {
* 基础知识<a href="https://www.geek-book.com/src/docs/redis/redis/redis.io/commands/xreadgroup.html">Redis Stream xreadgroup 命令</a> * 基础知识<a href="https://www.geek-book.com/src/docs/redis/redis/redis.io/commands/xreadgroup.html">Redis Stream xreadgroup 命令</a>
*/ */
@Bean(initMethod = "start", destroyMethod = "stop") @Bean(initMethod = "start", destroyMethod = "stop")
@ConditionalOnBean(AbstractStreamMessageListener.class) // 只有 AbstractStreamMessageListener 存在的时候才需要注册 Redis pubsub 监听 @ConditionalOnBean(AbstractRedisStreamMessageListener.class) // 只有 AbstractStreamMessageListener 存在的时候才需要注册 Redis pubsub 监听
public StreamMessageListenerContainer<String, ObjectRecord<String, String>> redisStreamMessageListenerContainer( public StreamMessageListenerContainer<String, ObjectRecord<String, String>> redisStreamMessageListenerContainer(
RedisMQTemplate redisMQTemplate, List<AbstractStreamMessageListener<?>> listeners) { RedisMQTemplate redisMQTemplate, List<AbstractRedisStreamMessageListener<?>> listeners) {
RedisTemplate<String, ?> redisTemplate = redisMQTemplate.getRedisTemplate(); RedisTemplate<String, ?> redisTemplate = redisMQTemplate.getRedisTemplate();
checkRedisVersion(redisTemplate); checkRedisVersion(redisTemplate);
// 第一步创建 StreamMessageListenerContainer 容器 // 第一步创建 StreamMessageListenerContainer 容器

View File

@ -3,8 +3,8 @@ package cn.iocoder.yudao.framework.mq.redis.core;
import cn.iocoder.yudao.framework.common.util.json.JsonUtils; import cn.iocoder.yudao.framework.common.util.json.JsonUtils;
import cn.iocoder.yudao.framework.mq.redis.core.interceptor.RedisMessageInterceptor; import cn.iocoder.yudao.framework.mq.redis.core.interceptor.RedisMessageInterceptor;
import cn.iocoder.yudao.framework.mq.redis.core.message.AbstractRedisMessage; import cn.iocoder.yudao.framework.mq.redis.core.message.AbstractRedisMessage;
import cn.iocoder.yudao.framework.mq.redis.core.pubsub.AbstractChannelMessage; import cn.iocoder.yudao.framework.mq.redis.core.pubsub.AbstractRedisChannelMessage;
import cn.iocoder.yudao.framework.mq.redis.core.stream.AbstractStreamMessage; import cn.iocoder.yudao.framework.mq.redis.core.stream.AbstractRedisStreamMessage;
import lombok.AllArgsConstructor; import lombok.AllArgsConstructor;
import lombok.Getter; import lombok.Getter;
import org.springframework.data.redis.connection.stream.RecordId; import org.springframework.data.redis.connection.stream.RecordId;
@ -35,7 +35,7 @@ public class RedisMQTemplate {
* *
* @param message 消息 * @param message 消息
*/ */
public <T extends AbstractChannelMessage> void send(T message) { public <T extends AbstractRedisChannelMessage> void send(T message) {
try { try {
sendMessageBefore(message); sendMessageBefore(message);
// 发送消息 // 发送消息
@ -51,7 +51,7 @@ public class RedisMQTemplate {
* @param message 消息 * @param message 消息
* @return 消息记录的编号对象 * @return 消息记录的编号对象
*/ */
public <T extends AbstractStreamMessage> RecordId send(T message) { public <T extends AbstractRedisStreamMessage> RecordId send(T message) {
try { try {
sendMessageBefore(message); sendMessageBefore(message);
// 发送消息 // 发送消息

View File

@ -2,7 +2,7 @@ package cn.iocoder.yudao.framework.mq.redis.core.job;
import cn.hutool.core.collection.CollUtil; import cn.hutool.core.collection.CollUtil;
import cn.iocoder.yudao.framework.mq.redis.core.RedisMQTemplate; import cn.iocoder.yudao.framework.mq.redis.core.RedisMQTemplate;
import cn.iocoder.yudao.framework.mq.redis.core.stream.AbstractStreamMessageListener; import cn.iocoder.yudao.framework.mq.redis.core.stream.AbstractRedisStreamMessageListener;
import lombok.AllArgsConstructor; import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RLock; import org.redisson.api.RLock;
@ -33,7 +33,7 @@ public class RedisPendingMessageResendJob {
*/ */
private static final int EXPIRE_TIME = 5 * 60; private static final int EXPIRE_TIME = 5 * 60;
private final List<AbstractStreamMessageListener<?>> listeners; private final List<AbstractRedisStreamMessageListener<?>> listeners;
private final RedisMQTemplate redisTemplate; private final RedisMQTemplate redisTemplate;
private final String groupName; private final String groupName;
private final RedissonClient redissonClient; private final RedissonClient redissonClient;

View File

@ -8,7 +8,7 @@ import com.fasterxml.jackson.annotation.JsonIgnore;
* *
* @author 芋道源码 * @author 芋道源码
*/ */
public abstract class AbstractChannelMessage extends AbstractRedisMessage { public abstract class AbstractRedisChannelMessage extends AbstractRedisMessage {
/** /**
* 获得 Redis Channel默认使用类名 * 获得 Redis Channel默认使用类名

View File

@ -20,7 +20,7 @@ import java.util.List;
* *
* @author 芋道源码 * @author 芋道源码
*/ */
public abstract class AbstractChannelMessageListener<T extends AbstractChannelMessage> implements MessageListener { public abstract class AbstractRedisChannelMessageListener<T extends AbstractRedisChannelMessage> implements MessageListener {
/** /**
* 消息类型 * 消息类型
@ -37,7 +37,7 @@ public abstract class AbstractChannelMessageListener<T extends AbstractChannelMe
private RedisMQTemplate redisMQTemplate; private RedisMQTemplate redisMQTemplate;
@SneakyThrows @SneakyThrows
protected AbstractChannelMessageListener() { protected AbstractRedisChannelMessageListener() {
this.messageType = getMessageClass(); this.messageType = getMessageClass();
this.channel = messageType.getDeclaredConstructor().newInstance().getChannel(); this.channel = messageType.getDeclaredConstructor().newInstance().getChannel();
} }

View File

@ -8,7 +8,7 @@ import com.fasterxml.jackson.annotation.JsonIgnore;
* *
* @author 芋道源码 * @author 芋道源码
*/ */
public abstract class AbstractStreamMessage extends AbstractRedisMessage { public abstract class AbstractRedisStreamMessage extends AbstractRedisMessage {
/** /**
* 获得 Redis Stream Key默认使用类名 * 获得 Redis Stream Key默认使用类名

View File

@ -22,7 +22,7 @@ import java.util.List;
* *
* @author 芋道源码 * @author 芋道源码
*/ */
public abstract class AbstractStreamMessageListener<T extends AbstractStreamMessage> public abstract class AbstractRedisStreamMessageListener<T extends AbstractRedisStreamMessage>
implements StreamListener<String, ObjectRecord<String, String>> { implements StreamListener<String, ObjectRecord<String, String>> {
/** /**
@ -48,7 +48,7 @@ public abstract class AbstractStreamMessageListener<T extends AbstractStreamMess
private RedisMQTemplate redisMQTemplate; private RedisMQTemplate redisMQTemplate;
@SneakyThrows @SneakyThrows
protected AbstractStreamMessageListener() { protected AbstractRedisStreamMessageListener() {
this.messageType = getMessageClass(); this.messageType = getMessageClass();
this.streamKey = messageType.getDeclaredConstructor().newInstance().getStreamKey(); this.streamKey = messageType.getDeclaredConstructor().newInstance().getStreamKey();
} }

View File

@ -0,0 +1 @@
<http://www.iocoder.cn/Spring-Boot/Event/?yudao>