messages = context.getMsgList();
+ Assert.isTrue(messages.size() == 1, "消息条数({})不正确", messages.size());
+ // 设置租户编号
+ String tenantId = messages.get(0).getUserProperty(HEADER_TENANT_ID);
+ if (StrUtil.isNotEmpty(tenantId)) {
+ TenantContextHolder.setTenantId(Long.parseLong(tenantId));
+ }
+ }
+
+ @Override
+ public void consumeMessageAfter(ConsumeMessageContext context) {
+ TenantContextHolder.clear();
+ }
+
+}
diff --git a/yudao-framework/yudao-spring-boot-starter-biz-tenant/src/main/java/cn/iocoder/yudao/framework/tenant/core/mq/rocketmq/TenantRocketMQInitializer.java b/yudao-framework/yudao-spring-boot-starter-biz-tenant/src/main/java/cn/iocoder/yudao/framework/tenant/core/mq/rocketmq/TenantRocketMQInitializer.java
new file mode 100644
index 000000000..7f12ac520
--- /dev/null
+++ b/yudao-framework/yudao-spring-boot-starter-biz-tenant/src/main/java/cn/iocoder/yudao/framework/tenant/core/mq/rocketmq/TenantRocketMQInitializer.java
@@ -0,0 +1,53 @@
+package cn.iocoder.yudao.framework.tenant.core.mq.rocketmq;
+
+import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
+import org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl;
+import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl;
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.spring.core.RocketMQTemplate;
+import org.apache.rocketmq.spring.support.DefaultRocketMQListenerContainer;
+import org.springframework.beans.BeansException;
+import org.springframework.beans.factory.config.BeanPostProcessor;
+
+/**
+ * 多租户的 RocketMQ 初始化器
+ *
+ * @author 芋道源码
+ */
+public class TenantRocketMQInitializer implements BeanPostProcessor {
+
+ @Override
+ public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
+ if (bean instanceof DefaultRocketMQListenerContainer) {
+ DefaultRocketMQListenerContainer container = (DefaultRocketMQListenerContainer) bean;
+ initTenantConsumer(container.getConsumer());
+ } else if (bean instanceof RocketMQTemplate) {
+ RocketMQTemplate template = (RocketMQTemplate) bean;
+ initTenantProducer(template.getProducer());
+ }
+ return bean;
+ }
+
+ private void initTenantProducer(DefaultMQProducer producer) {
+ if (producer == null) {
+ return;
+ }
+ DefaultMQProducerImpl producerImpl = producer.getDefaultMQProducerImpl();
+ if (producerImpl == null) {
+ return;
+ }
+ producerImpl.registerSendMessageHook(new TenantRocketMQSendMessageHook());
+ }
+
+ private void initTenantConsumer(DefaultMQPushConsumer consumer) {
+ if (consumer == null) {
+ return;
+ }
+ DefaultMQPushConsumerImpl consumerImpl = consumer.getDefaultMQPushConsumerImpl();
+ if (consumerImpl == null) {
+ return;
+ }
+ consumerImpl.registerConsumeMessageHook(new TenantRocketMQConsumeMessageHook());
+ }
+
+}
\ No newline at end of file
diff --git a/yudao-framework/yudao-spring-boot-starter-biz-tenant/src/main/java/cn/iocoder/yudao/framework/tenant/core/mq/rocketmq/TenantRocketMQSendMessageHook.java b/yudao-framework/yudao-spring-boot-starter-biz-tenant/src/main/java/cn/iocoder/yudao/framework/tenant/core/mq/rocketmq/TenantRocketMQSendMessageHook.java
new file mode 100644
index 000000000..4f0307465
--- /dev/null
+++ b/yudao-framework/yudao-spring-boot-starter-biz-tenant/src/main/java/cn/iocoder/yudao/framework/tenant/core/mq/rocketmq/TenantRocketMQSendMessageHook.java
@@ -0,0 +1,36 @@
+package cn.iocoder.yudao.framework.tenant.core.mq.rocketmq;
+
+import cn.iocoder.yudao.framework.tenant.core.context.TenantContextHolder;
+import org.apache.rocketmq.client.hook.SendMessageContext;
+import org.apache.rocketmq.client.hook.SendMessageHook;
+
+import static cn.iocoder.yudao.framework.web.core.util.WebFrameworkUtils.HEADER_TENANT_ID;
+
+/**
+ * RocketMQ 消息队列的多租户 {@link SendMessageHook} 实现类
+ *
+ * Producer 发送消息时,将 {@link TenantContextHolder} 租户编号,添加到消息的 Header 中
+ *
+ * @author 芋道源码
+ */
+public class TenantRocketMQSendMessageHook implements SendMessageHook {
+
+ @Override
+ public String hookName() {
+ return getClass().getSimpleName();
+ }
+
+ @Override
+ public void sendMessageBefore(SendMessageContext sendMessageContext) {
+ Long tenantId = TenantContextHolder.getTenantId();
+ if (tenantId == null) {
+ return;
+ }
+ sendMessageContext.getMessage().putUserProperty(HEADER_TENANT_ID, tenantId.toString());
+ }
+
+ @Override
+ public void sendMessageAfter(SendMessageContext sendMessageContext) {
+ }
+
+}
diff --git a/yudao-framework/yudao-spring-boot-starter-biz-tenant/src/main/java/org/springframework/messaging/handler/invocation/InvocableHandlerMethod.java b/yudao-framework/yudao-spring-boot-starter-biz-tenant/src/main/java/org/springframework/messaging/handler/invocation/InvocableHandlerMethod.java
new file mode 100644
index 000000000..059d8f97f
--- /dev/null
+++ b/yudao-framework/yudao-spring-boot-starter-biz-tenant/src/main/java/org/springframework/messaging/handler/invocation/InvocableHandlerMethod.java
@@ -0,0 +1,269 @@
+/*
+ * Copyright 2002-2021 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.springframework.messaging.handler.invocation;
+
+import cn.iocoder.yudao.framework.tenant.core.context.TenantContextHolder;
+import cn.iocoder.yudao.framework.tenant.core.util.TenantUtils;
+import org.springframework.core.DefaultParameterNameDiscoverer;
+import org.springframework.core.MethodParameter;
+import org.springframework.core.ParameterNameDiscoverer;
+import org.springframework.core.ResolvableType;
+import org.springframework.lang.Nullable;
+import org.springframework.messaging.Message;
+import org.springframework.messaging.handler.HandlerMethod;
+import org.springframework.util.ObjectUtils;
+
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.lang.reflect.Type;
+import java.util.Arrays;
+
+import static cn.iocoder.yudao.framework.web.core.util.WebFrameworkUtils.HEADER_TENANT_ID;
+
+/**
+ * Extension of {@link HandlerMethod} that invokes the underlying method with
+ * argument values resolved from the current HTTP request through a list of
+ * {@link HandlerMethodArgumentResolver}.
+ *
+ * 针对 rabbitmq-spring 和 kafka-spring,不存在合适的拓展点,可以实现 Consumer 消费前,读取 Header 中的 tenant-id 设置到 {@link TenantContextHolder} 中
+ * TODO 芋艿:持续跟进,看看有没新的拓展点
+ *
+ * @author Rossen Stoyanchev
+ * @author Juergen Hoeller
+ * @since 4.0
+ */
+public class InvocableHandlerMethod extends HandlerMethod {
+
+ private static final Object[] EMPTY_ARGS = new Object[0];
+
+ private HandlerMethodArgumentResolverComposite resolvers = new HandlerMethodArgumentResolverComposite();
+
+ private ParameterNameDiscoverer parameterNameDiscoverer = new DefaultParameterNameDiscoverer();
+
+ /**
+ * Create an instance from a {@code HandlerMethod}.
+ */
+ public InvocableHandlerMethod(HandlerMethod handlerMethod) {
+ super(handlerMethod);
+ }
+
+ /**
+ * Create an instance from a bean instance and a method.
+ */
+ public InvocableHandlerMethod(Object bean, Method method) {
+ super(bean, method);
+ }
+
+ /**
+ * Construct a new handler method with the given bean instance, method name and parameters.
+ * @param bean the object bean
+ * @param methodName the method name
+ * @param parameterTypes the method parameter types
+ * @throws NoSuchMethodException when the method cannot be found
+ */
+ public InvocableHandlerMethod(Object bean, String methodName, Class>... parameterTypes)
+ throws NoSuchMethodException {
+
+ super(bean, methodName, parameterTypes);
+ }
+
+ /**
+ * Set {@link HandlerMethodArgumentResolver HandlerMethodArgumentResolvers} to use for resolving method argument values.
+ */
+ public void setMessageMethodArgumentResolvers(HandlerMethodArgumentResolverComposite argumentResolvers) {
+ this.resolvers = argumentResolvers;
+ }
+
+ /**
+ * Set the ParameterNameDiscoverer for resolving parameter names when needed
+ * (e.g. default request attribute name).
+ * Default is a {@link DefaultParameterNameDiscoverer}.
+ */
+ public void setParameterNameDiscoverer(ParameterNameDiscoverer parameterNameDiscoverer) {
+ this.parameterNameDiscoverer = parameterNameDiscoverer;
+ }
+
+ /**
+ * Invoke the method after resolving its argument values in the context of the given message.
+ *
Argument values are commonly resolved through
+ * {@link HandlerMethodArgumentResolver HandlerMethodArgumentResolvers}.
+ * The {@code providedArgs} parameter however may supply argument values to be used directly,
+ * i.e. without argument resolution.
+ *
Delegates to {@link #getMethodArgumentValues} and calls {@link #doInvoke} with the
+ * resolved arguments.
+ * @param message the current message being processed
+ * @param providedArgs "given" arguments matched by type, not resolved
+ * @return the raw value returned by the invoked method
+ * @throws Exception raised if no suitable argument resolver can be found,
+ * or if the method raised an exception
+ * @see #getMethodArgumentValues
+ * @see #doInvoke
+ */
+ @Nullable
+ public Object invoke(Message> message, Object... providedArgs) throws Exception {
+ Object[] args = getMethodArgumentValues(message, providedArgs);
+ if (logger.isTraceEnabled()) {
+ logger.trace("Arguments: " + Arrays.toString(args));
+ }
+ // 注意:如下是本类的改动点!!!
+ // 情况一:无租户编号的情况
+ Long tenantId= parseTenantId(message);
+ if (tenantId == null) {
+ return doInvoke(args);
+ }
+ // 情况二:有租户的情况下
+ return TenantUtils.execute(tenantId, () -> doInvoke(args));
+ }
+
+ private Long parseTenantId(Message> message) {
+ Object tenantId = message.getHeaders().get(HEADER_TENANT_ID);
+ if (tenantId == null) {
+ return null;
+ }
+ if (tenantId instanceof Long) {
+ return (Long) tenantId;
+ }
+ if (tenantId instanceof Number) {
+ return ((Number) tenantId).longValue();
+ }
+ if (tenantId instanceof String) {
+ return Long.parseLong((String) tenantId);
+ }
+ if (tenantId instanceof byte[]) {
+ return Long.parseLong(new String((byte[]) tenantId));
+ }
+ throw new IllegalArgumentException("未知的数据类型:" + tenantId);
+ }
+
+ /**
+ * Get the method argument values for the current message, checking the provided
+ * argument values and falling back to the configured argument resolvers.
+ *
The resulting array will be passed into {@link #doInvoke}.
+ * @since 5.1.2
+ */
+ protected Object[] getMethodArgumentValues(Message> message, Object... providedArgs) throws Exception {
+ MethodParameter[] parameters = getMethodParameters();
+ if (ObjectUtils.isEmpty(parameters)) {
+ return EMPTY_ARGS;
+ }
+
+ Object[] args = new Object[parameters.length];
+ for (int i = 0; i < parameters.length; i++) {
+ MethodParameter parameter = parameters[i];
+ parameter.initParameterNameDiscovery(this.parameterNameDiscoverer);
+ args[i] = findProvidedArgument(parameter, providedArgs);
+ if (args[i] != null) {
+ continue;
+ }
+ if (!this.resolvers.supportsParameter(parameter)) {
+ throw new MethodArgumentResolutionException(
+ message, parameter, formatArgumentError(parameter, "No suitable resolver"));
+ }
+ try {
+ args[i] = this.resolvers.resolveArgument(parameter, message);
+ }
+ catch (Exception ex) {
+ // Leave stack trace for later, exception may actually be resolved and handled...
+ if (logger.isDebugEnabled()) {
+ String exMsg = ex.getMessage();
+ if (exMsg != null && !exMsg.contains(parameter.getExecutable().toGenericString())) {
+ logger.debug(formatArgumentError(parameter, exMsg));
+ }
+ }
+ throw ex;
+ }
+ }
+ return args;
+ }
+
+ /**
+ * Invoke the handler method with the given argument values.
+ */
+ @Nullable
+ protected Object doInvoke(Object... args) throws Exception {
+ try {
+ return getBridgedMethod().invoke(getBean(), args);
+ }
+ catch (IllegalArgumentException ex) {
+ assertTargetBean(getBridgedMethod(), getBean(), args);
+ String text = (ex.getMessage() != null ? ex.getMessage() : "Illegal argument");
+ throw new IllegalStateException(formatInvokeError(text, args), ex);
+ }
+ catch (InvocationTargetException ex) {
+ // Unwrap for HandlerExceptionResolvers ...
+ Throwable targetException = ex.getTargetException();
+ if (targetException instanceof RuntimeException) {
+ throw (RuntimeException) targetException;
+ }
+ else if (targetException instanceof Error) {
+ throw (Error) targetException;
+ }
+ else if (targetException instanceof Exception) {
+ throw (Exception) targetException;
+ }
+ else {
+ throw new IllegalStateException(formatInvokeError("Invocation failure", args), targetException);
+ }
+ }
+ }
+
+ MethodParameter getAsyncReturnValueType(@Nullable Object returnValue) {
+ return new AsyncResultMethodParameter(returnValue);
+ }
+
+ private class AsyncResultMethodParameter extends HandlerMethodParameter {
+
+ @Nullable
+ private final Object returnValue;
+
+ private final ResolvableType returnType;
+
+ public AsyncResultMethodParameter(@Nullable Object returnValue) {
+ super(-1);
+ this.returnValue = returnValue;
+ this.returnType = ResolvableType.forType(super.getGenericParameterType()).getGeneric();
+ }
+
+ protected AsyncResultMethodParameter(AsyncResultMethodParameter original) {
+ super(original);
+ this.returnValue = original.returnValue;
+ this.returnType = original.returnType;
+ }
+
+ @Override
+ public Class> getParameterType() {
+ if (this.returnValue != null) {
+ return this.returnValue.getClass();
+ }
+ if (!ResolvableType.NONE.equals(this.returnType)) {
+ return this.returnType.toClass();
+ }
+ return super.getParameterType();
+ }
+
+ @Override
+ public Type getGenericParameterType() {
+ return this.returnType.getType();
+ }
+
+ @Override
+ public AsyncResultMethodParameter clone() {
+ return new AsyncResultMethodParameter(this);
+ }
+ }
+
+}
diff --git a/yudao-framework/yudao-spring-boot-starter-biz-tenant/src/main/resources/META-INF/spring.factories b/yudao-framework/yudao-spring-boot-starter-biz-tenant/src/main/resources/META-INF/spring.factories
new file mode 100644
index 000000000..a495842a0
--- /dev/null
+++ b/yudao-framework/yudao-spring-boot-starter-biz-tenant/src/main/resources/META-INF/spring.factories
@@ -0,0 +1,2 @@
+org.springframework.boot.env.EnvironmentPostProcessor=\
+ cn.iocoder.yudao.framework.tenant.core.mq.kafka.TenantKafkaEnvironmentPostProcessor
diff --git a/yudao-framework/yudao-spring-boot-starter-mq/pom.xml b/yudao-framework/yudao-spring-boot-starter-mq/pom.xml
index 75303d4e3..c8972f16b 100644
--- a/yudao-framework/yudao-spring-boot-starter-mq/pom.xml
+++ b/yudao-framework/yudao-spring-boot-starter-mq/pom.xml
@@ -12,7 +12,7 @@
jar
${project.artifactId}
- 消息队列,基于 Redis Pub/Sub 实现广播消费,基于 Stream 实现集群消费
+ 消息队列,支持 Redis、RocketMQ、RabbitMQ、Kafka 四种
https://github.com/YunaiV/ruoyi-vue-pro
@@ -21,6 +21,23 @@
cn.iocoder.boot
yudao-spring-boot-starter-redis