优化支付通知的逻辑,解决并发的问题。

同时,收到支付结果时,立马回调业务,避免延迟
This commit is contained in:
YunaiV 2021-10-27 10:06:49 +08:00
parent 72b64dc526
commit dd2d8a2ba9
16 changed files with 363 additions and 10 deletions

View File

@ -0,0 +1,29 @@
package cn.iocoder.yudao.adminserver.modules.pay.job.notify;
import cn.iocoder.yudao.coreservice.modules.pay.service.notify.PayNotifyCoreService;
import cn.iocoder.yudao.framework.quartz.core.handler.JobHandler;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
/**
* 支付通知 Job
* 通过不断扫描待通知的 PayNotifyTaskDO 记录回调业务线的回调接口
*
* @author 芋道源码
*/
@Component
@Slf4j
public class PayNotifyJob implements JobHandler {
@Resource
private PayNotifyCoreService payNotifyCoreService;
@Override
public String execute(String param) throws Exception {
int notifyCount = payNotifyCoreService.executeNotify();
return String.format("执行支付通知 %s 个", notifyCount);
}
}

View File

@ -0,0 +1 @@
package cn.iocoder.yudao.adminserver.modules.pay.job;

View File

@ -1,7 +1,6 @@
package cn.iocoder.yudao.adminserver.modules.system.enums;
import cn.iocoder.yudao.framework.common.exception.ErrorCode;
import javafx.beans.binding.MapExpression;
/**
* System 错误码枚举类

View File

@ -166,6 +166,9 @@ yudao:
exclude-urls: # 如下两个 url仅仅是为了演示去掉配置也没关系
- ${spring.boot.admin.context-path}/** # 不处理 Spring Boot Admin 的请求
- ${management.endpoints.web.base-path}/** # 不处理 Actuator 的请求
pay:
pay-notify-url: http://niubi.natapp1.cc/api/pay/order/notify
refund-notify-url: http://niubi.natapp1.cc/api/pay/refund/notify
demo: false # 关闭演示模式
justauth:

View File

@ -77,6 +77,12 @@
<artifactId>yudao-spring-boot-starter-mq</artifactId>
</dependency>
<!-- 服务保障相关 -->
<dependency>
<groupId>cn.iocoder.boot</groupId>
<artifactId>yudao-spring-boot-starter-protection</artifactId>
</dependency>
<!-- Test 测试相关 -->
<dependency>
<groupId>cn.iocoder.boot</groupId>

View File

@ -34,15 +34,11 @@ public class PayNotifyLogDO extends BaseDO {
*/
private Integer notifyTimes;
/**
* 请求参数
*/
private String request;
/**
* 响应结果
* HTTP 响应结果
*/
private String response;
/**
* 状态
* 支付通知状态
*
* 外键 {@link PayNotifyStatusEnum}
*/

View File

@ -1,10 +1,30 @@
package cn.iocoder.yudao.coreservice.modules.pay.dal.mysql.notify;
import cn.iocoder.yudao.coreservice.modules.pay.dal.dataobject.merchant.PayAppDO;
import cn.iocoder.yudao.coreservice.modules.pay.dal.dataobject.notify.PayNotifyTaskDO;
import cn.iocoder.yudao.coreservice.modules.pay.enums.notify.PayNotifyStatusEnum;
import cn.iocoder.yudao.framework.mybatis.core.mapper.BaseMapperX;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import org.apache.ibatis.annotations.Mapper;
import java.util.Date;
import java.util.List;
@Mapper
public interface PayNotifyTaskCoreMapper extends BaseMapperX<PayNotifyTaskDO> {
/**
* 获得需要通知的 PayNotifyTaskDO 记录需要满足如下条件
*
* 1. status 非成功
* 2. nextNotifyTime 小于当前时间
*
* @return PayTransactionNotifyTaskDO 数组
*/
default List<PayNotifyTaskDO> selectListByNotify() {
return selectList(new QueryWrapper<PayNotifyTaskDO>()
.in("status", PayNotifyStatusEnum.WAITING.getStatus(), PayNotifyStatusEnum.REQUEST_SUCCESS.getStatus(),
PayNotifyStatusEnum.REQUEST_FAILURE.getStatus())
.le("next_notify_time", new Date()));
}
}

View File

@ -0,0 +1,19 @@
package cn.iocoder.yudao.coreservice.modules.pay.dal.redis;
import cn.iocoder.yudao.framework.redis.core.RedisKeyDefine;
import org.redisson.api.RLock;
import static cn.iocoder.yudao.framework.redis.core.RedisKeyDefine.KeyTypeEnum.HASH;
/**
* Lock4j Redis Key 枚举类
*
* @author 芋道源码
*/
public interface PayRedisKeyCoreConstants {
RedisKeyDefine PAY_NOTIFY_LOCK = new RedisKeyDefine("通知任务的分布式锁",
"pay_notify:lock:", // 参数来自 DefaultLockKeyBuilder
HASH, RLock.class, RedisKeyDefine.TimeoutTypeEnum.DYNAMIC); // Redisson Lock 使用 Hash 数据结构
}

View File

@ -0,0 +1,39 @@
package cn.iocoder.yudao.coreservice.modules.pay.dal.redis.notify;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.stereotype.Repository;
import javax.annotation.Resource;
import java.util.concurrent.TimeUnit;
import static cn.iocoder.yudao.coreservice.modules.pay.dal.redis.PayRedisKeyCoreConstants.PAY_NOTIFY_LOCK;
/**
* 支付通知的锁 Redis DAO
*
* @author 芋道源码
*/
@Repository
public class PayNotifyLockCoreRedisDAO {
@Resource
private RedissonClient redissonClient;
public void lock(Long id, Long timeoutMillis, Runnable runnable) {
String lockKey = formatKey(id);
RLock lock = redissonClient.getLock(lockKey);
try {
lock.lock(timeoutMillis, TimeUnit.MILLISECONDS);
// 执行逻辑
runnable.run();
} finally {
lock.unlock();
}
}
private static String formatKey(Long id) {
return String.format(PAY_NOTIFY_LOCK.getKeyTemplate(), id);
}
}

View File

@ -22,7 +22,8 @@ public interface PayNotifyCoreService {
* 执行支付通知
*
* 注意该方法提供给定时任务调用目前是 yudao-admin-server 进行调用
* @return 通知数量
*/
void executeNotify();
int executeNotify() throws InterruptedException;
}

View File

@ -1,22 +1,38 @@
package cn.iocoder.yudao.coreservice.modules.pay.service.notify.impl;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.http.HttpUtil;
import cn.iocoder.yudao.coreservice.modules.pay.dal.dataobject.notify.PayNotifyTaskDO;
import cn.iocoder.yudao.coreservice.modules.pay.dal.dataobject.order.PayOrderDO;
import cn.iocoder.yudao.coreservice.modules.pay.dal.mysql.notify.PayNotifyTaskCoreMapper;
import cn.iocoder.yudao.coreservice.modules.pay.dal.redis.notify.PayNotifyLockCoreRedisDAO;
import cn.iocoder.yudao.coreservice.modules.pay.enums.notify.PayNotifyStatusEnum;
import cn.iocoder.yudao.coreservice.modules.pay.enums.notify.PayNotifyTypeEnum;
import cn.iocoder.yudao.coreservice.modules.pay.service.notify.PayNotifyCoreService;
import cn.iocoder.yudao.coreservice.modules.pay.service.notify.dto.PayNotifyTaskCreateReqDTO;
import cn.iocoder.yudao.coreservice.modules.pay.service.notify.vo.PayNotifyOrderReqVO;
import cn.iocoder.yudao.coreservice.modules.pay.service.notify.vo.PayRefundOrderReqVO;
import cn.iocoder.yudao.coreservice.modules.pay.service.order.PayOrderCoreService;
import cn.iocoder.yudao.framework.common.pojo.CommonResult;
import cn.iocoder.yudao.framework.common.util.date.DateUtils;
import cn.iocoder.yudao.framework.common.util.json.JsonUtils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Lazy;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import javax.annotation.Resource;
import javax.validation.Valid;
import java.util.Date;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import static cn.iocoder.yudao.framework.common.util.date.DateUtils.SECOND_MILLIS;
import static cn.iocoder.yudao.framework.common.util.json.JsonUtils.toJsonString;
/**
* 支付通知 Core Service 实现类
@ -28,6 +44,15 @@ import java.util.Objects;
@Slf4j
public class PayNotifyCoreServiceImpl implements PayNotifyCoreService {
/**
* 通知超时时间单位
*/
public static final int NOTIFY_TIMEOUT = 120;
/**
* {@link #NOTIFY_TIMEOUT} 的毫秒
*/
public static final long NOTIFY_TIMEOUT_MILLIS = 120 * SECOND_MILLIS;
@Resource
@Lazy // 循环依赖避免报错
private PayOrderCoreService payOrderCoreService;
@ -38,6 +63,13 @@ public class PayNotifyCoreServiceImpl implements PayNotifyCoreService {
@Resource
private ThreadPoolTaskExecutor threadPoolTaskExecutor; // TODO 芋艿未来提供独立的线程池
@Resource
private PayNotifyLockCoreRedisDAO payNotifyLockCoreRedisDAO;
@Resource
@Lazy // 循环依赖自己依赖自己避免报错
private PayNotifyCoreServiceImpl self;
@Override
public void createPayNotifyTask(PayNotifyTaskCreateReqDTO reqDTO) {
PayNotifyTaskDO task = new PayNotifyTaskDO();
@ -56,11 +88,132 @@ public class PayNotifyCoreServiceImpl implements PayNotifyCoreService {
// 执行插入
payNotifyTaskCoreMapper.insert(task);
// 异步直接发起任务虽然会有定时任务扫描但是会导致延迟
self.executeNotifyAsync(task);
}
@Override
public void executeNotify() {
public int executeNotify() throws InterruptedException {
// 获得需要通知的任务
List<PayNotifyTaskDO> tasks = payNotifyTaskCoreMapper.selectListByNotify();
if (CollUtil.isEmpty(tasks)) {
return 0;
}
// 遍历逐个通知
CountDownLatch latch = new CountDownLatch(tasks.size());
tasks.forEach(task -> threadPoolTaskExecutor.execute(() -> {
try {
executeNotify(task);
} finally {
latch.countDown();
}
}));
// 等待完成
this.awaitExecuteNotify(latch);
// 返回执行完成的任务数成功 + 失败)
return tasks.size();
}
/**
* 等待全部支付通知的完成
* 1 秒会打印一次剩余任务数量
*
* @param latch Latch
* @throws InterruptedException 如果被打断
*/
private void awaitExecuteNotify(CountDownLatch latch) throws InterruptedException {
long size = latch.getCount();
for (int i = 0; i < NOTIFY_TIMEOUT; i++) {
if (latch.await(1L, TimeUnit.SECONDS)) {
return;
}
log.info("[awaitExecuteNotify][任务处理中, 总任务数({}) 剩余任务数({})]", size, latch.getCount());
}
log.error("[awaitExecuteNotify][任务未处理完,总任务数({}) 剩余任务数({})]", size, latch.getCount());
}
/**
* 异步执行单个支付通知
*
* @param task 通知任务
*/
@Async
public void executeNotifyAsync(PayNotifyTaskDO task) {
self.executeNotify(task); // 使用 self避免事务不发起
}
/**
* 同步执行单个支付通知
*
* @param task 通知任务
*/
public void executeNotify(PayNotifyTaskDO task) {
// 分布式锁避免并发问题
payNotifyLockCoreRedisDAO.lock(task.getId(), NOTIFY_TIMEOUT_MILLIS, () -> {
// 校验当前任务是否已经被通知过
// 虽然已经通过分布式加锁但是可能同时满足通知的条件然后都去获得锁此时第一个执行完后第二个还是能拿到锁然后会再执行一次
PayNotifyTaskDO dbTask = payNotifyTaskCoreMapper.selectById(task.getId());
if (DateUtils.afterNow(dbTask.getNextNotifyTime())) {
log.info("[executeNotify][dbTask({}) 任务被忽略,原因是未到达下次通知时间,可能是因为并发执行了]", toJsonString(dbTask));
return;
}
// 执行通知
executeNotify0(dbTask);
});
}
@Transactional
public void executeNotify0(PayNotifyTaskDO task) {
// 发起回调
CommonResult<?> invokeResult = null;
Throwable invokeException = null;
try {
invokeResult = executeNotifyInvoke(task);
} catch (Throwable e) {
invokeException = e;
}
// 设置通用的更新 PayNotifyTaskDO 的字段
PayNotifyTaskDO updateTask = new PayNotifyTaskDO()
.setId(task.getId())
.setLastExecuteTime(new Date())
.setNotifyTimes(task.getNotifyTimes() + 1);
// 情况一调用成功
// 情况二调用失败
// 调用三调用异常
// 记录 PayNotifyLog 日志
}
/**
* 执行单个支付任务的 HTTP 调用
*
* @param task 通知任务
* @return HTTP 响应
*/
private CommonResult<?> executeNotifyInvoke(PayNotifyTaskDO task) {
// 拼接参数
Object request;
if (Objects.equals(task.getType(), PayNotifyTypeEnum.ORDER.getType())) {
request = PayNotifyOrderReqVO.builder().merchantOrderId(task.getMerchantOrderId())
.payOrderId(task.getDataId()).build();
} else if (Objects.equals(task.getType(), PayNotifyTypeEnum.REFUND.getType())) {
request = PayRefundOrderReqVO.builder().merchantOrderId(task.getMerchantOrderId())
.payRefundId(task.getDataId()).build();
} else {
throw new RuntimeException("未知的通知任务类型:" + toJsonString(task));
}
// 请求地址
String response = HttpUtil.post(task.getNotifyUrl(), toJsonString(request),
(int) NOTIFY_TIMEOUT_MILLIS);
// 解析结果
return JsonUtils.parseObject(response, CommonResult.class);
}
}

View File

@ -0,0 +1,28 @@
package cn.iocoder.yudao.coreservice.modules.pay.service.notify.vo;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import javax.validation.constraints.NotEmpty;
import javax.validation.constraints.NotNull;
@ApiModel(value = "支付单的通知 Request VO", description = "业务方接入支付回调时,使用该 VO 对象")
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class PayNotifyOrderReqVO {
@ApiModelProperty(value = "商户订单编号", required = true, example = "10")
@NotEmpty(message = "商户订单号不能为空")
private String merchantOrderId;
@ApiModelProperty(value = "支付订单编号", required = true, example = "20")
@NotNull(message = "支付订单编号不能为空")
private Long payOrderId;
}

View File

@ -0,0 +1,28 @@
package cn.iocoder.yudao.coreservice.modules.pay.service.notify.vo;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import javax.validation.constraints.NotEmpty;
import javax.validation.constraints.NotNull;
@ApiModel(value = "退款单的通知 Request VO", description = "业务方接入退款回调时,使用该 VO 对象")
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class PayRefundOrderReqVO {
@ApiModelProperty(value = "商户订单编号", required = true, example = "10")
@NotEmpty(message = "商户订单号不能为空")
private String merchantOrderId;
@ApiModelProperty(value = "支付退款编号", required = true, example = "20")
@NotNull(message = "支付退款编号不能为空")
private Long payRefundId;
}

View File

@ -0,0 +1,6 @@
/**
* 这里的 VO 包有点特殊是提供给接入支付模块的业务提供回调接口时可以直接使用 VO
*
* 例如说支付单的回调使用
*/
package cn.iocoder.yudao.coreservice.modules.pay.service.notify.vo;

View File

@ -6,6 +6,8 @@ import java.util.Date;
/**
* 时间工具类
*
* @author 芋道源码
*/
public class DateUtils {
@ -14,6 +16,11 @@ public class DateUtils {
*/
public static final String TIME_ZONE_DEFAULT = "GMT+8";
/**
* 秒转换成毫秒
*/
public static final long SECOND_MILLIS = 1000;
public static final String FORMAT_YEAR_MONTH_DAY_HOUR_MINUTE_SECOND = "yyyy-MM-dd HH:mm:ss";
public static Date addTime(Duration duration) {
@ -74,4 +81,12 @@ public class DateUtils {
return a.compareTo(b) > 0 ? a : b;
}
public static boolean beforeNow(Date date) {
return date.getTime() < System.currentTimeMillis();
}
public static boolean afterNow(Date date) {
return date.getTime() >= System.currentTimeMillis();
}
}

View File

@ -1,5 +1,6 @@
package cn.iocoder.yudao.userserver.modules.shop.controller;
import cn.iocoder.yudao.coreservice.modules.pay.service.notify.vo.PayNotifyOrderReqVO;
import cn.iocoder.yudao.coreservice.modules.pay.service.order.PayOrderCoreService;
import cn.iocoder.yudao.coreservice.modules.pay.service.order.dto.PayOrderCreateReqDTO;
import cn.iocoder.yudao.framework.common.pojo.CommonResult;
@ -10,10 +11,12 @@ import io.swagger.annotations.ApiOperation;
import lombok.extern.slf4j.Slf4j;
import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
import javax.validation.Valid;
import java.time.Duration;
import static cn.iocoder.yudao.framework.common.pojo.CommonResult.success;
@ -52,4 +55,11 @@ public class ShopOrderController {
.payOrderId(payOrderId).build());
}
@PostMapping("/pay-notify")
@ApiOperation("支付回调")
public CommonResult<Boolean> payNotify(@RequestBody @Valid PayNotifyOrderReqVO reqVO) {
log.info("[payNotify][回调成功]");
return success(true);
}
}