python小小白

Python进阶|并发编程

2020-01-03  本文已影响0人  凡有言说

本篇来源于对小帅 b“通往Python高手之路”相关文章的学习

一、基础概念
首先我们需要了解一系列的基础概念:
1.进程
打开win的任务管理器,就可以看到电脑相关的进程。进程就是电脑里正在运行的程序(运行程序就会产生相应的进程),每个进程会占用系统的内存和CPU资源。

其中,我们会看到系统可以运行多个进程,我们称之为“多进程”。它允许电脑同时上网,听音乐等。但这个同时是相对而言的,是对多核CPU而言。如果电脑的CPU是单核的,那么一次只能执行一个进程。

2.线程
比如打开网易云音乐,我们既能听歌,又能看到同步滚动的歌词,即这个进程(网易云音乐)可以执行多个任务。这里的任务就是线程,可以同时做不同的任务,就是多线程。

3.并发
并发是指某一段时间内同时运行多个进程。有关双十一的报道常会提到“高并发”。

4.并行
并行是指某一时间点同时运行多个进程,而不是某一时间段内。

5.串行
当有多个任务时,按照顺序来执行任务。

6.线程同步
线程同步是指串行线程的实现方式,当说同步线程时,不是说同时进行多个任务,而是说一个线程执行完了才可以执行下一个线程。关于同步,常会提到“锁”。在执行多线程时,如果要同步,就需要加一把锁,来确保线程是同步的。

7.线程异步
与线程同步相对,即执行主线程时,可以同时去执行别的线程,不需要等到一个线程执行完后才执行下一个线程。

8.阻塞
阻塞是指调用后等待的状态,要等到有数据的时候才返回。

9.协程
协程可以理解为一种线程,只不过是轻量级的,可以由用户控制(线程是系统控制)。一个线程可以有多个协程,可以提高性能。

二、多线程和多进程
多线程(threading)和多进程(processing)的主要作用是是提高程序和 CPU 的使用率,从而高效运行,看起来像同一时间能完成多件事情。

多线程和多进程听起来比较高级,但是实现起来并不难。

with ThreadPoolExecutor(max_workers=1) as executor:
    future = executor.submit(pow, 2, 10)

以上代码表示,开启1个线程计算2的10次方。

Python提供了futures,它封装好了多线程和多进程的方法和属性。这里,以爬取豆瓣电影top250为例,对比下普通方法和多线程方法。

#普通方法(单线程)
%%time

import requests

headers = {
    "User-Agent":"Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/79.0.3945.88 Safari/537.36"
}

def requests_douban(url):
    try:
        response = requests.get(url, headers = headers)
        if response.status_code == 200:
            return response.content
    except requests.RequestException:
        return None
    
def main(page):
    url = "https://movie.douban.com/top250?start=" + str(page*25) + "&filter="
    print(url)
    html = requests_douban(url)
    print(f"通过 {url} :获取了 {len(html)} 数据量")

if __name__ == "__main__":
    for i in range(0, 10):
        main(i)

消耗的时间为:

Wall time: 2.93 s
%%time

import requests
import concurrent.futures

headers = {
    "User-Agent":"Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/79.0.3945.88 Safari/537.36"
}

def request_douban(url):
    try:
        response = requests.get(url, headers = headers)
        if response.status_code == 200:
            print(f"通过 {url} :获取了 {len(html)} 数据量")
            return response.content
    except requests.RequestException:
        return None
    
if __name__ == "__main__":
    urls = ["https://movie.douban.com/top250?start=" + str(i*25) + "&filter=" for i in range(0, 10)]
    
    with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
        executor.map(request_douban, urls)

消耗的时间为:

Wall time: 841 ms

可以看出,使用多线程方法要比单线程的快。

关于多进程,直接把ThreadPoolExecutor改为ProcessPoolExecutor就可以。

上面用到了submitmap,前者是传入参数而调用具体方法,后者是传入可迭代对象调用具体方法。

前面我们都用到了concurrent.futures,这是Python3.2新加入的模。在学习之前,我们先了解下``Future`模式,它其实是生产-消费者模型的一种扩展,在生产-消费者模型中,生产者不关心消费者什么时候处理完数据,也不关心消费者处理的结果。而在 Future 中,我们可以让生产者等待消息处理完成,如果需要的话,我们还可以获取相关的计算结果。

当我们executor.submit 方法之后,就会得到一个 Future对象。它里面封装了一些异步操作的方法,比较常用的有:

比如,执行完成时回调一个done方法:

import requests
import concurrent.futures

headers = {
    "User-Agent":"Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/79.0.3945.88 Safari/537.36"
}

def request_douban(url):
    try:
        response = requests.get(url, headers = headers)
        if response.status_code == 200:
            print(f"通过 {url} :获取了 {len(html)} 数据量")
            return response.content
    except requests.RequestException:
        return None

def done(fn):
    if fn.done():
        print("调用结束")
    
if __name__ == "__main__":
    urls = ["https://movie.douban.com/top250?start=" + str(i*25) + "&filter=" for i in range(0, 10)]
    
    with concurrent.futures.ProcessPoolExecutor(max_workers=5) as executor:
        for url in urls:
            future = executor.submit(request_douban, url)
            future.add_done_callback(done)

以上,def done(fn)自定义函数作用是是:如果future执行结束,就打印“调用结束”。而future.add_done_callback(done)是执行完成后用来调用我们自定义的函数。

还可以把每次得到的future放入队列里等待执行:

import requests
import concurrent.futures

headers = {
    "User-Agent":"Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/79.0.3945.88 Safari/537.36"
}

def request_douban(url):
    try:
        response = requests.get(url, headers = headers)
        if response.status_code == 200:
            print(f"通过 {url} :获取了 {len(html)} 数据量")
            return response.content
    except requests.RequestException:
        return None

def done(fn):
    if fn.done():
        print("调用结束")
    
if __name__ == "__main__":
    urls = ["https://movie.douban.com/top250?start=" + str(i*25) + "&filter=" for i in range(0, 10)]
    task = []
    
    with concurrent.futures.ProcessPoolExecutor(max_workers=5) as executor:
        for url in urls:
            future = executor.submit(request_douban, url)
            future.add_done_callback(done)
            task.append(future)
    
    for future in task:
        try:
            data = future.result()
            print(data)
        except Exception as ex:
            print("出现异常")

三、协程
协程可以理解为一种线程,只不过是轻量级的,可以由用户控制(线程是系统控制)。一个线程可以有多个协程,可以提高性能。协程的主要特点是,异步任务的调度可以由我们来决定。

在Python3.7中提供了用于实现协程的asyncio模块。

#普通
import time

def main():
    for i in range(3):
        print("怕什么真理无穷")
        time.sleep(1)
        print("进一寸有一寸的欢喜")
        print("----------")
        
if __name__ == "__main__":
    start = time.perf_counter()
    main()
    end = time.perf_counter()
    print("时间:", end-start)


怕什么真理无穷
进一寸有一寸的欢喜
----------
怕什么真理无穷
进一寸有一寸的欢喜
----------
怕什么真理无穷
进一寸有一寸的欢喜
----------
时间: 3.0063941000000796
#协程
import time
import asyncio

async def main():
    for i in range(3):
        print("怕什么真理无穷")
        await asyncio.sleep(1)
        print("进一寸有一寸的欢喜")
        print("----------")
        
if __name__ == "__main__":
    start = time.perf_counter()
    asyncio.run(main())
    end = time.perf_counter()
    print("时间:", end-start)


怕什么真理无穷
进一寸有一寸的欢喜
----------
怕什么真理无穷
进一寸有一寸的欢喜
----------
怕什么真理无穷
进一寸有一寸的欢喜
----------
时间: 3.0115504999999994

在上面的代码中,导入了asyncio模块,它可以让我们使用到协程,同时,在 main函数前面加了关键词 asyncio,这使得该函数变成了异步函数,即现在的 main变成了一个协程对象。

print(main())

<coroutine object main at 0x00000219164C82C8>

coroutine 就是协程的意思

在这个异步函数里,我们又定义了一个关键词await,它能够将执行阻塞,等执行完毕后才返回,这里是让程序等待一秒执行打印“进一寸有一寸的欢喜”。最后是通过asyncio.run(main())来运行异步函数。

但会发现,加了协程后的代码速度并没有快多少。这是因为asyncio.run() 去执行 main 函数的时候,每次都遇到 wait ,然后就阻塞在这里,完了才继续。说白了,每次都执行1个任务,这就体现不出协程的用武之地了。

所以,为了看到协程的优势,我们需要创建多个任务,这里要用到asyncio.create_task()

#协程
import time
import asyncio

async def main():
        print("怕什么真理无穷")
        await asyncio.sleep(1)
        print("进一寸有一寸的欢喜")
        print("----------")
        
async def task():
    tasks = [asyncio.create_task(main()) for i in range(3)]
    
    for task in tasks:
        await task
        
if __name__ == "__main__":
    start = time.perf_counter()
    asyncio.run(task())
    end = time.perf_counter()
    print("时间:", end-start)


怕什么真理无穷
怕什么真理无穷
怕什么真理无穷
进一寸有一寸的欢喜
----------
进一寸有一寸的欢喜
----------
进一寸有一寸的欢喜
----------
时间: 1.0051481999998941

很明显,耗时仅1秒。需要注意的是,怕什么真理无穷是在main函数的 await asyncio.sleep(1)之前,所以每个任务执行到这里时,遇到await就会跳出当前任务,开始下一个任务,1秒后,事件调度器就会重新调度开始的任务,往下执行进一寸有一寸的欢喜

通过协程的方式,可以高效地执行并发任务,通过asynico模块的awaitcreate_task方式有效控制任务的切换。

参考资料:
Python concurrent.future 使用教程及源码初剖
执行多线程/多进程后得到的 Future 操作
协程在手,说走就走

上一篇下一篇

猜你喜欢

热点阅读