Python进阶|并发编程
本篇来源于对小帅 b“通往Python高手之路”相关文章的学习
一、基础概念
首先我们需要了解一系列的基础概念:
1.进程
打开win的任务管理器,就可以看到电脑相关的进程。进程就是电脑里正在运行的程序(运行程序就会产生相应的进程),每个进程会占用系统的内存和CPU资源。
其中,我们会看到系统可以运行多个进程,我们称之为“多进程”。它允许电脑同时上网,听音乐等。但这个同时是相对而言的,是对多核CPU而言。如果电脑的CPU是单核的,那么一次只能执行一个进程。
2.线程
比如打开网易云音乐,我们既能听歌,又能看到同步滚动的歌词,即这个进程(网易云音乐)可以执行多个任务。这里的任务就是线程,可以同时做不同的任务,就是多线程。
3.并发
并发是指某一段时间内同时运行多个进程。有关双十一的报道常会提到“高并发”。
4.并行
并行是指某一时间点同时运行多个进程,而不是某一时间段内。
5.串行
当有多个任务时,按照顺序来执行任务。
6.线程同步
线程同步是指串行线程的实现方式,当说同步线程时,不是说同时进行多个任务,而是说一个线程执行完了才可以执行下一个线程。关于同步,常会提到“锁”。在执行多线程时,如果要同步,就需要加一把锁,来确保线程是同步的。
7.线程异步
与线程同步相对,即执行主线程时,可以同时去执行别的线程,不需要等到一个线程执行完后才执行下一个线程。
8.阻塞
阻塞是指调用后等待的状态,要等到有数据的时候才返回。
9.协程
协程可以理解为一种线程,只不过是轻量级的,可以由用户控制(线程是系统控制)。一个线程可以有多个协程,可以提高性能。
二、多线程和多进程
多线程(threading)和多进程(processing)的主要作用是是提高程序和 CPU 的使用率,从而高效运行,看起来像同一时间能完成多件事情。
- threadIng更多的是用在频繁 I/O (Input/Output)的并发场景中,比如爬取网站信息。
- processing则更多应用在充分使用CPU的并行场景,比如处理器是八核的,为了是程序运行能充分利用其八个核,可以使用多进程。
多线程和多进程听起来比较高级,但是实现起来并不难。
with ThreadPoolExecutor(max_workers=1) as executor:
future = executor.submit(pow, 2, 10)
以上代码表示,开启1个线程计算2的10次方。
Python提供了futures
,它封装好了多线程和多进程的方法和属性。这里,以爬取豆瓣电影top250为例,对比下普通方法和多线程方法。
#普通方法(单线程)
%%time
import requests
headers = {
"User-Agent":"Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/79.0.3945.88 Safari/537.36"
}
def requests_douban(url):
try:
response = requests.get(url, headers = headers)
if response.status_code == 200:
return response.content
except requests.RequestException:
return None
def main(page):
url = "https://movie.douban.com/top250?start=" + str(page*25) + "&filter="
print(url)
html = requests_douban(url)
print(f"通过 {url} :获取了 {len(html)} 数据量")
if __name__ == "__main__":
for i in range(0, 10):
main(i)
消耗的时间为:
Wall time: 2.93 s
%%time
import requests
import concurrent.futures
headers = {
"User-Agent":"Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/79.0.3945.88 Safari/537.36"
}
def request_douban(url):
try:
response = requests.get(url, headers = headers)
if response.status_code == 200:
print(f"通过 {url} :获取了 {len(html)} 数据量")
return response.content
except requests.RequestException:
return None
if __name__ == "__main__":
urls = ["https://movie.douban.com/top250?start=" + str(i*25) + "&filter=" for i in range(0, 10)]
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
executor.map(request_douban, urls)
消耗的时间为:
Wall time: 841 ms
可以看出,使用多线程方法要比单线程的快。
关于多进程,直接把ThreadPoolExecutor
改为ProcessPoolExecutor
就可以。
上面用到了submit
和map
,前者是传入参数而调用具体方法,后者是传入可迭代对象调用具体方法。
前面我们都用到了concurrent.futures
,这是Python3.2新加入的模。在学习之前,我们先了解下``Future`模式,它其实是生产-消费者模型的一种扩展,在生产-消费者模型中,生产者不关心消费者什么时候处理完数据,也不关心消费者处理的结果。而在 Future 中,我们可以让生产者等待消息处理完成,如果需要的话,我们还可以获取相关的计算结果。
当我们executor.submit 方法之后,就会得到一个 Future对象。它里面封装了一些异步操作的方法,比较常用的有:
- done() : 如果调用已被取消或正常结束那么返回 True 。
- result(timeout=None):返回调用返回的结果。
- add_done_callback(fn):当 future 被取消或完成运行时,将会调用 fn 方法。
- as_completed(fs): 当 future 执行完之后会返回一个迭代器。
比如,执行完成时回调一个done
方法:
import requests
import concurrent.futures
headers = {
"User-Agent":"Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/79.0.3945.88 Safari/537.36"
}
def request_douban(url):
try:
response = requests.get(url, headers = headers)
if response.status_code == 200:
print(f"通过 {url} :获取了 {len(html)} 数据量")
return response.content
except requests.RequestException:
return None
def done(fn):
if fn.done():
print("调用结束")
if __name__ == "__main__":
urls = ["https://movie.douban.com/top250?start=" + str(i*25) + "&filter=" for i in range(0, 10)]
with concurrent.futures.ProcessPoolExecutor(max_workers=5) as executor:
for url in urls:
future = executor.submit(request_douban, url)
future.add_done_callback(done)
以上,def done(fn)
自定义函数作用是是:如果future
执行结束,就打印“调用结束”。而future.add_done_callback(done)
是执行完成后用来调用我们自定义的函数。
还可以把每次得到的future
放入队列里等待执行:
import requests
import concurrent.futures
headers = {
"User-Agent":"Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/79.0.3945.88 Safari/537.36"
}
def request_douban(url):
try:
response = requests.get(url, headers = headers)
if response.status_code == 200:
print(f"通过 {url} :获取了 {len(html)} 数据量")
return response.content
except requests.RequestException:
return None
def done(fn):
if fn.done():
print("调用结束")
if __name__ == "__main__":
urls = ["https://movie.douban.com/top250?start=" + str(i*25) + "&filter=" for i in range(0, 10)]
task = []
with concurrent.futures.ProcessPoolExecutor(max_workers=5) as executor:
for url in urls:
future = executor.submit(request_douban, url)
future.add_done_callback(done)
task.append(future)
for future in task:
try:
data = future.result()
print(data)
except Exception as ex:
print("出现异常")
三、协程
协程可以理解为一种线程,只不过是轻量级的,可以由用户控制(线程是系统控制)。一个线程可以有多个协程,可以提高性能。协程的主要特点是,异步任务的调度可以由我们来决定。
在Python3.7中提供了用于实现协程的asyncio
模块。
#普通
import time
def main():
for i in range(3):
print("怕什么真理无穷")
time.sleep(1)
print("进一寸有一寸的欢喜")
print("----------")
if __name__ == "__main__":
start = time.perf_counter()
main()
end = time.perf_counter()
print("时间:", end-start)
怕什么真理无穷
进一寸有一寸的欢喜
----------
怕什么真理无穷
进一寸有一寸的欢喜
----------
怕什么真理无穷
进一寸有一寸的欢喜
----------
时间: 3.0063941000000796
#协程
import time
import asyncio
async def main():
for i in range(3):
print("怕什么真理无穷")
await asyncio.sleep(1)
print("进一寸有一寸的欢喜")
print("----------")
if __name__ == "__main__":
start = time.perf_counter()
asyncio.run(main())
end = time.perf_counter()
print("时间:", end-start)
怕什么真理无穷
进一寸有一寸的欢喜
----------
怕什么真理无穷
进一寸有一寸的欢喜
----------
怕什么真理无穷
进一寸有一寸的欢喜
----------
时间: 3.0115504999999994
在上面的代码中,导入了asyncio
模块,它可以让我们使用到协程,同时,在 main
函数前面加了关键词 asyncio
,这使得该函数变成了异步函数,即现在的 main
变成了一个协程对象。
print(main())
<coroutine object main at 0x00000219164C82C8>
coroutine 就是协程的意思
在这个异步函数里,我们又定义了一个关键词await
,它能够将执行阻塞,等执行完毕后才返回,这里是让程序等待一秒执行打印“进一寸有一寸的欢喜”。最后是通过asyncio.run(main())
来运行异步函数。
但会发现,加了协程后的代码速度并没有快多少。这是因为asyncio.run() 去执行 main 函数的时候,每次都遇到 wait ,然后就阻塞在这里,完了才继续。说白了,每次都执行1个任务,这就体现不出协程的用武之地了。
所以,为了看到协程的优势,我们需要创建多个任务,这里要用到asyncio.create_task()
#协程
import time
import asyncio
async def main():
print("怕什么真理无穷")
await asyncio.sleep(1)
print("进一寸有一寸的欢喜")
print("----------")
async def task():
tasks = [asyncio.create_task(main()) for i in range(3)]
for task in tasks:
await task
if __name__ == "__main__":
start = time.perf_counter()
asyncio.run(task())
end = time.perf_counter()
print("时间:", end-start)
怕什么真理无穷
怕什么真理无穷
怕什么真理无穷
进一寸有一寸的欢喜
----------
进一寸有一寸的欢喜
----------
进一寸有一寸的欢喜
----------
时间: 1.0051481999998941
很明显,耗时仅1秒。需要注意的是,怕什么真理无穷
是在main函数的 await asyncio.sleep(1)之前,所以每个任务执行到这里时,遇到await就会跳出当前任务,开始下一个任务,1秒后,事件调度器就会重新调度开始的任务,往下执行进一寸有一寸的欢喜
。
通过协程的方式,可以高效地执行并发任务,通过asynico
模块的await
和create_task
方式有效控制任务的切换。
参考资料:
Python concurrent.future 使用教程及源码初剖
执行多线程/多进程后得到的 Future 操作
协程在手,说走就走