복붙노트

[PYTHON] 파이썬에서 다중 처리 대기열

PYTHON

파이썬에서 다중 처리 대기열

파이썬에서 멀티 프로세싱 라이브러리와 함께 큐를 사용하려고합니다. 아래의 코드를 실행하면 (print 문이 작동 함) Queue에 대한 join을 호출 한 후에 프로세스가 종료되지 않고 여전히 살아있다. 나머지 프로세스는 어떻게 종료합니까?

감사!

def MultiprocessTest(self):
  print "Starting multiprocess."
  print "Number of CPUs",multiprocessing.cpu_count()

  num_procs = 4
  def do_work(message):
    print "work",message ,"completed"

  def worker():
    while True:
      item = q.get()
      do_work(item)
      q.task_done()

  q = multiprocessing.JoinableQueue()
  for i in range(num_procs):
    p = multiprocessing.Process(target=worker)
    p.daemon = True
    p.start()

  source = ['hi','there','how','are','you','doing']
  for item in source:
    q.put(item)
  print "q close"
  q.join()
  #q.close()
  print "Finished everything...."
  print "num active children:",multiprocessing.active_children()

해결법

  1. ==============================

    1.이 시도:

    이 시도:

    import multiprocessing
    
    num_procs = 4
    def do_work(message):
      print "work",message ,"completed"
    
    def worker():
      for item in iter( q.get, None ):
        do_work(item)
        q.task_done()
      q.task_done()
    
    q = multiprocessing.JoinableQueue()
    procs = []
    for i in range(num_procs):
      procs.append( multiprocessing.Process(target=worker) )
      procs[-1].daemon = True
      procs[-1].start()
    
    source = ['hi','there','how','are','you','doing']
    for item in source:
      q.put(item)
    
    q.join()
    
    for p in procs:
      q.put( None )
    
    q.join()
    
    for p in procs:
      p.join()
    
    print "Finished everything...."
    print "num active children:", multiprocessing.active_children()
    
  2. ==============================

    2.당신의 노동자들은 종말을 지키기 위해 감시병이 필요합니다. 그렇지 않으면 그들은 막연한 읽기에 앉아있을 것입니다. P에 대한 조인 대신 Q에서 절전 모드를 사용하면 상태 정보 등을 표시 할 수 있습니다. 선호하는 템플릿은 다음과 같습니다.

    당신의 노동자들은 종말을 지키기 위해 감시병이 필요합니다. 그렇지 않으면 그들은 막연한 읽기에 앉아있을 것입니다. P에 대한 조인 대신 Q에서 절전 모드를 사용하면 상태 정보 등을 표시 할 수 있습니다. 선호하는 템플릿은 다음과 같습니다.

    def worker(q,nameStr):
      print 'Worker %s started' %nameStr
      while True:
         item = q.get()
         if item is None: # detect sentinel
           break
         print '%s processed %s' % (nameStr,item) # do something useful
         q.task_done()
      print 'Worker %s Finished' % nameStr
      q.task_done()
    
    q = multiprocessing.JoinableQueue()
    procs = []
    for i in range(num_procs):
      nameStr = 'Worker_'+str(i)
      p = multiprocessing.Process(target=worker, args=(q,nameStr))
      p.daemon = True
      p.start()
      procs.append(p)
    
    source = ['hi','there','how','are','you','doing']
    for item in source:
      q.put(item)
    
    for i in range(num_procs):
      q.put(None) # send termination sentinel, one for each process
    
    while not q.empty(): # wait for processing to finish
      sleep(1)   # manage timeouts and status updates etc.
    
  3. ==============================

    3.프로세스에 참여하기 전에 대기열을 지워야하지만 q.empty ()는 신뢰할 수 없습니다.

    프로세스에 참여하기 전에 대기열을 지워야하지만 q.empty ()는 신뢰할 수 없습니다.

    대기열을 지우는 가장 좋은 방법은 신뢰할 수있는 네트워크가있는 소켓처럼 센티널 값을받을 때까지 성공적인 도착 또는 루프 수를 계산하는 것입니다.

  4. ==============================

    4.아래의 코드는 그다지 관련성이 없을 수도 있지만 귀하의 의견 / 피드백을 위해 게시하여 함께 배울 수 있습니다. 고맙습니다!

    아래의 코드는 그다지 관련성이 없을 수도 있지만 귀하의 의견 / 피드백을 위해 게시하여 함께 배울 수 있습니다. 고맙습니다!

    import multiprocessing
    
    def boss(q,nameStr):
      source = range(1024)
      for item in source:
        q.put(nameStr+' '+str(item))
      q.put(None) # send termination sentinel, one for each process
    
    def worker(q,nameStr):
      while True:
         item = q.get()
         if item is None: # detect sentinel
           break
         print '%s processed %s' % (nameStr,item) # do something useful
    
    q = multiprocessing.Queue()
    
    procs = []
    
    num_procs = 4
    for i in range(num_procs):
      nameStr = 'ID_'+str(i)
      p = multiprocessing.Process(target=worker, args=(q,nameStr))
      procs.append(p)
      p = multiprocessing.Process(target=boss,   args=(q,nameStr))
      procs.append(p)
    
    for j in procs:
      j.start()
    for j in procs:
      j.join()
    
  5. ==============================

    5.JoinableQueue에 많은 작업을 넣은 다음 작업을 소비하는 작업자 프로세스를 시작하고 대기열을 읽은 후 종료하는 상대적으로 단순한 경우에 대한 무신론적인 방법입니다. 트릭은 get () 대신 JoinableQueue.get_nowait ()를 사용하는 것입니다. 이름에서 알 수 있듯이 get_nowait ()은 비 블로킹 방식으로 큐에서 값을 가져 오려고 시도하지만 아무 것도 얻지 못하면 queue.Empty 예외가 발생합니다. 작업자가 종료하여이 예외를 처리합니다.

    JoinableQueue에 많은 작업을 넣은 다음 작업을 소비하는 작업자 프로세스를 시작하고 대기열을 읽은 후 종료하는 상대적으로 단순한 경우에 대한 무신론적인 방법입니다. 트릭은 get () 대신 JoinableQueue.get_nowait ()를 사용하는 것입니다. 이름에서 알 수 있듯이 get_nowait ()은 비 블로킹 방식으로 큐에서 값을 가져 오려고 시도하지만 아무 것도 얻지 못하면 queue.Empty 예외가 발생합니다. 작업자가 종료하여이 예외를 처리합니다.

    원칙을 설명하기위한 초보자 용 코드 :

    import multiprocessing as mp
    from queue import Empty
    
    def worker(q):
      while True:
        try:
          work = q.get_nowait()
          # ... do something with `work`
          q.task_done()
        except Empty:
          break # completely done
    
    # main
    worknum = 4
    jq = mp.JoinableQueue()
    
    # fill up the task queue
    # let's assume `tasks` contains some sort of data
    # that your workers know how to process
    for task in tasks:
      jq.put(task)
    
    procs = [ mp.Process(target=worker, args=(jq,)) for _ in range(worknum) ]
    for p in procs:
      p.start()
    
    for p in procs:
      p.join()
    

    장점은 "poison pills"를 큐에 넣을 필요가 없으므로 코드가 조금 짧아집니다.

    중요 : 생산자와 소비자가 동일한 대기열을 "인터리브 방식"으로 사용하고 작업자가 새로운 작업을 기다려야하는보다 복잡한 상황에서는 "독극물"접근 방식을 사용해야합니다. 위의 제안은 작업 큐가 비어 있으면 더 이상 주위에 매달리는 지점이 없다는 것을 "알 수있는"간단한 경우입니다.

  6. from https://stackoverflow.com/questions/6672525/multiprocessing-queue-in-python by cc-by-sa and MIT license