- 1.实际业务实现的不一定是ItemReader接口,可能是其他的一些接口,不过这些接口都最终实现了ItemReader接口。
- 2.默认的ItemWriter会接收到ItemReader传递过来的list的值。
- 3. JobHandler类结合批处理的使用
- 4.使用Split进行并行的运行Flow,一个flow里面有多个step,然后就可以实现对cpu的充分利用可以提高效率。
- 5. Step里面的方法可以执行很多的重要的方法,lisener,reader,writer,retryLimit,faultTolerant,chunk.
- 6.实际业务中,只要是数据量大,而且需要定时处理。就可能用到相关的springBatch和xxl-job(任务调度框架)比如csdn定时发布功能:
- 注意一下重点: 每一个方法都@Bean,交给Spring来托管,每个Job方法都必须要唯一,JobHandler里面的value也必须唯一,然后Job的注入是直接注入你的方法名字的。
@Component
@Slf4j
public class InspectorDayTaskReader<T> extends AbstractItemCountingItemStreamItemReader<T> implements InitializingBean {
private Iterator<T> iterator;
@Resource
private ClmInspectorLcaaMapper clmInspectorLcaaMapper;
public InspectorDayTaskReader() {
setName(getShortName(InspectorDayTaskReader.class));
}
@Override
protected T doRead() throws Exception {
T next = null;
if (iterator.hasNext()) {
next = iterator.next();
}
return next;
}
@Override
protected void doOpen() throws Exception {
List<InspectorUphold> inspectorUpholds = clmInspectorLcaaMapper.queryInspectTypeIsValid("2");
// 使用并行流进行实现它的底层原理是fork/join 思想使用分而治之的思想
String collect = null;
for (InspectorUphold inspectorUphold : inspectorUpholds) {
if (inspectorUphold.getRulid()>2 && inspectorUphold.getPerid()!=null && !"".equals(inspectorUphold.getPerid())){
collect = inspectorUpholds.stream()
.map(InspectorUphold::getPerid)
.collect(Collectors.joining(","));
}
}
log.info("获取perid的字符串格式为:{}",collect);
List<ClmInspect> strings = clmInspectorLcaaMapper.queryAllDayCaseBySysdocCheck(collect);
log.info("捞取每日理赔的的案件数目:{}条",strings.size());
XxlJobLogger.log("捞取每日理赔的的案件数目:{}条",strings.size());
XxlJobLogger.log("捞取每日理赔的的案件的理赔号集合列表:{}",strings);
iterator = (Iterator<T>) strings.iterator();
}
@Override
protected void doClose() throws Exception {
iterator = null;
}
@Override
public void afterPropertiesSet() throws Exception {
}
public InspectorDayTaskReader(ClmInspectorLcaaMapper clmInspectorLcaaMapper) {
this.clmInspectorLcaaMapper = clmInspectorLcaaMapper;
}
public void setClmInspectorLcaaMapper(ClmInspectorLcaaMapper clmInspectorLcaaMapper) {
this.clmInspectorLcaaMapper = clmInspectorLcaaMapper;
}
}
}
2.默认的ItemWriter会接收到ItemReader传递过来的list的值。package com.xxl.job.clm.jobhandler.inspectortask;
import com.citicpru.batchclm.entity.middleground.ClmInspect;
import com.citicpru.batchclm.entity.middleground.InspectorUphold;
import com.xxl.job.clm.mapper.mysql.ClmInspectMapper;
import com.xxl.job.clm.mapper.mysql.ClmInspectorLcaaMapper;
import com.xxl.job.clm.utils.StringUtil;
import com.xxl.job.core.log.XxlJobLogger;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.common.protocol.types.Field;
import org.springframework.batch.item.ItemWriter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import javax.annotation.Resource;
import java.math.BigDecimal;
import java.util.*;
import java.util.stream.Collectors;
/**
* 每日理赔案例数目进行督查
* @author houzhicong
* @since
**/
@Component
@Slf4j
public class InspectorDayTaskWriter implements ItemWriter{
@Autowired
private ClmInspectMapper clmInspectMapper;
@Resource
private ClmInspectorLcaaMapper clmInspectorLcaaMapper;
@Override
public void write(List list) throws Exception {
addInspectByDay(list);
}
/* * 随机抽取理赔案件数,list 理赔总的案件数
* count 抽取数
*/
public static List<ClmInspect> getSubStringByRadom(List list, int count){
List backList = null;
backList = new ArrayList<String>();
Random random = new Random();
int backSum = 0;
if (list.size() >= count) {
backSum = count;
}else {
backSum = list.size();
}
for (int i = 0; i < backSum; i++) {
// 随机数的范围为0-list.size()-1
int target = random.nextInt(list.size());
backList.add(list.get(target));
}
log.info("抽取案件集合为:{}",backList);
return backList;
}
// 日常督查案件进行批处理 插入待督查表 list为每日所有的理赔号码
@Transactional
public void addInspectByDay(List list) throws Exception {
log.info("写入的List集合为:{}",list);
// 日常有效督查规则查询
List<InspectorUphold> inspectorUpholds = clmInspectorLcaaMapper.queryInspectTypeIsValid("2");
XxlJobLogger.log("日常有效督查规则查询集合列表houzhicong:{}",inspectorUpholds);
// System.out.println("inspectorDayList="+list);
ClmInspect clmInspect = new ClmInspect();
List<ClmInspect> subStringByRadom=null;
int i = 0;
try{
for (InspectorUphold inspe
:inspectorUpholds) {
if(inspe.getRulcon()==null){
log.info("日常督查规则条件为空-----");
throw new Exception("你的规则条件为空");
}
BigDecimal rulon = inspe.getRulcon();
// 把 rulcon转换为int
BigDecimal b = new BigDecimal(1);
// rulcon>1 按件数进行抽查
if(rulon.compareTo(b)==1){
// 抽取案件数量比总数大
if(rulon.compareTo( new BigDecimal(list.size()))==1){
BigDecimal bigDecimal =
BigDecimal.valueOf(list.size());
rulon=bigDecimal;
}
int rulcon1 = rulon.intValue();
log.info("案件最终规则条件:{}",rulcon1);
// 按件数进行随机抽样
subStringByRadom = getSubStringByRadom(list,rulcon1);
log.info("捞取到的案件的理赔号码:{}",subStringByRadom);
XxlJobLogger.log("随机抽样到的案件的理赔号码:{}",subStringByRadom);
HashSet<ClmInspect> clmInspectsList = new HashSet<>(subStringByRadom);
XxlJobLogger.log("去除重复之后的理赔号码:{}",subStringByRadom);
for (ClmInspect clmInspect1:clmInspectsList
) {
// 插入所有的这个待督查的理赔号码
Integer rulid = inspe.getRulid();
String ruldes = inspe.getRuldes();
clmInspect.setRulid(rulid);
clmInspect.setRuldes(ruldes);
clmInspect.setInspecttype("2");
// 设置案件状态为待督查状态
clmInspect.setIscleancase(clmInspect1.getIscleancase());
clmInspect.setInspectflag(null);
clmInspect.setClmnum(clmInspect1.getClmnum());
// 插入待督查表
i = clmInspectMapper.insertSelective(clmInspect);
}
if(i>0){
log.info("houzhicong按件数进行抽查,插入每日理赔的进行待督查的案件成功数目:{}条",subStringByRadom.size());
XxlJobLogger.log("houzhicong按件数进行抽查,插入每日理赔的进行待督查的案件成功数目:{}条",subStringByRadom.size());
}
}
// 按百分数进行随机抽样
else {
BigDecimal count1 = rulon.multiply(BigDecimal.valueOf(list.size()));
int count = count1.intValue();
List<ClmInspect> subStringByRadom1= getSubStringByRadom(list, count);
Set<String> set1 = subStringByRadom1.stream()
.collect(Collectors.groupingBy(ClmInspect::getClmnum))
.keySet();
// 进行理赔号码的去除重复
for (String clmnum:set1
) {
// 插入所有的这个待督查的理赔号码
Integer rulid = inspe.getRulid();
String ruldes = inspe.getRuldes();
clmInspect.setRulid(rulid);
clmInspect.setRuldes(ruldes);
clmInspect.setInspecttype("2");
// 设置案件状态为待督查状态
clmInspect.setInspectflag(null);
clmInspect.setClmnum(clmnum);
// 插入待督查表
i = clmInspectMapper.insertSelective(clmInspect);
}
if(i>0){
log.info("日常督查按百分数进行随机抽样,插入每日理赔的进行待督查的案件成功数目:{}条",subStringByRadom.size());
XxlJobLogger.log("日常督查按百分数进行随机抽样,插入每日理赔的进行待督查的案件成功数目:{}条",subStringByRadom.size());
}
}
}
}catch (Exception e){
log.error("抽取案件发生异常:",e);
}
}
}
3. JobHandler类结合批处理的使用
package com.xxl.job.clm.jobhandler.inspectortask;
import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.handler.IJobHandler;
import com.xxl.job.core.handler.annotation.JobHandler;
import com.xxl.job.core.log.XxlJobLogger;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.Date;
/**
* @Author:
* @Description: 理赔案件进行待督查任务插入批处理调度
* @Date: 2022-04-09
*/
@Slf4j
@JobHandler(value = "inspectorJobHandler")
@Component
public class InspectorTaskJobHandler extends IJobHandler {
private static final int SLEEP = 1000;
@Autowired
private JobLauncher jobLauncher;
@Autowired
private Job inspectorTaskJobInit;
/**
* @param : param
* @return : ReturnT
*理赔案件批处理以后进行待督查
*/
@Override
public ReturnT<String> execute(String param) throws Exception {
long time = System.currentTimeMillis();
XxlJobLogger.log("理赔案件批处理以后进行待督查,线程:{},时间:{}", Thread.currentThread().getId(), time);
JobParameters jobParameters = new JobParametersBuilder()
.addDate("date", new Date())
.toJobParameters();
JobExecution run = jobLauncher.run(inspectorTaskJobInit, jobParameters);
while (run.isRunning()) {
Thread.sleep(SLEEP);
}
XxlJobLogger.log("理赔案件批处理以后进行待督查结束,线程:{},时间:{}", Thread.currentThread().getId(), System.currentTimeMillis());
if (run.getStatus().isUnsuccessful()) {
return FAIL;
} else {
return SUCCESS;
}
}
}
4.使用Split进行并行的运行Flow,一个flow里面有多个step,然后就可以实现对cpu的充分利用可以提高效率。
@Bean
public Job inspectorTaskJobInit() {
return this.jobBuilderFactory.get("inspectorTaskJobInit")
.incrementer(new RunIdIncrementer())
// flow1 和flow2进行并发执行
// 查询处理
.start(flow1())
// 重点拼接
.split(new SimpleAsyncTaskExecutor())
.add(flow2()).end()
// 公用监听
.listener(commonJobListener).build();
}
5. Step里面的方法可以执行很多的重要的方法,lisener,reader,writer,retryLimit,faultTolerant,chunk.
package com.xxl.job.clm.jobhandler.inspectortask;
import com.alibaba.nacos.api.config.annotation.NacosValue;
import com.xxl.job.clm.biz.ErrorInfoBiz;
import com.xxl.job.clm.jobhandler.bjHealth.SendEmailExceptionWriter;
import com.xxl.job.clm.listener.CommonJobListener;
import com.xxl.job.clm.listener.CommonStepListener;
import com.xxl.job.clm.mapper.mysql.ClmInspectorLcaaMapper;
import lombok.extern.slf4j.Slf4j;
import org.apache.ibatis.session.SqlSessionFactory;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.job.builder.FlowBuilder;
import org.springframework.batch.core.job.flow.Flow;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import javax.annotation.Resource;
import java.util.List;
/*
*
* @Author:侯治聪
* @Description: 理赔案件进行待督查任务插入批处理调度
* @Date: 2022-04-09
*/
@Slf4j
@Configuration
@EnableBatchProcessing
public class InspectorTaskStartJob{
@Resource
private JobBuilderFactory jobBuilderFactory;
@Resource
private CommonJobListener commonJobListener;
@Resource
private StepBuilderFactory stepBuilderFactory;
@Resource
private CommonStepListener commonStepListener;
@Resource
InspectorDayTaskWriter inspectorDayTaskWriter;
@Resource
InspectorTaskQuarterWriter inspectorTaskQuarterWriter;
@Resource
private SqlSessionFactory as400SqlSessionFactory;
@Resource
private ClmInspectorLcaaMapper clmInspectorLcaaMapper;
@Resource
private ErrorInfoBiz errorInfoBiz;
@NacosValue("${batch.lcbaSync.chunkSize}")
private Integer chunkSize;
@NacosValue("${batch.retryLimit}")
private Integer retryLimit;
@Resource
private SendEmailExceptionWriter sendEmailExceptionWriter;
@Bean
public Job inspectorTaskJobInit() {
return this.jobBuilderFactory.get("inspectorTaskJobInit")
.incrementer(new RunIdIncrementer())
// flow1 和flow2进行并发执行
// 查询处理
.start(flow1())
// 重点拼接
.split(new SimpleAsyncTaskExecutor())
.add(flow2()).end()
// 公用监听
.listener(commonJobListener).build();
}
@Bean
public Flow flow1() {
return new FlowBuilder<Flow>("flow01")
.start(findDayInspectorStep())
.build();
}
@Bean
public Flow flow2(){
return new FlowBuilder<Flow>("flow2")
.start(findQuaterInspectorStep())
.build();
}
// 批处理日常待督查的案件
@Bean
public Step findDayInspectorStep() {
return this.stepBuilderFactory.get("findDayInspectorStep")
.<List,List>chunk(8)
.reader(allDayClmCase())
.writer(inspectorDayTaskWriter)
//开启重试功能
.faultTolerant()
//中台处理推送报文时异常
.retry(Exception.class)
.retryLimit(3)
.skip(Exception.class)
.skipLimit(Integer.MAX_VALUE)
.listener(commonJobListener)
.build();
}
// 批处理季度待督查的案件
@Bean
public Step findQuaterInspectorStep() {
return stepBuilderFactory.get("findQuaterInspectorStep")
.<List,List>chunk(3)
.reader(allQuarterClmCase())
.writer(inspectorTaskQuarterWriter)
.faultTolerant()
.retry(Exception.class)
.retryLimit(retryLimit)//重试3次
.skip(Exception.class)//跳过异常
.skipLimit(Integer.MAX_VALUE)//跳过int最大值
.listener(commonJobListener)
.build();
}
@Bean
public InspectorDayTaskReader<List> allDayClmCase() {
InspectorDayTaskReader<List> inspectorDayTaskReader = new InspectorDayTaskReader();
System.out.println(inspectorDayTaskReader+"inspectorDayTaskReader--------------------------------");
inspectorDayTaskReader.setClmInspectorLcaaMapper(clmInspectorLcaaMapper);
return inspectorDayTaskReader;
}
@Bean
public InspectorTaskQuarterReader<List> allQuarterClmCase() {
InspectorTaskQuarterReader<List> inspectorTaskQuarterReader = new InspectorTaskQuarterReader();
return inspectorTaskQuarterReader;
}
/**
* @Description: [监听跳过,发生跳过时发送邮件.记录下跳过数据]
*/
/* @Bean
SkipListener inspectorStepSkipListener() {
return new SkipListener() {
@Override
public void onSkipInRead(Throwable throwable) {
log.error("理赔受理数据同步批处理,抽取数据,读异常:{}", throwable);
// mailBiz.sendSimpleMail("", "理赔受理数据同步批处理", "理赔受理数据同步批处理,读异常," + ThrowableUtil.getStackTrace(throwable));
// smsBiz.send("", "理赔受理数据同步批处理,读异常,");
}
@Override
public void onSkipInWrite(Lcbapf lcbapf, Throwable throwable) {
throwable.printStackTrace();
log.error("理赔受理数据同步批处理,抽取数据,写异常:{},任务信息:{}", throwable, lcbapf);
//TODO 保存错误日志
// errorInfoBiz.insertAs400Error(BatchJobStepEnum.ren_start.getValue(), lcbapf.getClmnum(), "续期任务批处理,写异常!");
// mailBiz.sendSimpleMail("", "理赔受理数据同步批处理", "理赔受理数据同步批处理,写异常," + ThrowableUtil.getStackTrace(throwable) + "错误数据:" + lcbapf.toString());
// smsBiz.send("", "理赔受理数据同步批处理,写异常," + "关联id:" + lcbapf.getClmnum());
}
@Override
public void onSkipInProcess(Lcbapf lcbapf, Throwable throwable) {
//理赔受理数据同步不需要数据处理
}
};
}
*/
}
6.实际业务中,只要是数据量大,而且需要定时处理。就可能用到相关的springBatch和xxl-job(任务调度框架)比如csdn定时发布功能:
注意一下重点: 每一个方法都@Bean,交给Spring来托管,每个Job方法都必须要唯一,JobHandler里面的value也必须唯一,然后Job的注入是直接注入你的方法名字的。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)