优化 mp 账号的刷新机制,使用 Job 轮询,替换 MQ 广播

This commit is contained in:
YunaiV 2023-07-27 20:25:15 +08:00
parent 5d6308e4bb
commit c33186569d
14 changed files with 62 additions and 115 deletions

View File

@ -164,7 +164,7 @@ public class CollectionUtils {
return from.stream().filter(predicate).findFirst().orElse(null);
}
public static <T, V extends Comparable<? super V>> V getMaxValue(List<T> from, Function<T, V> valueFunc) {
public static <T, V extends Comparable<? super V>> V getMaxValue(Collection<T> from, Function<T, V> valueFunc) {
if (CollUtil.isEmpty(from)) {
return null;
}

View File

@ -15,8 +15,8 @@ import cn.iocoder.yudao.module.bpm.convert.definition.BpmTaskAssignRuleConvert;
import cn.iocoder.yudao.module.bpm.dal.dataobject.definition.BpmTaskAssignRuleDO;
import cn.iocoder.yudao.module.bpm.dal.dataobject.definition.BpmUserGroupDO;
import cn.iocoder.yudao.module.bpm.dal.mysql.definition.BpmTaskAssignRuleMapper;
import cn.iocoder.yudao.module.bpm.enums.definition.BpmTaskAssignRuleTypeEnum;
import cn.iocoder.yudao.module.bpm.enums.DictTypeConstants;
import cn.iocoder.yudao.module.bpm.enums.definition.BpmTaskAssignRuleTypeEnum;
import cn.iocoder.yudao.module.bpm.framework.flowable.core.behavior.script.BpmTaskAssignScript;
import cn.iocoder.yudao.module.system.api.dept.DeptApi;
import cn.iocoder.yudao.module.system.api.dept.PostApi;
@ -89,7 +89,7 @@ public class BpmTaskAssignRuleServiceImpl implements BpmTaskAssignRuleService {
@Override
public List<BpmTaskAssignRuleDO> getTaskAssignRuleListByProcessDefinitionId(String processDefinitionId,
String taskDefinitionKey) {
String taskDefinitionKey) {
return taskRuleMapper.selectListByProcessDefinitionId(processDefinitionId, taskDefinitionKey);
}
@ -128,14 +128,14 @@ public class BpmTaskAssignRuleServiceImpl implements BpmTaskAssignRuleService {
validTaskAssignRuleOptions(reqVO.getType(), reqVO.getOptions());
// 校验是否已经配置
BpmTaskAssignRuleDO existRule =
taskRuleMapper.selectListByModelIdAndTaskDefinitionKey(reqVO.getModelId(), reqVO.getTaskDefinitionKey());
taskRuleMapper.selectListByModelIdAndTaskDefinitionKey(reqVO.getModelId(), reqVO.getTaskDefinitionKey());
if (existRule != null) {
throw exception(TASK_ASSIGN_RULE_EXISTS, reqVO.getModelId(), reqVO.getTaskDefinitionKey());
}
// 存储
BpmTaskAssignRuleDO rule = BpmTaskAssignRuleConvert.INSTANCE.convert(reqVO)
.setProcessDefinitionId(BpmTaskAssignRuleDO.PROCESS_DEFINITION_ID_NULL); // 只有流程模型才允许新建
.setProcessDefinitionId(BpmTaskAssignRuleDO.PROCESS_DEFINITION_ID_NULL); // 只有流程模型才允许新建
taskRuleMapper.insert(rule);
return rule.getId();
}
@ -169,14 +169,14 @@ public class BpmTaskAssignRuleServiceImpl implements BpmTaskAssignRuleService {
// 遍历匹配对应的规则
Map<String, BpmTaskAssignRuleRespVO> processInstanceRuleMap =
CollectionUtils.convertMap(processInstanceRules, BpmTaskAssignRuleRespVO::getTaskDefinitionKey);
CollectionUtils.convertMap(processInstanceRules, BpmTaskAssignRuleRespVO::getTaskDefinitionKey);
for (BpmTaskAssignRuleRespVO modelRule : modelRules) {
BpmTaskAssignRuleRespVO processInstanceRule = processInstanceRuleMap.get(modelRule.getTaskDefinitionKey());
if (processInstanceRule == null) {
return false;
}
if (!ObjectUtil.equals(modelRule.getType(), processInstanceRule.getType()) || !ObjectUtil.equal(
modelRule.getOptions(), processInstanceRule.getOptions())) {
modelRule.getOptions(), processInstanceRule.getOptions())) {
return false;
}
}
@ -192,7 +192,7 @@ public class BpmTaskAssignRuleServiceImpl implements BpmTaskAssignRuleService {
// 开始复制
List<BpmTaskAssignRuleDO> newRules = BpmTaskAssignRuleConvert.INSTANCE.convertList2(rules);
newRules.forEach(rule -> rule.setProcessDefinitionId(toProcessDefinitionId).setId(null).setCreateTime(null)
.setUpdateTime(null));
.setUpdateTime(null));
taskRuleMapper.insertBatch(newRules);
}
@ -215,7 +215,7 @@ public class BpmTaskAssignRuleServiceImpl implements BpmTaskAssignRuleService {
if (Objects.equals(type, BpmTaskAssignRuleTypeEnum.ROLE.getType())) {
roleApi.validRoleList(options);
} else if (ObjectUtils.equalsAny(type, BpmTaskAssignRuleTypeEnum.DEPT_MEMBER.getType(),
BpmTaskAssignRuleTypeEnum.DEPT_LEADER.getType())) {
BpmTaskAssignRuleTypeEnum.DEPT_LEADER.getType())) {
deptApi.validateDeptList(options);
} else if (Objects.equals(type, BpmTaskAssignRuleTypeEnum.POST.getType())) {
postApi.validPostList(options);
@ -225,7 +225,7 @@ public class BpmTaskAssignRuleServiceImpl implements BpmTaskAssignRuleService {
userGroupService.validUserGroups(options);
} else if (Objects.equals(type, BpmTaskAssignRuleTypeEnum.SCRIPT.getType())) {
dictDataApi.validateDictDataList(DictTypeConstants.TASK_ASSIGN_SCRIPT,
CollectionUtils.convertSet(options, String::valueOf));
CollectionUtils.convertSet(options, String::valueOf));
} else {
throw new IllegalArgumentException(format("未知的规则类型({})", type));
}

View File

@ -40,6 +40,8 @@ public class BpmTaskAssignLeaderX2ScriptTest extends BaseMockitoUnitTest {
// mock 方法(startUser)
AdminUserRespDTO startUser = randomPojo(AdminUserRespDTO.class, o -> o.setDeptId(10L));
when(adminUserApi.getUser(eq(1L))).thenReturn(startUser);
// mock 方法(getStartUserDept)没有部门
when(deptApi.getDept(eq(10L))).thenReturn(null);
// 调用
Set<Long> result = script.calculateTaskCandidateUsers(execution);
@ -56,7 +58,9 @@ public class BpmTaskAssignLeaderX2ScriptTest extends BaseMockitoUnitTest {
when(adminUserApi.getUser(eq(1L))).thenReturn(startUser);
DeptRespDTO startUserDept = randomPojo(DeptRespDTO.class, o -> o.setId(10L).setParentId(100L)
.setLeaderUserId(20L));
// mock 方法getDept
when(deptApi.getDept(eq(10L))).thenReturn(startUserDept);
when(deptApi.getDept(eq(100L))).thenReturn(null);
// 调用
Set<Long> result = script.calculateTaskCandidateUsers(execution);

View File

@ -6,6 +6,9 @@ import cn.iocoder.yudao.framework.mybatis.core.query.LambdaQueryWrapperX;
import cn.iocoder.yudao.module.mp.controller.admin.account.vo.MpAccountPageReqVO;
import cn.iocoder.yudao.module.mp.dal.dataobject.account.MpAccountDO;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Select;
import java.time.LocalDateTime;
@Mapper
public interface MpAccountMapper extends BaseMapperX<MpAccountDO> {
@ -22,4 +25,7 @@ public interface MpAccountMapper extends BaseMapperX<MpAccountDO> {
return selectOne(MpAccountDO::getAppId, appId);
}
@Select("SELECT COUNT(*) FROM pay_account WHERE update_time > #{maxUpdateTime}")
Long selectCountByUpdateTimeGt(LocalDateTime maxUpdateTime);
}

View File

@ -1,29 +0,0 @@
package cn.iocoder.yudao.module.mp.mq.consumer;
import cn.iocoder.yudao.framework.mq.core.pubsub.AbstractChannelMessageListener;
import cn.iocoder.yudao.module.mp.mq.message.MpAccountRefreshMessage;
import cn.iocoder.yudao.module.mp.service.account.MpAccountService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
/**
* 针对 {@link MpAccountRefreshMessage} 的消费者
*
* @author 芋道源码
*/
@Component
@Slf4j
public class MpAccountRefreshConsumer extends AbstractChannelMessageListener<MpAccountRefreshMessage> {
@Resource
private MpAccountService mpAccountService;
@Override
public void onMessage(MpAccountRefreshMessage message) {
log.info("[onMessage][收到 Account 刷新消息]");
mpAccountService.initLocalCache();
}
}

View File

@ -1,21 +0,0 @@
package cn.iocoder.yudao.module.mp.mq.message;
import cn.iocoder.yudao.framework.mq.core.pubsub.AbstractChannelMessage;
import lombok.Data;
import lombok.EqualsAndHashCode;
/**
* 公众号账号刷新 Message
*
* @author 芋道源码
*/
@Data
@EqualsAndHashCode(callSuper = true)
public class MpAccountRefreshMessage extends AbstractChannelMessage {
@Override
public String getChannel() {
return "mp.account.refresh";
}
}

View File

@ -1,28 +0,0 @@
package cn.iocoder.yudao.module.mp.mq.producer;
import cn.iocoder.yudao.framework.mq.core.RedisMQTemplate;
import cn.iocoder.yudao.module.mp.mq.message.MpAccountRefreshMessage;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
/**
* 公众号账号 Producer
*
* @author 芋道源码
*/
@Component
public class MpAccountProducer {
@Resource
private RedisMQTemplate redisMQTemplate;
/**
* 发送 {@link MpAccountRefreshMessage} 消息
*/
public void sendAccountRefreshMessage() {
MpAccountRefreshMessage message = new MpAccountRefreshMessage();
redisMQTemplate.send(message);
}
}

View File

@ -1,9 +1,9 @@
package cn.iocoder.yudao.module.mp.service.account;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.util.ObjUtil;
import cn.iocoder.yudao.framework.common.exception.util.ServiceExceptionUtil;
import cn.iocoder.yudao.framework.common.pojo.PageResult;
import cn.iocoder.yudao.framework.common.util.collection.CollectionUtils;
import cn.iocoder.yudao.framework.tenant.core.util.TenantUtils;
import cn.iocoder.yudao.module.mp.controller.admin.account.vo.MpAccountCreateReqVO;
import cn.iocoder.yudao.module.mp.controller.admin.account.vo.MpAccountPageReqVO;
@ -13,7 +13,6 @@ import cn.iocoder.yudao.module.mp.dal.dataobject.account.MpAccountDO;
import cn.iocoder.yudao.module.mp.dal.mysql.account.MpAccountMapper;
import cn.iocoder.yudao.module.mp.enums.ErrorCodeConstants;
import cn.iocoder.yudao.module.mp.framework.mp.core.MpServiceFactory;
import cn.iocoder.yudao.module.mp.mq.producer.MpAccountProducer;
import com.google.common.annotations.VisibleForTesting;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
@ -21,15 +20,20 @@ import me.chanjar.weixin.common.error.WxErrorException;
import me.chanjar.weixin.mp.api.WxMpService;
import me.chanjar.weixin.mp.bean.result.WxMpQrCodeTicket;
import org.springframework.context.annotation.Lazy;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import org.springframework.validation.annotation.Validated;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.time.LocalDateTime;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import static cn.iocoder.yudao.framework.common.exception.util.ServiceExceptionUtil.exception;
import static cn.iocoder.yudao.framework.common.util.collection.CollectionUtils.convertMap;
import static cn.iocoder.yudao.framework.common.util.collection.CollectionUtils.getMaxValue;
import static cn.iocoder.yudao.module.system.enums.ErrorCodeConstants.USER_USERNAME_EXISTS;
/**
@ -58,9 +62,6 @@ public class MpAccountServiceImpl implements MpAccountService {
@Lazy // 延迟加载解决循环依赖的问题
private MpServiceFactory mpServiceFactory;
@Resource
private MpAccountProducer mpAccountProducer;
@Override
@PostConstruct
public void initLocalCache() {
@ -72,7 +73,30 @@ public class MpAccountServiceImpl implements MpAccountService {
// 第二步构建缓存创建或更新支付 Client
mpServiceFactory.init(accounts);
accountCache = CollectionUtils.convertMap(accounts, MpAccountDO::getAppId);
accountCache = convertMap(accounts, MpAccountDO::getAppId);
});
}
/**
* 通过定时任务轮询刷新缓存
*
* 目的多节点部署时通过轮询通知所有节点进行刷新
*/
@Scheduled(initialDelay = 60, fixedRate = 60, timeUnit = TimeUnit.SECONDS)
public void refreshLocalCache() {
// 注意忽略自动多租户因为要全局初始化缓存
TenantUtils.executeIgnore(() -> {
// 情况一如果缓存里没有数据则直接刷新缓存
if (CollUtil.isEmpty(accountCache)) {
initLocalCache();
return;
}
// 情况二如果缓存里数据则通过 updateTime 判断是否有数据变更有变更则刷新缓存
LocalDateTime maxTime = getMaxValue(accountCache.values(), MpAccountDO::getUpdateTime);
if (mpAccountMapper.selectCountByUpdateTimeGt(maxTime) > 0) {
initLocalCache();
}
});
}
@ -85,8 +109,8 @@ public class MpAccountServiceImpl implements MpAccountService {
MpAccountDO account = MpAccountConvert.INSTANCE.convert(createReqVO);
mpAccountMapper.insert(account);
// 发送刷新消息
mpAccountProducer.sendAccountRefreshMessage();
// 刷新缓存
initLocalCache();
return account.getId();
}
@ -101,8 +125,8 @@ public class MpAccountServiceImpl implements MpAccountService {
MpAccountDO updateObj = MpAccountConvert.INSTANCE.convert(updateReqVO);
mpAccountMapper.updateById(updateObj);
// 发送刷新消息
mpAccountProducer.sendAccountRefreshMessage();
// 刷新缓存
initLocalCache();
}
@Override
@ -112,8 +136,8 @@ public class MpAccountServiceImpl implements MpAccountService {
// 删除
mpAccountMapper.deleteById(id);
// 发送刷新消息
mpAccountProducer.sendAccountRefreshMessage();
// 刷新缓存
initLocalCache();
}
private MpAccountDO validateAccountExists(Long id) {

View File

@ -6,8 +6,6 @@ import lombok.Data;
/**
* 支付单信息 Response DTO
*
* TODO 芋艿还没定好字段
*
* @author 芋道源码
*/
@Data

View File

@ -8,8 +8,6 @@ import java.time.LocalDateTime;
/**
* 退款单信息 Response DTO
*
* TODO 芋艿还没定好字段
*
* @author 芋道源码
*/
@Data

View File

@ -61,12 +61,6 @@
<artifactId>yudao-spring-boot-starter-job</artifactId>
</dependency>
<!-- 消息队列相关 -->
<dependency>
<groupId>cn.iocoder.boot</groupId>
<artifactId>yudao-spring-boot-starter-mq</artifactId>
</dependency>
<!-- Test 测试相关 -->
<dependency>
<groupId>cn.iocoder.boot</groupId>

View File

@ -15,7 +15,6 @@ import cn.iocoder.yudao.module.pay.controller.admin.channel.vo.PayChannelUpdateR
import cn.iocoder.yudao.module.pay.convert.channel.PayChannelConvert;
import cn.iocoder.yudao.module.pay.dal.dataobject.channel.PayChannelDO;
import cn.iocoder.yudao.module.pay.dal.mysql.channel.PayChannelMapper;
import cn.iocoder.yudao.module.pay.enums.ErrorCodeConstants;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
@ -112,7 +111,7 @@ public class PayChannelServiceImpl implements PayChannelService {
channelMapper.insert(channel);
// 刷新缓存
refreshLocalCache();
initLocalCache();
return channel.getId();
}
@ -127,7 +126,7 @@ public class PayChannelServiceImpl implements PayChannelService {
channelMapper.updateById(channel);
// 刷新缓存
refreshLocalCache();
initLocalCache();
}
/**
@ -160,7 +159,7 @@ public class PayChannelServiceImpl implements PayChannelService {
channelMapper.deleteById(id);
// 刷新缓存
refreshLocalCache();
initLocalCache();
}
private PayChannelDO validateChannelExists(Long id) {

View File

@ -167,7 +167,8 @@ yudao:
- ${spring.boot.admin.context-path}/** # 不处理 Spring Boot Admin 的请求
- ${management.endpoints.web.base-path}/** # 不处理 Actuator 的请求
pay:
callback-url: http://yunai.natapp1.cc/admin-api/pay/notify/callback
order-notify-url: http://yunai.natapp1.cc/admin-api/pay/notify/order # 支付渠道的【支付】回调地址
refund-notify-url: http://yunai.natapp1.cc/admin-api/pay/notify/refund # 支付渠道的【退款】回调地址
demo: true # 开启演示模式
justauth: