[PYTHON] 여러 프로세스에서 단일 파일 처리
PYTHON여러 프로세스에서 단일 파일 처리
하나의 큰 텍스트 파일에 각 행을 처리하고 (일부 작업 수행) 데이터베이스에 저장합니다. 단일 프로그램이 너무 오래 걸리므로 여러 프로세스 나 스레드를 통해 수행되기를 바랍니다. 각 스레드 / 프로세스는 단일 파일에서 다른 데이터 (다른 라인)를 읽고 데이터 조각 (line)에 몇 가지 작업을 수행하여 결국 데이터베이스에 저장하므로 결국 전체 데이터가 처리되고 데이터베이스가 필요한 데이터와 함께 덤프됩니다.
그러나 나는 이것에 접근하는 방법을 이해할 수 없다.
해결법
-
==============================
1.당신이 찾고있는 것은 생산자 / 소비자 패턴입니다.
당신이 찾고있는 것은 생산자 / 소비자 패턴입니다.
기본 스레딩 예제
다음은 다중 처리 대신 스레딩 모듈을 사용하는 기본 예제입니다.
import threading import Queue import sys def do_work(in_queue, out_queue): while True: item = in_queue.get() # process result = item out_queue.put(result) in_queue.task_done() if __name__ == "__main__": work = Queue.Queue() results = Queue.Queue() total = 20 # start for workers for i in xrange(4): t = threading.Thread(target=do_work, args=(work, results)) t.daemon = True t.start() # produce data for i in xrange(total): work.put(i) work.join() # get the results for i in xrange(total): print results.get() sys.exit()
스레드와 파일 객체를 공유하지 않을 것입니다. 대기열에 데이터 행을 제공하여 작업을 수행 할 수 있습니다. 그런 다음 각 스레드는 줄을 선택하여 처리 한 다음 대기열에서 반환합니다.
목록 및 특별한 종류의 대기열과 같은 데이터를 공유하기 위해 다중 처리 모듈에 내장 된 몇 가지 고급 기능이 있습니다. 멀티 프로세싱과 스레드를 사용하는 것에는 상반 관계가 있으며 작업이 CPU 바인딩인지 IO 바인딩인지에 따라 다릅니다.
기본 다중 처리 .Pool 예제
다음은 다중 처리 풀의 기본적인 예입니다.
from multiprocessing import Pool def process_line(line): return "FOO: %s" % line if __name__ == "__main__": pool = Pool(4) with open('file.txt') as source_file: # chunk the work into batches of 4 lines at a time results = pool.map(process_line, source_file, 4) print results
풀은 자체 프로세스를 관리하는 편리한 개체입니다. 열린 파일이 그 행을 반복 할 수 있기 때문에 풀 파일을 풀.map ()에 넘겨 줄 수 있습니다.이 풀은 루프를 반복하여 작업자 함수에 전달합니다. 블록을 매핑하고 완료되면 전체 결과를 반환합니다. 이것은 지나치게 단순화 된 예제이며 pool.map ()은 전체 파일을 한번에 메모리로 읽어 들이기 전에 한 번에 작업을 수행합니다. 대용량 파일이있을 것으로 예상되는 경우이를 염두에 두십시오. 제작자 / 소비자 설정을 디자인하는 고급 방법이 있습니다.
제한 및 줄 재 분류 기능이있는 수동 "풀"
이것은 Pool.map의 수동 예제이지만 한 번에 전체 iterable을 사용하는 대신 큐 크기를 설정하여 처리 할 수있는 한 빨리 조각 단위로만 공급할 수 있습니다. 또한 줄 번호를 추가하여 나중에 추적하고 원하는 경우 참조 할 수 있습니다.
from multiprocessing import Process, Manager import time import itertools def do_work(in_queue, out_list): while True: item = in_queue.get() line_no, line = item # exit signal if line == None: return # fake work time.sleep(.5) result = (line_no, line) out_list.append(result) if __name__ == "__main__": num_workers = 4 manager = Manager() results = manager.list() work = manager.Queue(num_workers) # start for workers pool = [] for i in xrange(num_workers): p = Process(target=do_work, args=(work, results)) p.start() pool.append(p) # produce data with open("source.txt") as f: iters = itertools.chain(f, (None,)*num_workers) for num_and_line in enumerate(iters): work.put(num_and_line) for p in pool: p.join() # get the results # example: [(1, "foo"), (10, "bar"), (0, "start")] print sorted(results)
-
==============================
2.여기 정말 바보 같은 예를 들어 보았습니다.
여기 정말 바보 같은 예를 들어 보았습니다.
import os.path import multiprocessing def newlinebefore(f,n): f.seek(n) c=f.read(1) while c!='\n' and n > 0: n-=1 f.seek(n) c=f.read(1) f.seek(n) return n filename='gpdata.dat' #your filename goes here. fsize=os.path.getsize(filename) #size of file (in bytes) #break the file into 20 chunks for processing. nchunks=20 initial_chunks=range(1,fsize,fsize/nchunks) #You could also do something like: #initial_chunks=range(1,fsize,max_chunk_size_in_bytes) #this should work too. with open(filename,'r') as f: start_byte=sorted(set([newlinebefore(f,i) for i in initial_chunks])) end_byte=[i-1 for i in start_byte] [1:] + [None] def process_piece(filename,start,end): with open(filename,'r') as f: f.seek(start+1) if(end is None): text=f.read() else: nbytes=end-start+1 text=f.read(nbytes) # process text here. createing some object to be returned # You could wrap text into a StringIO object if you want to be able to # read from it the way you would a file. returnobj=text return returnobj def wrapper(args): return process_piece(*args) filename_repeated=[filename]*len(start_byte) args=zip(filename_repeated,start_byte,end_byte) pool=multiprocessing.Pool(4) result=pool.map(wrapper,args) #Now take your results and write them to the database. print "".join(result) #I just print it to make sure I get my file back ...
여기 까다로운 부분은 파일을 줄 바꿈 문자로 분리하여 줄을 놓치지 않거나 부분 줄만 읽도록하는 것입니다. 그런 다음 각 프로세스는 파일의 일부를 읽고 주 스레드가 데이터베이스에 넣을 수있는 개체를 반환합니다. 물론이 부분을 청크로 처리해야 할 수도 있으므로 모든 정보를 한 번에 메모리에 보관할 필요가 없습니다. (이것은 매우 쉽게 수행됩니다 - "args"리스트를 X 청크로 분할하고 pool.map (wrapper, chunk)를 호출하십시오 - 여기를보십시오)
-
==============================
3.단일 큰 파일을 여러 개의 작은 파일로 나누고 각 파일을 별도의 스레드로 처리하도록하십시오.
단일 큰 파일을 여러 개의 작은 파일로 나누고 각 파일을 별도의 스레드로 처리하도록하십시오.
from https://stackoverflow.com/questions/11196367/processing-single-file-from-multiple-processes by cc-by-sa and MIT license
'PYTHON' 카테고리의 다른 글
[PYTHON] 팬더를 사용하여 그룹별로 시간차를 계산하는 방법은 무엇입니까? (0) | 2018.11.27 |
---|---|
[PYTHON] Google Cloud SQL에 대해 Django 관리 명령을 실행하는 방법 (0) | 2018.11.27 |
[PYTHON] 다중 처리 : 여러 프로세스간에 사전을 공유하려면 어떻게해야합니까? (0) | 2018.11.27 |
[PYTHON] 공분산 행렬에서 퇴화 된 행 / 열을 찾는 법 (0) | 2018.11.27 |
[PYTHON] Python으로 원격 PostgreSQL 데이터베이스에 연결하는 방법 (0) | 2018.11.27 |