복붙노트

[SPRING] 스프링 배치 프로세서에서 비동기식 REST API 호출하기

SPRING

스프링 배치 프로세서에서 비동기식 REST API 호출하기

목록 목록을 처리하는 봄 배치 작업을 작성했습니다.

Reader는 목록 목록을 반환합니다. 프로세서는 각 ListItem에서 작동하고 처리 된 List를 반환합니다. Writer는 DB에 항목을 쓰고 List of List에서 sftp를 작성합니다.

Spring 배치 프로세서에서 Async REST API를 호출하는 유스 케이스가있다. ListenableFutureCommand에서 ListenableFutureCallback을 구현하여 예상대로 작동하는 성공 및 실패를 처리했지만 비동기 호출이 무언가를 반환하기 전에 ItemProcessor는 비동기 api에서 콜백을 기다리고 객체 (List)를 작성자에게 반환합니다.

ItemProcessor에서 비동기 호출을 구현하고 처리하는 방법을 모르겠습니다.

AsyncItemProcessor 및 AsyncItemWriter에 대해 읽었지만이 시나리오에서 사용해야하는 것인지 확실하지 않습니다.

또한 AsyncRestTemplate에서 ListenableFuture 응답에 대해 get ()을 호출 할 것을 고려했지만 응답마다 응답하지 않을 때까지 문서별로 현재 스레드를 차단합니다.

나는 이것을 어떻게 구현해야하는지에 대한 도움을 구하고있다. 아래의 코드 스 니펫 :

프로세서 :

public class MailDocumentProcessor implements ItemProcessor<List<MailingDocsEntity>, List<MailingDocsEntity>> {

... Initialization code

@Override
public List<MailingDocsEntity> process(List<MailingDocsEntity> documentsList) throws Exception {
    logger.info("Entering MailingDocsEntity processor");


    List<MailingDocsEntity> synchronizedList = Collections.synchronizedList(documentsList);


    for (MailingDocsEntity mailingDocsEntity : synchronizedList) {
        System.out.println("Reading Mailing id: " + mailingDocsEntity.getMailingId());

       ..code to get the file

         //If the file is not a pdf convert it
         String fileExtension = readFromSpResponse.getFileExtension();
         String fileName = readFromSpResponse.getFileName();
         byte[] fileBytes = readFromSpResponse.getByteArray();

         try {

             //Do checks to make sure PDF file is being sent
             if (!"pdf".equalsIgnoreCase(fileExtension)) {
                 //Only doc, docx and xlsx conversions are supported

                     ...Building REquest object
                     //make async call to pdf conversion service
            pdfService.convertDocxToPdf(request, mailingDocsEntity);

                 } else {
                     logger.error("The file cannot be converted to a pdf.\n"
                        );

                 }
             }


         } catch (Exception ex){
             logger.error("There has been an exception while processing data", ex);

         }

    }
    return synchronizedList;
}

}

비동기 PdfConversion 서비스 클래스 :

@Service
public class PdfService{


   @Autowired
   @Qualifier("MicroServiceAsyncRestTemplate")
   AsyncRestTemplate microServiceAsyncRestTemplate;

   public ConvertDocxToPdfResponse convertDocxToPdf(ConvertDocxToPdfRequest request, MailingDocsEntity mailingDocsEntity){

        ConvertDocxToPdfResponse pdfResponse = new ConvertDocxToPdfResponse();


            try {

                HttpHeaders headers = new HttpHeaders();
                headers.setContentType(MediaType.APPLICATION_JSON);

                HttpEntity<?> entity = new HttpEntity<>(request, headers);



                ListenableFuture<ResponseEntity<ConvertDocxToPdfResponse>> microServiceResponse = microServiceAsyncRestTemplate.postForEntity(batchMailProcessingConfiguration.getPdfUrl(), entity, ConvertDocxToPdfResponse.class);

                ConvertDocxToPdfResponse resultBody = microServiceResponse.get().getBody();
                microServiceResponse.addCallback(new ListenableFutureCallback<ResponseEntity<ConvertDocxToPdfResponse>>()  {

                    @Override
                    public void onSuccess(ResponseEntity<ConvertDocxToPdfResponse> result) {
                        ...code to do stuff on success


                    }

                    @Override
                    public void onFailure(Throwable ex) {
                        pdfResponse.setMessage("Exception while retrieving response");

                    }
                });

            } catch (Exception e) {
                String message = "There has been an error while issuing a pdf generate request to the pdf micro service";
                pdfResponse.setMessage(message);
                logger.error(message, e);
            }


        return pdfResponse;
    }

}

내 원래의 일괄 작업은 동기식이었고, 비동기식으로 변환하여 처리 속도가 빨라졌습니다. 비슷한 질문을 찾으려고했으나 충분한 정보를 찾을 수 없었습니다. 모든 포인터 또는 도움이 매우 감사하겠습니다.

고맙습니다!!

해결법

  1. ==============================

    1.예, AsyncItemProcessor 및 AsyncItemWriter는 사용 사례에 적합합니다. AsyncItemProcessor는 새 스레드의 항목에 대한 대리자 ItemProcessor의 논리 (나머지 호출)를 실행합니다. 항목이 완료되면 결과의 Future가 작성 될 AsynchItemWriter에 전달됩니다. 그러면 AsynchItemWriter가 Future를 언랩하고 항목을 씁니다. 이러한 구성 요소의 장점은 선물 포장, 포장 해제 등을 직접 처리 할 필요가 없다는 것입니다.

    예, AsyncItemProcessor 및 AsyncItemWriter는 사용 사례에 적합합니다. AsyncItemProcessor는 새 스레드의 항목에 대한 대리자 ItemProcessor의 논리 (나머지 호출)를 실행합니다. 항목이 완료되면 결과의 Future가 작성 될 AsynchItemWriter에 전달됩니다. 그러면 AsynchItemWriter가 Future를 언랩하고 항목을 씁니다. 이러한 구성 요소의 장점은 선물 포장, 포장 해제 등을 직접 처리 할 필요가 없다는 것입니다.

    다음을 찾을 수 있습니다.

    희망이 도움이됩니다.

  2. from https://stackoverflow.com/questions/52302649/calling-async-rest-api-from-spring-batch-processor by cc-by-sa and MIT license