增量同步在业务场景中经常会遇到,比如人员同步,部门同步等等。通过可配置化来使同步任务更加灵活。
一、增量同步是在普通定时任务的基础上实现的。
–>springcloud的可配置化定时任务的设计及实现
二、增加一张表,控制增量数据的时间范围。
-- auto-generated definition create table sync_config ( creation_date datetime not null, created_by bigint not null, last_update_date datetime(3) null, last_updated_by bigint not null, enabled_flag varchar(1) not null, sync_config_id bigint auto_increment primary key, sync_type varchar(128) null comment '同步类型', sync_key varchar(128) null comment '同步数据唯一键(如:部门id)', sync_desc varchar(256) null comment '同步描述', current_status varchar(128) null comment '当前状态(Y正在运行N未运行)', last_run_time datetime(3) default CURRENT_TIMESTAMP(3) not null, max_interval_hours int null comment '最大时间间隔(小时)' ) comment '数据同步配置'; create index sync_config_n1 on sync_config (sync_type, sync_key);
PS:sync_key有什么作用呢?由于业务需要,同步人员是按部门增量同步的,每一个部门的最后更新时间是不一样的,也不能全部按最大时间范围来更新,那样的话人员多的部门数据量会很庞大,对系统造成压力,所以按部门id记录每个部门的最后更新时间,对每个部门单独做增量查询。
三、写一个控制同步的方法,当同步开始时,状态要改为Y正在运行,以免重复执行,当同步结束,状态改为N未运行。
@Service @Transactional public class SyncConfigServiceImpl implements SyncConfigService { @Autowired private SyncConfigMapper syncConfigMapper; private static final String RUNING = "Y"; private static final String UNRUN = "N"; private static final Integer maxIntervalHours = 48; @Override public CommonResultupdateRuning(String syncType, String syncTypeName, String syncKey) { if (BeeStringUtil.isEmpty(syncType)) { return CommonResult.failWithMessage("syncType不能为空"); } FbpSyncConfigExample example = new FbpSyncConfigExample(); FbpSyncConfigExample.Criteria criteria = example.createCriteria(); criteria.andEnabledFlagEqualTo(YesOrNo.Y.val).andSyncTypeEqualTo(syncType); if (BeeStringUtil.isNotEmpty(syncKey)) { criteria.andSyncKeyEqualTo(syncKey); } else { criteria.andSyncKeyIsNull(); } SyncConfig config = BeeCollectionUtil.getObj(syncConfigMapper.selectByExample(example)); if (config != null) { if (RUNING.equals(config.getCurrentStatus())) { return CommonResult.failWithMessage("同步任务正在进行"); } config.updatebaseInfo(BeeSpecialIdConstant.TIMER.id); config.setCurrentStatus(RUNING); syncConfigMapper.updateByPrimaryKeySelective(config); return CommonResult.successWithData(config); } else { config = new SyncConfig(); config.addbaseInfo(BeeSpecialIdConstant.TIMER.id); config.setCurrentStatus(RUNING); config.setLastRunTime(BeeDateUtil.addDay(new Date(), -2)); config.setSyncType(syncType); config.setSyncDesc(syncTypeName); config.setSyncKey(syncKey); config.setMaxIntervalHours(maxIntervalHours); syncConfigMapper.insertSelective(config); return CommonResult.successWithData(config); } } @Override public CommonResult updateRuned(String syncType, String syncTypeName, String syncKey, Date lastRunTime) { if (BeeStringUtil.isEmpty(syncType)) { return CommonResult.failWithMessage("syncType不能为空"); } FbpSyncConfigExample example = new FbpSyncConfigExample(); FbpSyncConfigExample.Criteria criteria = example.createCriteria(); criteria.andEnabledFlagEqualTo(YesOrNo.Y.val).andSyncTypeEqualTo(syncType); if (BeeStringUtil.isNotEmpty(syncKey)) { criteria.andSyncKeyEqualTo(syncKey); } else { criteria.andSyncKeyIsNull(); } SyncConfig config = BeeCollectionUtil.getObj(syncConfigMapper.selectByExample(example)); if(config == null){ return CommonResult.failWithMessage("同步任务不存在"); } //更新 config.updatebaseInfo(BeeSpecialIdConstant.TIMER.id); config.setCurrentStatus(UNRUN); if(lastRunTime != null){ config.setLastRunTime(lastRunTime); } syncConfigMapper.updateByPrimaryKeySelective(config); return CommonResult.success(); } @Override public CommonResult release() { SyncConfig set = new SyncConfig(); set.updatebaseInfo(0L); set.setCurrentStatus(YesOrNo.N.val); FbpSyncConfigExample configExample = new FbpSyncConfigExample(); configExample.createCriteria().andLastUpdateDateLessThan(BeeDateUtil.addDay(new Date(), -1)) .andEnabledFlagEqualTo(YesOrNo.Y.val).andCurrentStatusEqualTo(YesOrNo.Y.val); syncConfigMapper.updateByExampleSelective(set, configExample); return CommonResult.success(); } }
四、定时器的代码。需要继承AbstractTimer。
public abstract class AbstractIncrementalSyncTimer extends AbstractTimer { @SuppressWarnings("rawtypes") @Override public CommonResult process(ContextInfo context, JSONObject para) throws Exception { SyncTypeBean syncTypeBean = this.getSyncType(); if (syncTypeBean == null || MyStringUtil.isEmpty(syncTypeBean.getSyncType()) || MyStringUtil.isEmpty(syncTypeBean.getSyncDesc())) { return CommonResult.failWithMessage("SyncTypeBean为空"); } CommonResult cr = null; TimerFindSyncKeyRespBean syncKeyResp = this.findSyncKey(); ListsyncKeys = syncKeyResp.getSyncKeys(); if(MyCollectionUtil.isEmpty(syncKeys) && !syncKeyResp.isRunWhenEmpty()) { return CommonResult.success(); } if (MyCollectionUtil.isEmpty(syncKeys)) { cr = this.processTask(context, para, syncTypeBean); if (!cr.isSuccess() && this.isSendMonitor()) { monitorSendByMqService.send(SysMonitorBean.buildMail("同步数据错误", JSONObject.toJSONString(syncTypeBean), null)); } return cr; } else { //多线程同步 FixedThreadProcess tp = new FixedThreadProcess (syncKeys, getThreadCount()) { @Override protected boolean process(String syncKey) throws Exception { // MyStringUtil是一个自己写的String的工具类 if (MyStringUtil.isEmpty(syncKey)) { return true; } //将线程外的变量复制 JSONObject threadPara = null; if(para != null) { threadPara = (JSONObject)para.clone(); } // MyCopyUtil是一个自己写的copy工具类 ContextInfo threadContext = MyCopyUtil.copyNew(context, ContextInfo.class); SyncTypeBean threadSyncTypeBean = MyCopyUtil.copyNew(syncTypeBean, SyncTypeBean.class); // threadSyncTypeBean.setSyncKey(syncKey); CommonResult> cr = processTask(threadContext, threadPara, threadSyncTypeBean); if (!cr.isSuccess() && isSendMonitor()) { monitorSendByMqService.send(SysMonitorBean.buildMail("同步数据错误", JSONObject.toJSONString(threadSyncTypeBean), null)); } return false; } }; tp.run(); return CommonResult.success(); } } @SuppressWarnings("rawtypes") private CommonResult processTask(ContextInfo context, JSONObject para, SyncTypeBean syncTypeBean) throws Exception { CommonResult cr; Date endTime = null; try { // 更新同步配置为更新中 try { cr = syncConfigService.updateRuning(syncTypeBean.getSyncType(), syncTypeBean.getSyncDesc(), syncTypeBean.getSyncKey()); if (!cr.isSuccess()) { log.error("定时器同步失败-执行updateRuning返回错误:syncType={}, syncKey={}, msg={}, sessionId={}", syncTypeBean.getSyncType(), syncTypeBean.getSyncKey(), cr.getMessage(), context.getSessionId()); return cr; } } catch (Exception e) { log.error("定时器同步失败-执行updateRuning异常: syncType={}, syncKey={}, msg={}, sessionId={}", syncTypeBean.getSyncType(), syncTypeBean.getSyncKey(), e.getMessage(), context.getSessionId(), e); cr = CommonResult.failWithMessage("定时器同步失败-执行updateRuning异常"); return cr; } // 执行业务代码 try { SyncConfig SyncConfig = (SyncConfig) cr.getData(); String syncKey = syncTypeBean.getSyncKey(); Date syncStartTime = SyncConfig.getLastRunTime(); Date syncEndTime = this.getEndTime(SyncConfig); log.info("定时增量同步任务:syncType={}, syncKey={}, syncStartTime={}, syncEndTime={}, sessionId={}", syncTypeBean.getSyncType(), syncTypeBean.getSyncKey(), MyDateUtil.dateToString(syncStartTime, DateStyle.YYYY_MM_DD_HH_MM_SS_SSS), MyDateUtil.dateToString(syncEndTime, DateStyle.YYYY_MM_DD_HH_MM_SS_SSS), context.getSessionId()); cr = this.sync(context, para, syncKey, syncStartTime, syncEndTime); if (!cr.isSuccess()) { log.error("定时器同步失败-执行业务处理返回错误: syncType={}, syncKey={}, msg={}, sessionId={}", syncTypeBean.getSyncType(), syncTypeBean.getSyncKey(), cr.getMessage(), context.getSessionId()); return cr; } // 成功则取最后一条记录时间 endTime = (Date) cr.getData(); return cr; } catch (Exception e) { log.error("定时器同步失败-业务处理异常: syncType={}, syncKey={}, msg={}, sessionId={}", e.getMessage(), context.getSessionId(), e); cr = CommonResult.failWithMessage("定时器同步失败-业务处理异常"); return cr; } } catch (Exception e) { log.error("定时器同步失败-定时器运行异常: syncType={}, syncKey={}, msg={}, sessionId={}", syncTypeBean.getSyncType(), syncTypeBean.getSyncKey(), e.getMessage(), context.getSessionId(), e); cr = CommonResult.failWithMessage("定时器同步失败-定时器运行异常"); return cr; } finally { // endTime为空则不更新时间 try { cr = syncConfigService.updateRuned(syncTypeBean.getSyncType(), syncTypeBean.getSyncDesc(), syncTypeBean.getSyncKey(), endTime); if (!cr.isSuccess()) { log.error("定时器同步失败-执行updateRuned返回错误: syncType={}, syncKey={}, msg={}, sessionId={}", syncTypeBean.getSyncType(), syncTypeBean.getSyncKey(), cr.getMessage(), context.getSessionId()); return cr; } } catch (Exception e) { log.error("定时器同步失败-执行updateRuned异常: syncType={}, syncKey={}, msg={}, sessionId={}", syncTypeBean.getSyncType(), syncTypeBean.getSyncKey(), e.getMessage(), context.getSessionId(), e); cr = CommonResult.failWithMessage("定时器同步失败-执行updateRuned异常"); return cr; } } } private Date getEndTime(SyncConfig SyncConfig) { Date beginTime = SyncConfig.getLastRunTime(); Date endTime = new Date(); if (SyncConfig.getMaxIntervalHours() != null && MyDateUtil.getIntervalHours(endTime, beginTime) > SyncConfig.getMaxIntervalHours()) { endTime = MyDateUtil.addHour(beginTime, SyncConfig.getMaxIntervalHours()); } //最后时间向前退5秒,以确保事物提交时间和赋值最后更新时间的时间差 Date endTime2 = MyDateUtil.addSecond(endTime, -5); if(beginTime.before(endTime2)) { return endTime2; } else { return endTime; } } protected abstract SyncTypeBean getSyncType(); protected abstract TimerFindSyncKeyRespBean findSyncKey(); protected abstract boolean isSendMonitor(); protected int getThreadCount() { return 2; } protected abstract CommonResult sync(ContextInfo context, JSONObject para, String syncKey, Date beginTime, Date endTime) throws Exception; public static class TimerFindSyncKeyRespBean { private List syncKeys; private boolean runWhenEmpty; public List getSyncKeys() { return syncKeys; } public void setSyncKeys(List syncKeys) { this.syncKeys = syncKeys; } public boolean isRunWhenEmpty() { return runWhenEmpty; } public void setRunWhenEmpty(boolean runWhenEmpty) { this.runWhenEmpty = runWhenEmpty; } public static TimerFindSyncKeyRespBean build(List syncKeys) { TimerFindSyncKeyRespBean bean = new TimerFindSyncKeyRespBean(); bean.setSyncKeys(syncKeys); bean.setRunWhenEmpty(false); return bean; } public static TimerFindSyncKeyRespBean build() { TimerFindSyncKeyRespBean bean = new TimerFindSyncKeyRespBean(); bean.setRunWhenEmpty(true); return bean; } } }
多线程FixedThreadProcess类请参考:
–>记一个实现多线程的工具类
由此,便可实现一个通过配置的增量同步定时任务。
附:SyncTypeBean类
public class SyncTypeBean { private String syncType; private String syncDesc; private String syncKey; public SyncTypeBean() { } public SyncTypeBean(String syncType, String syncDesc) { this.syncType = syncType; this.syncDesc = syncDesc; } public String getSyncType() { return syncType; } public void setSyncType(String syncType) { this.syncType = syncType; } public String getSyncDesc() { return syncDesc; } public void setSyncDesc(String syncDesc) { this.syncDesc = syncDesc; } public String getSyncKey() { return syncKey; } public void setSyncKey(String syncKey) { this.syncKey = syncKey; } @Override public String toString() { return JSONObject.toJSONString(this); } }
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)