PythonPython学习

Python并发编程——多线程

2021-05-13  本文已影响0人  xiaogp

摘要:Python多线程线程同步线程池GIL

线程概述

当一个进程里面只有一个线程时,叫做单线程,超过一个线程就叫做多线程,在多线程中会有一个主线程来完成整个进程从开始到结束的全部操作,而其他的线程会在主线程的运行过程中被创建或退出。


线程的创建和原理

(1)线程的模块

Python的线程模块主要是threading模块

(2)主线程的产生

一个Python程序就是一个进程,每个进程会默认启动一个线程,即主线程,可以通过threading模块中的current_thread函数查看

import threading
print(threading.current_thread())
<_MainThread(MainThread, started 140486979041088)>

current_thread返回的是当前线程的信息,默认是进程下的主线程,结果尖括号的第一个显示他是主线程_MainThread,圆括号中的MainThread是线程名称,started后面的是线程号,在操作系统中每一个线程都会有一个ID号,用为唯一标识。
threading模块中还有两个常用的函数,分别是

print(threading.enumerate())
print(threading.active_count())

在Python console中存在4个线程,分别打印出线程列表和线程数如下

[<_MainThread(MainThread, started 140486979041088)>, <Thread(Thread-2, started daemon 140486915045120)>, <HistorySavingThread(IPythonHistorySavingThread, started 140486836745984)>, <Thread(Thread-3, started daemon 140486828353280)>]
4

Python中所有进程的主线程,名称都是一样的叫做MainThread,而子线程的名字需要在创建时指定,如果不指定Python会默认起名字。

(3)创建子线程

创建子线程有两种方法,都是通过threading.Thread类来实现

先来看第一种方法直接实例化

import threading


def handle(sid):
    print("Thread {} run, info: {}".format(sid, threading.current_thread()))


for i in range(10):
    t = threading.Thread(target=handle, args=(i, ))
    t.start()  # 这个地方加t.join()是一样的,默认不守护进程,则主线程会等待子线程执行完毕再关闭

print(threading.current_thread())

执行效果如下,此时线程类型变为Thread,并且分别以数字ID命名,和主线程MainThread不一样,在一个新的线程是执行handle函数,此时函数内部的threading.current_thread()返回的就是当前线程的信息

Thread 0 run, info: <Thread(Thread-1, started 140591162775296)>
Thread 1 run, info: <Thread(Thread-2, started 140591154382592)>
Thread 2 run, info: <Thread(Thread-3, started 140591145989888)>
Thread 3 run, info: <Thread(Thread-4, started 140591145989888)>
Thread 4 run, info: <Thread(Thread-5, started 140591145989888)>
Thread 5 run, info: <Thread(Thread-6, started 140591145989888)>
Thread 6 run, info: <Thread(Thread-7, started 140591145989888)>
Thread 7 run, info: <Thread(Thread-8, started 140591145989888)>
Thread 8 run, info: <Thread(Thread-9, started 140591145989888)>
Thread 9 run, info: <Thread(Thread-10, started 140591145989888)>
<_MainThread(MainThread, started 140591187232576)>

Process finished with exit code 0

可以改变线程的名称,比如修改这一行代码

t = threading.Thread(target=handle, name="a" + str(i), args=(i, ))

输出如下

Thread 0 run, info: <Thread(a0, started 139623398315776)>
Thread 1 run, info: <Thread(a1, started 139623389923072)>
...

再看一下使用线程类,新建一个类对象继承threading.Thread,然后重写run方法,在调用start的时候线程对象会调用run方法

import threading


def handle(sid):
    print("Thread {} run, info: {}".format(sid, threading.current_thread()))


class MyClass(threading.Thread):
    def __init__(self, sid):
        threading.Thread.__init__(self)  # 重写__init__方法,等同于super().__init__()
        self.sid = sid

    def run(self):  # 在子类中如果方法与父类相同,父类的方法被覆盖失效
        handle(self.sid)


for i in range(10):
    t = MyClass(i)
    t.start() 

查看原类的run方法,可见如果不重写run函数,默认会执行传入的target参数

    def run(self):
        try:
            if self._target:
                self._target(*self._args, **self._kwargs)
        finally:
            del self._target, self._args, self._kwargs

返回输出如下

Thread 0 run, info: <MyClass(Thread-1, started 139963900937984)>
Thread 1 run, info: <MyClass(Thread-2, started 139963892545280)>
Thread 2 run, info: <MyClass(Thread-3, started 139963892545280)>
Thread 3 run, info: <MyClass(Thread-4, started 139963892545280)>
Thread 4 run, info: <MyClass(Thread-5, started 139963892545280)>
Thread 5 run, info: <MyClass(Thread-6, started 139963892545280)>
Thread 6 run, info: <MyClass(Thread-7, started 139963892545280)>
Thread 7 run, info: <MyClass(Thread-8, started 139963892545280)>
Thread 8 run, info: <MyClass(Thread-9, started 139963892545280)>
Thread 9 run, info: <MyClass(Thread-10, started 139963892545280)>

Process finished with exit code 0

除此之外在实例化类对象的时候还有一个参数daemon,默认是False,每个线程的守护进程参数和主线程一致,默认是False就是说进程退出时必须等待这个线程也退出,看一下源码

        if daemon is not None:
            self._daemonic = daemon
        else:
            self._daemonic = current_thread().daemon  # False

总结一下线程的创建,目的就是创建线程并且将执行函数绑定到线程上,有两种方法

(4)threadingThread类的方法
(5)线程内部状态及原理

线程状态分为5种:创建,就绪,运行,阻塞,退出,过程如下


互斥锁

多线程的优势在于并发,即可以同时运行多个任务,但是当多线程需要共享数据时,也会带来数据不同步的问题,互斥锁就是解决数据不同步的问题。

(1)多线程的问题

以一个例子来看,所有线程共享一个全局变量,并且在执行函数之后修改这个全局变量,但是执行时间不同

import time
import threading

a = 1


def handle(sid):
    global a
    a = a * 2
    time.sleep(sid % 2)
    print(sid, a)


class MyClass(threading.Thread):
    def __init__(self, sid):
        super().__init__()
        self.sid = sid

    def run(self):
        handle(self.sid)


threads = []
for i in range(10):
    t = MyClass(i)
    t.start()

for t in threads:
    t.join() # 主线程等待所有其他子线程执行完毕

输出结果如下,可见单数的id由于需要等待1s导致在sleep的时候其他线程还在更改共享数据,1,3,5,7四个线程输出的值都市9号线程的结果

0 2
2 8
4 32
6 128
8 512
1 1024
3 1024
7 1024
5 1024
9 1024

上述代码使用了threads列表join使得主线程必须等待子线程全部执行完毕再退出,如果在每一个start后面直接执行join,输出结果完全不一样

import time
import threading

a = 1


def handle(sid):
    global a
    a = a * 2
    time.sleep(sid % 2)
    print(sid, a)


class MyClass(threading.Thread):
    def __init__(self, sid):
        super().__init__()
        self.sid = sid

    def run(self):
        handle(self.sid)


for i in range(10):
    t = MyClass(i)
    t.start()
    t.join()

原因是如果在每个线程start后直接join,则主线程被每个子线程的start后阻塞,无法启动循环列表后面的子线程,相当于单线程

0 2
1 4
2 8
3 16
4 32
5 64
6 128
7 256
8 512
9 1024
(2)互斥所

锁的出现就是解决多线程之间的同步问题,其核心在于将执行程序中的某段代码保护起来(相当于锁起来),被锁起来的代码一次只能允许一个线程执行。在Python中使用threading.RLock类来创建锁,他有两个方法acquirerelease

import time
import threading
lock = threading.RLock()
a = 1


def handle(sid):
    lock.acquire()
    global a
    a = a * 2
    time.sleep(sid % 2)
    print(sid, a)
    lock.release()


class MyClass(threading.Thread):
    def __init__(self, sid):
        super().__init__()
        self.sid = sid

    def run(self):
        handle(self.sid)


threads = []
for i in range(10):
    t = MyClass(i)
    t.start()

for t in threads:
    t.join()

输出结果如下,此时线程序号和乘数的值顺序对应上了

0 2
1 4
2 8
3 16
4 32
5 64
6 128
7 256
8 512
9 1024

锁的注意事项:


信号量

信号量(semaphore)是一种带计数的线程同步机制,当调用release时,增加计数,当acquire时,减少计数,当计数为0时,自动阻塞,等待release被调用,可以实现并发限制,分为纯粹的信号量(Semaphore)和带有不按揭的信号量(BoundedSemaphore),区别如下

import threading
import datetime
import time

semaphore = threading.Semaphore(3)


def foo():
    semaphore.acquire()
    time.sleep(5)
    print("当前时间:", datetime.datetime.today().strftime("%Y-%m-%d %H:%M:%S"))
    semaphore.release()


class MyClass(threading.Thread):
    def __init__(self):
        super(MyClass, self).__init__()

    def run(self):
        foo()


threads = []
for i in range(10):
    t = MyClass()
    t.start()
    threads.append(t)

for t in threads:
    t.join()

上述代码设置信号量为3,开启10个线程,执行打印时间的函数,10个线程超出3个信号量限制,因此每当线程数量达到3个主线程阻塞,在循环内部卡住使得下面的子线程无法启动,最后的记过是没3个线程一批打印输出时间,一批和一批时间之间间隔5秒

当前时间: 2021-05-14 11:18:52
当前时间: 2021-05-14 11:18:52
当前时间: 2021-05-14 11:18:52
当前时间: 2021-05-14 11:18:57
当前时间: 2021-05-14 11:18:57
当前时间: 2021-05-14 11:18:57
当前时间: 2021-05-14 11:19:02
当前时间: 2021-05-14 11:19:02
当前时间: 2021-05-14 11:19:02
当前时间: 2021-05-14 11:19:07

再看BoundedSemaphore,如果调用多次release超出信号量上限就会报错,但是Semaphore不会报错,修改代码如下

semaphore = threading.BoundedSemaphore(3)


def foo():
    semaphore.acquire()
    time.sleep(5)
    print("当前时间:", datetime.datetime.today().strftime("%Y-%m-%d %H:%M:%S"))
    semaphore.release()
    semaphore.release()

输出报错如下,显示信号释放太多次

ValueError: Semaphore released too many times

使用线程池提升运行效率

线程池是一种多线程处理方式,是在正常的多线程处理方式上的一种优化

在需要频繁创建线程的系统中,一般都会使用线程池技术,原因是

(1)实现线程池

Python中使用concurrent.features模块下的ThreadPoolExecutor来实现线程池,只需要传入线程个数系统就能为该线程池初始化相应个数的线程,线程的使用有两种

从使用角度来看,抢占式更灵活,非抢占式更严格。

(2)单线程和多线程处理时间比较

先写一个简单的程序,使用单线程循环遍历执行一个函数

import time

person = ["Anna", "Gary", "All"]


def print_person(p):
    print(p)
    time.sleep(2)


t1 = time.time()
for p in person:
    print_person(p)
t2 = time.time()
print("耗时:", t2 - t1)

输出如下,耗时6s

Anna
Gary
All
耗时: 6.005347490310669

下一步实现抢占式线程池,使用with关键字创建线程池,将列表元素一个一个传入执行函数,调用实例化对象的submit方法将线程启动,代码如下

import time
from concurrent.futures import ThreadPoolExecutor

person = ["Anna", "Gary", "All"]


def print_person(p):
    print(p)
    time.sleep(2)


t1 = time.time()
with ThreadPoolExecutor(3) as executor:  # 使用with上下文
    for p in person:
        executor.submit(print_person, p)
t2 = time.time()
print("耗时:", t2 - t1)

输出如下,可见多线程的并发缩短了程序的运行时间

Anna
Gary
All
耗时: 2.002558708190918

进一步实现非抢占式线程池,也是使用with关键字,他是使用实例化线程池的map方法启动线程,并且传入函数参数时直接传入列表

t1 = time.time()
with ThreadPoolExecutor(3) as executor:
    executor.map(print_person, person)
t2 = time.time()
print("耗时:", t2 - t1)

输出如下,也是2s,和抢占式效率差不多

Anna
Gary
All
耗时: 2.0014290809631348

多线程和GIL

(1)知识准备

GIL 是Python的全局解释器锁,同一进程中假如有多个线程运行,一个线程在运行Python程序的时候会霸占Python解释器,使该进程内的其他线程无法运行。在GIL中,全局锁并不是一直锁定的,比如当线程遇到IO等待或ticks计数(Python3改为计时器,执行时间达到阈值后,当前线程释放GIL)达到100,cpu会做切换,把cpu的时间片让给其他线程执行,此时GIL释放,释放时候所有线程继续进行锁竞争,Python里一个进程永远只能同时执行一个线程

(2)IO密集型和CPU密集型多线程测试

以爬取网页进行解析进行测试,线程池最大线程数8个,爬去50次,总共需要1.4秒

import re
import time
import requests
from concurrent.futures import ThreadPoolExecutor

headers = {
      ...
}


def handle(sid):
    response = requests.get("https://movie.douban.com/top250", headers=headers)
    res = re.findall(r'alt="(.*?)"', response.text, re.S)
    print(str(sid) + ",".join(res))


sid_list = list(range(50))
t1 = time.time()
with ThreadPoolExecutor(8) as executor:
    executor.map(handle, sid_list)
t2 = time.time()
print("耗时:", t2 - t1)  # 1.496328353881836

单线程模式测试如下需要40s,可见多线程应对IO密集型可以实现并发提高效率

t1 = time.time()
for i in range(50):
    handle(i)
t2 = time.time()
print("耗时:", t2 - t1)  # 40s

再测试一下CPU密集型,此处以geopy计算球面距离为例,这个计算包含多个三角函数的计算,耗时1.5s

import time
from concurrent.futures import ThreadPoolExecutor

from geopy.distance import great_circle


def handle(sid):
    for i in range(1000):
        res = great_circle((41.49008, -71.312796), (41.499498, -81.695391)).meters
    print(str(sid) + str(res))


sid_list = list(range(100))
t1 = time.time()
with ThreadPoolExecutor(8) as executor:
    executor.map(handle, sid_list)
t2 = time.time()
print("耗时:", t2 - t1)  # 1.5076820850372314

再测一下单线程,竟然比多线程耗时低,可见在CPU密集型中线程频繁切换反而多线程效率更低

t1 = time.time()
for i in range(100):
    handle(i)
t2 = time.time()
print("耗时:", t2 - t1)  # 1.1195
上一篇 下一篇

猜你喜欢

热点阅读