Spring Batch-循环读取器处理器写入器步骤

Spring Batch-循环读取器处理器写入器步骤,第1张

Spring Batch-循环读取器/处理器/写入器步骤

不要将步骤,读取器,处理程序和编写器实例化为Spring-Bean。不需要这样做。只有您的工作实例必须是Spring Bean。

因此,只需从您的步骤,读取器,写入器和处理器创建器方法中删除@Bean和@StepScope配置,然后在需要时实例化它们即可。

只有一个陷阱,您必须手动调用afterPropertiesSet()。例如:

// @Bean -> delete// @StepScope -> deletepublic FlatFileItemWriter writer(@Value("#{jobExecutionContext[fileName]}") String fileName){    FlatFileItemWriter writer = new FlatFileItemWriter();    writer.setResource(new FileSystemResource(new File(TARGET_LOCATION + fileName + TARGET_FILE_EXTENSION)));writer.setLineAggregator(new DelimitedLineAggregator(){{        setDelimiter(TARGET_DELIMITER);        setFieldExtractor(new PassThroughFieldExtractor());        }}    );    // ------- ADD!!    writer.afterPropertiesSet();    return writer;}

这样,您的步骤,读取器,编写器实例将自动被“作用域限定”,因为您明确地为每个步骤实例化了它们。

如果我的回答不够清楚,请告诉我。然后,我将添加一个更详细的示例。

编辑

一个简单的例子:

@Configurationpublic class MyJobConfiguration {    @Autowired    private JobBuilderFactory jobBuilderFactory;    @Autowired    private StepBuilderFactory stepBuilderFactory;    List<String> filenames = Arrays.asList("file1.txt", "file2.txt");    @Bean    public Job myJob() {       List<Step> steps = filenames.stream().map(name -> createStep(filename));       return jobBuilderFactory.get("subsetJob")     .start(createParallelFlow(steps));      .end() .build();    }    // helper method to create a step    private Step createStep(String filename) {    {        return stepBuilderFactory.get("convertStepFor" + filename); // !!! Stepname has to be unique .chunk(100_000) .reader(createFileReader(new FileSystemResource(new File(filename)), new YourInputLineMapper())); .processor(new YourConversionProcessor()); .writer(createFileWriter(new FileSystemResource(new File("converted_"+filename)), new YourOutputLineAggregator())); .build();    }    // helper method to create a split flow out of a List of steps    private static Flow createParallelFlow(List<Step> steps) {        SimpleAsyncTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor();        taskExecutor.setConcurrencyLimit(steps.size());        List<Flow> flows = steps.stream() // we have to convert the steps to a flows .map(step -> //         new FlowBuilder<Flow>("flow_" + step.getName()) //         .start(step) //         .build()) // .collect(Collectors.toList());        return new FlowBuilder<SimpleFlow>("parallelStepsFlow").split(taskExecutor) // .add(flows.toArray(new Flow[flows.size()])) // .build();    }    // helper methods to create filereader and filewriters    public static <T> ItemReader<T> createFileReader(Resource source, LineMapper<T> lineMapper) throws Exception {        FlatFileItemReader<T> reader = new FlatFileItemReader<>();        reader.setEncoding("UTF-8");        reader.setResource(source);        reader.setLineMapper(lineMapper);        reader.afterPropertiesSet();        return reader;    }    public static <T> ItemWriter<T> createFileWriter(Resource target, LineAggregator<T> aggregator) throws Exception {        FlatFileItemWriter<T> writer = new FlatFileItemWriter<>();        writer.setEncoding("UTF-8");        writer.setResource(target);        writer.setLineAggregator(aggregator);        writer.afterPropertiesSet();        return writer;    }}


欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/zaji/5479130.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-12
下一篇 2022-12-12

发表评论

登录后才能评论

评论列表(0条)

保存