python 子进程返回值给父进程

2021-02-19  本文已影响0人  汉江岳

需求

串行:preprocessing --> fuc_1 --> fuc_2 --> fuc_3 --> postprocessing
并行:preprocessing --> (fuc_1 fuc_2 fuc_3并行执行) --> postprocessing
实现10最有效率的

实现1

可以满足需求,但效率不高,需要不断地 建立子进程、销毁子进程

import multiprocessing
from multiprocessing import Manager
def worker(procnum, return_dict):
    '''worker function'''
    print(str(procnum) + ' represent!')
    return_dict[procnum] = procnum


if __name__ == '__main__':
    manager = Manager()
    # return_list = manager.list() 也可以使用列表list
    return_dict = manager.dict()
    jobs = []
    for i in range(5):
        p = multiprocessing.Process(target=worker, args=(i,return_dict))
        jobs.append(p)
        p.start()

    for proc in jobs:
        proc.join()
    print(return_dict.values())

实现2

使用进程的队列multiprocessing.Queue,put(),get()方法
子进程不需返回值,将目标结果放入队列中
在主进程中获取 get方法

import random
import time
import multiprocessing


def worker_1(k,q):
    t = 0
    print("process-", k)
    for i in range(10):
        x = random.randint(1, 3)
        t += x
    q.put(t)

def worker_2(k,q):
    t = 0
    print("process-", k)
    for i in range(5):
        x = random.randint(1, 3)
        t += x
    q.put(t)

if __name__ == '__main__': 
    q = multiprocessing.Queue()
    jobs = []
    
    p = multiprocessing.Process(target=worker_1, args=('1', q))
    jobs.append(p)
    p.start()

    p = multiprocessing.Process(target=worker_2, args=('2', q))
    jobs.append(p)
    p.start()
    
    for p in jobs:
        p.join()
    
    results = [q.get() for j in jobs]
    print(results)

实现3

使用进程池

from multiprocessing import Pool
import time
 
def func_1(i):
    time.sleep(1)
    return  i*i

 
def func_2(i):
    time.sleep(2)
    return  i*i*i
 
if __name__ == '__main__':
    p1 = Pool(5)
    p2 = Pool(5)
    ret_1 = p1.map(func_1,range(10)) # pool.map() 默认是同步的
    ret_2 = p2.map(func_2,range(10))
    print(ret_1)
    print(ret_2)

实现4

使用pool.apply_async() 异步执行
pool.map_async() 异步执行+列表推导

from multiprocessing import Pool
import time

def func_1(*args):
    time.sleep(2)
    print(args)
    return args[0][0], args[0][1], args[0][2]
 
def func_2(i):
    time.sleep(3)
    return  {'ans': i*i*i}
 
# 先建立进程池,放在
p1 = Pool(5)
p2 = Pool(5)

if __name__ == '__main__':
    
    start = time.time()
    # ret_1 = p1.map_async(func_1, [('lim', 2020, 65.8), ('lim', 2020, 65.8)]) 
    res = p1.apply_async(func_1, ('lim', 2020, 65.8))
    # print('jhsjsj')
    ret_2 = p2.map_async(func_2, range(5))
    ret_2 = p2.map(func_2, range(5))
    print(res.get())
    # print(ret_1.get())
    print(ret_2.get())
    print(time.time() - start)
    # print(ret_1 + ret_2)

实现5

from multiprocessing import Pool
import time

def func_1(*args):
    time.sleep(2)
    print(args)
    return args[0][0], args[0][1], args[0][2]
 

def func_2(i):
    time.sleep(3)
    return  {'ans': i*i*i}

def func3():
    print(3)

def funMain(i):
    if i == 0:
        func_1('lim', 2020, 65.8)
    else:
        func3() 
 
p1 = Pool(2)
p2 = Pool(5)

if __name__ == '__main__':
    
    start = time.time()
    # ret_1 = p1.map_async(func_1, [('lim', 2020, 65.8), ('lim', 2020, 65.8)]) 
    # 如果用多个进程池,则进程池之间需要采用异步执行
    # res = p1.apply_async(func_1, ('lim', 2020, 65.8))
    # print('jhsjsj')
    # ret_2 = p2.map_async(func_2, range(5))
    ret_2 = p2.map(funMain, range(2))  # 用一个进程池来并行执行不同的函数,一个进程池的开销小一些
    # print(res.get())
    # print(ret_1.get())
    print(ret_2)
    print(time.time() - start)
    # print(ret_1 + ret_2)

实现6

用一个进程池多进程执行多个函数并收集结果

from multiprocessing import Pool # 进程池
import time # 计时


def func_1(*args): # *list  读取参数列表
    time.sleep(2)
    print(args)
    return args[0], args[1], args[2] # 三个参数

def func_2(i):
    time.sleep(3)
    return  {'ans': i*i*i}

def func3():
    print(5)
    time.sleep(5)
    return 5

 
p1 = Pool(2)


if __name__ == '__main__':
    start = time.time()
    res1 = p1.apply_async(func_1, ('lim', 2020, 65.8))
    res2 = p1.apply_async(func_2, [2])
    res3 = p1.apply_async(func3,)
    print(res1.get())
    print(res2.get())
    print(res3.get())
    print(time.time() - start)

实现7

线程池嵌套进程池

#coding=utf-8
# 线程池 嵌套进程池

from multiprocessing import Pool # 进程池
import time # 计时 
from concurrent.futures import ThreadPoolExecutor


def func_1(*args): # *list  读取参数列表
    time.sleep(2)
    print(args)
    return args[0], args[1], args[2] # 三个参数

def func_2(i):
    time.sleep(3)
    return  {'ans': i*i*i}

def func3():
    print(5)
    time.sleep(5)
    return 5

def f(txt):
    start = time.time()
    pool = Pool(3)
    res1 = pool.apply_async(func_1, ('lim', 2020, 65.8))
    res2 = pool.apply_async(func_2, [2])
    res3 = pool.apply_async(func3,)
    print(res1.get())
    print(res2.get())
    print(res3.get())
    print(time.time() - start)
    return txt



if __name__ == '__main__':
    start = time.time()
    
    executor = ThreadPoolExecutor(max_workers=2)
    # 通过submit函数提交执行的函数到线程池中,submit函数立即返回,不阻塞
    task1 = executor.submit(f, ('hello'))
    task2 = executor.submit(f, ('world'))
    # done方法用于判定某个任务是否完成
    # print(task1.done())
    # cancel方法用于取消某个任务,该任务没有放入线程池中才能取消成功
    # print(task2.cancel())
    # time.sleep(4)
    # print(task1.done())
    # result方法可以获取task的执行结果
    print(task1.result()) # 对线程返回值的收集
    print(task2.result())
    print(time.time() - start)

实现8

线程池嵌套线程池

#coding=utf-8
# 线程池 嵌套 线程池  multithread-multithread
# 多线程可能是伪的并行,在计算复杂度高的情况下,可能达不到加速的效果

from multiprocessing import Pool # 进程池
import time # 计时 
from concurrent.futures import ThreadPoolExecutor


def func_1(*args): # *list  读取参数列表
    time.sleep(2)
    print(args)
    args = args[0]
    return args[0], args[1], args[2] # 三个参数

def func_2(i):
    time.sleep(3)
    return  {'ans': i*i*i}

def func3():
    print(5)
    time.sleep(5)
    return 5

def f(txt):
    start = time.time()
    executor = ThreadPoolExecutor(max_workers=3)
    # 通过submit函数提交执行的函数到线程池中,submit函数立即返回,不阻塞
    task1 = executor.submit(func_1, ('lim', 2020, 65.8))
    task2 = executor.submit(func_2, (2))
    task3 = executor.submit(func3,)
    print(task1.result()) # 对线程返回值的收集
    print(task2.result())
    print(task3.result())
    print(time.time() - start)
    return txt



if __name__ == '__main__':
    start = time.time()
    
    executor = ThreadPoolExecutor(max_workers=2)
    # 通过submit函数提交执行的函数到线程池中,submit函数立即返回,不阻塞
    task1 = executor.submit(f, ('hello'))
    task2 = executor.submit(f, ('world'))
    # done方法用于判定某个任务是否完成
    # print(task1.done())
    # cancel方法用于取消某个任务,该任务没有放入线程池中才能取消成功
    # print(task2.cancel())
    # time.sleep(4)
    # print(task1.done())
    # result方法可以获取task的执行结果
    print(task1.result()) # 对线程返回值的收集
    print(task2.result())
    print(time.time() - start)

实现9

进程池嵌套线程池
可以实现简单的子函数用多线程计算,复杂的函数用多进程计算

#coding=utf-8
#进程池嵌套线程池
# 进程池

from multiprocessing import Pool # 进程池
import time # 计时 
from concurrent.futures import ThreadPoolExecutor


def func_1(*args): # *list  读取参数列表
    time.sleep(2)
    print(args)
    args = args[0]
    return args[0], args[1], args[2] # 三个参数

def func_2(i):
    time.sleep(3)
    return  {'ans': i*i*i}

def func3():
    print(5)
    time.sleep(5)
    return 5

def f(txt):
    start = time.time()
    executor = ThreadPoolExecutor(max_workers=3)
    # 通过submit函数提交执行的函数到线程池中,submit函数立即返回,不阻塞
    task1 = executor.submit(func_1, ('lim', 2020, 65.8))
    task2 = executor.submit(func_2, (2))
    task3 = executor.submit(func3,)
    print(task1.result()) # 对线程返回值的收集
    print(task2.result())
    print(task3.result())
    print(time.time() - start)
    return txt



if __name__ == '__main__':
    start = time.time()
    pool = Pool(2)
    res1 = pool.apply_async(f, ('hello',))
    res2 = pool.apply_async(f, ('world',))
    print(res1.get())
    print(res2.get())
    print(time.time() - start)

实现10

开大进程池的进程个数避免使用多线程,因为python的多线程是伪的多线程,计算密集时起不到加速作用,还是用多进程比较靠谱。

#coding=utf-8
# 只用进程池,增加进程数量
from multiprocessing import Pool # 进程池
import time # 计时 

# 子函数1
def func_1(*args): # *list  读取参数列表
    time.sleep(2)
    # print(args)
    # args = args[0]
    return args[0], args[1], args[2] # 三个参数

# 子函数2
def func_2(i):
    time.sleep(3)
    return  {'ans': i*i*i}

# 子函数3
def func_3():
    print(5)
    time.sleep(5)
    return 5

# 定义汇总处理逻辑
def f(a,b,c): 
    return a, b, c


if __name__ == '__main__':
    start = time.time()
    pool = Pool(6) # 根据需要创建进程池

    # 不同子函数的输入列表
    input1 = [('lim', 2020, 65.8), ('liuts', 2020, 65.8)]
    input2 = [(2,), (3,)]
    input3 = [(), ()]

    # 异步执行多个子函数来并行处理输入列表
    res1 = [pool.apply_async(func_1, e) for e in input1]
    res2 = [pool.apply_async(func_2, e) for e in input2]
    res3 = [pool.apply_async(func_3, e) for e in input3]

    # 结果收集
    res1 = [e.get() for e in res1]
    res2 = [e.get() for e in res2]
    res3 = [e.get() for e in res3]

    # # 串行汇总处理
    # for a,b,c in zip(res1, res2, res3):
    #     print(f(a, b, c)) # 单条数据

    # 并行汇总处理
    final_res = [pool.apply_async(f, (a,b,c)) for a,b,c in zip(res1, res2, res3)]
    final_res = [e.get() for e in final_res]
    print(final_res)
    
    print(time.time() - start)

参考

  1. https://docs.python.org/3/library/multiprocessing.html
  2. https://blog.csdn.net/huangpo001/article/details/106254480
  3. ThreadPoolExcutor https://www.jianshu.com/p/b9b3d66aa0be
上一篇 下一篇

猜你喜欢

热点阅读