Spring Boot & Spring Batch 实现批处理

Spring Boot & Spring Batch 实现批处理,第1张

Spring Boot & Spring Batch 实现批处理 Spring Boot & Spring Batch 实现批处理 需求领域

自动,复杂地处理大量信息,无需用户交互即可最有效地进行处理。这些 *** 作通常包括基于时间的事件(例如月末计算,通知或通信)。定期应用非常大的数据集(例如,保险利益确定或费率调整)重复处理复杂的业务规则。从内部和外部系统接收的信息的集成,通常需要格式化,验证和以事务方式进行的处理到记录系统中。批处理每天用于为企业处理数十亿笔事务 Spring Batch介绍

核心流程图如下:

Spring Batch 是一个轻量级的,全面的批处理框架,旨在支持开发对企业系统的日常运行至关重要的强大的批处理应用程序,提供了可重用的功能,这些功能对于处理大量记录至关重要,可用于简单的用例(例如,将文件读入数据库或运行存储过程),也可以用于复杂的大量用例(例如,在数据库之间移动大量数据,对其进行转换等)。上)。大量批处理作业可以以高度可扩展的方式利用框架来处理大量信息

总结起来就是:量大,复杂,流程化数据处理利器!

实现需求

1、通过远程接口访问得到告警信息的原始数据

2、对数据进行清洗,聚合进行本地化 *** 作,如:发送MQ,保存本地库等

核心依赖


    4.0.0
    
        org.springframework.boot
        spring-boot-starter-parent
        2.5.1
        
    
    ***
    ***
    0.0.1-SNAPSHOT
    ***

    
        17
        2020.0.3
        UTF-8
    

    
        
            
                org.springframework.cloud
                spring-cloud-dependencies
                ${spring-cloud.version}
                pom
                import
            
        
    

    
        
            org.springframework.boot
            spring-boot-starter-batch
        

      
         org.springframework.boot
         spring-boot-starter-actuator
      

        
            org.springframework.boot
            spring-boot-starter-web
        

        
            org.springframework.cloud
            spring-cloud-starter-config
        
        
            org.springframework.cloud
            spring-cloud-starter-bus-amqp
        

        
            org.projectlombok
            lombok
        

        
            org.springframework.boot
            spring-boot-starter-data-jdbc
        

        
            mysql
            mysql-connector-java
            runtime
        

        
            org.apache.httpcomponents
            httpclient
        

        
        
            com.aliyun.oss
            aliyun-sdk-oss
            3.10.2
            
                
                    org.codehaus.jettison
                    jettison
                
            
        

        
            com.baomidou
            mybatis-plus-boot-starter
            3.4.3.1
        

        
            org.apache.commons
            commons-lang3
            3.11
        
    


Spring 配置
spring:
  application:
    name: ***
  batch:
    job:
      #默认自动执行定义的Job(true),改为false,需要jobLaucher.run执行
      enabled: false
    jdbc:
      #spring batch在数据库里面创建默认的数据表,如果不是always则会提示相关表不存在
      initialize-schema: always
  datasource:
    driver-class-name: com.mysql.cj.jdbc.Driver
    url: ***
    username: ***
    password: ***
    hikari:
      connection-test-query: SELECt 1
  rabbitmq:
    listener:
      simple:
        missing-queues-fatal: false
    addresses: ***
    port: ***
    username: ***
    password: ***
    virtual-host: ***


#--------------------mybatis配置----------------------------------
mybatis-plus:
  mapper-locations:
    - classpath*:mapper
@Data
public class RunningOverVO implements Serializable {

    
    private static final long serialVersionUID = 1L;

    private Annotations annotations;
    private Date endsAt;
    private String fingerprint;
    private List receivers;
    private Date startsAt;
    private Status status;
    private Date updatedAt;
    private String generatorURL;
    private Labels labels;

    @Data
    public class Labels {
        private String acpus;
        private String alertname;
        private String comment;
        private String dc;
        private String to;
        private String instance;
        private String job;
        private String jobid;
        private String jobname;
        private String lc;
        private String lnodes_s;
        private String nnodes;
        private String priority;
        private String queue;
        private String service;
        private String severity;
        private String ss;
        private Date start;
        private String state;
        private Date submit;
        private String user;
    }

    @Data
    static class Receivers {
        private String name;
    }

    @Data
    class Annotations {
        private String description;
        private String summary;
    }

    @Data
    class Status {
        private List inhibitedBy;
        private List silencedBy;
        private String state;
    }

}
@Data
public class WarningInfoItem implements Serializable {

    private static final long serialVersionUID = 1L;

    private Integer id;

    
    private Integer typeId;

    
    private String clusterId;

    
    private String clusterUser;

    
    private String jobId;

    
    private Date startTime;

    
    private String job;

    
    private Date createAt;
}
@Data
public class WarningInfo implements java.io.Serializable {

    
    private static final long serialVersionUID = 1L;

    @TableId(type = IdType.AUTO)
    private BigInteger id;

    
    private String userId;

    
    private Integer typeId;


    
    private String msg;

    
    private Date datetime;

    
    private Date createAt;

    
    private Date ackAt;

    
    private String isOss;


    
    private String jobs;

    
    private String remark;

    
    private String desc = "详细内容进入系统查看";

    
    private Boolean isDetailInfo;

    
    @Override
    public String toString() {
        return "{" +
                ""id":" + id +
                ", "userId":"" + userId + '"' +
                ", "typeId":" + typeId +
                ", "msg":"" + msg + '"' +
                ", "datetime":"" + datetime + '"' +
                ", "createAt":"" + createAt + '"' +
                ", "ackAt":"" + ackAt + '"' +
                ", "jobs":"" + jobs + '"' +
                '}';
    }
}
Batch配置
@Configuration
@EnableBatchProcessing
public class BatchConfig {

    
    @Bean
    public JobRepository batchJobRepository(DataSource dataSource, PlatformTransactionManager transactionManager) throws Exception{
        JobRepositoryFactoryBean jobRepositoryFactoryBean = new JobRepositoryFactoryBean();
        jobRepositoryFactoryBean.setDatabaseType("mysql");
        jobRepositoryFactoryBean.setTransactionManager(transactionManager);
        jobRepositoryFactoryBean.setDataSource(dataSource);
        return jobRepositoryFactoryBean.getObject();
    }


    
    @Bean
    public SimpleJobLauncher batchJobLauncher(DataSource dataSource, PlatformTransactionManager transactionManager) throws Exception{
        SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
        // 设置jobRepository
        jobLauncher.setJobRepository(batchJobRepository(dataSource, transactionManager));
        return jobLauncher;
    }
}
Step定义 Step1 Reader

读取远程接口,将原始数据转换为原始告警Bean,JsonItemReader为ItemReader接口的实现类,泛型为原始数据转换后的实体bean类型

@Bean("runningOverFirstReader")
@StepScope
public JsonItemReader runningOverReaderFirst(){
    CloseableHttpClient httpclient = HttpClients.createDefault();
    ResponseHandler responseHandler = new ResponseHandler() {
        @Override
        public String handleResponse(final HttpResponse response)
                throws IOException {//
            int status = response.getStatusLine().getStatusCode();
            if (status >= 200 && status < 300) {
                HttpEntity entity = response.getEntity();
                return entity != null ? EntityUtils.toString(entity) : null;
            } else {
                throw new ClientProtocolException(
                        "Unexpected response status: " + status);
            }
        }
    };
    HttpGet httpGet = new HttpGet(warningRunningOverUrl);
    httpGet.addHeader("Accept", "application/json;charset=UTF-8");
    String response = null;
    try {
        response = httpclient.execute(httpGet,responseHandler);
    } catch (IOException e) {
        e.printStackTrace();
        return null;
    }
    if(!StringUtils.hasText(response)){
        return null;
    }
    ByteArrayResource byteArrayResource = new ByteArrayResource(response.getBytes());
    JacksonJsonObjectReader jsonObjectReader = new JacksonJsonObjectReader(RunningOverVO.class);
    jsonObjectReader.setMapper(new JsonMapper());
    JsonItemReader jsonItemReader = new JsonItemReader<>(byteArrayResource,jsonObjectReader);
    jsonItemReader.setName("runningOverReaderFirst");
    jsonItemReader.setResource(byteArrayResource);
    return jsonItemReader;
}
Process

对原始数据进行清洗,得到告警数据临时表对象,注意**ItemProcessor**中RunningOverVO为清洗前类型,WarningInfoItem为清洗后类型

@Bean("runningOverFirstProcess")
public ItemProcessor runningOverProcessFirst() {
    ItemProcessor itemProcessor = ehrOrg -> {
        RunningOverVO.Labels labels = ehrOrg.getLabels();
        WarningInfoItem warningInfoItem = new WarningInfoItem();
        warningInfoItem.setClusterId(labels.getDc());
        warningInfoItem.setClusterUser(labels.getUser());
        warningInfoItem.setJobId(labels.getJobid());
        warningInfoItem.setTypeId(2);
        warningInfoItem.setStartTime(labels.getStart());
        warningInfoItem.setCreateAt(Calendar.getInstance().getTime());
        //拼装job json字符串
        StringBuffer sb = new StringBuffer();
        sb.append("{");
        sb.append(""job_id":"" + labels.getJobid() + "",");
        sb.append(""job_name":"" + labels.getJobname()+ "",");
        sb.append(""partition":"" + labels.getQueue()+ "",");
        sb.append(""node_num":"" + labels.getNnodes()+ "",");
        sb.append(""cores_num":"" + labels.getAcpus() + "",");
        sb.append(""run_time":"" + getRunningDay(labels.getStart()) + """);
        sb.append("}");
        warningInfoItem.setJob(sb.toString());
        return warningInfoItem;
    };
    return itemProcessor;
}


private static long getRunningDay(Date sTime) throws ParseException {
    SimpleDateFormat sdf=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    Date nowTime=sdf.parse(sdf.format(Calendar.getInstance().getTime()));
    Date startTime =sdf.parse(sdf.format(sTime));
    Calendar cal = Calendar.getInstance();
    cal.setTime(nowTime);
    long time1 = cal.getTimeInMillis();
    cal.setTime(startTime);
    long time2 = cal.getTimeInMillis();
    long between_days=(time1-time2)/(1000*3600*24);
    return between_days;
}
Writer

自定义的writer,继承ItemWriter接口,完成临时表数据入库批量 *** 作,实现方法public void write(List list) throws Exception中,List即批量处理的数据集合,在后面Step对象创建中配置;也可以使用ItemWriter很多现成的实现类,基本上你能想到的都已经提供:

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-VguAk47T-1642488842343)(C:UsersparateraAppDataRoamingTyporatypora-user-imagesimage-20220118142554284.png)]

@Component
public class RunningOverFirstWriter implements ItemWriter {

    @Autowired
    private JdbcTemplate jdbcTemplate;

    public RunningOverFirstWriter(JdbcTemplate jdbcTemplate) {
        this.jdbcTemplate = jdbcTemplate;
    }

    @Override
    public void write(List list) throws Exception {
        String batchSQL = "insert IGNORE into warning_info_item(type_id,cluster_id,cluster_user,job_id,start_time,job,create_at) values (?,?,?,?,?,?,?)";
        List batchArgs=new ArrayList();
        for (Object o : list) {
            WarningInfoItem item = (WarningInfoItem) o;
            Object[] objs = {item.getTypeId(),item.getClusterId(),item.getClusterUser(),item.getJobId(),item.getStartTime(),item.getJob(),item.getCreateAt()};
            batchArgs.add(objs);
        }
        jdbcTemplate.batchUpdate(batchSQL,batchArgs);
    }
}
Step2 Reader

将Step1落库的临时表数据根据业务进行聚合,这一步根据自己的需要来,没必要看这么长的SQL

@Bean("runningOverSecondReader")
@StepScope
public ListItemReader> runningOverReaderSecond(){
    //临时修改配置group_concat的合并长度,只会在当前session会话生效
    jdbcTemplate.execute("SET SESSION group_concat_max_len=1024000");
    //通过cluster_user left join warning_info_item 得到付费者id所属超算中心&超算账号对应的所有作业告警信息
    List> lstWarning = jdbcTemplate.queryForList("SELECT n" +
            "e.user_id,n" +
            "e.type_id,n" +
            "e.clusterId,n" +
            "e.clusterUser,n" +
            "CONCAt('[',GROUP_CONCAt(e.jobs),']') jobs,n" +
            "e.date_time,n" +
            "e.createAt n" +
            "FROM (SELECtn" +
            "        a.pay_user_id as user_id,n" +
            "        b.type_id,n" +
            "        b.cluster_id clusterId,n" +
            "        b.cluster_user clusterUser,n" +
            "        GROUP_CONCAt( b.job ) jobs,n" +
            "        b.start_time date_time,n" +
            "        b.create_at createAt n" +
            "FROMn" +
            "        cluster_user an" +
            "        LEFT JOIN warning_info_item b ON a.cluster_id = b.cluster_id AND a.`user` = b.cluster_user       n" +
            "WHERen" +
            "        b.type_id = 2 n" +
            "GROUP BYn" +
            "        a.pay_user_idn" +
            "        n" +
            "UNIOn ALL n" +
            "SELECtn" +
            "        d.pay_user_id as user_id,n" +
            "        b.type_id,n" +
            "        b.cluster_id clusterId,n" +
            "        b.cluster_user clusterUser,n" +
            "        GROUP_CONCAt( b.job ) jobs,n" +
            "        b.start_time date_time,n" +
            "        b.create_at createAt n" +
            "FROMn" +
            "        cluster_user an" +
            "        LEFT JOIN warning_info_item b ON a.cluster_id = b.cluster_id AND a.`user` = b.cluster_user n" +
            "        LEFT JOIN user_info c ON a.pay_user_id=c.idn" +
            "        LEFT JOIN account_group d ON c.group_id = d.idn" +
            "WHERen" +
            "        b.type_id = 2n" +
            "GROUP BY d.pay_user_idn" +
            ")en" +
            "GROUP BY e.user_id");
    ListItemReader> listItemReader = new ListItemReader>(lstWarning);
    return listItemReader;
}
Process

数据进行二次清洗,根据业务来,Process阶段本来就是可选的,主要是照顾流程化作业需要

@Bean("runningOverSecondProcess")
public ItemProcessor, Map> runningOverProcessSecond() {
    Format format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    ObjectMapper mapper = new ObjectMapper();
    ItemProcessor, Map> itemProcessor = ehrOrg -> {
        ehrOrg.put("date_time",ehrOrg.get("date_time") != null?format.format(Date.from(((LocalDateTime)ehrOrg.get("date_time")).toInstant(ZoneOffset.of("+8")))):"");
        ehrOrg.put("createAt",ehrOrg.get("createAt") != null?format.format(Date.from(((LocalDateTime)ehrOrg.get("createAt")).toInstant(ZoneOffset.of("+8")))):"");
        //组装报警描述
        String jobStr = (String) ehrOrg.get("jobs");
        if(StringUtils.hasText(jobStr)){
            List> list = mapper.readValue(jobStr, List.class);
            for (Map map : list) {
                String msg = "您的作业" + map.get("job_id") +"等已经运行超过1 天,请注意检查运行情况,避免机时浪费。 如有问题,可咨询在线支持工程师或您的客户经理,感谢您的支持";
                ehrOrg.put("msg",msg);
                break;
            }
        }
        return ehrOrg;
    };
    return itemProcessor;
}
Writer

自定义Writer,可以同时实现多个业务

@Component
@Slf4j
public class RunningOverSecondWriter implements ItemWriter {

    
    private static String QUEUE_CONSOLE_NOTICE = "***";

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Autowired
    private JdbcTemplate jdbcTemplate;

    @Autowired
    private WarningMapper warningMapper;

    @Value("${console.aliyun.ak}")
    private String ossAK;

    @Value("${console.aliyun.sk}")
    private String ossSK;

    @Value("${console.aliyun.endpoint}")
    private String endpoint;

    @Value("${console.warning.bucket.name}")
    private String bucketName;

    public RunningOverSecondWriter(RabbitTemplate rabbitTemplate) {
        this.rabbitTemplate = rabbitTemplate;
    }

    @Override
    public void write(List list) throws Exception {
        OSS ossClient = new OSSClientBuilder().build(endpoint, ossAK, ossSK);
        ObjectMapper mapper = new ObjectMapper();
        List lstWarning = new ArrayList<>();
        DateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        for (Object o : list) {
            Map m = (Map)o;
            
            String jobs = (String) m.get("jobs");
            String isOss = "0";
            StringBuffer remark = new StringBuffer();
            //生成告警id,取当前时间纳秒数
            long id = System.nanoTime();
            m.put("id",id);
            if(StringUtils.isNotEmpty(jobs)){
                List lstMap = mapper.readValue(jobs,List.class);
                if(!CollectionUtils.isEmpty(lstMap) && lstMap.size() > 5){
                    //将字符流转换为json文件,保存OSS标志,后面需要用到
                    isOss = "1";
                    //保存到Oss,同时设置文件路径
                    jobs = uploadJobs2Oss(ossClient,jobs,id).toString();
                }
                
                remark.append("n" +
                        "    n" +
                        "      作业号n" +
                        "    n" +
                        "    n" +
                        "      作业名n" +
                        "    n" +
                        "    n" +
                        "      队列n" +
                        "    n" +
                        "    n" +
                        "      节点数目n" +
                        "    n" +
                        "    n" +
                        "      占用核数n" +
                        "    n" +
                        "    n" +
                        "      运行天数n" +
                        "    n" +
                        "  ");
                for (Map map : lstMap) {
                    remark.append("n");
                    remark.append("n");
                    remark.append(map.get("job_id")).append("n");
                    remark.append("n");
                    remark.append("n");
                    remark.append(map.get("job_name")).append("n");
                    remark.append("n");
                    remark.append("n");
                    remark.append(map.get("partition")).append("n");;
                    remark.append("n");
                    remark.append("");
                    remark.append(map.get("node_num")).append("n");;
                    remark.append("n");
                    remark.append("");
                    remark.append(map.get("cores_num")).append("n");;
                    remark.append("n");
                    remark.append("");
                    remark.append(map.get("run_time")).append("n");;
                    remark.append("n");
                    remark.append("");
                }
                //显示详情
                m.put("remark",remark.toString());
            }
            WarningInfo warningInfo = settingWarningInfo(format, m, jobs, isOss);
            //发送MQ
            rabbitTemplate.convertAndSend(QUEUE_CONSOLE_NOTICE,mapper.writevalueAsString(warningInfo));
            lstWarning.add(warningInfo);
        }
        //关闭OSS客户端连接
        ossClient.shutdown();
        //批量插入warning_info
        warningMapper.batchInsert(lstWarning);
        //删除中间表原始数据
        jdbcTemplate.execute("delete from warning_info_item");
    }

    private WarningInfo settingWarningInfo(DateFormat format, Map m, String jobs, String isOss) throws ParseException {
        WarningInfo warningInfo = new WarningInfo();
        warningInfo.setId(BigInteger.valueOf((Long) m.get("id")));
        warningInfo.setUserId((String) m.get("user_id"));
        warningInfo.setTypeId((Integer) m.get("type_id"));
        warningInfo.setMsg((String) m.get("msg"));
        warningInfo.setIsOss(isOss);
        warningInfo.setJobs(jobs);
        warningInfo.setIsDetailInfo(true);
        warningInfo.setRemark((String) m.get("remark"));
        warningInfo.setDatetime(format.parse((String) m.get("date_time")));
        warningInfo.setCreateAt(format.parse((String) m.get("createAt")));
        return warningInfo;
    }

    private URL uploadJobs2Oss(OSS ossClient, String content, long fileName){
        PutObjectRequest putObjectRequest = new PutObjectRequest(bucketName, "warning/" + fileName + ".json", new ByteArrayInputStream(content.getBytes()));
        // 上传字符串。
        ossClient.putObject(putObjectRequest);
        //获取文件签名路径用于保存到warning_info表
        // 设置签名URL过期时间为3600秒(1小时)* 24 * 30,我擦,为什么会是减法
        Date expiration = new Date(Calendar.getInstance().getTime().getTime() - 3600 * 1000 * 24 * 30);
        // 生成以GET方法访问的签名URL,访客可以直接通过浏览器访问相关内容。
        URL url = ossClient.generatePresignedUrl(bucketName, "warning/" + fileName + ".json", expiration);
        return url;
    }
Step
@Autowired
private JobBuilderFactory jobBuilderFactory;

@Autowired
private StepBuilderFactory stepBuilderFactory;

@Autowired
@Qualifier("runningOverFirstReader")
private JsonItemReader runningOverFirstReader;

@Autowired
@Qualifier("runningOverFirstProcess")
private ItemProcessor runningOverFirstProcess;

@Autowired
private RunningOverFirstWriter runningOverFirstWriter;

@Autowired
@Qualifier("runningOverSecondReader")
private ListItemReader runningOverSecondReader;

@Autowired
@Qualifier("runningOverSecondProcess")
private ItemProcessor runningOverSecondProcess;

@Autowired
private RunningOverSecondWriter runningOverSecondWriter;


    public Step runningOverFirstStep() {
        return stepBuilderFactory.get("runningOverFirstStep")
                //提交事务之前要处理的数据个数,可以理解为"分页" 理论上如果数据够用,越大越好,但是如果不够用,会额外消耗准备时间(类似于i++ *** 作)
                .chunk(100)
                .reader(runningOverFirstReader)
                .processor(runningOverFirstProcess)
                .writer(runningOverFirstWriter)
                .build();
    }

    
    public Step runningOverSecondStep() {
        return stepBuilderFactory.get("runningOverSecondStep")
                .chunk(50)
                .reader(runningOverSecondReader)
                .processor(runningOverSecondProcess)
                .writer(runningOverSecondWriter)
                .build();
    }
Flow

这个非常类似于工作流,我甚至觉得可以当成工作流来应用!

@Bean
public Flow runningOverFlow() {
    return new FlowBuilder("runningOverFlow")
            .start(runningOverFirstStep())
            .next(runningOverSecondStep())
            .build();
}
Job

最小执行单元

@Bean
public Job runningOverJob() {
    return jobBuilderFactory.get("runningOverJob")
            .start(runningOverFlow())
            .build()  //builds FlowJobBuilder instance
            .build(); //builds Job instance
}
测试执行

通过Job执行器执行Job

//这个地方纯粹的只是为了打印一下每个Step执行时间,你完全可以new 一个空参数对象
JobParameters jobParameters = new JobParametersBuilder().addLong("time", System.currentTimeMillis())
        .toJobParameters();
try {
    jobLauncher.run(runningOverJob, jobParameters);
} catch (Exception e.printStack()) {
   e.printStack()
}

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/zaji/5707361.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存