복붙노트

[SPRING] 병렬 작업을 실행할 때 Tasklet에서 params를 안전하게 단계별로 전달하는 방법

SPRING

병렬 작업을 실행할 때 Tasklet에서 params를 안전하게 단계별로 전달하는 방법

나는 tasklet에서 안전하게 동일한 매개 변수를 동일한 작업의 단계로 전달하려고합니다.

내 직업은 하나씩 3 단계의 작업 표 (step1, step2, step3)로 구성되며 결국에는 step4 (프로세서, 리더, 작가)

이 작업은 여러 번 병렬로 실행됩니다.

steplet 1 단계에서 나는 웹 서비스를 통해 param (hashId)을 평가하고있다. 내 리더 (4 단계)

3 단계에서 나는 새로운 매개 변수를 만들었습니다 : filePath는 hashid를 기반으로하며 파일 리소스 위치로 step4 (판독기)로 보냅니다.

이 param (hashId 및 filePath)을 전달하려면 stepExecution을 사용하고 있습니다.

나는 tasklet을 통해 그것을하는 3 가지 방법을 시도했다 :

param (step1의 step1에서 hashId를 step2로, step2에서 step3으로 hashId)를 전달하려면 다음과 같이하십시오.

chunkContext.getStepContext()
        .getStepExecution()
        .getExecutionContext()
        .put("hashId", hashId);

4 단계에서 해시 ID를 기반으로 파일 경로를 채우고 내 마지막 단계 (리더 프로세서 및 작성기)로이 방법을 전달합니다.

public class DownloadFileTasklet implements Tasklet, StepExecutionListener {
..

    @Override
     public RepeatStatus execute(ChunkContext chunkContext, ExecutionContext    
     executionContext) throws IOException {

    String hashId = chunkContext.getStepContext().getStepExecution().getJobExecution().getExecutionContext().get("hashId");

          ...

filepath="...hashId.csv";
//I used here executionContextPromotionListener in order to promote those keys

        chunkContext.getStepContext()
        .getStepExecution()
        .getExecutionContext()
        .put("filePath", filePath);
    } 

logger.info("filePath + "for hashId=" + hashId);

}
@Override
public void beforeStep(StepExecution stepExecution) {
    this.stepExecution = stepExecution;
}

내가 그 단계 (step3)를 끝내기 직전에 hashId와 filePath 값을 출력하고 있다는 것에주의를 기울이십시오. 로그에 의해 예상대로 일관되고 채워집니다.

또한 독자가 로그를 추가하여 내가 얻은 매개 변수를 로그에 기록했습니다.

@Bean
    @StepScope
    public ItemStreamReader<MyDTO> reader(@Value("#{jobExecutionContext[filePath]}") String filePath) {
              logger.info("test filePath="+filePath+");

        return itemReader;
    }

이 작업을 ~ 10 번 실행하면 병렬로 실행할 때 param filePath 값이 다른 작업 filePath 값으로 채워진다는 것을 알 수 있습니다.

이것은 executeContextPromotionListener로 작업 키를 승격시키는 방법입니다.

직업 정의 :

 @Bean
    public Job processFileJob() throws Exception {
        return this.jobs.get("processFileJob").
                start.(step1).
                next(step2)
                next(downloadFileTaskletStep()). //step3
                next(processSnidFileStep()).build();  //step4

    }

3 단계 정의

  public Step downloadFileTaskletStep() {
        return this.steps.get("downloadFileTaskletStep").tasklet(downloadFileTasklet()).listener(executionContextPromotionListener()).build();
    }


  @Bean
    public org.springframework.batch.core.listener.ExecutionContextPromotionListener executionContextPromotionListener() {
        ExecutionContextPromotionListener executionContextPromotionListener = new ExecutionContextPromotionListener();
        executionContextPromotionListener.setKeys(new String[]{"filePath"});
        return executionContextPromotionListener;
    }

같은 결과가 매개 변수를 엉망으로 만든다.

스프링 배치 데이터베이스 테이블을 통해 결과를 추적 할 수 있습니다 : batch_job_execution_context.short_context :

여기서 당신은 hashid에 의해 생성 된 filePatch가 원래의 hashId와 동일하지 않다는 것을 볼 수 있습니다 // 잘못된 레코드 ///

{ "string": "segmentId", "long": 13}, { "string": [{ "string": " "filePath", "/ etc / mydir / services / notification_processor / files / 2015_04_22 / f1c7b0f2180b7e266d36f87fcf6fb7aa.csv"]}, { "string": [ "hashId", "20df39d201fffc7444423cfdf2f43789"]}]]}}

이제 우리가 다른 기록을 확인하면 좋을 것 같습니다. 그러나 항상 하나 또는 두 개의 엉망이되었습니다.

// 정확한 레코드

{"map":[{"entry":[{"string":"totalRecords","int":5},{"string":"segmentId","long":13},{"string":["filePath","\/etc\/mydir\/services\/notification_processor\/files\/2015_04_22\/**c490c8282628b894727fc2a4d6fc0cb5**.csv"]},{"string":["hashId","**c490c8282628b894727fc2a4d6fc0cb5**"]}]}]}

{"map":[{"entry":[{"string":"totalRecords","int":5},{"string":"segmentId","long":13},{"string":["filePath","\/etc\/mydir\/services\/notification_processor\/files\/2015_04_22\/**2b21d3047208729192b87e90e4a868e4**.csv"]},{"string":["hashId","**2b21d3047208729192b87e90e4a868e4**"]}]}]}   

내가 스레딩 문제를 가지고있는 이유는 무엇입니까?

해결법

  1. ==============================

    1.시도한 방법을 검토하려면 다음을 수행하십시오.

    시도한 방법을 검토하려면 다음을 수행하십시오.

    간단히 말해서 메소드 3이 가장 정확하지만, 공유하려는 특성을 단계의 ExecutionContext에 추가 한 다음 ExecutionContextPromotionListener를 구성하여 해당 키를 Job의 ExecutionContext로 승격시켜야합니다.

    코드는 다음과 같이 업데이트됩니다.

    chunkContext.getStepContext()
                .getStepExecution()
                .getExecutionContext()
                .put("filePath", filePath);
    
  2. from https://stackoverflow.com/questions/29776901/how-to-safely-pass-params-from-tasklet-to-step-when-running-parallel-jobs by cc-by-sa and MIT license