Python之多线程、多进程

2020-03-02  本文已影响0人  天命_风流

多线程

多线程的含义

一个进程可以创建多个线程进行并发计算,从而让进程同时处理多个计算逻辑,或者将一个大的计算任务拆分成多个小的计算任务,然后同时计算这些子任务。
通过查阅相关资料,其它某些语言的多线程可以调用多个CPU同时进行运算(并行),但是对于Python来说,它的C解释器设置了一个GIL(全局解释器锁:Global Interpreter Lock),使得同一时刻只有一个线程执行,也就是说,无论你怎么设置你的多线程,CPython的都只能利用一个CPU,这也就无法发挥计算机多核心的优势了。
由于我使用的是CPython,所以下面的Python都是指在C解释器下实现的python。

Python下多线程的作用

由于GIL带来的影响,计算密集型的程序中使用多线程不会带来性能提升,甚至由于线程切换需要耗费资源,它的性能还会下降。
但是Python的多线程依然有它的用武之地,在处理需要等待、需要大量使用慢速设备(内存和硬盘的性能相对于CPU来说非常慢,因此我将这种设备定义为慢速设备)等情况下,多线程仍然有不俗的表现。

如何规避GIL带来的影响呢?有以下几点建议:使用非C的解释器、使用C扩展进行数据运算、使用多进程 ...

编写多线程需要注意的问题

Python的多线程

线程的建立和启动

我们有两种方式建立多线程:直接构建线程,使用类继承构件线程
直接构建线程:

import time
from threading import Thread    #  多线程最常使用的库


def t(n):
    while n > 0:
        print(n)
        n -= 1
        time.sleep(0.5)


if __name__ == '__main__':
    a = Thread(target=t, args=(3,))  # 实例化一个线程
    b = Thread(target=t, args=(3,))
    c = Thread(target=t, args=(3,), daemon=True)  # 实例一个守护线程,守护线程如果不自动退出,只会在程序结束时结束,因此它不可以使用jion
    a.start()  # 开启线程
    b.start()
    c.start()
    a.join()  # 等待线程执行完成,然后才能执行下面的代码
    if a.is_alive():  # 判断线程是否在执行
        print('a仍然执行')
    if b.is_alive():
        print('b仍然执行')

用类继承:

from threading import Thread
import time


class CountdownThread(Thread):  # 继承
    def __init__(self, n):
        super().__init__()
        self.n = n

    def run(self):
        while self.n > 0:
            print(self.n)
            self.n -= 1
            time.sleep(0.3)


c = CountdownThread(3)
c.start()  # 会执行实例的run方法
d = CountdownThread(3)
d.start()
进程间的信号传递

Event:全局信号

from threading import Thread, Event
import time


def t(x, e):
    e.wait()  # 等待信号
    print(x)


if __name__ == '__main__':
    event = Event()  # 设置一个信号
    t1 = Thread(target=t, args=('函数运行', event))
    t1.start()
    time.sleep(2)
    event.set()  # 传递出信号,所有event.wait()后面的代码开始执行

Condition:条件变量
https://www.jianshu.com/p/5d2579938517

进程间的数据共享

使用队列:

import threading
import time
import queue


def product():
    while True:
        if que.qsize() == 10:
            print('现在有10个包子,我决定休息一会')
            time.sleep(6)  # 够10个包子就休息6秒
        else:
            que.put(1)
            print('造包子,现在有{}个'.format(que.qsize()))
            time.sleep(0.2)  # 每0.2秒造一个包子


def consumer(name):
    while True:
        que.get()
        print('{}吃了一个包子,还剩下{}个'.format(name, que.qsize()))
        time.sleep(1.2)  # 每1.2秒吃一个包子


que = queue.Queue()  # 实例化一个可以在进程间共享的队列
p = threading.Thread(target=product)
c1 = threading.Thread(target=consumer, args=('选手1',))
c2 = threading.Thread(target=consumer, args=('选手2',))

p.start()
c1.start()
c2.start()

下面用一个实例解释一下队列中的 task_done 和 join:

import threading
import time
import queue


def product():
    while True:
        if que.qsize() == 10:  # 队列中有10个包子的时候生产者退出生产
            print('现在有10个包子,我回家了')
            break
        else:
            que.put(1)
            print('造包子,现在有{}个'.format(que.qsize()))
            time.sleep(0.1)  # 每0.2秒造一个包子


def consumer(name):
    while True:
        que.get()
        print('{}吃了一个包子,还剩下{}个'.format(name, que.qsize()))
        time.sleep(0.5)  # 每1.2秒吃一个包子
        que.task_done()  # task_done 要和get配合使用,每进行一次get,就要用一次task_done
        if que.qsize() == 0:  # 队列为空时消费者推出消费
            break


que = queue.Queue()  # 实例化一个可以在进程间共享的队列
p = threading.Thread(target=product)
c1 = threading.Thread(target=consumer, args=('选手1',))
c2 = threading.Thread(target=consumer, args=('选手2',))

p.start()
c1.start()
c2.start()
que.join()  # 保持阻塞,直到队列内部的计数器认为队列为空为止,注意队列计数器的规则:遇到put就+1,遇到task_done就-1
print('over')

多线程的一大问题是多个线程争抢同一个资源,这样争抢的结果可能会引发错误:A线程正在使用资源S,S状态为S1,这时B线程修改了S,状态改为S2,如此一来,A线程的计算结果和S(修改后的状态为S2)不符。
为了解决这种问题,我们需要对临界资源上锁:保证A线程使用完S之后再允许其它线程修改S。

import threading
import time

n = 0#设置n为一个临界资源
n_lock = threading.Lock()#为n设置一把锁

def a(x):
    n_lock.acquire()#上锁,上锁后,所有n_lock.acquire()都会被阻塞
    time.sleep(3)#假设一个程序需要花3秒进行运算
    global n
    n += x
    print(n)
    n_lock.release()#开锁,释放资源

t1 = threading.Thread(target=a,args=(3,))
t2 = threading.Thread(target=a,args=(5,))
t1.start()
t2.start()

当然,伟大的同行已经为我们写好了Lock的上下文环境,所以你可以使用with达到与上面相同的结果:

import threading
import time

n = 0  # 设置n为一个临界资源
n_lock = threading.Lock()  # 为n设置一把锁


def a(x):
    with n_lock:  # 自动用n_lock锁住下面的代码,当代码执行完毕后自动释放
        time.sleep(3)  # 假设一个程序需要花3秒进行运算
        global n
        n += x
        print(n)


t1 = threading.Thread(target=a, args=(3,))
t2 = threading.Thread(target=a, args=(5,))
t1.start()
t2.start()

Lock是一把只能一个请求者使用的锁,我们可以使用Semaphore(信号量)设置使用数量:

from threading import Semaphore
import threading
import time

n_lock = Semaphore(2)  # 允许两个人同时请求n_lock.acquire()
n = 0


def a(x):
    with n_lock:  # 自动用n_lock锁住下面的代码,当代码执行完毕后自动释放
        time.sleep(3)  # 假设一个程序需要花3秒进行运算
        global n
        n += x
        print(n)


t1 = threading.Thread(target=a, args=(3,))
t2 = threading.Thread(target=a, args=(5,))
t3 = threading.Thread(target=a, args=(10,))
t1.start()
t2.start()
t3.start()

其实锁和信号量的作用远远不止于对临界资源的保护,他们还可以同步各个进程(线程)之间的进度。具体可以看一看操作系统的相关内容。

保存线程专有状态

如果你需要保存当前运行线程的专有状态,且这个状态对其它线程是不可见的,那就需要使用这个技巧。

import threading

t = threading.local()

def prin():
    t.inner_id = threading.get_ident()
    print(t.inner_id)


t1=threading.Thread(target=prin)
t2=threading.Thread(target=prin)
t3=threading.Thread(target=prin)
prin()
t1.start()
t2.start()
t3.start()
创建线程池

可以使用threadin库提供的线程池,但是这里我选了另一个库的线程池,下面的代码是自动调用线程池,你可以使用pool.submit()手动为线程池调用代码。

from concurrent.futures import ThreadPoolExecutor  # 引入线程池
import time


def t(n):
    return n ** n ** n


if __name__ == '__main__':
    start = time.time()
    pool = ThreadPoolExecutor(max_workers=4)  # 实例化一个线程池,最多使用4个线程
    # with ThreadPoolExecutor(max_workers=4) as pool:#这行代码等价于上一行代码
    r = pool.map(t, [6 for i in range(20000)])  # 使用这个线程池调用 t(6) 函数 20000次
    pool.shutdown()  # 等待所有线程执行完毕
    print(time.time() - start)  # 一个简单的计算

    start = time.time()
    for i in range(20000):  # 这是直接使用单线程进行20000次 t(6)的计算
        t(6)
    print(time.time() - start)
###############  执行结果 ################
19.726083517074585
17.407222509384155

WTF???为什么线程并发耗费的时间反而比单线程多?
这就是最开始所讲到的,由于GIL的限制,Python只能发挥一个核心的性能,多线程只能发挥一个核心的性能,对于计算密集的程序,反而会因为线程切换增加计算成本。
那么如何破除这种困境呢?解决方案之一就是使用多进程

多进程

多进程的原理和多线程类似,python通过创建多个进程进行计算,由于一个python进程有一个GIL锁,多进程就可以解除这样的限制。这使得Python可以利用到CPU的多个核心。
依然以上一个例子作为对比:

from concurrent.futures import ProcessPoolExecutor  # 引入进程池
import time


def t(n):
    return n ** n ** n


if __name__ == '__main__':
    start = time.time()
    pool = ProcessPoolExecutor(max_workers=8)  # 实例化一个进程池
    # with ProcessPoolExecutor(4) as pool:#这行代码等价于上一行代码
    r = pool.map(t, [6 for i in range(20000)])  # 使用这个进程池调用 t(6) 函数 20000次
    pool.shutdown()  # 等待所有进程执行完毕
    print(time.time() - start)  # 一个简单的计算

    start = time.time()
    for i in range(20000):  # 这是直接使用单线程进行20000次 t(6)的计算
        t(6)
    print(time.time() - start)
###############  执行结果 ################
10.398910760879517
17.622284173965454

我的计算机是8核心,所以我将进程池上限设置为8,你会发现,使用进程池的运行速度比线程池快了将近一倍,这比你想象中的要慢?有以下几点原因:

多进程的编码工具和多线程的工具十分相似,我看到有个小伙伴写的还不错,点这里即可查看。(实际上是我懒)

最后附一份思维导图

并发
上一篇 下一篇

猜你喜欢

热点阅读