【功能优化】支付:钱包余额更新时,加锁避免并发更新,导致流水不连续的问题

This commit is contained in:
YunaiV 2024-09-23 13:45:21 +08:00
parent 9b5d3f01ca
commit dbb674b24f
4 changed files with 123 additions and 52 deletions

View File

@ -16,6 +16,15 @@ public interface RedisKeyConstants {
*/ */
String PAY_NOTIFY_LOCK = "pay_notify:lock:%d"; String PAY_NOTIFY_LOCK = "pay_notify:lock:%d";
/**
* 支付钱包的分布式锁
*
* KEY 格式pay_wallet:lock:%d
* VALUE 数据格式HASH // RLock.classRedisson Lock 使用 Hash 数据结构
* 过期时间不固定
*/
String PAY_WALLET_LOCK = "pay_wallet:lock:%d";
/** /**
* 支付序号的缓存 * 支付序号的缓存
* *

View File

@ -0,0 +1,42 @@
package cn.iocoder.yudao.module.pay.dal.redis.wallet;
import jakarta.annotation.Resource;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.stereotype.Repository;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import static cn.iocoder.yudao.module.pay.dal.redis.RedisKeyConstants.PAY_WALLET_LOCK;
/**
* 支付钱包的锁 Redis DAO
*
* @author 芋道源码
*/
@Repository
public class PayWalletLockRedisDAO {
@Resource
private RedissonClient redissonClient;
public <V> V lock(Long id, Long timeoutMillis, Callable<V> callable) throws Exception {
String lockKey = formatKey(id);
RLock lock = redissonClient.getLock(lockKey);
try {
lock.lock(timeoutMillis, TimeUnit.MILLISECONDS);
// 执行逻辑
return callable.call();
} catch (Exception e) {
throw e;
} finally {
lock.unlock();
}
}
private static String formatKey(Long id) {
return String.format(PAY_WALLET_LOCK, id);
}
}

View File

@ -2,17 +2,20 @@ package cn.iocoder.yudao.module.pay.service.wallet;
import cn.hutool.core.lang.Assert; import cn.hutool.core.lang.Assert;
import cn.iocoder.yudao.framework.common.pojo.PageResult; import cn.iocoder.yudao.framework.common.pojo.PageResult;
import cn.iocoder.yudao.framework.common.util.date.DateUtils;
import cn.iocoder.yudao.module.pay.controller.admin.wallet.vo.wallet.PayWalletPageReqVO; import cn.iocoder.yudao.module.pay.controller.admin.wallet.vo.wallet.PayWalletPageReqVO;
import cn.iocoder.yudao.module.pay.dal.dataobject.order.PayOrderExtensionDO; import cn.iocoder.yudao.module.pay.dal.dataobject.order.PayOrderExtensionDO;
import cn.iocoder.yudao.module.pay.dal.dataobject.refund.PayRefundDO; import cn.iocoder.yudao.module.pay.dal.dataobject.refund.PayRefundDO;
import cn.iocoder.yudao.module.pay.dal.dataobject.wallet.PayWalletDO; import cn.iocoder.yudao.module.pay.dal.dataobject.wallet.PayWalletDO;
import cn.iocoder.yudao.module.pay.dal.dataobject.wallet.PayWalletTransactionDO; import cn.iocoder.yudao.module.pay.dal.dataobject.wallet.PayWalletTransactionDO;
import cn.iocoder.yudao.module.pay.dal.mysql.wallet.PayWalletMapper; import cn.iocoder.yudao.module.pay.dal.mysql.wallet.PayWalletMapper;
import cn.iocoder.yudao.module.pay.dal.redis.wallet.PayWalletLockRedisDAO;
import cn.iocoder.yudao.module.pay.enums.wallet.PayWalletBizTypeEnum; import cn.iocoder.yudao.module.pay.enums.wallet.PayWalletBizTypeEnum;
import cn.iocoder.yudao.module.pay.service.order.PayOrderService; import cn.iocoder.yudao.module.pay.service.order.PayOrderService;
import cn.iocoder.yudao.module.pay.service.refund.PayRefundService; import cn.iocoder.yudao.module.pay.service.refund.PayRefundService;
import cn.iocoder.yudao.module.pay.service.wallet.bo.WalletTransactionCreateReqBO; import cn.iocoder.yudao.module.pay.service.wallet.bo.WalletTransactionCreateReqBO;
import jakarta.annotation.Resource; import jakarta.annotation.Resource;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Lazy; import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
@ -32,10 +35,17 @@ import static cn.iocoder.yudao.module.pay.enums.wallet.PayWalletBizTypeEnum.PAYM
*/ */
@Service @Service
@Slf4j @Slf4j
public class PayWalletServiceImpl implements PayWalletService { public class PayWalletServiceImpl implements PayWalletService {
/**
* 通知超时时间单位毫秒
*/
public static final long UPDATE_TIMEOUT_MILLIS = 120 * DateUtils.SECOND_MILLIS;
@Resource @Resource
private PayWalletMapper walletMapper; private PayWalletMapper walletMapper;
@Resource
private PayWalletLockRedisDAO lockRedisDAO;
@Resource @Resource
@Lazy // 延迟加载避免循环依赖 @Lazy // 延迟加载避免循环依赖
@ -121,75 +131,87 @@ public class PayWalletServiceImpl implements PayWalletService {
} }
@Override @Override
@Transactional(rollbackFor = Exception.class)
@SneakyThrows
public PayWalletTransactionDO reduceWalletBalance(Long walletId, Long bizId, public PayWalletTransactionDO reduceWalletBalance(Long walletId, Long bizId,
PayWalletBizTypeEnum bizType, Integer price) { PayWalletBizTypeEnum bizType, Integer price) {
// 1. 获取钱包 // 1. 获取钱包
PayWalletDO payWallet = getWallet(walletId); PayWalletDO payWallet = getWallet(walletId);
if (payWallet == null) { if (payWallet == null) {
log.error("[reduceWalletBalance],用户钱包({})不存在.", walletId); log.error("[reduceWalletBalance][用户钱包({})不存在]", walletId);
throw exception(WALLET_NOT_FOUND); throw exception(WALLET_NOT_FOUND);
} }
// 2.1 扣除余额 // 2. 加锁更新钱包余额目的避免钱包流水的并发更新时余额变化不连贯
int updateCounts; return lockRedisDAO.lock(walletId, UPDATE_TIMEOUT_MILLIS, () -> {
switch (bizType) { // 2. 扣除余额
case PAYMENT: { int updateCounts;
updateCounts = walletMapper.updateWhenConsumption(payWallet.getId(), price); switch (bizType) {
break; case PAYMENT: {
updateCounts = walletMapper.updateWhenConsumption(payWallet.getId(), price);
break;
}
case RECHARGE_REFUND: {
updateCounts = walletMapper.updateWhenRechargeRefund(payWallet.getId(), price);
break;
}
default: {
// TODO 其它类型待实现
throw new UnsupportedOperationException("待实现");
}
} }
case RECHARGE_REFUND: { if (updateCounts == 0) {
updateCounts = walletMapper.updateWhenRechargeRefund(payWallet.getId(), price); throw exception(WALLET_BALANCE_NOT_ENOUGH);
break;
} }
default: {
// TODO 其它类型待实现 // 3. 生成钱包流水
throw new UnsupportedOperationException("待实现"); Integer afterBalance = payWallet.getBalance() - price;
} WalletTransactionCreateReqBO bo = new WalletTransactionCreateReqBO().setWalletId(payWallet.getId())
} .setPrice(-price).setBalance(afterBalance).setBizId(String.valueOf(bizId))
if (updateCounts == 0) { .setBizType(bizType.getType()).setTitle(bizType.getDescription());
throw exception(WALLET_BALANCE_NOT_ENOUGH); return walletTransactionService.createWalletTransaction(bo);
} });
// 2.2 生成钱包流水
Integer afterBalance = payWallet.getBalance() - price;
WalletTransactionCreateReqBO bo = new WalletTransactionCreateReqBO().setWalletId(payWallet.getId())
.setPrice(-price).setBalance(afterBalance).setBizId(String.valueOf(bizId))
.setBizType(bizType.getType()).setTitle(bizType.getDescription());
return walletTransactionService.createWalletTransaction(bo);
} }
@Override @Override
@Transactional(rollbackFor = Exception.class)
@SneakyThrows
public PayWalletTransactionDO addWalletBalance(Long walletId, String bizId, public PayWalletTransactionDO addWalletBalance(Long walletId, String bizId,
PayWalletBizTypeEnum bizType, Integer price) { PayWalletBizTypeEnum bizType, Integer price) {
// 1.1 获取钱包 // 1. 获取钱包
PayWalletDO payWallet = getWallet(walletId); PayWalletDO payWallet = getWallet(walletId);
if (payWallet == null) { if (payWallet == null) {
log.error("[addWalletBalance],用户钱包({})不存在.", walletId); log.error("[addWalletBalance][用户钱包({})不存在]", walletId);
throw exception(WALLET_NOT_FOUND); throw exception(WALLET_NOT_FOUND);
} }
// 1.2 更新钱包金额
switch (bizType) {
case PAYMENT_REFUND: { // 退款更新
walletMapper.updateWhenConsumptionRefund(payWallet.getId(), price);
break;
}
case RECHARGE: { // 充值更新
walletMapper.updateWhenRecharge(payWallet.getId(), price);
break;
}
case UPDATE_BALANCE: // 更新余额
walletMapper.updateWhenRecharge(payWallet.getId(), price);
break;
default: {
// TODO 其它类型待实现
throw new UnsupportedOperationException("待实现");
}
}
// 2. 生成钱包流水 // 2. 加锁更新钱包余额目的避免钱包流水的并发更新时余额变化不连贯
WalletTransactionCreateReqBO transactionCreateReqBO = new WalletTransactionCreateReqBO() return lockRedisDAO.lock(walletId, UPDATE_TIMEOUT_MILLIS, () -> {
.setWalletId(payWallet.getId()).setPrice(price).setBalance(payWallet.getBalance() + price) // 2. 更新钱包金额
.setBizId(bizId).setBizType(bizType.getType()).setTitle(bizType.getDescription()); switch (bizType) {
return walletTransactionService.createWalletTransaction(transactionCreateReqBO); case PAYMENT_REFUND: { // 退款更新
walletMapper.updateWhenConsumptionRefund(payWallet.getId(), price);
break;
}
case RECHARGE: { // 充值更新
walletMapper.updateWhenRecharge(payWallet.getId(), price);
break;
}
case UPDATE_BALANCE: // 更新余额
walletMapper.updateWhenRecharge(payWallet.getId(), price);
break;
default: {
// TODO 其它类型待实现
throw new UnsupportedOperationException("待实现");
}
}
// 3. 生成钱包流水
WalletTransactionCreateReqBO transactionCreateReqBO = new WalletTransactionCreateReqBO()
.setWalletId(payWallet.getId()).setPrice(price).setBalance(payWallet.getBalance() + price)
.setBizId(bizId).setBizType(bizType.getType()).setTitle(bizType.getDescription());
return walletTransactionService.createWalletTransaction(transactionCreateReqBO);
});
} }
@Override @Override

View File

@ -87,8 +87,6 @@ public class PayWalletTransactionServiceImpl implements PayWalletTransactionServ
@Override @Override
public AppPayWalletTransactionSummaryRespVO getWalletTransactionSummary(Long userId, Integer userType, LocalDateTime[] createTime) { public AppPayWalletTransactionSummaryRespVO getWalletTransactionSummary(Long userId, Integer userType, LocalDateTime[] createTime) {
PayWalletDO wallet = payWalletService.getOrCreateWallet(userId, userType); PayWalletDO wallet = payWalletService.getOrCreateWallet(userId, userType);
AppPayWalletTransactionSummaryRespVO summary = new AppPayWalletTransactionSummaryRespVO()
.setTotalExpense(1).setTotalIncome(100);
return new AppPayWalletTransactionSummaryRespVO() return new AppPayWalletTransactionSummaryRespVO()
.setTotalExpense(payWalletTransactionMapper.selectPriceSum(wallet.getId(), TYPE_EXPENSE, createTime)) .setTotalExpense(payWalletTransactionMapper.selectPriceSum(wallet.getId(), TYPE_EXPENSE, createTime))
.setTotalIncome(payWalletTransactionMapper.selectPriceSum(wallet.getId(), TYPE_INCOME, createTime)); .setTotalIncome(payWalletTransactionMapper.selectPriceSum(wallet.getId(), TYPE_INCOME, createTime));