python多进程——进程间的通信

2021-07-19  本文已影响0人  修行的修行

前言

日常工作中,会遇到一些测试数据库抗压能力的需求,这时候就需要使用到python的多进程模块,能够并行的执行任务。我们先来看看并发和并行的区别。

并发(多线程)

并发(Concurrent),在操作系统中,是指一个时间段中有几个程序都处于已启动运行到运行完毕之间,且这几个程序都是在同一个处理机上运行。

并行(多进程)

并行(Parallel),当系统有一个以上CPU时,当一个CPU执行一个进程时,另一个CPU可以执行另一个进程,两个进程互不抢占CPU资源,可以同时进行,这种方式我们称之为并行(Parallel)。

这里面有一个很重要的点,那就是系统要有多个CPU才会出现并行。在有多个CPU的情况下,才会出现真正意义上的『同时进行』。

一个进程之下,可以启动多个线程,线程间共享内存,可以使用全局变量。

image

无通信的进程

下面是进程间无通信代码,会发现一个很难受的问题,无法用参数接收到worker函数的返回值和报错信息。当然如果不需要接收子进程的返回值这样用就可以了。

import random
import multiprocessing

def worker(id):
    try:
        x = id/random.randint(-1, 3)
        return {"result": int(x)}
    except Exception as e:
        return {"error": str(e)}

if __name__ == "__main__":
    jobs = []
    for i in range(1,11):
        p = multiprocessing.Process(target=worker, args=(i,))
        jobs.append(p)
        p.start()

    for p in jobs:
        p.join()

有通信的进程

进程之间所使用的是不同的内存空间,所以不能使用全局变量。Python的multiprocessing模块包装了底层的机制,提供了QueuePipes等多种方式来交换数据。

这样就可以通过主进程来管理子进程和获取返回值了,子进程也可以通过Queue中的数据,来决定自身的运行。

1.主进程和子进程间的通信

可以方便的汇总子进程的返回值和报错

import random
import multiprocessing

def worker(id, q):
    try:
        print('process {} start'.format(id))
        x = id / random.randint(-1, 3)
        q.put({id: {"result": int(x)}})
    except Exception as e:
        q.put({id: {"error": str(e)}})

if __name__ == "__main__":
    q = multiprocessing.Queue()
    jobs = []
    for i in range(1, 11):
        p = multiprocessing.Process(target=worker, args=(i, q))
        jobs.append(p)
        p.start()
    for p in jobs:
        p.join()
    result_list = []
    while not q.empty():
        result_list.append(q.get())
    print(result_list)

2.子进程间的通信

实现多进程查找乱序数组中某个数的位置,并在某个子进程查找到时,结束其它子进程

import random
import multiprocessing

def worker(id, temp_list, select_number, q):
    print('process {} start'.format(id), len(temp_list))
    for index, number in enumerate(temp_list):
        if q.empty():
            if number == select_number:
                q.put({'id': id, 'index': index})
        else:
            break

if __name__ == "__main__":
    select_number = 999
    list_length = 10000
    q = multiprocessing.Queue()
    shuffle_number_list = [i for i in range(list_length)]
    random.shuffle(shuffle_number_list)
    start = 0
    jobs = []
    for i in range(1, 11):
        temp_list = shuffle_number_list[start:i * 1000]
        start += 1000
        p = multiprocessing.Process(target=worker, args=(i, temp_list, select_number, q))
        jobs.append(p)
        p.start()
    for p in jobs:
        p.join()
    result = q.get()
    print(shuffle_number_list[result['index'] + (result['id'] - 1) * 1000] == select_number)

上一篇 下一篇

猜你喜欢

热点阅读