복붙노트

[SPRING] 스프링 배치 멀티 스레드 파일 읽기

SPRING

스프링 배치 멀티 스레드 파일 읽기

Spring 배치에서 CSV 파일을 읽으려고하고 각 행을 별도의 스레드에 할당하고 처리하려고합니다. 작업 실행기를 사용하여이 작업을 수행하려고 시도했지만 작업 매개 변수를 사용하여 파일 이름을 가져 오지 않으면 작업 중입니다. scope = "step"이래로 작업 매개 변수를 통과하면 모든 스레드가 파일에서 동일한 행을 읽습니다. 내가 범위를 변경하면 해결 될지 여부 = "직업"예라면 길을 제안하십시오. 현재 아래와 같은 오류가 발생합니다.

친절하게 도와주세요 ...

아래 Job.xml 찾기

<job id="partitionJob" xmlns="http://www.springframework.org/schema/batch"        restartable="true">
    <step id="step" allow-start-if-complete="true">
        <partition step="step2" partitioner="partitioner">
            <handler grid-size="3" task-executor="taskExecutor" />
        </partition>
    </step>
</job>

    <bean id="partitioner" class="com.range.part.RangePartitioner">
</bean>

<bean id="taskExecutor" class="org.springframework.core.task.SimpleAsyncTaskExecutor" />

<step id="step2" xmlns="http://www.springframework.org/schema/batch">
    <tasklet transaction-manager="transactionManager">
        <chunk  reader="itemReader" writer="cutomitemWriter" processor="itemProcessor" commit-interval="100" />
    </tasklet>
</step>
<bean id="itemProcessor" class="com.range.processor.UserProcessor" scope="step">
<property name="threadName" value="#{stepExecutionContext[name]}"/>
</bean>

<bean id="itemReader" class="org.springframework.batch.item.file.FlatFileItemReader" scope="job">
 <property name="resource" value="file:#{jobParameters[file]}"> 
 </property>    
  <!-- <property name="linesToSkip" value="1"/> -->
<property name="lineMapper">
        <bean class="org.springframework.batch.item.file.mapping.DefaultLineMapper">
            <property name="lineTokenizer">
                <bean class="org.springframework.batch.item.file.transform.DelimitedLineTokenizer">
                    <property name="delimiter" value="," />
                    <!--  <property name="names" value="transactionBranch,batchEntryDate,batchNo,channelID,CountryCode" />-->
        </bean>
            </property>
            <property name="fieldSetMapper">
                <bean class="com.fieldset.FieldsetMapper">

                </bean>
            </property>
        </bean>
    </property>
    </bean>

<bean id="cutomitemWriter" class="com.range.processor.customitemWritter">
</bean>

해결법

  1. ==============================

    1.Partitioner를 사용할 수있는 방법을 생각하고 있습니다. 파 티셔 터 수준에서 CSV 판독기 나 Spring Reader를 사용하여 파일을 읽은 다음 각 행을 처리 할 수 ​​있습니다.

    Partitioner를 사용할 수있는 방법을 생각하고 있습니다. 파 티셔 터 수준에서 CSV 판독기 나 Spring Reader를 사용하여 파일을 읽은 다음 각 행을 처리 할 수 ​​있습니다.

    모든 라인이 파티셔 큐 (Map)에 추가되어 요구 사항을 충족시킵니다.

    참조 용으로 여기에 코드를 게시했습니다.

    공용 클래스 LinePartitioner는 Partitioner {

    @Value("#{jobParameters['fileName']}")
    private String fileName;
    
    Map<String, ExecutionContext> queue = new HashMap<>();
    
    @Override
    public Map<String, ExecutionContext> partition(int gridSize) {
    
        BufferedReader reader = new BufferedReader(new FileReader(this.fileName));
        List<String> lines = new ArrayList<>();
        int count = 0;
        while ((line = reader.readLine()) != null) {
    
            ExecutionContext value = new ExecutionContext();
            value.put("lineContent", line);
            value.put("lineCount", count+1);
    
            queue.put(++count, value);
        }
    
        return queue;
    }
    

    }

    위의 코드와 같이 CSV 리더 또는 Spring Reader로 Reader를 대체하여 Pojo 객체로 매핑 필드를 단순화 할 수 있습니다.

    전체 프로그램이 필요한 경우 알려 주시면 쓰고 업로드 해 드리겠습니다.

    고마워, Nghia

    - Reader 용 1000 개 항목 판독기가있는 Partitioner를 작성하는 예제로 업데이트

    @Override
        public Map<String, ExecutionContext> partition(int gridSize) {
            try {
                Map<String, ExecutionContext> queue = new HashMap<>();
    
                List<List<String>> trunks = new ArrayList<>();
    
                // read and store data to a list of trunk
                int chunkSize = 1000;
                int count = 1;
                try (BufferedReader br = new BufferedReader(new FileReader("your file"))) {
                    String line;
                    List items = null;
                    while ((line = br.readLine()) != null) {
                        if (count % chunkSize == 0) {
                            items = new ArrayList();
                            trunks.add(items);
                        }
    
                        items.add(line);
                    }
                }
    
                // add to queue to start prorcessing
                for (int i=0; i<trunks.size(); i++) {
                    ExecutionContext value = new ExecutionContext();
                    value.put("items", trunks.get(i));
                    queue.put("trunk"+i, value);
                }
    
                return queue;
            }
    
            catch (Exception e) {
                // handle exception
            }
    }
    
  2. ==============================

    2.큰 CSV 파일 (200,000 줄 등)을 DB로 가져 와서 DB에서 JSON 파일 (FileReader 및 FileWriter에 No-Thread Safe가 있음)로 내보내는 멀티 스레딩 작업을 통해이 예제 (Github)를 볼 수 있습니다.

    큰 CSV 파일 (200,000 줄 등)을 DB로 가져 와서 DB에서 JSON 파일 (FileReader 및 FileWriter에 No-Thread Safe가 있음)로 내보내는 멀티 스레딩 작업을 통해이 예제 (Github)를 볼 수 있습니다.

    <batch:job id="transformJob">
        <batch:step id="deleteDir" next="cleanDB">
            <batch:tasklet ref="fileDeletingTasklet" />
        </batch:step>
        <batch:step id="cleanDB" next="countThread">
            <batch:tasklet ref="cleanDBTasklet" />
        </batch:step>
        <batch:step id="countThread" next="split">
            <batch:tasklet ref="countThreadTasklet" />
        </batch:step>
        <batch:step id="split" next="partitionerMasterImporter">
            <batch:tasklet>
                <batch:chunk reader="largeCSVReader" writer="smallCSVWriter"
                    commit-interval="#{jobExecutionContext['chunk.count']}" />
            </batch:tasklet>
        </batch:step>
        <batch:step id="partitionerMasterImporter" next="partitionerMasterExporter">
            <partition step="importChunked" partitioner="filePartitioner">
                <handler grid-size="10" task-executor="taskExecutor" />
            </partition>
        </batch:step>
        <batch:step id="partitionerMasterExporter" next="concat">
            <partition step="exportChunked" partitioner="dbPartitioner">
                <handler grid-size="10" task-executor="taskExecutor" />
            </partition>
        </batch:step>
        <batch:step id="concat">
            <batch:tasklet ref="concatFileTasklet" />
        </batch:step>
    </batch:job>
    
    <batch:step id="importChunked">
        <batch:tasklet>
            <batch:chunk reader="smallCSVFileReader" writer="dbWriter"
                processor="importProcessor" commit-interval="500">
            </batch:chunk>
        </batch:tasklet>
    </batch:step>
    
    <batch:step id="exportChunked">
        <batch:tasklet>
            <batch:chunk reader="dbReader" writer="jsonFileWriter"
                processor="exportProcessor" commit-interval="#{jobExecutionContext['chunk.count']}">
            </batch:chunk>
        </batch:tasklet>
    </batch:step>
    
    <bean id="jsonFileWriter" class="com.batch.writer.PersonWriterToFile"
        scope="step">
        <property name="outputPath" value="csv/chunked/paged-#{stepExecutionContext[page]}.json" />
    </bean>
    
    <bean id="dbReader" class="com.batch.reader.PersonReaderFromDataBase" scope="step">
        <property name="iPersonRepository" ref="IPersonRepository" />
        <property name="page" value="#{stepExecutionContext[page]}"/>
        <property name="size" value="#{stepExecutionContext[size]}"/>
    </bean>
    
    <bean id="countThreadTasklet" class="com.batch.tasklet.CountingTasklet"
        scope="step">
        <property name="input" value="file:csv/input/#{jobParameters[filename]}" />
    </bean>
    
    <bean id="cleanDBTasklet" class="com.batch.tasklet.CleanDBTasklet" />
    
    <bean id="fileDeletingTasklet" class="com.batch.tasklet.FileDeletingTasklet">
        <property name="directory" value="file:csv/chunked/" />
    </bean>
    
    <bean id="concatFileTasklet" class="com.batch.tasklet.FileConcatTasklet">
        <property name="directory" value="file:csv/chunked/" />
        <property name="outputFilename" value="csv/output/export.json" />
    </bean>
    
    <bean id="filePartitioner" class="com.batch.partitioner.FilePartitioner">
        <property name="outputPath" value="csv/chunked/" />
    </bean>
    
    <bean id="dbPartitioner" class="com.batch.partitioner.DBPartitioner" scope="step">
        <property name="pageSize" value="#{jobExecutionContext['chunk.count']}" />
    </bean>
    
    <bean id="largeCSVReader" class="com.batch.reader.LineReaderFromFile"
        scope="step">
        <property name="inputPath" value="csv/input/#{jobParameters[filename]}" />
    </bean>
    
    <bean id="smallCSVWriter" class="com.batch.writer.LineWriterToFile"
        scope="step">
        <property name="outputPath" value="csv/chunked/"></property>
    </bean>
    
    <bean id="smallCSVFileReader" class="com.batch.reader.PersonReaderFromFile"
        scope="step">
        <constructor-arg value="csv/chunked/#{stepExecutionContext[file]}" />
    </bean>
    
    <bean id="importProcessor" class="com.batch.processor.ImportPersonItemProcessor" />
    
    <bean id="exportProcessor" class="com.batch.processor.ExportPersonItemProcessor" />
    
    <bean id="dbWriter" class="com.batch.writer.PersonWriterToDataBase">
        <property name="iPersonRepository" ref="IPersonRepository" />
    </bean>
    

    두 경우 모두 partionner를 사용하여 10 개의 파일 (스레드 당 하나의 파일)을 가져오고 10 개의 파일 (스레드 당 하나의 파일)로 내보내고 모두 연결하여 하나의 파일을 만듭니다.

    희망이 도움이됩니다.

  3. from https://stackoverflow.com/questions/39092968/spring-batch-multi-thread-file-reading by cc-by-sa and MIT license