[SPRING] Spring Batch - 리더 / 프로세서 / 라이터 단계 루핑
SPRINGSpring Batch - 리더 / 프로세서 / 라이터 단계 루핑
허용 된 응답 코드를 기반으로 다음 코드를 조정했습니다.
// helper method to create a split flow out of a List of steps
private static Flow createParallelFlow(List<Step> steps) {
SimpleAsyncTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor();
taskExecutor.setConcurrencyLimit(steps.size());
Flow[] flows = new Flow[steps.size()];
for (int i = 0; i < steps.size(); i++) {
flows[i] = new FlowBuilder<SimpleFlow>(steps.get(i).getName()).start(steps.get(i)).build();
}
return new FlowBuilder<SimpleFlow>("parallelStepsFlow")
.split(taskExecutor)
.add(flows)
.build();
}
나는 올바르게 반복되는 버전으로 질문을 업데이트했지만 응용 프로그램이 확장 될 때 병렬 처리가 중요하므로 런타임시 동적으로 javaconfig를 사용하여 수행하는 방법을 알지 못한다 ...
세련된 질문 : 5 가지 상황에 대해 런타임에 동적으로 리더 프로세서 작성자를 만들려면 어떻게해야합니까? (5 개의 쿼리는 지금 구성된대로 5의 루프를 의미합니다)?
My LoopDecider는 다음과 같습니다.
public class LoopDecider implements JobExecutionDecider {
private static final Logger LOG = LoggerFactory.getLogger(LoopDecider.class);
private static final String COMPLETED = "COMPLETED";
private static final String CONTINUE = "CONTINUE";
private static final String ALL = "queries";
private static final String COUNT = "count";
private int currentQuery;
private int limit;
@SuppressWarnings("unchecked")
@Override
public FlowExecutionStatus decide(JobExecution jobExecution, StepExecution stepExecution) {
List<String> allQueries = (List<String>) jobExecution.getExecutionContext().get(ALL);
this.limit = allQueries.size();
jobExecution.getExecutionContext().put(COUNT, currentQuery);
if (++currentQuery >= limit) {
return new FlowExecutionStatus(COMPLETED);
} else {
LOG.info("Looping for query: " + allQueries.get(currentQuery - 1));
return new FlowExecutionStatus(CONTINUE);
}
}
}
쿼리 목록 (HQL 쿼리)을 기반으로 각 쿼리에 대해 리더 - 프로세서 - 작성자를 원합니다. 현재 구성은 다음과 같습니다.
일
@Bean
public Job subsetJob() throws Exception {
LoopDecider loopDecider = new LoopDecider();
FlowBuilder<Flow> flowBuilder = new FlowBuilder<>(FLOW_NAME);
Flow flow = flowBuilder
.start(createHQL())
.next(extractData())
.next(loopDecider)
.on("CONTINUE")
.to(extractData())
.from(loopDecider)
.on("COMPLETED")
.end()
.build();
return jobBuilderFactory.get("subsetJob")
.start(flow)
.end()
.build();
}
단계
public Step extractData(){
return stepBuilderFactory.get("extractData")
.chunk(100_000)
.reader(reader())
.processor(processor())
.writer(writer())
.build();
}
리더
public HibernateCursorItemReader reader(){
CustomHibernateCursorItemReader reader = new CustomHibernateCursorItemReader();
reader.setSessionFactory(HibernateUtil.getSessionFactory());
reader.setUseStatelessSession(false);
return reader;
}
프로세서
public DynamicRecordProcessor processor(){
return new DynamicRecordProcessor();
}
작가
public FlatFileItemWriter writer(){
CustomFlatFileItemWriter writer = new CustomFlatFileItemWriter();
writer.setLineAggregator(new DelimitedLineAggregator(){{
setDelimiter(TARGET_DELIMITER);
setFieldExtractor(new PassThroughFieldExtractor());
}}
);
return writer;
}
현재이 프로세스는 단일 쿼리에 적합합니다. 그러나 실제로 쿼리 목록이 있습니다.
필자의 초기 아이디어는 단계를 반복하고 쿼리 목록과 각 쿼리에 대해 읽기 - 프로세스 - 작성 단계를 전달하는 것입니다. 이것은 또한 병렬 청킹에 이상적입니다.
그러나 extractData 단계에 매개 변수로 쿼리 목록을 추가하고 각 쿼리에 대해 단계를 생성 할 때 예상되는 단일 단계 대신 단계 목록이 반환됩니다. 작업은 단계 목록 대신 단일 단계를 기대한다고 불평하기 시작합니다.
또 다른 아이디어는 Multi Item ResourceReader와 같은 생각으로 사용자 정의 Multi HibernateCursorItemReader를 작성하는 것이 었습니다. 그러나 나는 더 이상 쓸모없는 솔루션을 찾고있었습니다.
@Bean
public List<Step> extractData(@Value("#{jobExecutionContext[HQL]}") List<String> queries){
List<Step> steps = new ArrayList<Step>();
for (String query : queries) {
steps.add(stepBuilderFactory.get("extractData")
.chunk(100_000)
.reader(reader(query))
.processor(processor())
.writer(writer(query))
.build());
}
return steps;
}
의문 단계를 반복하고이를 작업에 통합하려면 어떻게합니까?
해결법
-
==============================
1.단계, 독자, 프로세서 및 작성자를 스프링 빈으로 인스턴스화하지 마십시오. 그것을 할 필요가 없습니다. 작업 인스턴스 만 Spring Bean이어야합니다.
단계, 독자, 프로세서 및 작성자를 스프링 빈으로 인스턴스화하지 마십시오. 그것을 할 필요가 없습니다. 작업 인스턴스 만 Spring Bean이어야합니다.
따라서 단계, 리더, 라이터 및 프로세서 크리에이터 메소드에서 @Bean 및 @StepScope 구성을 제거하고 필요한 경우 인스턴스화하십시오.
하나의 catch가 있으므로 afterPropertiesSet ()을 수동으로 호출해야합니다. 예 :
// @Bean -> delete // @StepScope -> delete public FlatFileItemWriter writer(@Value("#{jobExecutionContext[fileName]}") String fileName){ FlatFileItemWriter writer = new FlatFileItemWriter(); writer.setResource(new FileSystemResource(new File(TARGET_LOCATION + fileName + TARGET_FILE_EXTENSION))); writer.setLineAggregator(new DelimitedLineAggregator(){{ setDelimiter(TARGET_DELIMITER); setFieldExtractor(new PassThroughFieldExtractor()); }} ); // ------- ADD!! writer.afterPropertiesSet(); return writer; }
이렇게하면 단계, 판독기, 작성기 인스턴스가 모든 단계에 대해 명시 적으로 인스턴스화하므로 자동으로 "단계 범위 지정"됩니다.
내 대답이 충분히 명확하지 않으면 알려주세요. 그런 다음 더 자세한 예제를 추가 할 것입니다.
간단한 예 :
@Configuration public class MyJobConfiguration { @Autowired private JobBuilderFactory jobBuilderFactory; @Autowired private StepBuilderFactory stepBuilderFactory; List<String> filenames = Arrays.asList("file1.txt", "file2.txt"); @Bean public Job myJob() { List<Step> steps = filenames.stream().map(name -> createStep(filename)); return jobBuilderFactory.get("subsetJob") .start(createParallelFlow(steps)); .end() .build(); } // helper method to create a step private Step createStep(String filename) { { return stepBuilderFactory.get("convertStepFor" + filename); // !!! Stepname has to be unique .chunk(100_000) .reader(createFileReader(new FileSystemResource(new File(filename)), new YourInputLineMapper())); .processor(new YourConversionProcessor()); .writer(createFileWriter(new FileSystemResource(new File("converted_"+filename)), new YourOutputLineAggregator())); .build(); } // helper method to create a split flow out of a List of steps private static Flow createParallelFlow(List<Step> steps) { SimpleAsyncTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor(); taskExecutor.setConcurrencyLimit(steps.size()); List<Flow> flows = steps.stream() // we have to convert the steps to a flows .map(step -> // new FlowBuilder<Flow>("flow_" + step.getName()) // .start(step) // .build()) // .collect(Collectors.toList()); return new FlowBuilder<SimpleFlow>("parallelStepsFlow").split(taskExecutor) // .add(flows.toArray(new Flow[flows.size()])) // .build(); } // helper methods to create filereader and filewriters public static <T> ItemReader<T> createFileReader(Resource source, LineMapper<T> lineMapper) throws Exception { FlatFileItemReader<T> reader = new FlatFileItemReader<>(); reader.setEncoding("UTF-8"); reader.setResource(source); reader.setLineMapper(lineMapper); reader.afterPropertiesSet(); return reader; } public static <T> ItemWriter<T> createFileWriter(Resource target, LineAggregator<T> aggregator) throws Exception { FlatFileItemWriter<T> writer = new FlatFileItemWriter<>(); writer.setEncoding("UTF-8"); writer.setResource(target); writer.setLineAggregator(aggregator); writer.afterPropertiesSet(); return writer; } }
from https://stackoverflow.com/questions/37238813/spring-batch-looping-a-reader-processor-writer-step by cc-by-sa and MIT license
'SPRING' 카테고리의 다른 글
[SPRING] Java 11과 호환되는 최소 스프링 버전 (0) | 2018.12.27 |
---|---|
[SPRING] Spring restTemplate을 사용하여 302 리디렉션을 따릅니 까? (0) | 2018.12.27 |
[SPRING] 스프링 보안 3 Active Directory 인증, 데이터베이스 인증 (0) | 2018.12.26 |
[SPRING] @Scope ( "request")가 작동하지 않습니다. (0) | 2018.12.26 |
[SPRING] 칠면조 자바 표준시 (일광 절약 시간제) (0) | 2018.12.26 |