From 9c76fd4b69110a5ad802dd77ed89fc31d5364473 Mon Sep 17 00:00:00 2001 From: YunaiV Date: Sat, 20 Mar 2021 23:47:05 +0800 Subject: [PATCH] =?UTF-8?q?1.=20=E5=8D=87=E7=BA=A7=20springboot=20?= =?UTF-8?q?=E5=88=B0=E6=9C=80=E6=96=B0=E7=9A=84=E7=89=88=E6=9C=AC=EF=BC=8C?= =?UTF-8?q?=E8=A7=A3=E5=86=B3=20spring=20data=20redis=20=E5=AD=98=E5=82=A8?= =?UTF-8?q?=E7=9A=84=20bug=202.=20=E6=A2=B3=E7=90=86=20StreamMessageListen?= =?UTF-8?q?erContainer=20Bean=20=E7=9A=84=E5=88=9B=E5=BB=BA?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pom.xml | 30 +----- .../framework/redis/config/RedisConfig.java | 95 ++++++++++--------- .../stream/AbstractStreamMessageListener.java | 3 + .../redis/core/util/RedisMessageUtils.java | 2 +- .../redis/core/stream/RedisStreamTest.java | 16 +++- 5 files changed, 70 insertions(+), 76 deletions(-) diff --git a/pom.xml b/pom.xml index bba8d18cc..96e0e08ce 100644 --- a/pom.xml +++ b/pom.xml @@ -22,15 +22,15 @@ ${java.version} 3.8.0 - 2.4.2 + 2.4.4 3.0.2 1.5.22 5.1.46 1.2.4 - 3.4.1 - 3.14.1 + 3.4.2 + 3.15.1 1.7.0 @@ -42,7 +42,7 @@ 1.16.14 1.4.1.Final - 5.5.6 + 5.6.1 2.2.7 2.2 1.0.5 @@ -249,27 +249,7 @@ cn.hutool - hutool-core - ${hutool.version} - - - cn.hutool - hutool-extra - ${hutool.version} - - - cn.hutool - hutool-captcha - ${hutool.version} - - - cn.hutool - hutool-http - ${hutool.version} - - - cn.hutool - hutool-crypto + hutool-all ${hutool.version} diff --git a/src/main/java/cn/iocoder/dashboard/framework/redis/config/RedisConfig.java b/src/main/java/cn/iocoder/dashboard/framework/redis/config/RedisConfig.java index 13cda9dbb..f069738e5 100644 --- a/src/main/java/cn/iocoder/dashboard/framework/redis/config/RedisConfig.java +++ b/src/main/java/cn/iocoder/dashboard/framework/redis/config/RedisConfig.java @@ -1,21 +1,22 @@ package cn.iocoder.dashboard.framework.redis.config; -import cn.hutool.core.net.NetUtil; +import cn.hutool.system.SystemUtil; import cn.iocoder.dashboard.framework.redis.core.pubsub.AbstractChannelMessageListener; import cn.iocoder.dashboard.framework.redis.core.stream.AbstractStreamMessageListener; import lombok.extern.slf4j.Slf4j; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.data.redis.connection.RedisConnectionFactory; -import org.springframework.data.redis.connection.stream.*; +import org.springframework.data.redis.connection.stream.Consumer; +import org.springframework.data.redis.connection.stream.ObjectRecord; +import org.springframework.data.redis.connection.stream.ReadOffset; +import org.springframework.data.redis.connection.stream.StreamOffset; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.listener.ChannelTopic; import org.springframework.data.redis.listener.RedisMessageListenerContainer; import org.springframework.data.redis.serializer.RedisSerializer; import org.springframework.data.redis.stream.StreamMessageListenerContainer; -import org.springframework.util.ErrorHandler; -import java.time.Duration; import java.util.List; /** @@ -25,6 +26,9 @@ import java.util.List; @Slf4j public class RedisConfig { + /** + * 创建 RedisTemplate Bean,使用 JSON 序列化方式 + */ @Bean public RedisTemplate redisTemplate(RedisConnectionFactory factory) { // 创建 RedisTemplate 对象 @@ -33,11 +37,16 @@ public class RedisConfig { template.setConnectionFactory(factory); // 使用 String 序列化方式,序列化 KEY 。 template.setKeySerializer(RedisSerializer.string()); + template.setHashKeySerializer(RedisSerializer.string()); // 使用 JSON 序列化方式(库是 Jackson ),序列化 VALUE 。 template.setValueSerializer(RedisSerializer.json()); + template.setHashValueSerializer(RedisSerializer.json()); return template; } + /** + * 创建 Redis Pub/Sub 广播消费的容器 + */ @Bean public RedisMessageListenerContainer redisMessageListenerContainer(RedisConnectionFactory factory, List> listeners) { @@ -54,52 +63,48 @@ public class RedisConfig { return container; } + /** + * 创建 Redis Stream 集群消费的容器 + * + * Redis Stream 的 xreadgroup 命令:https://www.geek-book.com/src/docs/redis/redis/redis.io/commands/xreadgroup.html + */ @Bean(initMethod = "start", destroyMethod = "stop") - public StreamMessageListenerContainer> redisStreamMessageListenerContainer( - RedisConnectionFactory factory, List> listeners) { - // 创建配置对象 - StreamMessageListenerContainer.StreamMessageListenerContainerOptions> - streamMessageListenerContainerOptions = StreamMessageListenerContainer.StreamMessageListenerContainerOptions - .builder() - // 一次性最多拉取多少条消息 - .batchSize(10) - // 执行消息轮询的执行器 - // .executor(this.threadPoolTaskExecutor) - // 消息消费异常的handler - .errorHandler(new ErrorHandler() { - @Override - public void handleError(Throwable t) { - // throw new RuntimeException(t); - t.printStackTrace(); - } - }) - // 超时时间,设置为0,表示不超时(超时后会抛出异常) - .pollTimeout(Duration.ZERO) - // 序列化器 - .serializer(RedisSerializer.string()) - .targetType(String.class) - .build(); + public StreamMessageListenerContainer> redisStreamMessageListenerContainer(RedisTemplate redisTemplate, + List> listeners) { + // 第一步,创建 StreamMessageListenerContainer 容器 + // 创建 options 配置 + StreamMessageListenerContainer.StreamMessageListenerContainerOptions> containerOptions = + StreamMessageListenerContainer.StreamMessageListenerContainerOptions.builder() + .batchSize(10) // 一次性最多拉取多少条消息 + .targetType(String.class) // 目标类型。统一使用 String,通过自己封装的 AbstractStreamMessageListener 去反序列化 + .build(); + // 创建 container 对象 + StreamMessageListenerContainer> container = StreamMessageListenerContainer.create( + redisTemplate.getRequiredConnectionFactory(), containerOptions); - // 根据配置对象创建监听容器对象 - StreamMessageListenerContainer> container = StreamMessageListenerContainer - .create(factory, streamMessageListenerContainerOptions); - - RedisTemplate redisTemplate = redisTemplate(factory); - - // 使用监听容器对象开始监听消费(使用的是手动确认方式) - String consumerName = NetUtil.getLocalHostName(); // TODO 需要优化下,晚点参考下 rocketmq consumer 的 - for (AbstractStreamMessageListener listener : listeners) { + // 第二步,注册监听器,消费对应的 Stream 主题 + String consumerName = buildConsumerName(); + listeners.forEach(listener -> { + // 创建 listener 对应的消费者分组 try { redisTemplate.opsForStream().createGroup(listener.getStreamKey(), listener.getGroup()); - } catch (Exception ignore) { -// ignore.printStackTrace(); - } - - container.receive(Consumer.from(listener.getGroup(), consumerName), - StreamOffset.create(listener.getStreamKey(), ReadOffset.lastConsumed()), listener); - } - + } catch (Exception ignore) {} + // 创建 Consumer 对象 + Consumer consumer = Consumer.from(listener.getGroup(), consumerName); + // 设置 Consumer 消费进度,以最小消费进度为准 + StreamOffset streamOffset = StreamOffset.create(listener.getStreamKey(), ReadOffset.lastConsumed()); + // 设置 Consumer 监听 + StreamMessageListenerContainer.StreamReadRequestBuilder builder = StreamMessageListenerContainer.StreamReadRequest + .builder(streamOffset).consumer(consumer) + .autoAcknowledge(false) // 不自动 ack + .cancelOnError(throwable -> false); // 默认配置,发生异常就取消消费,显然不符合预期;因此,我们设置为 false + container.register(builder.build(), listener); + }); return container; } + private static String buildConsumerName() { + return String.format("%s@%d", SystemUtil.getHostInfo().getAddress(), SystemUtil.getCurrentPID()); + } + } diff --git a/src/main/java/cn/iocoder/dashboard/framework/redis/core/stream/AbstractStreamMessageListener.java b/src/main/java/cn/iocoder/dashboard/framework/redis/core/stream/AbstractStreamMessageListener.java index 6a029507c..7621d3638 100644 --- a/src/main/java/cn/iocoder/dashboard/framework/redis/core/stream/AbstractStreamMessageListener.java +++ b/src/main/java/cn/iocoder/dashboard/framework/redis/core/stream/AbstractStreamMessageListener.java @@ -46,6 +46,9 @@ public abstract class AbstractStreamMessageListener @Override public void onMessage(ObjectRecord message) { System.out.println(message); + if (true) { +// throw new IllegalStateException("测试下"); + } } /** diff --git a/src/main/java/cn/iocoder/dashboard/framework/redis/core/util/RedisMessageUtils.java b/src/main/java/cn/iocoder/dashboard/framework/redis/core/util/RedisMessageUtils.java index 8d01d4cf0..9331606af 100644 --- a/src/main/java/cn/iocoder/dashboard/framework/redis/core/util/RedisMessageUtils.java +++ b/src/main/java/cn/iocoder/dashboard/framework/redis/core/util/RedisMessageUtils.java @@ -31,7 +31,7 @@ public class RedisMessageUtils { * @param message 消息 * @return 消息记录的编号对象 */ - public static RecordId sendStreamMessage(RedisTemplate redisTemplate, T message) { + public static RecordId sendStreamMessage(RedisTemplate redisTemplate, T message) { return redisTemplate.opsForStream().add(StreamRecords.newRecord() .ofObject(JsonUtils.toJsonString(message)) // 设置内容 .withStreamKey(message.getStreamKey())); // 设置 stream key diff --git a/src/test-integration/java/cn/iocoder/dashboard/framework/redis/core/stream/RedisStreamTest.java b/src/test-integration/java/cn/iocoder/dashboard/framework/redis/core/stream/RedisStreamTest.java index 022f45bfd..3d0d8a249 100644 --- a/src/test-integration/java/cn/iocoder/dashboard/framework/redis/core/stream/RedisStreamTest.java +++ b/src/test-integration/java/cn/iocoder/dashboard/framework/redis/core/stream/RedisStreamTest.java @@ -9,6 +9,7 @@ import cn.iocoder.dashboard.modules.system.mq.message.mail.SysMailSendMessage; import cn.iocoder.dashboard.modules.system.mq.message.sms.SysSmsSendMessage; import org.junit.jupiter.api.Test; import org.springframework.context.annotation.Import; +import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.core.StringRedisTemplate; import javax.annotation.Resource; @@ -31,13 +32,18 @@ public class RedisStreamTest { @Resource private StringRedisTemplate stringRedisTemplate; + @Resource + private RedisTemplate redisTemplate; + @Test public void testProducer01() { - // 创建消息 - SysSmsSendMessage message = new SysSmsSendMessage(); - message.setMobile("15601691300").setTemplateCode("test"); - // 发送消息 - RedisMessageUtils.sendStreamMessage(stringRedisTemplate, message); + for (int i = 0; i < 100; i++) { + // 创建消息 + SysSmsSendMessage message = new SysSmsSendMessage(); + message.setMobile("15601691300").setTemplateCode("test:" + i); + // 发送消息 + RedisMessageUtils.sendStreamMessage(stringRedisTemplate, message); + } } @Test