복붙노트

[SPRING] Spring Batch - 리더 / 프로세서 / 라이터 단계 루핑

SPRING

Spring Batch - 리더 / 프로세서 / 라이터 단계 루핑

허용 된 응답 코드를 기반으로 다음 코드를 조정했습니다.

// helper method to create a split flow out of a List of steps
private static Flow createParallelFlow(List<Step> steps) {
    SimpleAsyncTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor();
    taskExecutor.setConcurrencyLimit(steps.size());         

    Flow[] flows = new Flow[steps.size()];
    for (int i = 0; i < steps.size(); i++) {
        flows[i] = new FlowBuilder<SimpleFlow>(steps.get(i).getName()).start(steps.get(i)).build();
    }           

    return new FlowBuilder<SimpleFlow>("parallelStepsFlow")
        .split(taskExecutor)                
        .add(flows)
        .build();
}

나는 올바르게 반복되는 버전으로 질문을 업데이트했지만 응용 프로그램이 확장 될 때 병렬 처리가 중요하므로 런타임시 동적으로 javaconfig를 사용하여 수행하는 방법을 알지 못한다 ...

세련된 질문 : 5 가지 상황에 대해 런타임에 동적으로 리더 프로세서 작성자를 만들려면 어떻게해야합니까? (5 개의 쿼리는 지금 구성된대로 5의 루프를 의미합니다)?

My LoopDecider는 다음과 같습니다.

public class LoopDecider implements JobExecutionDecider {

    private static final Logger LOG = LoggerFactory.getLogger(LoopDecider.class);
    private static final String COMPLETED = "COMPLETED";
    private static final String CONTINUE = "CONTINUE";
    private static final String ALL = "queries";
    private static final String COUNT = "count";

    private int currentQuery;
    private int limit;

    @SuppressWarnings("unchecked")
    @Override
    public FlowExecutionStatus decide(JobExecution jobExecution, StepExecution stepExecution) {
        List<String> allQueries = (List<String>) jobExecution.getExecutionContext().get(ALL);
        this.limit = allQueries.size();
        jobExecution.getExecutionContext().put(COUNT, currentQuery);
        if (++currentQuery >= limit) {
            return new FlowExecutionStatus(COMPLETED);
        } else {
            LOG.info("Looping for query: " + allQueries.get(currentQuery - 1));
            return new FlowExecutionStatus(CONTINUE);
        }       
    }

}

쿼리 목록 (HQL 쿼리)을 기반으로 각 쿼리에 대해 리더 - 프로세서 - 작성자를 원합니다. 현재 구성은 다음과 같습니다.

@Bean
public Job subsetJob() throws Exception {
    LoopDecider loopDecider = new LoopDecider();        
    FlowBuilder<Flow> flowBuilder = new FlowBuilder<>(FLOW_NAME);
    Flow flow = flowBuilder
            .start(createHQL())
            .next(extractData())
            .next(loopDecider)
            .on("CONTINUE")
            .to(extractData())
            .from(loopDecider)
            .on("COMPLETED")                
            .end()
            .build();       

    return jobBuilderFactory.get("subsetJob")               
            .start(flow)                
            .end()
            .build();
}

단계

public Step extractData(){
    return stepBuilderFactory.get("extractData")
            .chunk(100_000)
            .reader(reader())
            .processor(processor())
            .writer(writer())
            .build();
}

리더

public HibernateCursorItemReader reader(){      
    CustomHibernateCursorItemReader reader = new CustomHibernateCursorItemReader();
    reader.setSessionFactory(HibernateUtil.getSessionFactory());        
    reader.setUseStatelessSession(false);
    return reader;
}

프로세서

public DynamicRecordProcessor processor(){
    return new DynamicRecordProcessor();
}

작가

public FlatFileItemWriter writer(){
    CustomFlatFileItemWriter writer = new CustomFlatFileItemWriter();               
    writer.setLineAggregator(new DelimitedLineAggregator(){{
        setDelimiter(TARGET_DELIMITER);
        setFieldExtractor(new PassThroughFieldExtractor());
        }}
    );
    return writer;
}

현재이 프로세스는 단일 쿼리에 적합합니다. 그러나 실제로 쿼리 목록이 있습니다.

필자의 초기 아이디어는 단계를 반복하고 쿼리 목록과 각 쿼리에 대해 읽기 - 프로세스 - 작성 단계를 전달하는 것입니다. 이것은 또한 병렬 청킹에 이상적입니다.

그러나 extractData 단계에 매개 변수로 쿼리 목록을 추가하고 각 쿼리에 대해 단계를 생성 할 때 예상되는 단일 단계 대신 단계 목록이 반환됩니다. 작업은 단계 목록 대신 단일 단계를 기대한다고 불평하기 시작합니다.

또 다른 아이디어는 Multi Item ResourceReader와 같은 생각으로 사용자 정의 Multi HibernateCursorItemReader를 작성하는 것이 었습니다. 그러나 나는 더 이상 쓸모없는 솔루션을 찾고있었습니다.

@Bean
public List<Step> extractData(@Value("#{jobExecutionContext[HQL]}") List<String> queries){
    List<Step> steps = new ArrayList<Step>();
    for (String query : queries) {
        steps.add(stepBuilderFactory.get("extractData")
            .chunk(100_000)
            .reader(reader(query))
            .processor(processor())
            .writer(writer(query))
            .build());
    }
    return steps;
}

의문 단계를 반복하고이를 작업에 통합하려면 어떻게합니까?

해결법

  1. ==============================

    1.단계, 독자, 프로세서 및 작성자를 스프링 빈으로 인스턴스화하지 마십시오. 그것을 할 필요가 없습니다. 작업 인스턴스 만 Spring Bean이어야합니다.

    단계, 독자, 프로세서 및 작성자를 스프링 빈으로 인스턴스화하지 마십시오. 그것을 할 필요가 없습니다. 작업 인스턴스 만 Spring Bean이어야합니다.

    따라서 단계, 리더, 라이터 및 프로세서 크리에이터 메소드에서 @Bean 및 @StepScope 구성을 제거하고 필요한 경우 인스턴스화하십시오.

    하나의 catch가 있으므로 afterPropertiesSet ()을 수동으로 호출해야합니다. 예 :

    // @Bean -> delete
    // @StepScope -> delete
    public FlatFileItemWriter writer(@Value("#{jobExecutionContext[fileName]}") String fileName){
        FlatFileItemWriter writer = new FlatFileItemWriter();
        writer.setResource(new FileSystemResource(new File(TARGET_LOCATION + fileName + TARGET_FILE_EXTENSION)));       
        writer.setLineAggregator(new DelimitedLineAggregator(){{
            setDelimiter(TARGET_DELIMITER);
            setFieldExtractor(new PassThroughFieldExtractor());
            }}
        );
    
        // ------- ADD!!
        writer.afterPropertiesSet();
    
        return writer;
    }
    

    이렇게하면 단계, 판독기, 작성기 인스턴스가 모든 단계에 대해 명시 적으로 인스턴스화하므로 자동으로 "단계 범위 지정"됩니다.

    내 대답이 충분히 명확하지 않으면 알려주세요. 그런 다음 더 자세한 예제를 추가 할 것입니다.

    간단한 예 :

    @Configuration
    public class MyJobConfiguration {
    
        @Autowired
        private JobBuilderFactory jobBuilderFactory;
    
        @Autowired
        private StepBuilderFactory stepBuilderFactory;
    
    
        List<String> filenames = Arrays.asList("file1.txt", "file2.txt");
    
        @Bean
        public Job myJob() {
    
           List<Step> steps = filenames.stream().map(name -> createStep(filename));
    
           return jobBuilderFactory.get("subsetJob")               
                .start(createParallelFlow(steps));                
                .end()
                .build();
        }
    
    
        // helper method to create a step
        private Step createStep(String filename) {
        {
            return stepBuilderFactory.get("convertStepFor" + filename); // !!! Stepname has to be unique
                .chunk(100_000)
                .reader(createFileReader(new FileSystemResource(new File(filename)), new YourInputLineMapper()));
                .processor(new YourConversionProcessor());
                .writer(createFileWriter(new FileSystemResource(new File("converted_"+filename)), new YourOutputLineAggregator()));
                .build();
        }
    
    
        // helper method to create a split flow out of a List of steps
        private static Flow createParallelFlow(List<Step> steps) {
            SimpleAsyncTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor();
            taskExecutor.setConcurrencyLimit(steps.size());
    
            List<Flow> flows = steps.stream() // we have to convert the steps to a flows
                .map(step -> //
                        new FlowBuilder<Flow>("flow_" + step.getName()) //
                        .start(step) //
                        .build()) //
                .collect(Collectors.toList());
    
            return new FlowBuilder<SimpleFlow>("parallelStepsFlow").split(taskExecutor) //
                .add(flows.toArray(new Flow[flows.size()])) //
                .build();
        }
    
    
        // helper methods to create filereader and filewriters
        public static <T> ItemReader<T> createFileReader(Resource source, LineMapper<T> lineMapper) throws Exception {
            FlatFileItemReader<T> reader = new FlatFileItemReader<>();
    
            reader.setEncoding("UTF-8");
            reader.setResource(source);
            reader.setLineMapper(lineMapper);
            reader.afterPropertiesSet();
    
            return reader;
        }
    
        public static <T> ItemWriter<T> createFileWriter(Resource target, LineAggregator<T> aggregator) throws Exception {
            FlatFileItemWriter<T> writer = new FlatFileItemWriter<>();
    
            writer.setEncoding("UTF-8");
            writer.setResource(target);
            writer.setLineAggregator(aggregator);
    
            writer.afterPropertiesSet();
            return writer;
        }
    }
    
  2. from https://stackoverflow.com/questions/37238813/spring-batch-looping-a-reader-processor-writer-step by cc-by-sa and MIT license