不要将步骤,读取器,处理程序和编写器实例化为Spring-Bean。不需要这样做。只有您的工作实例必须是Spring Bean。
因此,只需从您的步骤,读取器,写入器和处理器创建器方法中删除@Bean和@StepScope配置,然后在需要时实例化它们即可。
只有一个陷阱,您必须手动调用afterPropertiesSet()。例如:
// @Bean -> delete// @StepScope -> deletepublic FlatFileItemWriter writer(@Value("#{jobExecutionContext[fileName]}") String fileName){ FlatFileItemWriter writer = new FlatFileItemWriter(); writer.setResource(new FileSystemResource(new File(TARGET_LOCATION + fileName + TARGET_FILE_EXTENSION)));writer.setLineAggregator(new DelimitedLineAggregator(){{ setDelimiter(TARGET_DELIMITER); setFieldExtractor(new PassThroughFieldExtractor()); }} ); // ------- ADD!! writer.afterPropertiesSet(); return writer;}
这样,您的步骤,读取器,编写器实例将自动被“作用域限定”,因为您明确地为每个步骤实例化了它们。
如果我的回答不够清楚,请告诉我。然后,我将添加一个更详细的示例。
编辑一个简单的例子:
@Configurationpublic class MyJobConfiguration { @Autowired private JobBuilderFactory jobBuilderFactory; @Autowired private StepBuilderFactory stepBuilderFactory; List<String> filenames = Arrays.asList("file1.txt", "file2.txt"); @Bean public Job myJob() { List<Step> steps = filenames.stream().map(name -> createStep(filename)); return jobBuilderFactory.get("subsetJob") .start(createParallelFlow(steps)); .end() .build(); } // helper method to create a step private Step createStep(String filename) { { return stepBuilderFactory.get("convertStepFor" + filename); // !!! Stepname has to be unique .chunk(100_000) .reader(createFileReader(new FileSystemResource(new File(filename)), new YourInputLineMapper())); .processor(new YourConversionProcessor()); .writer(createFileWriter(new FileSystemResource(new File("converted_"+filename)), new YourOutputLineAggregator())); .build(); } // helper method to create a split flow out of a List of steps private static Flow createParallelFlow(List<Step> steps) { SimpleAsyncTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor(); taskExecutor.setConcurrencyLimit(steps.size()); List<Flow> flows = steps.stream() // we have to convert the steps to a flows .map(step -> // new FlowBuilder<Flow>("flow_" + step.getName()) // .start(step) // .build()) // .collect(Collectors.toList()); return new FlowBuilder<SimpleFlow>("parallelStepsFlow").split(taskExecutor) // .add(flows.toArray(new Flow[flows.size()])) // .build(); } // helper methods to create filereader and filewriters public static <T> ItemReader<T> createFileReader(Resource source, LineMapper<T> lineMapper) throws Exception { FlatFileItemReader<T> reader = new FlatFileItemReader<>(); reader.setEncoding("UTF-8"); reader.setResource(source); reader.setLineMapper(lineMapper); reader.afterPropertiesSet(); return reader; } public static <T> ItemWriter<T> createFileWriter(Resource target, LineAggregator<T> aggregator) throws Exception { FlatFileItemWriter<T> writer = new FlatFileItemWriter<>(); writer.setEncoding("UTF-8"); writer.setResource(target); writer.setLineAggregator(aggregator); writer.afterPropertiesSet(); return writer; }}
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)