[PYTHON] 파이썬에서 다중 처리 대기열
PYTHON파이썬에서 다중 처리 대기열
파이썬에서 멀티 프로세싱 라이브러리와 함께 큐를 사용하려고합니다. 아래의 코드를 실행하면 (print 문이 작동 함) Queue에 대한 join을 호출 한 후에 프로세스가 종료되지 않고 여전히 살아있다. 나머지 프로세스는 어떻게 종료합니까?
감사!
def MultiprocessTest(self):
print "Starting multiprocess."
print "Number of CPUs",multiprocessing.cpu_count()
num_procs = 4
def do_work(message):
print "work",message ,"completed"
def worker():
while True:
item = q.get()
do_work(item)
q.task_done()
q = multiprocessing.JoinableQueue()
for i in range(num_procs):
p = multiprocessing.Process(target=worker)
p.daemon = True
p.start()
source = ['hi','there','how','are','you','doing']
for item in source:
q.put(item)
print "q close"
q.join()
#q.close()
print "Finished everything...."
print "num active children:",multiprocessing.active_children()
해결법
-
==============================
1.이 시도:
이 시도:
import multiprocessing num_procs = 4 def do_work(message): print "work",message ,"completed" def worker(): for item in iter( q.get, None ): do_work(item) q.task_done() q.task_done() q = multiprocessing.JoinableQueue() procs = [] for i in range(num_procs): procs.append( multiprocessing.Process(target=worker) ) procs[-1].daemon = True procs[-1].start() source = ['hi','there','how','are','you','doing'] for item in source: q.put(item) q.join() for p in procs: q.put( None ) q.join() for p in procs: p.join() print "Finished everything...." print "num active children:", multiprocessing.active_children()
-
==============================
2.당신의 노동자들은 종말을 지키기 위해 감시병이 필요합니다. 그렇지 않으면 그들은 막연한 읽기에 앉아있을 것입니다. P에 대한 조인 대신 Q에서 절전 모드를 사용하면 상태 정보 등을 표시 할 수 있습니다. 선호하는 템플릿은 다음과 같습니다.
당신의 노동자들은 종말을 지키기 위해 감시병이 필요합니다. 그렇지 않으면 그들은 막연한 읽기에 앉아있을 것입니다. P에 대한 조인 대신 Q에서 절전 모드를 사용하면 상태 정보 등을 표시 할 수 있습니다. 선호하는 템플릿은 다음과 같습니다.
def worker(q,nameStr): print 'Worker %s started' %nameStr while True: item = q.get() if item is None: # detect sentinel break print '%s processed %s' % (nameStr,item) # do something useful q.task_done() print 'Worker %s Finished' % nameStr q.task_done() q = multiprocessing.JoinableQueue() procs = [] for i in range(num_procs): nameStr = 'Worker_'+str(i) p = multiprocessing.Process(target=worker, args=(q,nameStr)) p.daemon = True p.start() procs.append(p) source = ['hi','there','how','are','you','doing'] for item in source: q.put(item) for i in range(num_procs): q.put(None) # send termination sentinel, one for each process while not q.empty(): # wait for processing to finish sleep(1) # manage timeouts and status updates etc.
-
==============================
3.프로세스에 참여하기 전에 대기열을 지워야하지만 q.empty ()는 신뢰할 수 없습니다.
프로세스에 참여하기 전에 대기열을 지워야하지만 q.empty ()는 신뢰할 수 없습니다.
대기열을 지우는 가장 좋은 방법은 신뢰할 수있는 네트워크가있는 소켓처럼 센티널 값을받을 때까지 성공적인 도착 또는 루프 수를 계산하는 것입니다.
-
==============================
4.아래의 코드는 그다지 관련성이 없을 수도 있지만 귀하의 의견 / 피드백을 위해 게시하여 함께 배울 수 있습니다. 고맙습니다!
아래의 코드는 그다지 관련성이 없을 수도 있지만 귀하의 의견 / 피드백을 위해 게시하여 함께 배울 수 있습니다. 고맙습니다!
import multiprocessing def boss(q,nameStr): source = range(1024) for item in source: q.put(nameStr+' '+str(item)) q.put(None) # send termination sentinel, one for each process def worker(q,nameStr): while True: item = q.get() if item is None: # detect sentinel break print '%s processed %s' % (nameStr,item) # do something useful q = multiprocessing.Queue() procs = [] num_procs = 4 for i in range(num_procs): nameStr = 'ID_'+str(i) p = multiprocessing.Process(target=worker, args=(q,nameStr)) procs.append(p) p = multiprocessing.Process(target=boss, args=(q,nameStr)) procs.append(p) for j in procs: j.start() for j in procs: j.join()
-
==============================
5.JoinableQueue에 많은 작업을 넣은 다음 작업을 소비하는 작업자 프로세스를 시작하고 대기열을 읽은 후 종료하는 상대적으로 단순한 경우에 대한 무신론적인 방법입니다. 트릭은 get () 대신 JoinableQueue.get_nowait ()를 사용하는 것입니다. 이름에서 알 수 있듯이 get_nowait ()은 비 블로킹 방식으로 큐에서 값을 가져 오려고 시도하지만 아무 것도 얻지 못하면 queue.Empty 예외가 발생합니다. 작업자가 종료하여이 예외를 처리합니다.
JoinableQueue에 많은 작업을 넣은 다음 작업을 소비하는 작업자 프로세스를 시작하고 대기열을 읽은 후 종료하는 상대적으로 단순한 경우에 대한 무신론적인 방법입니다. 트릭은 get () 대신 JoinableQueue.get_nowait ()를 사용하는 것입니다. 이름에서 알 수 있듯이 get_nowait ()은 비 블로킹 방식으로 큐에서 값을 가져 오려고 시도하지만 아무 것도 얻지 못하면 queue.Empty 예외가 발생합니다. 작업자가 종료하여이 예외를 처리합니다.
원칙을 설명하기위한 초보자 용 코드 :
import multiprocessing as mp from queue import Empty def worker(q): while True: try: work = q.get_nowait() # ... do something with `work` q.task_done() except Empty: break # completely done # main worknum = 4 jq = mp.JoinableQueue() # fill up the task queue # let's assume `tasks` contains some sort of data # that your workers know how to process for task in tasks: jq.put(task) procs = [ mp.Process(target=worker, args=(jq,)) for _ in range(worknum) ] for p in procs: p.start() for p in procs: p.join()
장점은 "poison pills"를 큐에 넣을 필요가 없으므로 코드가 조금 짧아집니다.
중요 : 생산자와 소비자가 동일한 대기열을 "인터리브 방식"으로 사용하고 작업자가 새로운 작업을 기다려야하는보다 복잡한 상황에서는 "독극물"접근 방식을 사용해야합니다. 위의 제안은 작업 큐가 비어 있으면 더 이상 주위에 매달리는 지점이 없다는 것을 "알 수있는"간단한 경우입니다.
from https://stackoverflow.com/questions/6672525/multiprocessing-queue-in-python by cc-by-sa and MIT license
'PYTHON' 카테고리의 다른 글
[PYTHON] 3 세트 이상의 비례 다이어그램 (0) | 2018.10.18 |
---|---|
[PYTHON] 파이썬을 사용하여 명령을 실행하기 위해 원격 Windows 컴퓨터에 연결하는 방법은 무엇입니까? (0) | 2018.10.18 |
[PYTHON] BottlePy 템플릿에 자바 스크립트 또는 CSS 파일을로드하는 방법은 무엇입니까? (0) | 2018.10.18 |
[PYTHON] .txt 파일에서 가장 빈번한 단어를 찾는 Python 프로그램, 단어와 그 수를 인쇄해야 함 (0) | 2018.10.18 |
[PYTHON] .txt 파일에서 가장 빈번한 단어를 찾는 Python 프로그램, 단어와 그 수를 인쇄해야 함 (0) | 2018.10.18 |