복붙노트

[SPRING] Flux <DataBuffer>를 올바르게 읽고 단일 inputStream으로 변환하는 방법

SPRING

Flux 를 올바르게 읽고 단일 inputStream으로 변환하는 방법

스프링 부트 애플리케이션에 WebClient 및 사용자 정의 BodyExtractorclass를 사용하고 있습니다.

WebClient webLCient = WebClient.create();
webClient.get()
   .uri(url, params)
   .accept(MediaType.APPLICATION.XML)
   .exchange()
   .flatMap(response -> {
     return response.body(new BodyExtractor());
   })

BodyExtractor.java

@Override
public Mono<T> extract(ClientHttpResponse response, BodyExtractor.Context context) {
  Flux<DataBuffer> body = response.getBody();
  body.map(dataBuffer -> {
    try {
      JaxBContext jc = JaxBContext.newInstance(SomeClass.class);
      Unmarshaller unmarshaller = jc.createUnmarshaller();

      return (T) unmarshaller.unmarshal(dataBuffer.asInputStream())
    } catch(Exception e){
       return null;
    }
  }).next();
}

위의 코드는 작은 페이로드에서는 작동하지만 큰 페이로드에서는 작동하지 않습니다. 다음 플럭스 값을 읽는 중이므로 모든 데이터 버퍼를 결합하고 읽는 방법을 모르겠습니다.

나는 반응기를 처음 사용하기 때문에 flux / mono로 많은 트릭을 알지 못합니다.

해결법

  1. ==============================

    1.InputStream 재구성은 수집 작업이 완료 될 때까지 아무 것도 출력되지 않으므로 WebClient를 처음 사용하는 목적을 상쇄합니다. 큰 물줄기의 경우 매우 오랜 시간이 걸릴 수 있습니다. 반응 모델은 개별 바이트를 처리하지 않고 Spring DataBuffer와 같은 바이트 블록을 처리합니다. 좀 더 우아한 해결책을 찾으려면 내 대답을 참조하십시오. https://stackoverflow.com/a/48054615/839733

    InputStream 재구성은 수집 작업이 완료 될 때까지 아무 것도 출력되지 않으므로 WebClient를 처음 사용하는 목적을 상쇄합니다. 큰 물줄기의 경우 매우 오랜 시간이 걸릴 수 있습니다. 반응 모델은 개별 바이트를 처리하지 않고 Spring DataBuffer와 같은 바이트 블록을 처리합니다. 좀 더 우아한 해결책을 찾으려면 내 대답을 참조하십시오. https://stackoverflow.com/a/48054615/839733

  2. ==============================

    2.Flux # collect 및 SequenceInputStream을 사용하여 작업을 수행 할 수있었습니다.

    Flux # collect 및 SequenceInputStream을 사용하여 작업을 수행 할 수있었습니다.

    @Override
    public Mono<T> extract(ClientHttpResponse response, BodyExtractor.Context context) {
      Flux<DataBuffer> body = response.getBody();
      return body.collect(InputStreamCollector::new, (t, dataBuffer)-> t.collectInputStream(dataBuffer.asInputStream))
        .map(inputStream -> {
          try {
            JaxBContext jc = JaxBContext.newInstance(SomeClass.class);
            Unmarshaller unmarshaller = jc.createUnmarshaller();
    
            return (T) unmarshaller.unmarshal(inputStream);
          } catch(Exception e){
            return null;
          }
      }).next();
    }
    

    InputStreamCollector.java

    public class InputStreamCollector {
      private InputStream is;
    
      public void collectInputStream(InputStream is) {
        if (this.is == null) this.is = is;
        this.is = new SequenceInputStream(this.is, is);
      }
    
      public InputStream getInputStream() {
        return this.is;
      }
    }
    
  3. ==============================

    3.Bk Santiago의 답변을 약간 수정 한 버전은 collect () 대신 reduce ()를 사용합니다. 매우 유사하지만 추가 수업이 필요하지 않습니다.

    Bk Santiago의 답변을 약간 수정 한 버전은 collect () 대신 reduce ()를 사용합니다. 매우 유사하지만 추가 수업이 필요하지 않습니다.

    자바:

    body.reduce(new InputStream() {
        public int read() { return -1; }
      }, (s: InputStream, d: DataBuffer) -> new SequenceInputStream(s, d.asInputStream())
    ).flatMap(inputStream -> /* do something with single InputStream */
    

    또는 코 틀린 :

    body.reduce(object : InputStream() {
      override fun read() = -1
    }) { s: InputStream, d -> SequenceInputStream(s, d.asInputStream()) }
      .flatMap { inputStream -> /* do something with single InputStream */ }
    

    collect ()를 사용하는 것보다이 접근법의 이점은 일을 수집하기 위해 다른 클래스를 가질 필요가 없다는 것입니다.

    새로운 빈 InputStream ()을 만들었지 만 구문이 혼란 스럽다면 대신 ByteArrayInputStream ( "". toByteArray ())으로 대체하여 대신 빈 값 ByteArrayInputStream을 초기 값으로 만들 수 있습니다.

  4. from https://stackoverflow.com/questions/46460599/how-to-correctly-read-fluxdatabuffer-and-convert-it-to-a-single-inputstream by cc-by-sa and MIT license