SpringBatch实际业务技术总结

SpringBatch实际业务技术总结,第1张

文章目录
      • 1.实际业务实现的不一定是ItemReader接口,可能是其他的一些接口,不过这些接口都最终实现了ItemReader接口。
      • 2.默认的ItemWriter会接收到ItemReader传递过来的list的值。
      • 3. JobHandler类结合批处理的使用
      • 4.使用Split进行并行的运行Flow,一个flow里面有多个step,然后就可以实现对cpu的充分利用可以提高效率。
      • 5. Step里面的方法可以执行很多的重要的方法,lisener,reader,writer,retryLimit,faultTolerant,chunk.
      • 6.实际业务中,只要是数据量大,而且需要定时处理。就可能用到相关的springBatch和xxl-job(任务调度框架)比如csdn定时发布功能:
      • 注意一下重点: 每一个方法都@Bean,交给Spring来托管,每个Job方法都必须要唯一,JobHandler里面的value也必须唯一,然后Job的注入是直接注入你的方法名字的。

1.实际业务实现的不一定是ItemReader接口,可能是其他的一些接口,不过这些接口都最终实现了ItemReader接口。
@Component
@Slf4j
public class InspectorDayTaskReader<T> extends AbstractItemCountingItemStreamItemReader<T> implements InitializingBean  {
    private Iterator<T> iterator;

    @Resource
    private ClmInspectorLcaaMapper clmInspectorLcaaMapper;

    public InspectorDayTaskReader() {
        setName(getShortName(InspectorDayTaskReader.class));

    }

    @Override
    protected T doRead() throws Exception {
        T next = null;
        if (iterator.hasNext()) {
            next = iterator.next();
        }
        return next;
    }

    @Override
    protected void doOpen() throws Exception {
        List<InspectorUphold> inspectorUpholds = clmInspectorLcaaMapper.queryInspectTypeIsValid("2");
//        使用并行流进行实现它的底层原理是fork/join 思想使用分而治之的思想
        String collect = null;
        for (InspectorUphold inspectorUphold : inspectorUpholds) {
            if (inspectorUphold.getRulid()>2 && inspectorUphold.getPerid()!=null && !"".equals(inspectorUphold.getPerid())){
                 collect = inspectorUpholds.stream()
                        .map(InspectorUphold::getPerid)
                        .collect(Collectors.joining(","));

            }
        }

        log.info("获取perid的字符串格式为:{}",collect);
        List<ClmInspect> strings = clmInspectorLcaaMapper.queryAllDayCaseBySysdocCheck(collect);
        log.info("捞取每日理赔的的案件数目:{}条",strings.size());
        XxlJobLogger.log("捞取每日理赔的的案件数目:{}条",strings.size());
        XxlJobLogger.log("捞取每日理赔的的案件的理赔号集合列表:{}",strings);
        iterator = (Iterator<T>) strings.iterator();
    }

    @Override
    protected void doClose() throws Exception {
        iterator = null;

    }

    @Override
    public void afterPropertiesSet() throws Exception {

    }


    public InspectorDayTaskReader(ClmInspectorLcaaMapper clmInspectorLcaaMapper) {
        this.clmInspectorLcaaMapper = clmInspectorLcaaMapper;
    }

    public void setClmInspectorLcaaMapper(ClmInspectorLcaaMapper clmInspectorLcaaMapper) {
        this.clmInspectorLcaaMapper = clmInspectorLcaaMapper;
    }
}


}

2.默认的ItemWriter会接收到ItemReader传递过来的list的值。
package com.xxl.job.clm.jobhandler.inspectortask;

import com.citicpru.batchclm.entity.middleground.ClmInspect;
import com.citicpru.batchclm.entity.middleground.InspectorUphold;
import com.xxl.job.clm.mapper.mysql.ClmInspectMapper;
import com.xxl.job.clm.mapper.mysql.ClmInspectorLcaaMapper;
import com.xxl.job.clm.utils.StringUtil;
import com.xxl.job.core.log.XxlJobLogger;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.common.protocol.types.Field;
import org.springframework.batch.item.ItemWriter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;

import javax.annotation.Resource;
import java.math.BigDecimal;
import java.util.*;
import java.util.stream.Collectors;

/**
 * 每日理赔案例数目进行督查
 * @author houzhicong
 * @since
 **/



@Component
@Slf4j
public class InspectorDayTaskWriter implements ItemWriter{

    @Autowired
    private ClmInspectMapper clmInspectMapper;
    @Resource
    private ClmInspectorLcaaMapper clmInspectorLcaaMapper;


    @Override
    public void write(List list) throws Exception {
        addInspectByDay(list);

    }

   /*  * 随机抽取理赔案件数,list 理赔总的案件数
     * count 抽取数
     */

    public static List<ClmInspect> getSubStringByRadom(List list, int count){
        List backList = null;
        backList = new ArrayList<String>();
        Random random = new Random();
        int backSum = 0;
        if (list.size() >= count) {
            backSum = count;
        }else {
            backSum = list.size();
        }
        for (int i = 0; i < backSum; i++) {
//			随机数的范围为0-list.size()-1
            int target = random.nextInt(list.size());
            backList.add(list.get(target));
        }

        log.info("抽取案件集合为:{}",backList);
        return backList;
    }



// 日常督查案件进行批处理 插入待督查表   list为每日所有的理赔号码
    @Transactional
    public  void  addInspectByDay(List list) throws Exception {
        log.info("写入的List集合为:{}",list);

// 日常有效督查规则查询
        List<InspectorUphold> inspectorUpholds = clmInspectorLcaaMapper.queryInspectTypeIsValid("2");
        XxlJobLogger.log("日常有效督查规则查询集合列表houzhicong:{}",inspectorUpholds);
//        System.out.println("inspectorDayList="+list);
        ClmInspect clmInspect = new ClmInspect();
        List<ClmInspect> subStringByRadom=null;
        int i = 0;

        try{

            for (InspectorUphold inspe
                    :inspectorUpholds) {

                if(inspe.getRulcon()==null){
                    log.info("日常督查规则条件为空-----");
                    throw new Exception("你的规则条件为空");
                }

                BigDecimal rulon = inspe.getRulcon();
// 把 rulcon转换为int

                BigDecimal b = new BigDecimal(1);



//              rulcon>1 按件数进行抽查
                if(rulon.compareTo(b)==1){
//      抽取案件数量比总数大
                    if(rulon.compareTo( new BigDecimal(list.size()))==1){

                        BigDecimal bigDecimal =
                                BigDecimal.valueOf(list.size());
                        rulon=bigDecimal;


                    }
                    int rulcon1 = rulon.intValue();

                    log.info("案件最终规则条件:{}",rulcon1);


//                   按件数进行随机抽样
                    subStringByRadom = getSubStringByRadom(list,rulcon1);

                    log.info("捞取到的案件的理赔号码:{}",subStringByRadom);
                    XxlJobLogger.log("随机抽样到的案件的理赔号码:{}",subStringByRadom);

                    HashSet<ClmInspect> clmInspectsList = new HashSet<>(subStringByRadom);


                    XxlJobLogger.log("去除重复之后的理赔号码:{}",subStringByRadom);
                    for (ClmInspect clmInspect1:clmInspectsList
                    ) {
//                        插入所有的这个待督查的理赔号码
                        Integer rulid = inspe.getRulid();
                        String ruldes = inspe.getRuldes();
                        clmInspect.setRulid(rulid);
                        clmInspect.setRuldes(ruldes);
                        clmInspect.setInspecttype("2");
//                    设置案件状态为待督查状态
                        clmInspect.setIscleancase(clmInspect1.getIscleancase());
                        clmInspect.setInspectflag(null);
                        clmInspect.setClmnum(clmInspect1.getClmnum());

//                    插入待督查表
                        i = clmInspectMapper.insertSelective(clmInspect);


                    }
                    if(i>0){

                        log.info("houzhicong按件数进行抽查,插入每日理赔的进行待督查的案件成功数目:{}条",subStringByRadom.size());
                        XxlJobLogger.log("houzhicong按件数进行抽查,插入每日理赔的进行待督查的案件成功数目:{}条",subStringByRadom.size());
                    }



                }
                //              按百分数进行随机抽样
                else {
                    BigDecimal count1 = rulon.multiply(BigDecimal.valueOf(list.size()));
                    int count = count1.intValue();

                    List<ClmInspect> subStringByRadom1= getSubStringByRadom(list, count);
                    Set<String> set1 = subStringByRadom1.stream()
                            .collect(Collectors.groupingBy(ClmInspect::getClmnum))
                            .keySet();
//              进行理赔号码的去除重复




                    for (String clmnum:set1
                    ) {
//                        插入所有的这个待督查的理赔号码
                        Integer rulid = inspe.getRulid();
                        String ruldes = inspe.getRuldes();
                        clmInspect.setRulid(rulid);
                        clmInspect.setRuldes(ruldes);
                        clmInspect.setInspecttype("2");
//                    设置案件状态为待督查状态
                        clmInspect.setInspectflag(null);
                        clmInspect.setClmnum(clmnum);

//                    插入待督查表
                        i = clmInspectMapper.insertSelective(clmInspect);

                    }
                    if(i>0){

                        log.info("日常督查按百分数进行随机抽样,插入每日理赔的进行待督查的案件成功数目:{}条",subStringByRadom.size());
                        XxlJobLogger.log("日常督查按百分数进行随机抽样,插入每日理赔的进行待督查的案件成功数目:{}条",subStringByRadom.size());
                    }






                }




            }

        }catch (Exception e){

            log.error("抽取案件发生异常:",e);
        }







    }



}

3. JobHandler类结合批处理的使用
package com.xxl.job.clm.jobhandler.inspectortask;

import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.handler.IJobHandler;
import com.xxl.job.core.handler.annotation.JobHandler;
import com.xxl.job.core.log.XxlJobLogger;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.Date;

/**
 * @Author:
 * @Description: 理赔案件进行待督查任务插入批处理调度
 *  @Date: 2022-04-09
 */

@Slf4j
@JobHandler(value = "inspectorJobHandler")
@Component
public class InspectorTaskJobHandler extends IJobHandler {

    private static final int SLEEP = 1000;
    @Autowired
    private JobLauncher jobLauncher;
    @Autowired
    private Job inspectorTaskJobInit;

/**
     * @param : param
     * @return : ReturnT
     *理赔案件批处理以后进行待督查
     */

    @Override
    public ReturnT<String> execute(String param) throws Exception {
        long time = System.currentTimeMillis();
        XxlJobLogger.log("理赔案件批处理以后进行待督查,线程:{},时间:{}", Thread.currentThread().getId(), time);
        JobParameters jobParameters = new JobParametersBuilder()
                .addDate("date", new Date())
                .toJobParameters();
        JobExecution run = jobLauncher.run(inspectorTaskJobInit, jobParameters);

        while (run.isRunning()) {
            Thread.sleep(SLEEP);
        }
        XxlJobLogger.log("理赔案件批处理以后进行待督查结束,线程:{},时间:{}", Thread.currentThread().getId(), System.currentTimeMillis());
        if (run.getStatus().isUnsuccessful()) {
            return FAIL;
        } else {
            return SUCCESS;
        }
    }
}

4.使用Split进行并行的运行Flow,一个flow里面有多个step,然后就可以实现对cpu的充分利用可以提高效率。

    @Bean
    public Job inspectorTaskJobInit() {
        return this.jobBuilderFactory.get("inspectorTaskJobInit")
                .incrementer(new RunIdIncrementer())
//              flow1 和flow2进行并发执行
                // 查询处理
                .start(flow1())
                // 重点拼接
                .split(new SimpleAsyncTaskExecutor())

                .add(flow2()).end()

                // 公用监听
                .listener(commonJobListener).build();
    }



5. Step里面的方法可以执行很多的重要的方法,lisener,reader,writer,retryLimit,faultTolerant,chunk.
package com.xxl.job.clm.jobhandler.inspectortask;

import com.alibaba.nacos.api.config.annotation.NacosValue;
import com.xxl.job.clm.biz.ErrorInfoBiz;
import com.xxl.job.clm.jobhandler.bjHealth.SendEmailExceptionWriter;
import com.xxl.job.clm.listener.CommonJobListener;
import com.xxl.job.clm.listener.CommonStepListener;
import com.xxl.job.clm.mapper.mysql.ClmInspectorLcaaMapper;
import lombok.extern.slf4j.Slf4j;
import org.apache.ibatis.session.SqlSessionFactory;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.job.builder.FlowBuilder;
import org.springframework.batch.core.job.flow.Flow;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.SimpleAsyncTaskExecutor;

import javax.annotation.Resource;
import java.util.List;

/*
*
 * @Author:侯治聪
 * @Description: 理赔案件进行待督查任务插入批处理调度
 * @Date: 2022-04-09

*/


@Slf4j
@Configuration
@EnableBatchProcessing
public class InspectorTaskStartJob{
    @Resource
    private JobBuilderFactory jobBuilderFactory;

    @Resource
    private CommonJobListener commonJobListener;

    @Resource
    private StepBuilderFactory stepBuilderFactory;

    @Resource
    private CommonStepListener commonStepListener;

    @Resource
    InspectorDayTaskWriter inspectorDayTaskWriter;


    @Resource
   InspectorTaskQuarterWriter inspectorTaskQuarterWriter;



    @Resource
    private SqlSessionFactory as400SqlSessionFactory;

    @Resource
    private ClmInspectorLcaaMapper clmInspectorLcaaMapper;

    @Resource
    private ErrorInfoBiz errorInfoBiz;

    @NacosValue("${batch.lcbaSync.chunkSize}")
    private Integer chunkSize;
    @NacosValue("${batch.retryLimit}")
    private Integer retryLimit;


    @Resource
    private SendEmailExceptionWriter sendEmailExceptionWriter;



    @Bean
    public Job inspectorTaskJobInit() {
        return this.jobBuilderFactory.get("inspectorTaskJobInit")
                .incrementer(new RunIdIncrementer())
//              flow1 和flow2进行并发执行
                // 查询处理
                .start(flow1())
                // 重点拼接
                .split(new SimpleAsyncTaskExecutor())

                .add(flow2()).end()

                // 公用监听
                .listener(commonJobListener).build();
    }



    @Bean
    public Flow flow1() {

        return new FlowBuilder<Flow>("flow01")
                .start(findDayInspectorStep())
                .build();

    }

    @Bean
    public Flow flow2(){

        return new FlowBuilder<Flow>("flow2")
                .start(findQuaterInspectorStep())
                .build();
    }


// 批处理日常待督查的案件
    @Bean
    public Step findDayInspectorStep() {
        return this.stepBuilderFactory.get("findDayInspectorStep")
                .<List,List>chunk(8)
                .reader(allDayClmCase())
                .writer(inspectorDayTaskWriter)
                //开启重试功能
                .faultTolerant()
                //中台处理推送报文时异常
                .retry(Exception.class)
                .retryLimit(3)
                .skip(Exception.class)
                .skipLimit(Integer.MAX_VALUE)
                .listener(commonJobListener)
                .build();


    }





//  批处理季度待督查的案件
    @Bean
    public Step findQuaterInspectorStep() {
     return    stepBuilderFactory.get("findQuaterInspectorStep")
                .<List,List>chunk(3)
                .reader(allQuarterClmCase())
                .writer(inspectorTaskQuarterWriter)
             .faultTolerant()
             .retry(Exception.class)
             .retryLimit(retryLimit)//重试3次
             .skip(Exception.class)//跳过异常
             .skipLimit(Integer.MAX_VALUE)//跳过int最大值
             .listener(commonJobListener)
             .build();


    }



    @Bean
    public InspectorDayTaskReader<List> allDayClmCase() {


        InspectorDayTaskReader<List> inspectorDayTaskReader = new InspectorDayTaskReader();
        System.out.println(inspectorDayTaskReader+"inspectorDayTaskReader--------------------------------");
        inspectorDayTaskReader.setClmInspectorLcaaMapper(clmInspectorLcaaMapper);
        return inspectorDayTaskReader;


    }


    @Bean
    public InspectorTaskQuarterReader<List> allQuarterClmCase() {
        InspectorTaskQuarterReader<List> inspectorTaskQuarterReader = new InspectorTaskQuarterReader();
        return inspectorTaskQuarterReader;

    }




/**
     * @Description: [监听跳过,发生跳过时发送邮件.记录下跳过数据]
     */
   /* @Bean
    SkipListener inspectorStepSkipListener() {
        return new SkipListener() {
            @Override
            public void onSkipInRead(Throwable throwable) {
                log.error("理赔受理数据同步批处理,抽取数据,读异常:{}", throwable);
//                mailBiz.sendSimpleMail("", "理赔受理数据同步批处理", "理赔受理数据同步批处理,读异常," + ThrowableUtil.getStackTrace(throwable));
//                smsBiz.send("", "理赔受理数据同步批处理,读异常,");
            }

            @Override
            public void onSkipInWrite(Lcbapf lcbapf, Throwable throwable) {
                throwable.printStackTrace();
                log.error("理赔受理数据同步批处理,抽取数据,写异常:{},任务信息:{}", throwable, lcbapf);
                //TODO 保存错误日志
//                errorInfoBiz.insertAs400Error(BatchJobStepEnum.ren_start.getValue(), lcbapf.getClmnum(), "续期任务批处理,写异常!");
//                mailBiz.sendSimpleMail("", "理赔受理数据同步批处理", "理赔受理数据同步批处理,写异常," + ThrowableUtil.getStackTrace(throwable) + "错误数据:" + lcbapf.toString());
//                smsBiz.send("", "理赔受理数据同步批处理,写异常," + "关联id:" + lcbapf.getClmnum());
            }

            @Override
            public void onSkipInProcess(Lcbapf lcbapf, Throwable throwable) {
                //理赔受理数据同步不需要数据处理
            }
        };
    }
*/


}

6.实际业务中,只要是数据量大,而且需要定时处理。就可能用到相关的springBatch和xxl-job(任务调度框架)比如csdn定时发布功能:

注意一下重点: 每一个方法都@Bean,交给Spring来托管,每个Job方法都必须要唯一,JobHandler里面的value也必须唯一,然后Job的注入是直接注入你的方法名字的。

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/langs/758064.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-05-01
下一篇 2022-05-01

发表评论

登录后才能评论

评论列表(0条)

保存