mp:实现 user 的批量同步

This commit is contained in:
YunaiV 2023-01-08 15:23:40 +08:00
parent a341c44c4d
commit f05e086aab
9 changed files with 157 additions and 25 deletions

View File

@ -63,8 +63,8 @@ public class MpTagController {
}
@PostMapping("/sync")
@ApiOperation("同步公众标签")
@ApiImplicitParam(name = "id", value = "公众号账号的编号", required = true, dataTypeClass = Long.class)
@ApiOperation("同步公众标签")
@ApiImplicitParam(name = "accountId", value = "公众号账号的编号", required = true, dataTypeClass = Long.class)
@PreAuthorize("@ss.hasPermission('mp:tag:sync')")
public CommonResult<Boolean> syncTag(@RequestParam("accountId") Long accountId) {
mpTagService.syncTag(accountId);

View File

@ -0,0 +1,5 @@
### 请求 /mp/user/sync 接口 => 成功
POST {{baseUrl}}/mp/user/sync?accountId=1
Content-Type: application/json
Authorization: Bearer {{token}}
tenant-id: {{adminTenentId}}

View File

@ -7,6 +7,7 @@ import cn.iocoder.yudao.module.mp.convert.user.MpUserConvert;
import cn.iocoder.yudao.module.mp.dal.dataobject.user.MpUserDO;
import cn.iocoder.yudao.module.mp.service.user.MpUserService;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiImplicitParam;
import io.swagger.annotations.ApiOperation;
import org.springframework.security.access.prepost.PreAuthorize;
import org.springframework.validation.annotation.Validated;
@ -34,4 +35,13 @@ public class MpUserController {
return success(MpUserConvert.INSTANCE.convertPage(pageResult));
}
@PostMapping("/sync")
@ApiOperation("同步公众号粉丝")
@ApiImplicitParam(name = "accountId", value = "公众号账号的编号", required = true, dataTypeClass = Long.class)
@PreAuthorize("@ss.hasPermission('mp:user:sync')")
public CommonResult<Boolean> syncUser(@RequestParam("accountId") Long accountId) {
mpUserService.syncUser(accountId);
return success(true);
}
}

View File

@ -3,6 +3,7 @@ package cn.iocoder.yudao.module.mp.convert.user;
import cn.hutool.core.date.LocalDateTimeUtil;
import cn.iocoder.yudao.framework.common.enums.CommonStatusEnum;
import cn.iocoder.yudao.framework.common.pojo.PageResult;
import cn.iocoder.yudao.framework.common.util.collection.CollectionUtils;
import cn.iocoder.yudao.module.mp.controller.admin.user.vo.MpUserRespVO;
import cn.iocoder.yudao.module.mp.dal.dataobject.account.MpAccountDO;
import cn.iocoder.yudao.module.mp.dal.dataobject.user.MpUserDO;
@ -44,4 +45,8 @@ public interface MpUserConvert {
return user;
}
default List<MpUserDO> convertList(MpAccountDO account, List<WxMpUser> wxUsers) {
return CollectionUtils.convertList(wxUsers, wxUser -> convert(account, wxUser));
}
}

View File

@ -17,6 +17,9 @@ import me.chanjar.weixin.mp.bean.tag.WxUserTag;
@Data
@EqualsAndHashCode(callSuper = true)
@ToString(callSuper = true)
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class MpTagDO extends BaseDO {
/**

View File

@ -2,20 +2,24 @@ package cn.iocoder.yudao.module.mp.dal.dataobject.user;
import cn.iocoder.yudao.framework.common.enums.CommonStatusEnum;
import cn.iocoder.yudao.framework.mybatis.core.dataobject.BaseDO;
import cn.iocoder.yudao.framework.mybatis.core.type.LongListTypeHandler;
import cn.iocoder.yudao.module.mp.dal.dataobject.account.MpAccountDO;
import cn.iocoder.yudao.module.mp.dal.dataobject.tag.MpTagDO;
import com.baomidou.mybatisplus.annotation.KeySequence;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.*;
import java.time.LocalDateTime;
import java.util.List;
/**
* 微信公众号粉丝 DO
*
* @author 芋道源码
*/
@TableName("mp_user")
@TableName(value = "mp_user", autoResultMap = true)
@KeySequence("mp_user_seq") // 用于 OraclePostgreSQLKingbaseDB2H2 数据库的主键自增如果是 MySQL 等数据库可不写
@Data
@EqualsAndHashCode(callSuper = true)
@ -82,6 +86,13 @@ public class MpUserDO extends BaseDO {
* 备注
*/
private String remark;
/**
* 标签编号数组
*
* 注意对应的是 {@link MpTagDO#getTagId()} 字段
*/
@TableField(typeHandler = LongListTypeHandler.class)
private List<Long> tagIds;
/**
* 微信公众号 ID

View File

@ -7,6 +7,8 @@ import cn.iocoder.yudao.module.mp.controller.admin.user.vo.MpUserPageReqVO;
import cn.iocoder.yudao.module.mp.dal.dataobject.user.MpUserDO;
import org.apache.ibatis.annotations.Mapper;
import java.util.List;
@Mapper
public interface MpUserMapper extends BaseMapperX<MpUserDO> {
@ -18,9 +20,16 @@ public interface MpUserMapper extends BaseMapperX<MpUserDO> {
.orderByDesc(MpUserDO::getId));
}
default MpUserDO selectByAppIdAndOpenid(String appId, String openId) {
default MpUserDO selectByAppIdAndOpenid(String appId, String openid) {
return selectOne(MpUserDO::getAppId, appId,
MpUserDO::getOpenid, openId);
MpUserDO::getOpenid, openid);
}
default List<MpUserDO> selectListByAppIdAndOpenid(String appId, List<String> openids) {
return selectList(new LambdaQueryWrapperX<MpUserDO>()
.eq(MpUserDO::getAppId, appId)
.in(MpUserDO::getOpenid, openids));
}
}

View File

@ -9,61 +9,68 @@ import java.util.Collection;
import java.util.List;
/**
* 微信公众号粉丝 Service 接口
* 公众号粉丝 Service 接口
*
* @author 芋道源码
*/
public interface MpUserService {
/**
* 获得微信公众号粉丝
* 获得公众号粉丝
*
* @param id 编号
* @return 微信公众号粉丝
* @return 公众号粉丝
*/
MpUserDO getUser(Long id);
/**
* 使用 appId + openId获得微信公众号粉丝
* 使用 appId + openId获得公众号粉丝
*
* @param appId 微信公众号 appId
* @param openId 微信公众号 openId
* @return 微信公众号粉丝
* @param appId 公众号 appId
* @param openId 公众号 openId
* @return 公众号粉丝
*/
MpUserDO getUser(String appId, String openId);
/**
* 获得微信公众号粉丝列表
* 获得公众号粉丝列表
*
* @param ids 编号
* @return 微信公众号粉丝列表
* @return 公众号粉丝列表
*/
List<MpUserDO> getUserList(Collection<Long> ids);
/**
* 获得微信公众号粉丝分页
* 获得公众号粉丝分页
*
* @param pageReqVO 分页查询
* @return 微信公众号粉丝分页
* @return 公众号粉丝分页
*/
PageResult<MpUserDO> getUserPage(MpUserPageReqVO pageReqVO);
/**
* 保存微信公众号粉丝
* 保存公众号粉丝
*
* 新增或更新根据是否存在数据库中
*
* @param appId 微信公众号 appId
* @param wxMpUser 微信公众号粉丝的信息
* @return 微信公众号粉丝
* @param appId 公众号 appId
* @param wxMpUser 公众号粉丝的信息
* @return 公众号粉丝
*/
MpUserDO saveUser(String appId, WxMpUser wxMpUser);
/**
* 更新微信公众号粉丝取消关注
* 同步一个公众号粉丝
*
* @param appId 微信公众号 appId
* @param openId 微信公众号粉丝的 openid
* @param accountId 公众号账号的编号
*/
void syncUser(Long accountId);
/**
* 更新公众号粉丝取消关注
*
* @param appId 公众号 appId
* @param openId 公众号粉丝的 openid
*/
void updateUserUnsubscribe(String appId, String openId);

View File

@ -1,23 +1,33 @@
package cn.iocoder.yudao.module.mp.service.user;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.util.StrUtil;
import cn.iocoder.yudao.framework.common.enums.CommonStatusEnum;
import cn.iocoder.yudao.framework.common.pojo.PageResult;
import cn.iocoder.yudao.framework.common.util.collection.CollectionUtils;
import cn.iocoder.yudao.module.mp.controller.admin.user.vo.MpUserPageReqVO;
import cn.iocoder.yudao.module.mp.convert.user.MpUserConvert;
import cn.iocoder.yudao.module.mp.dal.dataobject.account.MpAccountDO;
import cn.iocoder.yudao.module.mp.dal.dataobject.user.MpUserDO;
import cn.iocoder.yudao.module.mp.dal.mysql.user.MpUserMapper;
import cn.iocoder.yudao.module.mp.framework.mp.core.MpServiceFactory;
import cn.iocoder.yudao.module.mp.service.account.MpAccountService;
import lombok.extern.slf4j.Slf4j;
import me.chanjar.weixin.common.error.WxErrorException;
import me.chanjar.weixin.mp.api.WxMpService;
import me.chanjar.weixin.mp.bean.result.WxMpUser;
import me.chanjar.weixin.mp.bean.result.WxMpUserList;
import org.springframework.context.annotation.Lazy;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import org.springframework.validation.annotation.Validated;
import javax.annotation.Resource;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
/**
* 微信公众号粉丝 Service 实现类
@ -31,7 +41,11 @@ public class MpUserServiceImpl implements MpUserService {
@Resource
@Lazy // 延迟加载解决循环依赖的问题
private MpAccountService accountService;
private MpAccountService mpAccountService;
@Resource
@Lazy // 延迟加载解决循环依赖的问题
private MpServiceFactory mpServiceFactory;
@Resource
private MpUserMapper mpUserMapper;
@ -59,7 +73,7 @@ public class MpUserServiceImpl implements MpUserService {
@Override
public MpUserDO saveUser(String appId, WxMpUser wxMpUser) {
// 构建保存的 MpUserDO 对象
MpAccountDO account = accountService.getAccountFromCache(appId);
MpAccountDO account = mpAccountService.getAccountFromCache(appId);
MpUserDO user = MpUserConvert.INSTANCE.convert(account, wxMpUser);
// 根据情况插入或更新
@ -73,6 +87,74 @@ public class MpUserServiceImpl implements MpUserService {
return user;
}
@Override
@Async
public void syncUser(Long accountId) {
MpAccountDO account = mpAccountService.getRequiredAccount(accountId);
// for 循环避免递归出意外问题导致死循环
String nextOpenid = null;
for (int i = 0; i < Short.MAX_VALUE; i++) {
log.info("[syncUser][第({}) 次加载公众号用户列表nextOpenid({})]", i, nextOpenid);
try {
nextOpenid = syncUser0(account, nextOpenid);
} catch (WxErrorException e) {
log.error("[syncUser][第({}) 次同步用户异常]", i, e);
break;
}
// 如果 nextOpenid 为空表示已经同步完毕
if (StrUtil.isEmpty(nextOpenid)) {
break;
}
}
}
private String syncUser0(MpAccountDO account, String nextOpenid) throws WxErrorException {
// 第一步从公众号流式加载用户
WxMpService mpService = mpServiceFactory.getRequiredMpService(account.getId());
WxMpUserList wxUserList = mpService.getUserService().userList(nextOpenid);
if (CollUtil.isEmpty(wxUserList.getOpenids())) {
return null;
}
// 第二步分批加载用户信息
List<List<String>> openidsList = CollUtil.split(wxUserList.getOpenids(), 100);
for (List<String> openids : openidsList) {
log.info("[syncUser][批量加载用户信息openids({})]", openids);
List<WxMpUser> wxUsers = mpService.getUserService().userInfoList(openids);
batchSaveUser(account, wxUsers);
}
// 返回下一次的 nextOpenId
return wxUserList.getNextOpenid();
}
private void batchSaveUser(MpAccountDO account, List<WxMpUser> wxUsers) {
if (CollUtil.isEmpty(wxUsers)) {
return;
}
// 1. 获得数据库已保存的用户列表
List<MpUserDO> dbUsers = mpUserMapper.selectListByAppIdAndOpenid(account.getAppId(),
CollectionUtils.convertList(wxUsers, WxMpUser::getOpenId));
Map<String, MpUserDO> openId2Users = CollectionUtils.convertMap(dbUsers, MpUserDO::getOpenid);
// 2.1 根据情况插入或更新
List<MpUserDO> users = MpUserConvert.INSTANCE.convertList(account, wxUsers);
List<MpUserDO> newUsers = new ArrayList<>();
for (MpUserDO user : users) {
MpUserDO dbUser = openId2Users.get(user.getOpenid());
if (dbUser == null) { // 新增稍后批量插入
newUsers.add(user);
} else { // 更新直接执行更新
user.setId(dbUser.getId());
mpUserMapper.updateById(user);
}
}
// 2.2 批量插入
if (CollUtil.isNotEmpty(newUsers)) {
mpUserMapper.insertBatch(newUsers);
}
}
@Override
public void updateUserUnsubscribe(String appId, String openId) {
MpUserDO dbUser = mpUserMapper.selectByAppIdAndOpenid(appId, openId);