[PYTHON] 다중 처리 : 여러 프로세스간에 사전을 공유하려면 어떻게해야합니까?
PYTHON다중 처리 : 여러 프로세스간에 사전을 공유하려면 어떻게해야합니까?
결합 가능한 대기열 Q에서 작동하는 여러 프로세스를 작성하고 결국 전역 사전 D를 조작하여 결과를 저장할 수있는 프로그램. (그래서 각 자식 프로세스는 D를 사용하여 그 결과를 저장하고 다른 자식 프로세스가 어떤 결과를 산출하는지 볼 수 있습니다)
자식 프로세스에서 사전 D를 인쇄하면 해당 프로세스에서 수행 된 수정 (예 : D)이 표시됩니다. 하지만 메인 프로세스가 Q에 합류 한 후에 D를 프린트하면 빈 dict입니다!
동기화 / 잠금 문제라는 것을 알고 있습니다. 누군가 여기서 일어나는 일을 말하고, D에 대한 액세스를 어떻게 동기화 할 수 있습니까?
해결법
-
==============================
1.일반적인 대답은 Manager 객체 사용과 관련됩니다. 문서에서 수정 :
일반적인 대답은 Manager 객체 사용과 관련됩니다. 문서에서 수정 :
from multiprocessing import Process, Manager def f(d): d[1] += '1' d['2'] += 2 if __name__ == '__main__': manager = Manager() d = manager.dict() d[1] = '1' d['2'] = 2 p1 = Process(target=f, args=(d,)) p2 = Process(target=f, args=(d,)) p1.start() p2.start() p1.join() p2.join() print d
산출:
$ python mul.py {1: '111', '2': 6}
-
==============================
2.멀티 프로세싱은 스레딩과는 다릅니다. 각 하위 프로세스는 주 프로세스의 메모리 복사본을 가져옵니다. 일반적으로 상태는 통신 (파이프 / 소켓), 신호 또는 공유 메모리를 통해 공유됩니다.
멀티 프로세싱은 스레딩과는 다릅니다. 각 하위 프로세스는 주 프로세스의 메모리 복사본을 가져옵니다. 일반적으로 상태는 통신 (파이프 / 소켓), 신호 또는 공유 메모리를 통해 공유됩니다.
다중 처리는 프록시 또는 공유 메모리를 사용하여 로컬로 처리되는 공유 상태 인 사용 사례에 대해 일부 추상화를 사용할 수있게합니다. http://docs.python.org/library/multiprocessing.html#sharing-state-between-processes
관련 섹션 :
-
==============================
3.나는 Manager의 dict보다 더 빠른 내 자신의 작업을 공유하고 싶습니다. 그리고 Mac OS에서 작동하지 않는 많은 메모리를 사용하는 pyshmht 라이브러리보다 더 간단하고 안정적입니다. 내 딕트는 일반 문자열에 대해서만 작동하지만 현재는 변경할 수 없습니다. 선형 프로빙 구현을 사용하고 키와 값 쌍을 테이블 뒤에 별도의 메모리 블록에 저장합니다.
나는 Manager의 dict보다 더 빠른 내 자신의 작업을 공유하고 싶습니다. 그리고 Mac OS에서 작동하지 않는 많은 메모리를 사용하는 pyshmht 라이브러리보다 더 간단하고 안정적입니다. 내 딕트는 일반 문자열에 대해서만 작동하지만 현재는 변경할 수 없습니다. 선형 프로빙 구현을 사용하고 키와 값 쌍을 테이블 뒤에 별도의 메모리 블록에 저장합니다.
from mmap import mmap import struct from timeit import default_timer from multiprocessing import Manager from pyshmht import HashTable class shared_immutable_dict: def __init__(self, a): self.hs = 1 << (len(a) * 3).bit_length() kvp = self.hs * 4 ht = [0xffffffff] * self.hs kvl = [] for k, v in a.iteritems(): h = self.hash(k) while ht[h] != 0xffffffff: h = (h + 1) & (self.hs - 1) ht[h] = kvp kvp += self.kvlen(k) + self.kvlen(v) kvl.append(k) kvl.append(v) self.m = mmap(-1, kvp) for p in ht: self.m.write(uint_format.pack(p)) for x in kvl: if len(x) <= 0x7f: self.m.write_byte(chr(len(x))) else: self.m.write(uint_format.pack(0x80000000 + len(x))) self.m.write(x) def hash(self, k): h = hash(k) h = (h + (h >> 3) + (h >> 13) + (h >> 23)) * 1749375391 & (self.hs - 1) return h def get(self, k, d=None): h = self.hash(k) while True: x = uint_format.unpack(self.m[h * 4:h * 4 + 4])[0] if x == 0xffffffff: return d self.m.seek(x) if k == self.read_kv(): return self.read_kv() h = (h + 1) & (self.hs - 1) def read_kv(self): sz = ord(self.m.read_byte()) if sz & 0x80: sz = uint_format.unpack(chr(sz) + self.m.read(3))[0] - 0x80000000 return self.m.read(sz) def kvlen(self, k): return len(k) + (1 if len(k) <= 0x7f else 4) def __contains__(self, k): return self.get(k, None) is not None def close(self): self.m.close() uint_format = struct.Struct('>I') def uget(a, k, d=None): return to_unicode(a.get(to_str(k), d)) def uin(a, k): return to_str(k) in a def to_unicode(s): return s.decode('utf-8') if isinstance(s, str) else s def to_str(s): return s.encode('utf-8') if isinstance(s, unicode) else s def mmap_test(): n = 1000000 d = shared_immutable_dict({str(i * 2): '1' for i in xrange(n)}) start_time = default_timer() for i in xrange(n): if bool(d.get(str(i))) != (i % 2 == 0): raise Exception(i) print 'mmap speed: %d gets per sec' % (n / (default_timer() - start_time)) def manager_test(): n = 100000 d = Manager().dict({str(i * 2): '1' for i in xrange(n)}) start_time = default_timer() for i in xrange(n): if bool(d.get(str(i))) != (i % 2 == 0): raise Exception(i) print 'manager speed: %d gets per sec' % (n / (default_timer() - start_time)) def shm_test(): n = 1000000 d = HashTable('tmp', n) d.update({str(i * 2): '1' for i in xrange(n)}) start_time = default_timer() for i in xrange(n): if bool(d.get(str(i))) != (i % 2 == 0): raise Exception(i) print 'shm speed: %d gets per sec' % (n / (default_timer() - start_time)) if __name__ == '__main__': mmap_test() manager_test() shm_test()
내 랩탑 성능 결과는 다음과 같습니다.
mmap speed: 247288 gets per sec manager speed: 33792 gets per sec shm speed: 691332 gets per sec
간단한 사용 예 :
ht = shared_immutable_dict({'a': '1', 'b': '2'}) print ht.get('a')
-
==============================
4.파이썬 용 메모리 기반 해시 테이블 확장을 공유하는 pyshmht를 시도해 볼 수 있습니다.
파이썬 용 메모리 기반 해시 테이블 확장을 공유하는 pyshmht를 시도해 볼 수 있습니다.
주의
from https://stackoverflow.com/questions/6832554/multiprocessing-how-do-i-share-a-dict-among-multiple-processes by cc-by-sa and MIT license
'PYTHON' 카테고리의 다른 글
[PYTHON] Google Cloud SQL에 대해 Django 관리 명령을 실행하는 방법 (0) | 2018.11.27 |
---|---|
[PYTHON] 여러 프로세스에서 단일 파일 처리 (0) | 2018.11.27 |
[PYTHON] 공분산 행렬에서 퇴화 된 행 / 열을 찾는 법 (0) | 2018.11.27 |
[PYTHON] Python으로 원격 PostgreSQL 데이터베이스에 연결하는 방법 (0) | 2018.11.27 |
[PYTHON] Python에서 C로 코드 변환기를 작성 하시겠습니까? [닫은] (0) | 2018.11.27 |