복붙노트

[PYTHON] 다중 처리로 전달 된 함수의 반환 값을 복구하는 방법은?

PYTHON

다중 처리로 전달 된 함수의 반환 값을 복구하는 방법은?

아래 예제 코드에서 함수 작업자의 반환 값을 복구하고 싶습니다. 어떻게이 일을 할 수 있습니까? 이 값은 어디에 저장됩니까?

예제 코드 :

import multiprocessing

def worker(procnum):
    '''worker function'''
    print str(procnum) + ' represent!'
    return procnum


if __name__ == '__main__':
    jobs = []
    for i in range(5):
        p = multiprocessing.Process(target=worker, args=(i,))
        jobs.append(p)
        p.start()

    for proc in jobs:
        proc.join()
    print jobs

산출:

0 represent!
1 represent!
2 represent!
3 represent!
4 represent!
[<Process(Process-1, stopped)>, <Process(Process-2, stopped)>, <Process(Process-3, stopped)>, <Process(Process-4, stopped)>, <Process(Process-5, stopped)>]

작업에 저장된 객체에서 관련 속성을 찾을 수없는 것 같습니다.

미리 감사드립니다. 블리즈

해결법

  1. ==============================

    1.공유 변수를 사용하여 통신하십시오. 예를 들면 다음과 같습니다.

    공유 변수를 사용하여 통신하십시오. 예를 들면 다음과 같습니다.

    import multiprocessing
    
    def worker(procnum, return_dict):
        '''worker function'''
        print str(procnum) + ' represent!'
        return_dict[procnum] = procnum
    
    
    if __name__ == '__main__':
        manager = multiprocessing.Manager()
        return_dict = manager.dict()
        jobs = []
        for i in range(5):
            p = multiprocessing.Process(target=worker, args=(i,return_dict))
            jobs.append(p)
            p.start()
    
        for proc in jobs:
            proc.join()
        print return_dict.values()
    
  2. ==============================

    2.@sega_sai가 제안한 접근 방식이 더 좋은 방법이라고 생각합니다. 하지만 실제로 코드 예제가 필요하므로 여기로갑니다.

    @sega_sai가 제안한 접근 방식이 더 좋은 방법이라고 생각합니다. 하지만 실제로 코드 예제가 필요하므로 여기로갑니다.

    import multiprocessing
    from os import getpid
    
    def worker(procnum):
        print 'I am number %d in process %d' % (procnum, getpid())
        return getpid()
    
    if __name__ == '__main__':
        pool = multiprocessing.Pool(processes = 3)
        print pool.map(worker, range(5))
    

    어느 것이 반환 값을 출력 할 것인가?

    I am number 0 in process 19139
    I am number 1 in process 19138
    I am number 2 in process 19140
    I am number 3 in process 19139
    I am number 4 in process 19140
    [19139, 19138, 19140, 19139, 19140]
    

    지도 (Python 2가 내장되어 있음)에 익숙하다면 너무 어렵지 않을 것입니다. 그렇지 않으면 sega_Sai의 링크를 살펴보십시오.

    얼마나 작은 코드가 필요한지 유의하십시오. (또한 프로세스가 재사용되는 방법에 유의하십시오.)

  3. ==============================

    3.다음 예제는 다중 처리. 파이프 인스턴스 목록을 사용하여 임의의 수의 프로세스에서 문자열을 반환하는 방법을 보여줍니다.

    다음 예제는 다중 처리. 파이프 인스턴스 목록을 사용하여 임의의 수의 프로세스에서 문자열을 반환하는 방법을 보여줍니다.

    import multiprocessing
    
    def worker(procnum, send_end):
        '''worker function'''
        result = str(procnum) + ' represent!'
        print result
        send_end.send(result)
    
    def main():
        jobs = []
        pipe_list = []
        for i in range(5):
            recv_end, send_end = multiprocessing.Pipe(False)
            p = multiprocessing.Process(target=worker, args=(i, send_end))
            jobs.append(p)
            pipe_list.append(recv_end)
            p.start()
    
        for proc in jobs:
            proc.join()
        result_list = [x.recv() for x in pipe_list]
        print result_list
    
    if __name__ == '__main__':
        main()
    

    산출:

    0 represent!
    1 represent!
    2 represent!
    3 represent!
    4 represent!
    ['0 represent!', '1 represent!', '2 represent!', '3 represent!', '4 represent!']
    

    이 솔루션은 다중 처리보다 적은 리소스를 사용합니다.

    또는 다중 처리를 사용합니다.

    이러한 각 유형의 출처를 살펴 보는 것은 매우 유익합니다.

  4. ==============================

    4.대신 multiprocessing.Pool 클래스를 사용하고 .apply () .apply_async (), map ()

    대신 multiprocessing.Pool 클래스를 사용하고 .apply () .apply_async (), map ()

    http://docs.python.org/library/multiprocessing.html?highlight=pool#multiprocessing.pool.AsyncResult

  5. ==============================

    5.내장 된 exit를 사용하여 프로세스의 종료 코드를 설정할 수 있습니다. 프로세스의 exitcode 속성에서 얻을 수 있습니다.

    내장 된 exit를 사용하여 프로세스의 종료 코드를 설정할 수 있습니다. 프로세스의 exitcode 속성에서 얻을 수 있습니다.

    import multiprocessing
    
    def worker(procnum):
        print str(procnum) + ' represent!'
        exit(procnum)
    
    if __name__ == '__main__':
        jobs = []
        for i in range(5):
            p = multiprocessing.Process(target=worker, args=(i,))
            jobs.append(p)
            p.start()
    
        result = []
        for proc in jobs:
            proc.join()
            result.append(proc.exitcode)
        print result
    

    산출:

    0 represent!
    1 represent!
    2 represent!
    3 represent!
    4 represent!
    [0, 1, 2, 3, 4]
    
  6. ==============================

    6.대기열을 사용하는 프로세스에서 가치를 얻는 방법을 모색하는 다른 모든 사람들 :

    대기열을 사용하는 프로세스에서 가치를 얻는 방법을 모색하는 다른 모든 사람들 :

    import multiprocessing
    
    ret = {'foo': False}
    
    def worker(queue):
        ret = queue.get()
        ret['foo'] = True
        queue.put(ret)
    
    if __name__ == '__main__':
        queue = multiprocessing.Queue()
        queue.put(ret)
        p = multiprocessing.Process(target=worker, args=(queue,))
        p.start()
        print queue.get()  # Prints {"foo": True}
        p.join()
    
  7. ==============================

    7.웬일인지, 큐 어디서나이 작업을 수행 할 수있는 일반적인 예를 찾을 수 없었습니다 (파이썬의 doc 예제도 여러 프로세스를 생성하지 않습니다). 그래서 여기에 10 번 시도 후에 작동합니다.

    웬일인지, 큐 어디서나이 작업을 수행 할 수있는 일반적인 예를 찾을 수 없었습니다 (파이썬의 doc 예제도 여러 프로세스를 생성하지 않습니다). 그래서 여기에 10 번 시도 후에 작동합니다.

    def add_helper(queue, arg1, arg2): # the func called in child processes
        ret = arg1 + arg2
        queue.put(ret)
    
    def multi_add(): # spawns child processes
        q = Queue()
        processes = []
        rets = []
        for _ in range(0, 100):
            p = Process(target=add_helper, args=(q, 1, 2))
            processes.append(p)
            p.start()
        for p in processes:
            ret = q.get() # will block
            rets.append(ret)
        for p in processes:
            p.join()
        return rets
    

    대기열은 하위 프로세스의 반환 값을 저장하는 데 사용할 수있는 차단 가능한 스레드 안전 큐입니다. 따라서 각 프로세스에 대기열을 전달해야합니다. 덜 분명한 점은 프로세스에 참여하기 전에 큐에서 ()을 가져와야한다는 것입니다. 그렇지 않으면 큐가 가득 차고 모든 것이 차단됩니다.

    객체 지향적 인 사람들을위한 업데이트 (파이썬 3.4에서 테스트 됨) :

    from multiprocessing import Process, Queue
    
    class Multiprocessor():
    
        def __init__(self):
            self.processes = []
            self.queue = Queue()
    
        @staticmethod
        def _wrapper(func, queue, args, kwargs):
            ret = func(*args, **kwargs)
            queue.put(ret)
    
        def run(self, func, *args, **kwargs):
            args2 = [func, self.queue, args, kwargs]
            p = Process(target=self._wrapper, args=args2)
            self.processes.append(p)
            p.start()
    
        def wait(self):
            rets = []
            for p in self.processes:
                ret = self.queue.get()
                rets.append(ret)
            for p in self.processes:
                p.join()
            return rets
    
    # tester
    if __name__ == "__main__":
        mp = Multiprocessor()
        num_proc = 64
        for _ in range(num_proc): # queue up multiple tasks running `sum`
            mp.run(sum, [1, 2, 3, 4, 5])
        ret = mp.wait() # get all results
        print(ret)
        assert len(ret) == num_proc and all(r == 15 for r in ret)
    
  8. ==============================

    8.나는 함수에서 에러 코드를 얻을 필요가 있었기 때문에 vartec의 대답을 약간 수정했다. (고맙다 vertec !!! 그 굉장한 트릭)

    나는 함수에서 에러 코드를 얻을 필요가 있었기 때문에 vartec의 대답을 약간 수정했다. (고맙다 vertec !!! 그 굉장한 트릭)

    이것은 manager.list를 사용하여 수행 할 수도 있지만, 그 내용을 dict에 저장하고 그 안에 목록을 저장하는 것이 더 좋습니다. 그렇게하면 목록이 채워지는 순서를 확신 할 수 없으므로 함수와 결과를 유지합니다.

    from multiprocessing import Process
    import time
    import datetime
    import multiprocessing
    
    
    def func1(fn, m_list):
        print 'func1: starting'
        time.sleep(1)
        m_list[fn] = "this is the first function"
        print 'func1: finishing'
        # return "func1"  # no need for return since Multiprocess doesnt return it =(
    
    def func2(fn, m_list):
        print 'func2: starting'
        time.sleep(3)
        m_list[fn] = "this is function 2"
        print 'func2: finishing'
        # return "func2"
    
    def func3(fn, m_list):
        print 'func3: starting'
        time.sleep(9)
        # if fail wont join the rest because it never populate the dict
        # or do a try/except to get something in return.
        raise ValueError("failed here")
        # if we want to get the error in the manager dict we can catch the error
        try:
            raise ValueError("failed here")
            m_list[fn] = "this is third"
        except:
            m_list[fn] = "this is third and it fail horrible"
            # print 'func3: finishing'
            # return "func3"
    
    
    def runInParallel(*fns):  # * is to accept any input in list
        start_time = datetime.datetime.now()
        proc = []
        manager = multiprocessing.Manager()
        m_list = manager.dict()
        for fn in fns:
            # print fn
            # print dir(fn)
            p = Process(target=fn, name=fn.func_name, args=(fn, m_list))
            p.start()
            proc.append(p)
        for p in proc:
            p.join()  # 5 is the time out
    
        print datetime.datetime.now() - start_time
        return m_list, proc
    
    if __name__ == '__main__':
        manager, proc = runInParallel(func1, func2, func3)
        # print dir(proc[0])
        # print proc[0]._name
        # print proc[0].name
        # print proc[0].exitcode
    
        # here you can check what did fail
        for i in proc:
            print i.name, i.exitcode  # name was set up in the Process line 53
    
        # here will only show the function that worked and where able to populate the 
        # manager dict
        for i, j in manager.items():
            print dir(i)  # things you can do to the function
            print i, j
    
  9. from https://stackoverflow.com/questions/10415028/how-can-i-recover-the-return-value-of-a-function-passed-to-multiprocessing-proce by cc-by-sa and MIT license