복붙노트

[SPRING] 스프링 배치 프로세서

SPRING

스프링 배치 프로세서

Spring Batch에는 정렬 된 순서로 수천 개의 레코드가있는 파일이 있습니다. 주요 필드는 제품 코드입니다. 파일에 동일한 제품 코드의 여러 레코드가있을 수 있습니다. 요구 사항은 동일한 레코드를 그룹화해야한다는 것입니다 제품 코드를 수집 (예 : List) 한 다음 메소드 (예 : validateProductCodes (List prodCodeList))로 보냅니다. 나는 이것을 할 수있는 최선의 방법을 찾고있다. 내가 생각한 접근법은 프로세서의 모든 레코드를 읽은 다음 컬렉션을 작성하는 것이었다. 프로세서의 동일한 지점에있는 제품 코드가 레코드의 제품 코드와 다른 경우 productCode 그룹화가 완료되고 validateProductCodes ()가 동일한 제품 코드가있는 해당 레코드 그룹에 대해 호출 될 수 있습니다. 또한 Step.So를 사용하고 있습니다. 의미는 자동으로 프로세스가 다중 스레드임을 의미합니까? 의미 동일한 productCode를 가진 레코드 그룹은 다중 스레드 방식으로 처리됩니다.

감사

해결법

  1. ==============================

    1.질문에는 두 가지 질문이 있습니다. 첫째, 항목을 함께 그룹화하는 방법과 두 번째로 처리하는 방법을 알고 싶습니다.

    질문에는 두 가지 질문이 있습니다. 첫째, 항목을 함께 그룹화하는 방법과 두 번째로 처리하는 방법을 알고 싶습니다.

    그들을 그룹화하기 위해 Luca가 제안한 것과 같은 그룹 리더를 만들 수 있습니다 :

    public class GroupReader<I> implements ItemReader<List<I>>{
      private SingleItemPeekableItemReader<I> reader;
      private ItemReader<I> peekReaderDelegate;
    
      public void setReader(ItemReader<I> reader) {
          peekReaderDelegate = reader;
      }
    
      @Override
      public void afterPropertiesSet() throws Exception {
        Assert.notNull(peekReaderDelegate, "The 'itemReader' may not be null");
        this.reader= new SingleItemPeekableItemReader<I>();
        this.reader.setDelegate(delegateReader);
      }
    
      @Override
      public List<I> read() throws Exception {
        State state = State.NEW;
        List<I> group = null;
        I item = null;
    
        while (state != State.COMPLETE) {
          item = reader.read();
    
          switch (state) {
            case NEW: {
              if (item == null) {
                // end reached
                state = State.COMPLETE;
                break;
              }
    
              group = new ArrayList<I>();
              group.add(item);
              state = State.READING;
              I nextItem = reader.peek();
              if (isItAKeyChange(item, nextItem)) {
                state = State.COMPLETE;
              }
              break;
            }
            case READING: {
              group.add(item);
    
              // peek and check if there the peeked entry has a new date
              I nextItem = peekEntry();
              if (isItAKeyChange(item, nextItem)) {
                state = State.COMPLETE;
              }
              break;
            }
            default: {
              throw new org.springframework.expression.ParseException(groupCounter, "ParsingError: Reader is in an invalid state");
            }
          }
        }
    
        return group;
      }
    }
    

    모든 키에 대해이 리더는이 키와 일치하는 모든 요소가 포함 된 목록을 반환합니다. 따라서 그룹화는 독자가 직접 수행합니다. 당신이 설명했듯이 프로세서로는 그렇게 할 수 없습니다.

    멀티 스레딩에 대한 두 번째 질문. 이제 단계를 사용한다고해서 반드시 단계가 여러 스레드로 처리된다는 것을 의미하지는 않습니다.

    이를 위해서는 AsyncTaskExecutor를 설정해야하며 스로틀 제한을 설정해야합니다.

    하지만 그렇게한다면 독자는 스레드 안전해야합니다. 그렇지 않으면 그룹화가 작동하지 않습니다. 위의 read 메소드를 단순히 synchronized로 정의하면됩니다.

    또 다른 방법은이 질문에 제안 된 것처럼 작은 Synchronized Wrapper Reader를 작성하는 것입니다. 병렬 처리 Spring Batch StaxEventItemReader

    쓰고있는 대상에 따라 작성자를 동기화하고, 필요한 경우 결과를 재정렬해야 할 수도 있습니다.

  2. from https://stackoverflow.com/questions/31930775/spring-batch-processor by cc-by-sa and MIT license