pythonPython语言

python多进程

2018-10-19  本文已影响16人  井底蛙蛙呱呱呱

当有多个非相关任务需要处理时,并行能大大提高处理速度。这里简要介绍python的multiprocessing模块。


简单多进程编写

当我们任务数量确定而且比较少的时候,可以手动为每个任务指定一个进程来运行。

import multiprocessing as mp

def f(a):
    print(a)

if __name__ == '__main__':
    # 这里有三个任务,手动指定3个进程
    p1 = mp.Process(target=f, args=(1,))
    p2 = mp.Process(target=f, args=(2,))
    p3 = mp.Process(target=f, args=(3,))
    p1.start()
    p2.start()
    p3.start()
    p1.join()
    p2.join()
    p3.join()

# 输出
1
2
3

使用进程池来处理多任务

当我们任务比较多而且不确定数量(又或者想使得代码更简洁)的时候可以使用进程池Pool来编写多进程。

import multiprocessing as mp

def f(a):
    return a

if __name__ == '__main__':
    pool = mp.Pool()
    res = pool.map(f, (1, 2, 3, 4))
    print(res)
# 输出
[1, 2, 3, 4]

可以看到,进程池不仅可以方便的处理多进程,还将各个进程的的处理结果储存了在一个列表中。Pool默认使用计算机所有cpu核来进行运算,也可使用Pool(process=4)来指定并行的进程数。
另一个可以储存结果的函数是apply_async(),但是这个函数只支持传入一个参数,也即运行一个任务,运行多个任务需要多次指定。此外,他返回的结果是一个类似生成器的东西,需要通过get函数取出来。

import multiprocessing as mp

def f(a):
    return a

if __name__ == '__main__':
    pool = mp.Pool()
    res = [pool.apply_async(f, (i,)) for i in range(4)]
    print([r.get() for r in res])

# 输出
[0, 1, 2, 3]

多参数函数的多进程

上面处理任务的函数都只有一个参数,在实际情况这种情况是很少的,一般我们的函数都需要传入多个参数。python2中未能支持传多个参数,python3.3后则有starmap来支持传入多参数。

import multiprocessing as mp

def f(a, b):
    return (a, b)

if __name__ == '__main__':
    pool = mp.Pool()
    res = pool.starmap(f, ((1, 2), ('a', 'b')))
    print(res)

# 输出
[(1, 2), ('a', 'b')]

另一个传多个参数的方法是通过python中的魔术方法*args:

import multiprocessing as mp

def f(*l):
    a, b = l[0]  # 为什么这里是l[0]而不是l?因为这里传入的l是一个嵌套元组((a,b),)。
    return (a, b)

if __name__ == '__main__':
    pool = mp.Pool()
    res = pool.map(f, ((1, 2), ('a', 'b')))
    print(res)

# 输出
[(1, 2), ('a', 'b')]

亦或使用**kwargs:

import multiprocessing as mp

def f(*l):
    d = l[0]    # 这里l为({'a': 1, 'b': 2},)
    return (d['a'], d['b'])

if __name__ == '__main__':
    pool = mp.Pool()
    res = pool.map(f, ({'a': 1, 'b': 2}, {'a': 3, 'b': 4}))
    print(res)

# 输出
[(1, 2), (3, 4)]

共享内存和进程锁

一般情况下,各个进程中的数据变量是无法发生交流的,但我们可以通过使用Value数据存储在一个共享的内存表中。

import multiprocessing as mp
import time


def job(v, num):
    for _ in range(5):
        time.sleep(0.1)  # 暂停0.1秒,让输出效果更明显
        v.value += num  # v.value获取共享变量值
        print(v.value)


def multicore():
    v = mp.Value('i', 0)  # 定义共享变量
    p1 = mp.Process(target=job, args=(v, 1))
    p2 = mp.Process(target=job, args=(v, 3))  # 设定不同的number看如何抢夺内存
    p1.start()
    p2.start()
    p1.join()
    p2.join()


if __name__ == '__main__':
    multicore()

# 输出
1
5
9
13
17
4
8
12
16
20

在上面的代码中,我们定义了一个共享变量v,两个进程都可以对它进行操作。 在job()中我们想让v每隔0.1秒输出一次累加num的结果,但是在两个进程p1和p2 中设定了不同的累加值。很明显可以看到他们发生了冲突。

为了解决上述不同进程抢共享资源的问题,我们可以用加进程锁来解决。

import multiprocessing as mp


def job(v, num, l):
    l.acquire()  # 锁住
    for _ in range(5):
        time.sleep(0.1)
        v.value += num  # 获取共享内存
        print(v.value)
    l.release()  # 释放


def multicore():
    l = mp.Lock()  # 定义一个进程锁
    v = mp.Value('i', 0)  # 定义共享内存
    p1 = mp.Process(target=job, args=(v, 1, l))  # 需要将lock传入
    p2 = mp.Process(target=job, args=(v, 3, l))
    p1.start()
    p2.start()
    p1.join()
    p2.join()


if __name__ == '__main__':
    multicore()

# 输出
3
6
9
12
15
16
17
18
19
20

需要主义的是上面可能仍然会发生冲突——p1先执行还是p2先执行的问题。为了解决这个问题我们可以在start,join中决定他们的顺序。

import multiprocessing as mp

def job(v, num, l):
    l.acquire()  # 锁住
    for _ in range(5):
        time.sleep(0.1)
        v.value += num  # 获取共享内存
        print(v.value)
    l.release()  # 释放

def multicore():
    l = mp.Lock()  # 定义一个进程锁
    v = mp.Value('i', 0)  # 定义共享内存
    p1 = mp.Process(target=job, args=(v, 1, l))  # 需要将lock传入
    p2 = mp.Process(target=job, args=(v, 3, l))
    p1.start()
    p1.join()
    p2.start()
    p2.join()

if __name__ == '__main__':
    multicore()

# 输出
1
2
3
4
5
8
11
14
17
20
上一篇下一篇

猜你喜欢

热点阅读