복붙노트

[PYTHON] 여러 프로세스에서 단일 파일 처리

PYTHON

여러 프로세스에서 단일 파일 처리

하나의 큰 텍스트 파일에 각 행을 처리하고 (일부 작업 수행) 데이터베이스에 저장합니다. 단일 프로그램이 너무 오래 걸리므로 여러 프로세스 나 스레드를 통해 수행되기를 바랍니다. 각 스레드 / 프로세스는 단일 파일에서 다른 데이터 (다른 라인)를 읽고 데이터 조각 (line)에 몇 가지 작업을 수행하여 결국 데이터베이스에 저장하므로 결국 전체 데이터가 처리되고 데이터베이스가 필요한 데이터와 함께 덤프됩니다.

그러나 나는 이것에 접근하는 방법을 이해할 수 없다.

해결법

  1. ==============================

    1.당신이 찾고있는 것은 생산자 / 소비자 패턴입니다.

    당신이 찾고있는 것은 생산자 / 소비자 패턴입니다.

    기본 스레딩 예제

    다음은 다중 처리 대신 스레딩 모듈을 사용하는 기본 예제입니다.

    import threading
    import Queue
    import sys
    
    def do_work(in_queue, out_queue):
        while True:
            item = in_queue.get()
            # process
            result = item
            out_queue.put(result)
            in_queue.task_done()
    
    if __name__ == "__main__":
        work = Queue.Queue()
        results = Queue.Queue()
        total = 20
    
        # start for workers
        for i in xrange(4):
            t = threading.Thread(target=do_work, args=(work, results))
            t.daemon = True
            t.start()
    
        # produce data
        for i in xrange(total):
            work.put(i)
    
        work.join()
    
        # get the results
        for i in xrange(total):
            print results.get()
    
        sys.exit()
    

    스레드와 파일 객체를 공유하지 않을 것입니다. 대기열에 데이터 행을 제공하여 작업을 수행 할 수 있습니다. 그런 다음 각 스레드는 줄을 선택하여 처리 한 다음 대기열에서 반환합니다.

    목록 및 특별한 종류의 대기열과 같은 데이터를 공유하기 위해 다중 처리 모듈에 내장 된 몇 가지 고급 기능이 있습니다. 멀티 프로세싱과 스레드를 사용하는 것에는 상반 관계가 있으며 작업이 CPU 바인딩인지 IO 바인딩인지에 따라 다릅니다.

    기본 다중 처리 .Pool 예제

    다음은 다중 처리 풀의 기본적인 예입니다.

    from multiprocessing import Pool
    
    def process_line(line):
        return "FOO: %s" % line
    
    if __name__ == "__main__":
        pool = Pool(4)
        with open('file.txt') as source_file:
            # chunk the work into batches of 4 lines at a time
            results = pool.map(process_line, source_file, 4)
    
        print results
    

    풀은 자체 프로세스를 관리하는 편리한 개체입니다. 열린 파일이 그 행을 반복 할 수 있기 때문에 풀 파일을 풀.map ()에 넘겨 줄 수 있습니다.이 풀은 루프를 반복하여 작업자 함수에 전달합니다. 블록을 매핑하고 완료되면 전체 결과를 반환합니다. 이것은 지나치게 단순화 된 예제이며 pool.map ()은 전체 파일을 한번에 메모리로 읽어 들이기 전에 한 번에 작업을 수행합니다. 대용량 파일이있을 것으로 예상되는 경우이를 염두에 두십시오. 제작자 / 소비자 설정을 디자인하는 고급 방법이 있습니다.

    제한 및 줄 재 분류 기능이있는 수동 "풀"

    이것은 Pool.map의 수동 예제이지만 한 번에 전체 iterable을 사용하는 대신 큐 크기를 설정하여 처리 할 수있는 한 빨리 조각 단위로만 공급할 수 있습니다. 또한 줄 번호를 추가하여 나중에 추적하고 원하는 경우 참조 할 수 있습니다.

    from multiprocessing import Process, Manager
    import time
    import itertools 
    
    def do_work(in_queue, out_list):
        while True:
            item = in_queue.get()
            line_no, line = item
    
            # exit signal 
            if line == None:
                return
    
            # fake work
            time.sleep(.5)
            result = (line_no, line)
    
            out_list.append(result)
    
    
    if __name__ == "__main__":
        num_workers = 4
    
        manager = Manager()
        results = manager.list()
        work = manager.Queue(num_workers)
    
        # start for workers    
        pool = []
        for i in xrange(num_workers):
            p = Process(target=do_work, args=(work, results))
            p.start()
            pool.append(p)
    
        # produce data
        with open("source.txt") as f:
            iters = itertools.chain(f, (None,)*num_workers)
            for num_and_line in enumerate(iters):
                work.put(num_and_line)
    
        for p in pool:
            p.join()
    
        # get the results
        # example:  [(1, "foo"), (10, "bar"), (0, "start")]
        print sorted(results)
    
  2. ==============================

    2.여기 정말 바보 같은 예를 들어 보았습니다.

    여기 정말 바보 같은 예를 들어 보았습니다.

    import os.path
    import multiprocessing
    
    def newlinebefore(f,n):
        f.seek(n)
        c=f.read(1)
        while c!='\n' and n > 0:
            n-=1
            f.seek(n)
            c=f.read(1)
    
        f.seek(n)
        return n
    
    filename='gpdata.dat'  #your filename goes here.
    fsize=os.path.getsize(filename) #size of file (in bytes)
    
    #break the file into 20 chunks for processing.
    nchunks=20
    initial_chunks=range(1,fsize,fsize/nchunks)
    
    #You could also do something like:
    #initial_chunks=range(1,fsize,max_chunk_size_in_bytes) #this should work too.
    
    
    with open(filename,'r') as f:
        start_byte=sorted(set([newlinebefore(f,i) for i in initial_chunks]))
    
    end_byte=[i-1 for i in start_byte] [1:] + [None]
    
    def process_piece(filename,start,end):
        with open(filename,'r') as f:
            f.seek(start+1)
            if(end is None):
                text=f.read()
            else: 
                nbytes=end-start+1
                text=f.read(nbytes)
    
        # process text here. createing some object to be returned
        # You could wrap text into a StringIO object if you want to be able to
        # read from it the way you would a file.
    
        returnobj=text
        return returnobj
    
    def wrapper(args):
        return process_piece(*args)
    
    filename_repeated=[filename]*len(start_byte)
    args=zip(filename_repeated,start_byte,end_byte)
    
    pool=multiprocessing.Pool(4)
    result=pool.map(wrapper,args)
    
    #Now take your results and write them to the database.
    print "".join(result)  #I just print it to make sure I get my file back ...
    

    여기 까다로운 부분은 파일을 줄 바꿈 문자로 분리하여 줄을 놓치지 않거나 부분 줄만 읽도록하는 것입니다. 그런 다음 각 프로세스는 파일의 일부를 읽고 주 스레드가 데이터베이스에 넣을 수있는 개체를 반환합니다. 물론이 부분을 청크로 처리해야 할 수도 있으므로 모든 정보를 한 번에 메모리에 보관할 필요가 없습니다. (이것은 매우 쉽게 수행됩니다 - "args"리스트를 X 청크로 분할하고 pool.map (wrapper, chunk)를 호출하십시오 - 여기를보십시오)

  3. ==============================

    3.단일 큰 파일을 여러 개의 작은 파일로 나누고 각 파일을 별도의 스레드로 처리하도록하십시오.

    단일 큰 파일을 여러 개의 작은 파일로 나누고 각 파일을 별도의 스레드로 처리하도록하십시오.

  4. from https://stackoverflow.com/questions/11196367/processing-single-file-from-multiple-processes by cc-by-sa and MIT license