복붙노트

[PYTHON] Python 다중 처리에서 Pool.map을 Array (공유 메모리)와 결합하는 방법은 무엇입니까?

PYTHON

Python 다중 처리에서 Pool.map을 Array (공유 메모리)와 결합하는 방법은 무엇입니까?

나는 병렬로 여러 프로세스에 의해 처리되기를 원하는 매우 큰 (읽기 전용) 데이터 배열을 가지고있다.

나는 Pool.map 함수를 좋아하고 그것을 병렬로 그 데이터의 함수를 계산하기 위해 사용하고 싶다.

Value 나 Array 클래스를 사용하여 프로세스간에 공유 메모리 데이터를 사용할 수 있음을 알았습니다. 그러나 이것을 사용하려고하면 RuntimeError가 발생합니다. 'SynchronizedString 객체는 Pool.map 함수를 사용할 때 상속을 통해 프로세스간에 만 공유되어야합니다.

다음은 내가하려는 일의 단순화 된 예입니다.

from sys import stdin
from multiprocessing import Pool, Array

def count_it( arr, key ):
  count = 0
  for c in arr:
    if c == key:
      count += 1
  return count

if __name__ == '__main__':
  testData = "abcabcs bsdfsdf gdfg dffdgdfg sdfsdfsd sdfdsfsdf"
  # want to share it using shared memory
  toShare = Array('c', testData)

  # this works
  print count_it( toShare, "a" )

  pool = Pool()

  # RuntimeError here
  print pool.map( count_it, [(toShare,key) for key in ["a", "b", "s", "d"]] )

아무도 내가 여기서 뭘 잘못하고 있다고 말할 수 있습니까?

그래서 내가하고 싶은 것은 새로 생성 된 공유 메모리에 대한 정보를 프로세스 풀에 생성 된 후에 프로세스에 전달하는 것입니다.

해결법

  1. ==============================

    1.내가 방금 현상금을 보았던 것에 따라 다시 시험하고있다)

    내가 방금 현상금을 보았던 것에 따라 다시 시험하고있다)

    기본적으로 오류 메시지는 그것이 말한 것을 의미한다고 생각합니다 - 다중 처리 공유 메모리 배열은 인수로 전달 될 수 없습니다 (절편에 의해). 데이터를 직렬화하는 것은 이치에 맞지 않습니다. 요점은 데이터가 공유 메모리라는 것입니다. 따라서 공유 배열을 전역으로 만들어야합니다. 내 첫 번째 대답처럼 모듈의 속성으로 넣는 것이 깔끔한 것 같지만 예제의 전역 변수로 남겨 두는 것이 좋습니다. 포크 이전에 데이터를 설정하고 싶지 않은 점을 생각해 보면, 여기에 수정 된 예제가 있습니다. 공유 배열을 하나 이상 가질 수 있기를 원한다면 (즉, toShare를 인자로 전달하기를 원할 때) 마찬가지로 공유 배열의 전역 목록을 만들고 count_it에 색인을 전달하면됩니다 (toShare의 c가됩니다). [나는]:).

    from sys import stdin
    from multiprocessing import Pool, Array, Process
    
    def count_it( key ):
      count = 0
      for c in toShare:
        if c == key:
          count += 1
      return count
    
    if __name__ == '__main__':
      # allocate shared array - want lock=False in this case since we 
      # aren't writing to it and want to allow multiple processes to access
      # at the same time - I think with lock=True there would be little or 
      # no speedup
      maxLength = 50
      toShare = Array('c', maxLength, lock=False)
    
      # fork
      pool = Pool()
    
      # can set data after fork
      testData = "abcabcs bsdfsdf gdfg dffdgdfg sdfsdfsd sdfdsfsdf"
      if len(testData) > maxLength:
          raise ValueError, "Shared array too small to hold data"
      toShare[:len(testData)] = testData
    
      print pool.map( count_it, ["a", "b", "s", "d"] )
    

    [편집 : 위의 포크를 사용하지 않기 때문에 Windows에서 작동하지 않습니다. 그러나, 아래는 Windows에서 여전히 풀 (Pool)을 사용하므로, 이것이 당신이 원하는 것과 가장 가까운 것이라고 생각합니다 :

    from sys import stdin
    from multiprocessing import Pool, Array, Process
    import mymodule
    
    def count_it( key ):
      count = 0
      for c in mymodule.toShare:
        if c == key:
          count += 1
      return count
    
    def initProcess(share):
      mymodule.toShare = share
    
    if __name__ == '__main__':
      # allocate shared array - want lock=False in this case since we 
      # aren't writing to it and want to allow multiple processes to access
      # at the same time - I think with lock=True there would be little or 
      # no speedup
      maxLength = 50
      toShare = Array('c', maxLength, lock=False)
    
      # fork
      pool = Pool(initializer=initProcess,initargs=(toShare,))
    
      # can set data after fork
      testData = "abcabcs bsdfsdf gdfg dffdgdfg sdfsdfsd sdfdsfsdf"
      if len(testData) > maxLength:
          raise ValueError, "Shared array too small to hold data"
      toShare[:len(testData)] = testData
    
      print pool.map( count_it, ["a", "b", "s", "d"] )
    

    왜지도가 배열을 Pickle하지 않고 Process and Pool가 될지 확신 할 수 없습니다. 아마도 윈도우의 서브 프로세스 초기화 지점에서 전송되었을 것입니다. 데이터는 여전히 포크 후에 설정됩니다.

  2. ==============================

    2.내가 보는 문제는 Pool이 인수 목록을 통해 공유 된 데이터를 pickling하는 것을 지원하지 않는다는 것이다. 그것은 "객체는 상속을 통해 프로세스간에 만 공유되어야합니다"라는 오류 메시지의 의미입니다. Pool 클래스를 사용하여 공유하려는 경우 공유 데이터를 상속해야합니다 (예 : global).

    내가 보는 문제는 Pool이 인수 목록을 통해 공유 된 데이터를 pickling하는 것을 지원하지 않는다는 것이다. 그것은 "객체는 상속을 통해 프로세스간에 만 공유되어야합니다"라는 오류 메시지의 의미입니다. Pool 클래스를 사용하여 공유하려는 경우 공유 데이터를 상속해야합니다 (예 : global).

    명시 적으로 전달해야하는 경우 다중 처리를 사용해야 할 수도 있습니다. 프로세스. 다음은 수정 된 예제입니다.

    from multiprocessing import Process, Array, Queue
    
    def count_it( q, arr, key ):
      count = 0
      for c in arr:
        if c == key:
          count += 1
      q.put((key, count))
    
    if __name__ == '__main__':
      testData = "abcabcs bsdfsdf gdfg dffdgdfg sdfsdfsd sdfdsfsdf"
      # want to share it using shared memory
      toShare = Array('c', testData)
    
      q = Queue()
      keys = ['a', 'b', 's', 'd']
      workers = [Process(target=count_it, args = (q, toShare, key))
        for key in keys]
    
      for p in workers:
        p.start()
      for p in workers:
        p.join()
      while not q.empty():
        print q.get(),
    

    대기열의 요소 순서는 다를 수 있습니다.

    좀 더 일반적이고 풀과 비슷하게 만들려면 고정 된 N 개의 프로세스를 생성하고 키 목록을 N 개로 분할 한 다음 프로세스 대상으로 래퍼 함수를 ​​사용하면 목록의 각 키에 대해 count_it가 호출됩니다 다음과 같이 전달됩니다.

    def wrapper( q, arr, keys ):
      for k in keys:
        count_it(q, arr, k)
    
  3. ==============================

    3.데이터가 읽기 전용이라면 풀에서 포크 (fork)가 나오기 전에 모듈에서 변수를 변수로 만듭니다. 그런 다음 모든 하위 프로세스는 해당 프로세스에 액세스 할 수 있어야하며 작성하지 않은 경우 복사되지 않습니다.

    데이터가 읽기 전용이라면 풀에서 포크 (fork)가 나오기 전에 모듈에서 변수를 변수로 만듭니다. 그런 다음 모든 하위 프로세스는 해당 프로세스에 액세스 할 수 있어야하며 작성하지 않은 경우 복사되지 않습니다.

    import myglobals # anything (empty .py file)
    myglobals.data = []
    
    def count_it( key ):
        count = 0
        for c in myglobals.data:
            if c == key:
                count += 1
        return count
    
    if __name__ == '__main__':
    myglobals.data = "abcabcs bsdfsdf gdfg dffdgdfg sdfsdfsd sdfdsfsdf"
    
    pool = Pool()
    print pool.map( count_it, ["a", "b", "s", "d"] )
    

    lock = False 키워드 인수로 시도 할 수 있지만 Array를 사용하려고 할 경우 (기본적으로 true)

  4. ==============================

    4.따라서 공유 유형 사용이 잘못되었습니다. 부모 프로세스에서이 배열을 상속 받고 싶습니까? 명시 적으로 전달하는 것을 선호합니까? 전자의 경우에는 다른 답변이 제시하는 것처럼 전역 변수를 만들어야합니다. 그러나 sharedctype을 사용하여 명시 적으로 전달할 필요가 없으며 원본 testData를 전달하면됩니다.

    따라서 공유 유형 사용이 잘못되었습니다. 부모 프로세스에서이 배열을 상속 받고 싶습니까? 명시 적으로 전달하는 것을 선호합니까? 전자의 경우에는 다른 답변이 제시하는 것처럼 전역 변수를 만들어야합니다. 그러나 sharedctype을 사용하여 명시 적으로 전달할 필요가 없으며 원본 testData를 전달하면됩니다.

    BTW, Pool.map ()의 사용법이 잘못되었습니다. Map () 함수와 같은 인터페이스를 가지고 있습니다 (starmap ()로 처리 했습니까?). 다음은 배열을 명시 적으로 전달하는 작업 예제입니다.

    from multiprocessing import Pool
    
    def count_it( (arr, key) ):
        count = 0
        for c in arr:
            if c == key:
                count += 1
        return count
    
    if __name__ == '__main__':
        testData = "abcabcs bsdfsdf gdfg dffdgdfg sdfsdfsd sdfdsfsdf"
        pool = Pool()
        print pool.map(count_it, [(testData, key) for key in ["a", "b", "s", "d"]])
    
  5. from https://stackoverflow.com/questions/1675766/how-to-combine-pool-map-with-array-shared-memory-in-python-multiprocessing by cc-by-sa and MIT license