diff --git a/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/config/YudaoMQAutoConfiguration.java b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/config/YudaoMQAutoConfiguration.java index cd6739c5a..3def221f3 100644 --- a/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/config/YudaoMQAutoConfiguration.java +++ b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/config/YudaoMQAutoConfiguration.java @@ -2,6 +2,7 @@ package cn.iocoder.yudao.framework.mq.config; import cn.hutool.system.SystemUtil; import cn.iocoder.yudao.framework.mq.core.RedisMQTemplate; +import cn.iocoder.yudao.framework.mq.core.interceptor.RedisMessageInterceptor; import cn.iocoder.yudao.framework.mq.core.pubsub.AbstractChannelMessageListener; import cn.iocoder.yudao.framework.mq.core.stream.AbstractStreamMessageListener; import cn.iocoder.yudao.framework.redis.config.YudaoRedisAutoConfiguration; @@ -9,7 +10,6 @@ import lombok.extern.slf4j.Slf4j; import org.springframework.boot.autoconfigure.AutoConfigureAfter; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; -import org.springframework.data.redis.connection.RedisConnectionFactory; import org.springframework.data.redis.connection.stream.Consumer; import org.springframework.data.redis.connection.stream.ObjectRecord; import org.springframework.data.redis.connection.stream.ReadOffset; @@ -33,8 +33,12 @@ import java.util.List; public class YudaoMQAutoConfiguration { @Bean - public RedisMQTemplate redisMQTemplate(StringRedisTemplate stringRedisTemplate) { - return new RedisMQTemplate(stringRedisTemplate); + public RedisMQTemplate redisMQTemplate(StringRedisTemplate redisTemplate, + List interceptors) { + RedisMQTemplate redisMQTemplate = new RedisMQTemplate(redisTemplate); + // 添加拦截器 + interceptors.forEach(redisMQTemplate::addInterceptor); + return redisMQTemplate; } // ========== 消费者相关 ========== @@ -44,13 +48,14 @@ public class YudaoMQAutoConfiguration { */ @Bean public RedisMessageListenerContainer redisMessageListenerContainer( - RedisConnectionFactory factory, List> listeners) { + RedisMQTemplate redisMQTemplate, List> listeners) { // 创建 RedisMessageListenerContainer 对象 RedisMessageListenerContainer container = new RedisMessageListenerContainer(); // 设置 RedisConnection 工厂。 - container.setConnectionFactory(factory); + container.setConnectionFactory(redisMQTemplate.getRedisTemplate().getRequiredConnectionFactory()); // 添加监听器 listeners.forEach(listener -> { + listener.setRedisMQTemplate(redisMQTemplate); container.addMessageListener(listener, new ChannelTopic(listener.getChannel())); log.info("[redisMessageListenerContainer][注册 Channel({}) 对应的监听器({})]", listener.getChannel(), listener.getClass().getName()); @@ -65,7 +70,8 @@ public class YudaoMQAutoConfiguration { */ @Bean(initMethod = "start", destroyMethod = "stop") public StreamMessageListenerContainer> redisStreamMessageListenerContainer( - RedisTemplate redisTemplate, List> listeners) { + RedisMQTemplate redisMQTemplate, List> listeners) { + RedisTemplate redisTemplate = redisMQTemplate.getRedisTemplate(); // 第一步,创建 StreamMessageListenerContainer 容器 // 创建 options 配置 StreamMessageListenerContainer.StreamMessageListenerContainerOptions> containerOptions = @@ -74,8 +80,8 @@ public class YudaoMQAutoConfiguration { .targetType(String.class) // 目标类型。统一使用 String,通过自己封装的 AbstractStreamMessageListener 去反序列化 .build(); // 创建 container 对象 - StreamMessageListenerContainer> container = StreamMessageListenerContainer.create( - redisTemplate.getRequiredConnectionFactory(), containerOptions); + StreamMessageListenerContainer> container = + StreamMessageListenerContainer.create(redisTemplate.getRequiredConnectionFactory(), containerOptions); // 第二步,注册监听器,消费对应的 Stream 主题 String consumerName = buildConsumerName(); @@ -85,7 +91,7 @@ public class YudaoMQAutoConfiguration { redisTemplate.opsForStream().createGroup(listener.getStreamKey(), listener.getGroup()); } catch (Exception ignore) {} // 设置 listener 对应的 redisTemplate - listener.setRedisTemplate(redisTemplate); + listener.setRedisMQTemplate(redisMQTemplate); // 创建 Consumer 对象 Consumer consumer = Consumer.from(listener.getGroup(), consumerName); // 设置 Consumer 消费进度,以最小消费进度为准 diff --git a/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/core/RedisMQTemplate.java b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/core/RedisMQTemplate.java index 45d1874ea..8a31feda7 100644 --- a/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/core/RedisMQTemplate.java +++ b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/core/RedisMQTemplate.java @@ -1,13 +1,19 @@ package cn.iocoder.yudao.framework.mq.core; import cn.iocoder.yudao.framework.common.util.json.JsonUtils; +import cn.iocoder.yudao.framework.mq.core.interceptor.RedisMessageInterceptor; +import cn.iocoder.yudao.framework.mq.core.message.AbstractRedisMessage; import cn.iocoder.yudao.framework.mq.core.pubsub.AbstractChannelMessage; import cn.iocoder.yudao.framework.mq.core.stream.AbstractStreamMessage; import lombok.AllArgsConstructor; +import lombok.Getter; import org.springframework.data.redis.connection.stream.RecordId; import org.springframework.data.redis.connection.stream.StreamRecords; import org.springframework.data.redis.core.RedisTemplate; +import java.util.ArrayList; +import java.util.List; + /** * Redis MQ 操作模板类 * @@ -16,7 +22,13 @@ import org.springframework.data.redis.core.RedisTemplate; @AllArgsConstructor public class RedisMQTemplate { + @Getter private final RedisTemplate redisTemplate; + /** + * 拦截器数组 + */ + @Getter + private final List interceptors = new ArrayList<>(); /** * 发送 Redis 消息,基于 Redis pub/sub 实现 @@ -24,7 +36,13 @@ public class RedisMQTemplate { * @param message 消息 */ public void send(T message) { - redisTemplate.convertAndSend(message.getChannel(), JsonUtils.toJsonString(message)); + try { + sendMessageBefore(message); + // 发送消息 + redisTemplate.convertAndSend(message.getChannel(), JsonUtils.toJsonString(message)); + } finally { + sendMessageAfter(message); + } } /** @@ -34,9 +52,36 @@ public class RedisMQTemplate { * @return 消息记录的编号对象 */ public RecordId send(T message) { - return redisTemplate.opsForStream().add(StreamRecords.newRecord() - .ofObject(JsonUtils.toJsonString(message)) // 设置内容 - .withStreamKey(message.getStreamKey())); // 设置 stream key + try { + sendMessageBefore(message); + // 发送消息 + return redisTemplate.opsForStream().add(StreamRecords.newRecord() + .ofObject(JsonUtils.toJsonString(message)) // 设置内容 + .withStreamKey(message.getStreamKey())); // 设置 stream key + } finally { + sendMessageAfter(message); + } + } + + /** + * 添加拦截器 + * + * @param interceptor 拦截器 + */ + public void addInterceptor(RedisMessageInterceptor interceptor) { + interceptors.add(interceptor); + } + + private void sendMessageBefore(AbstractRedisMessage message) { + // 正序 + interceptors.forEach(interceptor -> interceptor.sendMessageBefore(message)); + } + + private void sendMessageAfter(AbstractRedisMessage message) { + // 倒序 + for (int i = interceptors.size() - 1; i >= 0; i--) { + interceptors.get(i).sendMessageAfter(message); + } } } diff --git a/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/core/interceptor/RedisMessageInterceptor.java b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/core/interceptor/RedisMessageInterceptor.java new file mode 100644 index 000000000..11d8e1337 --- /dev/null +++ b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/core/interceptor/RedisMessageInterceptor.java @@ -0,0 +1,26 @@ +package cn.iocoder.yudao.framework.mq.core.interceptor; + +import cn.iocoder.yudao.framework.mq.core.message.AbstractRedisMessage; + +/** + * {@link AbstractRedisMessage} 消息拦截器 + * 通过拦截器,作为插件机制,实现拓展。 + * 例如说,多租户场景下的 MQ 消息处理 + * + * @author 芋道源码 + */ +public interface RedisMessageInterceptor { + + default void sendMessageBefore(AbstractRedisMessage message) { + } + + default void sendMessageAfter(AbstractRedisMessage message) { + } + + default void consumeMessageBefore(AbstractRedisMessage message) { + } + + default void consumeMessageAfter(AbstractRedisMessage message) { + } + +} diff --git a/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/core/pubsub/AbstractChannelMessageListener.java b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/core/pubsub/AbstractChannelMessageListener.java index ff0725580..8585aafe6 100644 --- a/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/core/pubsub/AbstractChannelMessageListener.java +++ b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/core/pubsub/AbstractChannelMessageListener.java @@ -2,11 +2,16 @@ package cn.iocoder.yudao.framework.mq.core.pubsub; import cn.hutool.core.util.TypeUtil; import cn.iocoder.yudao.framework.common.util.json.JsonUtils; +import cn.iocoder.yudao.framework.mq.core.RedisMQTemplate; +import cn.iocoder.yudao.framework.mq.core.interceptor.RedisMessageInterceptor; +import cn.iocoder.yudao.framework.mq.core.message.AbstractRedisMessage; +import lombok.Setter; import lombok.SneakyThrows; import org.springframework.data.redis.connection.Message; import org.springframework.data.redis.connection.MessageListener; import java.lang.reflect.Type; +import java.util.List; /** * Redis Pub/Sub 监听器抽象类,用于实现广播消费 @@ -25,6 +30,11 @@ public abstract class AbstractChannelMessageListener) type; } + private void consumeMessageBefore(AbstractRedisMessage message) { + assert redisMQTemplate != null; + List interceptors = redisMQTemplate.getInterceptors(); + // 正序 + interceptors.forEach(interceptor -> interceptor.consumeMessageBefore(message)); + } + + private void consumeMessageAfter(AbstractRedisMessage message) { + assert redisMQTemplate != null; + List interceptors = redisMQTemplate.getInterceptors(); + // 倒序 + for (int i = interceptors.size() - 1; i >= 0; i--) { + interceptors.get(i).consumeMessageAfter(message); + } + } + } diff --git a/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/core/stream/AbstractStreamMessage.java b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/core/stream/AbstractStreamMessage.java index bafa685b3..29ea833f3 100644 --- a/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/core/stream/AbstractStreamMessage.java +++ b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/core/stream/AbstractStreamMessage.java @@ -1,5 +1,6 @@ package cn.iocoder.yudao.framework.mq.core.stream; +import cn.iocoder.yudao.framework.mq.core.message.AbstractRedisMessage; import com.fasterxml.jackson.annotation.JsonIgnore; /** @@ -7,7 +8,7 @@ import com.fasterxml.jackson.annotation.JsonIgnore; * * @author 芋道源码 */ -public abstract class AbstractStreamMessage { +public abstract class AbstractStreamMessage extends AbstractRedisMessage { /** * 获得 Redis Stream Key diff --git a/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/core/stream/AbstractStreamMessageListener.java b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/core/stream/AbstractStreamMessageListener.java index 8ccac0ce5..1c4d91606 100644 --- a/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/core/stream/AbstractStreamMessageListener.java +++ b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/core/stream/AbstractStreamMessageListener.java @@ -2,15 +2,18 @@ package cn.iocoder.yudao.framework.mq.core.stream; import cn.hutool.core.util.TypeUtil; import cn.iocoder.yudao.framework.common.util.json.JsonUtils; +import cn.iocoder.yudao.framework.mq.core.RedisMQTemplate; +import cn.iocoder.yudao.framework.mq.core.interceptor.RedisMessageInterceptor; +import cn.iocoder.yudao.framework.mq.core.message.AbstractRedisMessage; import lombok.Getter; import lombok.Setter; import lombok.SneakyThrows; import org.springframework.beans.factory.annotation.Value; import org.springframework.data.redis.connection.stream.ObjectRecord; -import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.stream.StreamListener; import java.lang.reflect.Type; +import java.util.List; /** * Redis Stream 监听器抽象类,用于实现集群消费 @@ -39,10 +42,10 @@ public abstract class AbstractStreamMessageListener redisTemplate; + private RedisMQTemplate redisMQTemplate; @SneakyThrows protected AbstractStreamMessageListener() { @@ -54,14 +57,20 @@ public abstract class AbstractStreamMessageListener message) { // 消费消息 T messageObj = JsonUtils.parseObject(message.getValue(), messageType); - this.onMessage(messageObj); - // ack 消息消费完成 - redisTemplate.opsForStream().acknowledge(group, message); - // TODO 芋艿:需要额外考虑以下几个点: - // 1. 处理异常的情况 - // 2. 发送日志;以及事务的结合 - // 3. 消费日志;以及通用的幂等性 - // 4. 消费失败的重试,https://zhuanlan.zhihu.com/p/60501638 + try { + consumeMessageBefore(messageObj); + // 消费消息 + this.onMessage(messageObj); + // ack 消息消费完成 + redisMQTemplate.getRedisTemplate().opsForStream().acknowledge(group, message); + // TODO 芋艿:需要额外考虑以下几个点: + // 1. 处理异常的情况 + // 2. 发送日志;以及事务的结合 + // 3. 消费日志;以及通用的幂等性 + // 4. 消费失败的重试,https://zhuanlan.zhihu.com/p/60501638 + } finally { + consumeMessageAfter(messageObj); + } } /** @@ -85,4 +94,20 @@ public abstract class AbstractStreamMessageListener) type; } + private void consumeMessageBefore(AbstractRedisMessage message) { + assert redisMQTemplate != null; + List interceptors = redisMQTemplate.getInterceptors(); + // 正序 + interceptors.forEach(interceptor -> interceptor.consumeMessageBefore(message)); + } + + private void consumeMessageAfter(AbstractRedisMessage message) { + assert redisMQTemplate != null; + List interceptors = redisMQTemplate.getInterceptors(); + // 倒序 + for (int i = interceptors.size() - 1; i >= 0; i--) { + interceptors.get(i).consumeMessageAfter(message); + } + } + }