springcloud的增量同步定时任务的设计及实现

springcloud的增量同步定时任务的设计及实现,第1张

springcloud的增量同步定时任务的设计及实现

增量同步在业务场景中经常会遇到,比如人员同步,部门同步等等。通过可配置化来使同步任务更加灵活。

一、增量同步是在普通定时任务的基础上实现的。
–>springcloud的可配置化定时任务的设计及实现

二、增加一张表,控制增量数据的时间范围。

-- auto-generated definition
create table sync_config
(
    creation_date      datetime                                 not null,
    created_by         bigint                                   not null,
    last_update_date   datetime(3)                              null,
    last_updated_by    bigint                                   not null,
    enabled_flag       varchar(1)                               not null,
    sync_config_id     bigint auto_increment
        primary key,
    sync_type          varchar(128)                             null comment '同步类型',
    sync_key           varchar(128)                             null comment '同步数据唯一键(如:部门id)',
    sync_desc          varchar(256)                             null comment '同步描述',
    current_status     varchar(128)                             null comment '当前状态(Y正在运行N未运行)',
    last_run_time      datetime(3) default CURRENT_TIMESTAMP(3) not null,
    max_interval_hours int                                      null comment '最大时间间隔(小时)'
)
    comment '数据同步配置';

create index sync_config_n1
    on sync_config (sync_type, sync_key);


PS:sync_key有什么作用呢?由于业务需要,同步人员是按部门增量同步的,每一个部门的最后更新时间是不一样的,也不能全部按最大时间范围来更新,那样的话人员多的部门数据量会很庞大,对系统造成压力,所以按部门id记录每个部门的最后更新时间,对每个部门单独做增量查询。

三、写一个控制同步的方法,当同步开始时,状态要改为Y正在运行,以免重复执行,当同步结束,状态改为N未运行。

@Service
@Transactional
public class SyncConfigServiceImpl implements SyncConfigService {

    @Autowired
    private SyncConfigMapper syncConfigMapper;

    private static final String RUNING = "Y";

    private static final String UNRUN = "N";

    private static final Integer maxIntervalHours = 48;

	
    @Override
    public CommonResult updateRuning(String syncType, String syncTypeName, String syncKey) {
        if (BeeStringUtil.isEmpty(syncType)) {
            return CommonResult.failWithMessage("syncType不能为空");
        }
        FbpSyncConfigExample example = new FbpSyncConfigExample();
        FbpSyncConfigExample.Criteria criteria = example.createCriteria();
        criteria.andEnabledFlagEqualTo(YesOrNo.Y.val).andSyncTypeEqualTo(syncType);
        if (BeeStringUtil.isNotEmpty(syncKey)) {
            criteria.andSyncKeyEqualTo(syncKey);
        } else {
        	criteria.andSyncKeyIsNull();
        }
        SyncConfig config = BeeCollectionUtil.getObj(syncConfigMapper.selectByExample(example));
        if (config != null) {
            if (RUNING.equals(config.getCurrentStatus())) {
                return CommonResult.failWithMessage("同步任务正在进行");
            }
            config.updatebaseInfo(BeeSpecialIdConstant.TIMER.id);
            config.setCurrentStatus(RUNING);
            syncConfigMapper.updateByPrimaryKeySelective(config);
            return CommonResult.successWithData(config);
        } else {
            config = new SyncConfig();
            config.addbaseInfo(BeeSpecialIdConstant.TIMER.id);
            config.setCurrentStatus(RUNING);
            config.setLastRunTime(BeeDateUtil.addDay(new Date(), -2));
            config.setSyncType(syncType);
            config.setSyncDesc(syncTypeName);
            config.setSyncKey(syncKey);
            config.setMaxIntervalHours(maxIntervalHours);
            syncConfigMapper.insertSelective(config);
            return CommonResult.successWithData(config);
        }
    }

	
    @Override
    public CommonResult updateRuned(String syncType, String syncTypeName, String syncKey, Date lastRunTime) {
        if (BeeStringUtil.isEmpty(syncType)) {
            return CommonResult.failWithMessage("syncType不能为空");
        }
        FbpSyncConfigExample example = new FbpSyncConfigExample();
        FbpSyncConfigExample.Criteria criteria = example.createCriteria();
        criteria.andEnabledFlagEqualTo(YesOrNo.Y.val).andSyncTypeEqualTo(syncType);
        if (BeeStringUtil.isNotEmpty(syncKey)) {
            criteria.andSyncKeyEqualTo(syncKey);
        } else {
        	criteria.andSyncKeyIsNull();
        }
        SyncConfig config = BeeCollectionUtil.getObj(syncConfigMapper.selectByExample(example));
        if(config == null){
            return CommonResult.failWithMessage("同步任务不存在"); 
        }
        //更新
        config.updatebaseInfo(BeeSpecialIdConstant.TIMER.id);
        config.setCurrentStatus(UNRUN);
        if(lastRunTime != null){
            config.setLastRunTime(lastRunTime);
        }
        syncConfigMapper.updateByPrimaryKeySelective(config);
        return CommonResult.success();
    }

	
    @Override
    public CommonResult release() {
        SyncConfig set = new SyncConfig();
        set.updatebaseInfo(0L);
        set.setCurrentStatus(YesOrNo.N.val);
        FbpSyncConfigExample configExample = new FbpSyncConfigExample();
        configExample.createCriteria().andLastUpdateDateLessThan(BeeDateUtil.addDay(new Date(), -1))
                .andEnabledFlagEqualTo(YesOrNo.Y.val).andCurrentStatusEqualTo(YesOrNo.Y.val);
        syncConfigMapper.updateByExampleSelective(set, configExample);
        return CommonResult.success();
    }
}

四、定时器的代码。需要继承AbstractTimer。


public abstract class AbstractIncrementalSyncTimer extends AbstractTimer {

    @SuppressWarnings("rawtypes")
    @Override
    public CommonResult process(ContextInfo context, JSONObject para) throws Exception {
        SyncTypeBean syncTypeBean = this.getSyncType();
        if (syncTypeBean == null || MyStringUtil.isEmpty(syncTypeBean.getSyncType()) || MyStringUtil.isEmpty(syncTypeBean.getSyncDesc())) {
            return CommonResult.failWithMessage("SyncTypeBean为空");
        }
        CommonResult cr = null;
        TimerFindSyncKeyRespBean syncKeyResp = this.findSyncKey();
        List syncKeys = syncKeyResp.getSyncKeys();
        if(MyCollectionUtil.isEmpty(syncKeys) && !syncKeyResp.isRunWhenEmpty()) {
            return CommonResult.success();
        }
        if (MyCollectionUtil.isEmpty(syncKeys)) {
            cr = this.processTask(context, para, syncTypeBean);
            if (!cr.isSuccess() && this.isSendMonitor()) {
                monitorSendByMqService.send(SysMonitorBean.buildMail("同步数据错误", JSONObject.toJSONString(syncTypeBean), null));
            }
            return cr;
        } else {
            //多线程同步
            FixedThreadProcess tp = new FixedThreadProcess(syncKeys, getThreadCount()) {
                @Override
                protected boolean process(String syncKey) throws Exception {
					// MyStringUtil是一个自己写的String的工具类
                    if (MyStringUtil.isEmpty(syncKey)) {
                        return true;
                    }
                    //将线程外的变量复制
                    JSONObject threadPara = null;
                    if(para != null) {
                        threadPara = (JSONObject)para.clone();
                    }
					// MyCopyUtil是一个自己写的copy工具类
                    ContextInfo threadContext = MyCopyUtil.copyNew(context, ContextInfo.class);
                    SyncTypeBean threadSyncTypeBean = MyCopyUtil.copyNew(syncTypeBean, SyncTypeBean.class);
                    //
                    threadSyncTypeBean.setSyncKey(syncKey);
                    CommonResult cr = processTask(threadContext, threadPara, threadSyncTypeBean);
                    if (!cr.isSuccess() && isSendMonitor()) {
                        monitorSendByMqService.send(SysMonitorBean.buildMail("同步数据错误", JSONObject.toJSONString(threadSyncTypeBean), null));
                    }
                    return false;
                }
            };
            tp.run();
            return CommonResult.success();
        }
    }

    @SuppressWarnings("rawtypes")
    private CommonResult processTask(ContextInfo context, JSONObject para, SyncTypeBean syncTypeBean) throws Exception {
        CommonResult cr;
        Date endTime = null;
        try {
            // 更新同步配置为更新中
            try {
                cr = syncConfigService.updateRuning(syncTypeBean.getSyncType(), syncTypeBean.getSyncDesc(), syncTypeBean.getSyncKey());
                if (!cr.isSuccess()) {
                    log.error("定时器同步失败-执行updateRuning返回错误:syncType={}, syncKey={}, msg={}, sessionId={}", syncTypeBean.getSyncType(), syncTypeBean.getSyncKey(), cr.getMessage(), context.getSessionId());
                    return cr;
                }
            } catch (Exception e) {
                log.error("定时器同步失败-执行updateRuning异常: syncType={}, syncKey={}, msg={}, sessionId={}", syncTypeBean.getSyncType(), syncTypeBean.getSyncKey(), e.getMessage(), context.getSessionId(), e);
                cr = CommonResult.failWithMessage("定时器同步失败-执行updateRuning异常");
                return cr;
            }
            // 执行业务代码
            try {
                SyncConfig SyncConfig = (SyncConfig) cr.getData();
                String syncKey = syncTypeBean.getSyncKey();
                Date syncStartTime = SyncConfig.getLastRunTime();
                Date syncEndTime = this.getEndTime(SyncConfig);
                log.info("定时增量同步任务:syncType={}, syncKey={}, syncStartTime={}, syncEndTime={}, sessionId={}",  syncTypeBean.getSyncType(), syncTypeBean.getSyncKey(), 
                        MyDateUtil.dateToString(syncStartTime, DateStyle.YYYY_MM_DD_HH_MM_SS_SSS), MyDateUtil.dateToString(syncEndTime, DateStyle.YYYY_MM_DD_HH_MM_SS_SSS), context.getSessionId());
                cr = this.sync(context, para, syncKey, syncStartTime, syncEndTime);
                if (!cr.isSuccess()) {
                    log.error("定时器同步失败-执行业务处理返回错误: syncType={}, syncKey={}, msg={}, sessionId={}", syncTypeBean.getSyncType(), syncTypeBean.getSyncKey(), cr.getMessage(), context.getSessionId());
                    return cr;
                }
                // 成功则取最后一条记录时间
                endTime = (Date) cr.getData();
                return cr;
            } catch (Exception e) {
                log.error("定时器同步失败-业务处理异常: syncType={}, syncKey={}, msg={}, sessionId={}",  e.getMessage(), context.getSessionId(), e);
                cr = CommonResult.failWithMessage("定时器同步失败-业务处理异常");
                return cr;
            }
        } catch (Exception e) {
            log.error("定时器同步失败-定时器运行异常: syncType={}, syncKey={}, msg={}, sessionId={}", syncTypeBean.getSyncType(), syncTypeBean.getSyncKey(), e.getMessage(), context.getSessionId(), e);
            cr = CommonResult.failWithMessage("定时器同步失败-定时器运行异常");
            return cr;
        } finally {
            // endTime为空则不更新时间
            try {
                cr = syncConfigService.updateRuned(syncTypeBean.getSyncType(), syncTypeBean.getSyncDesc(), syncTypeBean.getSyncKey(), endTime);
                if (!cr.isSuccess()) {
                    log.error("定时器同步失败-执行updateRuned返回错误: syncType={}, syncKey={}, msg={}, sessionId={}", syncTypeBean.getSyncType(), syncTypeBean.getSyncKey(), cr.getMessage(), context.getSessionId());
                    return cr;
                }
            } catch (Exception e) {
                log.error("定时器同步失败-执行updateRuned异常: syncType={}, syncKey={}, msg={}, sessionId={}", syncTypeBean.getSyncType(), syncTypeBean.getSyncKey(), e.getMessage(), context.getSessionId(), e);
                cr = CommonResult.failWithMessage("定时器同步失败-执行updateRuned异常");
                return cr;
            }
        }
    }

    private Date getEndTime(SyncConfig SyncConfig) {
        Date beginTime = SyncConfig.getLastRunTime();
        Date endTime = new Date();
        if (SyncConfig.getMaxIntervalHours() != null && MyDateUtil.getIntervalHours(endTime, beginTime) > SyncConfig.getMaxIntervalHours()) {
            endTime = MyDateUtil.addHour(beginTime, SyncConfig.getMaxIntervalHours());
        }
        //最后时间向前退5秒,以确保事物提交时间和赋值最后更新时间的时间差
        Date endTime2 = MyDateUtil.addSecond(endTime, -5);
        if(beginTime.before(endTime2)) {
            return endTime2;
        } else {
            return endTime;
        }
    }

    
    protected abstract SyncTypeBean getSyncType();

    
    protected abstract TimerFindSyncKeyRespBean findSyncKey();

    
    protected abstract boolean isSendMonitor();
    
    
    protected int getThreadCount() {
        return 2;
    }

    
    protected abstract CommonResult sync(ContextInfo context, JSONObject para, String syncKey, Date beginTime, Date endTime) throws Exception;

    public static class TimerFindSyncKeyRespBean {
        private List syncKeys;
        
        private boolean runWhenEmpty;

        public List getSyncKeys() {
            return syncKeys;
        }

        public void setSyncKeys(List syncKeys) {
            this.syncKeys = syncKeys;
        }

        public boolean isRunWhenEmpty() {
            return runWhenEmpty;
        }

        public void setRunWhenEmpty(boolean runWhenEmpty) {
            this.runWhenEmpty = runWhenEmpty;
        }
        
        
        public static TimerFindSyncKeyRespBean build(List syncKeys) {
            TimerFindSyncKeyRespBean bean = new TimerFindSyncKeyRespBean();
            bean.setSyncKeys(syncKeys);
            bean.setRunWhenEmpty(false);
            return bean;
        }
        
        
        public static TimerFindSyncKeyRespBean build() {
            TimerFindSyncKeyRespBean bean = new TimerFindSyncKeyRespBean();
            bean.setRunWhenEmpty(true);
            return bean;
        }

    }
}

多线程FixedThreadProcess类请参考:
–>记一个实现多线程的工具类

由此,便可实现一个通过配置的增量同步定时任务。

附:SyncTypeBean类

public class SyncTypeBean {

    private String syncType;
    private String syncDesc;
    private String syncKey;

    public SyncTypeBean() {
    }

    public SyncTypeBean(String syncType, String syncDesc) {
        this.syncType = syncType;
        this.syncDesc = syncDesc;
    }

    public String getSyncType() {
        return syncType;
    }

    public void setSyncType(String syncType) {
        this.syncType = syncType;
    }

    public String getSyncDesc() {
        return syncDesc;
    }

    public void setSyncDesc(String syncDesc) {
        this.syncDesc = syncDesc;
    }

    public String getSyncKey() {
        return syncKey;
    }

    public void setSyncKey(String syncKey) {
        this.syncKey = syncKey;
    }

    @Override
    public String toString() {
        return JSONObject.toJSONString(this);
    }
}

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/zaji/5438171.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-11
下一篇 2022-12-11

发表评论

登录后才能评论

评论列表(0条)

保存