[SPRING] 스프링 배치 멀티 스레드 파일 읽기
Spring 배치에서 CSV 파일을 읽으려고하고 각 행을 별도의 스레드에 할당하고 처리하려고합니다. 작업 실행기를 사용하여이 작업을 수행하려고 시도했지만 작업 매개 변수를 사용하여 파일 이름을 가져 오지 않으면 작업 중입니다. scope = "step"이래로 작업 매개 변수를 통과하면 모든 스레드가 파일에서 동일한 행을 읽습니다. 내가 범위를 변경하면 해결 될지 여부 = "직업"예라면 길을 제안하십시오. 현재 아래와 같은 오류가 발생합니다.
친절하게 도와주세요 ...
아래 Job.xml 찾기
<job id="partitionJob" xmlns="http://www.springframework.org/schema/batch" restartable="true">
<step id="step" allow-start-if-complete="true">
<partition step="step2" partitioner="partitioner">
<handler grid-size="3" task-executor="taskExecutor" />
<bean id="partitioner" class="com.range.part.RangePartitioner">
<bean id="taskExecutor" class="org.springframework.core.task.SimpleAsyncTaskExecutor" />
<step id="step2" xmlns="http://www.springframework.org/schema/batch">
<tasklet transaction-manager="transactionManager">
<chunk reader="itemReader" writer="cutomitemWriter" processor="itemProcessor" commit-interval="100" />
<bean id="itemProcessor" class="com.range.processor.UserProcessor" scope="step">
<property name="threadName" value="#{stepExecutionContext[name]}"/>
<bean id="itemReader" class="org.springframework.batch.item.file.FlatFileItemReader" scope="job">
<property name="resource" value="file:#{jobParameters[file]}">
<!-- <property name="linesToSkip" value="1"/> -->
<property name="lineMapper">
<bean class="org.springframework.batch.item.file.mapping.DefaultLineMapper">
<property name="lineTokenizer">
<bean class="org.springframework.batch.item.file.transform.DelimitedLineTokenizer">
<property name="delimiter" value="," />
<!-- <property name="names" value="transactionBranch,batchEntryDate,batchNo,channelID,CountryCode" />-->
<property name="fieldSetMapper">
<bean class="com.fieldset.FieldsetMapper">
<bean id="cutomitemWriter" class="com.range.processor.customitemWritter">
1.Partitioner를 사용할 수있는 방법을 생각하고 있습니다. 파 티셔 터 수준에서 CSV 판독기 나 Spring Reader를 사용하여 파일을 읽은 다음 각 행을 처리 할 수 있습니다.
모든 라인이 파티셔 큐 (Map)에 추가되어 요구 사항을 충족시킵니다.
참조 용으로 여기에 코드를 게시했습니다.
공용 클래스 LinePartitioner는 Partitioner {
@Value("#{jobParameters['fileName']}") private String fileName; Map<String, ExecutionContext> queue = new HashMap<>(); @Override public Map<String, ExecutionContext> partition(int gridSize) { BufferedReader reader = new BufferedReader(new FileReader(this.fileName)); List<String> lines = new ArrayList<>(); int count = 0; while ((line = reader.readLine()) != null) { ExecutionContext value = new ExecutionContext(); value.put("lineContent", line); value.put("lineCount", count+1); queue.put(++count, value); } return queue; }
위의 코드와 같이 CSV 리더 또는 Spring Reader로 Reader를 대체하여 Pojo 객체로 매핑 필드를 단순화 할 수 있습니다.
전체 프로그램이 필요한 경우 알려 주시면 쓰고 업로드 해 드리겠습니다.
고마워, Nghia
- Reader 용 1000 개 항목 판독기가있는 Partitioner를 작성하는 예제로 업데이트
@Override public Map<String, ExecutionContext> partition(int gridSize) { try { Map<String, ExecutionContext> queue = new HashMap<>(); List<List<String>> trunks = new ArrayList<>(); // read and store data to a list of trunk int chunkSize = 1000; int count = 1; try (BufferedReader br = new BufferedReader(new FileReader("your file"))) { String line; List items = null; while ((line = br.readLine()) != null) { if (count % chunkSize == 0) { items = new ArrayList(); trunks.add(items); } items.add(line); } } // add to queue to start prorcessing for (int i=0; i<trunks.size(); i++) { ExecutionContext value = new ExecutionContext(); value.put("items", trunks.get(i)); queue.put("trunk"+i, value); } return queue; } catch (Exception e) { // handle exception } }
2.큰 CSV 파일 (200,000 줄 등)을 DB로 가져 와서 DB에서 JSON 파일 (FileReader 및 FileWriter에 No-Thread Safe가 있음)로 내보내는 멀티 스레딩 작업을 통해이 예제 (Github)를 볼 수 있습니다.
<batch:job id="transformJob"> <batch:step id="deleteDir" next="cleanDB"> <batch:tasklet ref="fileDeletingTasklet" /> </batch:step> <batch:step id="cleanDB" next="countThread"> <batch:tasklet ref="cleanDBTasklet" /> </batch:step> <batch:step id="countThread" next="split"> <batch:tasklet ref="countThreadTasklet" /> </batch:step> <batch:step id="split" next="partitionerMasterImporter"> <batch:tasklet> <batch:chunk reader="largeCSVReader" writer="smallCSVWriter" commit-interval="#{jobExecutionContext['chunk.count']}" /> </batch:tasklet> </batch:step> <batch:step id="partitionerMasterImporter" next="partitionerMasterExporter"> <partition step="importChunked" partitioner="filePartitioner"> <handler grid-size="10" task-executor="taskExecutor" /> </partition> </batch:step> <batch:step id="partitionerMasterExporter" next="concat"> <partition step="exportChunked" partitioner="dbPartitioner"> <handler grid-size="10" task-executor="taskExecutor" /> </partition> </batch:step> <batch:step id="concat"> <batch:tasklet ref="concatFileTasklet" /> </batch:step> </batch:job> <batch:step id="importChunked"> <batch:tasklet> <batch:chunk reader="smallCSVFileReader" writer="dbWriter" processor="importProcessor" commit-interval="500"> </batch:chunk> </batch:tasklet> </batch:step> <batch:step id="exportChunked"> <batch:tasklet> <batch:chunk reader="dbReader" writer="jsonFileWriter" processor="exportProcessor" commit-interval="#{jobExecutionContext['chunk.count']}"> </batch:chunk> </batch:tasklet> </batch:step> <bean id="jsonFileWriter" class="com.batch.writer.PersonWriterToFile" scope="step"> <property name="outputPath" value="csv/chunked/paged-#{stepExecutionContext[page]}.json" /> </bean> <bean id="dbReader" class="com.batch.reader.PersonReaderFromDataBase" scope="step"> <property name="iPersonRepository" ref="IPersonRepository" /> <property name="page" value="#{stepExecutionContext[page]}"/> <property name="size" value="#{stepExecutionContext[size]}"/> </bean> <bean id="countThreadTasklet" class="com.batch.tasklet.CountingTasklet" scope="step"> <property name="input" value="file:csv/input/#{jobParameters[filename]}" /> </bean> <bean id="cleanDBTasklet" class="com.batch.tasklet.CleanDBTasklet" /> <bean id="fileDeletingTasklet" class="com.batch.tasklet.FileDeletingTasklet"> <property name="directory" value="file:csv/chunked/" /> </bean> <bean id="concatFileTasklet" class="com.batch.tasklet.FileConcatTasklet"> <property name="directory" value="file:csv/chunked/" /> <property name="outputFilename" value="csv/output/export.json" /> </bean> <bean id="filePartitioner" class="com.batch.partitioner.FilePartitioner"> <property name="outputPath" value="csv/chunked/" /> </bean> <bean id="dbPartitioner" class="com.batch.partitioner.DBPartitioner" scope="step"> <property name="pageSize" value="#{jobExecutionContext['chunk.count']}" /> </bean> <bean id="largeCSVReader" class="com.batch.reader.LineReaderFromFile" scope="step"> <property name="inputPath" value="csv/input/#{jobParameters[filename]}" /> </bean> <bean id="smallCSVWriter" class="com.batch.writer.LineWriterToFile" scope="step"> <property name="outputPath" value="csv/chunked/"></property> </bean> <bean id="smallCSVFileReader" class="com.batch.reader.PersonReaderFromFile" scope="step"> <constructor-arg value="csv/chunked/#{stepExecutionContext[file]}" /> </bean> <bean id="importProcessor" class="com.batch.processor.ImportPersonItemProcessor" /> <bean id="exportProcessor" class="com.batch.processor.ExportPersonItemProcessor" /> <bean id="dbWriter" class="com.batch.writer.PersonWriterToDataBase"> <property name="iPersonRepository" ref="IPersonRepository" /> </bean>
두 경우 모두 partionner를 사용하여 10 개의 파일 (스레드 당 하나의 파일)을 가져오고 10 개의 파일 (스레드 당 하나의 파일)로 내보내고 모두 연결하여 하나의 파일을 만듭니다.
희망이 도움이됩니다.
