数据同步
2020-10-13 本文已影响0人
盗生一
package com.gxhj.safecampus.visit.service.impl;
import com.alibaba.fastjson.JSONObject;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.gxhj.commontools.utils.CollectionUtils;
import com.gxhj.commontools.utils.StringUtils;
import com.gxhj.safecampus.configuration.sysconfig.CampusConfig;
import com.gxhj.safecampus.middleware.util.MiddlewareUtil;
import com.gxhj.safecampus.utils.common.HttpConnectionUtil;
import com.gxhj.safecampus.visit.entity.DataSynchronousInfo;
import com.gxhj.safecampus.visit.entity.VisitorInfo;
import com.gxhj.safecampus.visit.enums.SynchronousDataOperateTypes;
import com.gxhj.safecampus.visit.enums.SynchronousDataTypes;
import com.gxhj.safecampus.visit.enums.SynchronousStatuses;
import com.gxhj.safecampus.visit.service.*;
import com.gxhj.safecampus.visit.synchronous.result.SynchronousResult;
import com.gxhj.safecampus.visit.vo.VisitSynchronousInfoVo;
import com.gxhj.usermanage.entity.GroupInfo;
import com.gxhj.usermanage.service.IGroupService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.stereotype.Service;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
/**
* 同步访客小程序数据业务接口实现
*/
@Service
@ConditionalOnBean(DataSynchronousServiceImpl.class)
public class SynchronousVisitDataServiceImpl implements ISynchronousVisitDataService {
private Logger log = LoggerFactory.getLogger(SynchronousVisitDataServiceImpl.class);
@Autowired
private IGroupService groupService;
@Autowired
private IVisitorService visitorService;
@Autowired
private IAccessingInviteService accessingInviteService;
@Autowired
private IAccessingRecordService accessingRecordService;
@Autowired
private IDataSynchronousService dataSynchronousService;
@Override
public void synchronousVisitDataByGroupId() {
// 获取所有要同步数据的组织编号列表
List<String> lstGroupId = this.groupService.lambdaQuery()
.eq(GroupInfo::getSynchronousData, true)
.select(GroupInfo::getGroupId)
.list()
.stream()
.map(GroupInfo::getGroupId).collect(Collectors.toList());
// 如果无组织要同步数据则返回
if (CollectionUtils.isEmpty(lstGroupId)) {
return;
}
String json;
VisitSynchronousInfoVo visitSyncInfo;
// 获取当前系统下的访客编号列表
List<Long> lstVisitId = this.visitorService.lambdaQuery()
.select(VisitorInfo::getVisitorID)
.list().stream().map(VisitorInfo::getVisitorID)
.collect(Collectors.toList());
List<VisitorInfo> lstNewVisitor = new ArrayList<>();
List<VisitorInfo> lstUpdateVisitor = new ArrayList<>();
int newSize = 0;
int updateSize = 0;
for (String groupId : lstGroupId) {
json = HttpConnectionUtil.doGet(CampusConfig.getVisitorDataSyncConfig().getGetVisitorDataUrl() + groupId);
if (StringUtils.isEmpty(json)) {
log.error("未能获取到'" + groupId + "'的任何访客数据");
continue;
}
try {
// 反序列化成对象
visitSyncInfo = MiddlewareUtil.getSerialObjMapper().readValue(json, VisitSynchronousInfoVo.class);
// 保存访客列表信息
if (CollectionUtils.isNotEmpty(visitSyncInfo.getVisitorInfoList())) {
visitSyncInfo.getVisitorInfoList().forEach(visitor -> {
if (lstVisitId.contains(visitor.getVisitorID())) {
// 更新信息
lstUpdateVisitor.add(visitor);
} else {
// 新增信息
lstNewVisitor.add(visitor);
// 记录主键
lstVisitId.add(visitor.getVisitorID());
}
});
// 批量更新
if (CollectionUtils.isNotEmpty(lstUpdateVisitor)){
this.visitorService.updateBatchById(lstUpdateVisitor);
// 更新后数据移除
updateSize = lstUpdateVisitor.size();
lstUpdateVisitor.clear();
}
// 批量新增
if (CollectionUtils.isNotEmpty(lstNewVisitor)){
this.visitorService.saveBatch(lstNewVisitor);
// 新增后数据移除
newSize = lstNewVisitor.size();
lstNewVisitor.clear();
}
// 记录同步数量
log.info("同步到" + visitSyncInfo.getVisitorInfoList().size() + "条访客信息,其中更新" + updateSize + "条,新增" + newSize + "条。");
}
// 保存邀请列表信息
if (CollectionUtils.isNotEmpty(visitSyncInfo.getAccessingInviteInfoList())) {
this.accessingInviteService.saveOrUpdateBatch(visitSyncInfo.getAccessingInviteInfoList());
// 记录同步数量
log.info("同步到" + visitSyncInfo.getAccessingInviteInfoList().size() + "条邀请信息");
}
// 保存访客记录列表信息
if (CollectionUtils.isNotEmpty(visitSyncInfo.getAccessingRecordInfoList())) {
this.accessingRecordService.saveOrUpdateBatch(visitSyncInfo.getAccessingRecordInfoList());
// 记录同步数量
log.info("同步到" + visitSyncInfo.getAccessingRecordInfoList().size() + "条访客记录信息");
}
} catch (Exception ex) {
log.error("解析访客数据出错,访客数据为:" + json, ex);
}
}
}
@Override
public void sendEmployeeData(Integer size) {
Page<DataSynchronousInfo> pageInfo = new Page<>(1, size);
pageInfo.setSearchCount(false);
Page<DataSynchronousInfo> lstDataSync = this.dataSynchronousService.lambdaQuery()
.eq(DataSynchronousInfo::getDataType, SynchronousDataTypes.EmployeeType)
.eq(DataSynchronousInfo::getSynchronousStatus,SynchronousStatuses.SynchronousWait)
.orderByAsc(DataSynchronousInfo::getCreateTime)
.page(pageInfo);
sendData(lstDataSync.getRecords(), CampusConfig.getVisitorDataSyncConfig().getSendEmployeeDataUrl());
}
@Override
public void sendGroupData(Integer size) {
Page<DataSynchronousInfo> pageInfo = new Page<>(1, size);
pageInfo.setSearchCount(false);
Page<DataSynchronousInfo> lstDataSync = this.dataSynchronousService.lambdaQuery()
.eq(DataSynchronousInfo::getDataType, SynchronousDataTypes.GroupType)
.eq(DataSynchronousInfo::getSynchronousStatus,SynchronousStatuses.SynchronousWait)
.orderByAsc(DataSynchronousInfo::getCreateTime)
.page(pageInfo);
// 获取所有同步
sendData(lstDataSync.getRecords(), CampusConfig.getVisitorDataSyncConfig().getSendGroupDataUrl());
}
boolean synError = true;
// 统一发送数据调用http请求方法
private void sendData(List<DataSynchronousInfo> lstDataSync, String url) {
if (CollectionUtils.isEmpty(lstDataSync)) {
return;
}
HttpConnectionUtil.RequestMethods method = HttpConnectionUtil.RequestMethods.POST;
List<DataSynchronousInfo> lstSaveInfo = new ArrayList<>();
for (DataSynchronousInfo record : lstDataSync) {
if (record.getOperateType() == SynchronousDataOperateTypes.AddOperate) {
method = HttpConnectionUtil.RequestMethods.POST;
} else if (record.getOperateType() == SynchronousDataOperateTypes.UpdateOperate) {
method = HttpConnectionUtil.RequestMethods.PUT;
} else if (record.getOperateType() == SynchronousDataOperateTypes.DeleteOperate) {
method = HttpConnectionUtil.RequestMethods.DELETE;
}
// TODO:请求结果的判断?如果失败了怎么处理?
SynchronousResult synRst = JSONObject.toJavaObject(JSONObject.parseObject(HttpConnectionUtil.doMethod(url, method, record.getDataContent())), SynchronousResult.class);
// 判断 成功
if (synRst.isResult()&&(Integer)synRst.getData()>0) {
//
record.setSynchronousStatus(SynchronousStatuses.SynchronousSuccess);
record.setSynchronousResult("同步成功");
record.setSynchronousTime(LocalDateTime.now());
lstSaveInfo.add(record);
this.dataSynchronousService.updateBatchById(lstSaveInfo);
}
// }else {
//
// // 同步失败
// record.setSynchronousStatus(SynchronousStatuses.SynchronousFail);
// record.setSynchronousResult("同步失败:" + synRst.getMessage());
// record.setSynchronousTime(LocalDateTime.now());
// lstSaveInfo.add(record);
// }
if (!synRst.isResult()||(Integer)synRst.getData()==0){
// 立即触发一次 成功 插入成功记录
if (synError){
synError = false;
sendData(lstDataSync,url);
}
// 失败 插入失败记录
if (!synError){
this.dataSynchronousService.updateBatchById(lstSaveInfo);
}
}
}
// this.dataSynchronousService.updateBatchById(lstSaveInfo);
}
}