asyncio

2023-05-22  本文已影响0人  木叶苍蓝
  1. 协程
    想学 asyncio,得先了解协程,协程是根本!
    协程(Coroutine),也可以被称为微线程,是一种用户态内的上下文切换技术,简而言之,其实就是通过一个线程实现代码块相互切换执行。
def func1():
    print(1)
    time.sleep(1)
    print(2)

def func2():
    print(3)
    time.sleep(1)
    print(4)

func1()
func2()

上述代码是普通的函数定义和执行,按流程分别执行两个函数中的代码,并先后会输出1,2,3,4。但如果介入协程技术那么就可以实现函数见代码切换执行,最终输入:1,3,2,4。
在Python 中有多种方式可以实现协程,例如:

1.1 greenlet
greenlet 是一个第三方模块,需要提前安装 pip3 install greenlet 才能使用

from greenlet import greenlet

def func1():
    print(1)
    gr2.switch()
    print(2)

def func2():
    print(3)
    gr1.switch()
    print(4)

gr1 = greenlet(func1)
gr2 = greenlet(func2)

gr1.switch()

注意:switch 中也可以传递参数用于切换执行时相互传递值。

1.2 yield
基于 Python 的生成器的 yield 和 yield form 关键字实现协程代码

def func1():
    yield 1
    yield form func2()
    yield 2

def func2():
    yield 3
    yield 4

f1 = func1()
for item in f1:
    print(item)

注意:yield form 关键字是在 Python3.3 中引入的

1.3 asyncio
在 Python3.4 之前官方未提供协程的类库,一般大家都是使用 greenlet 等其他方式来实现。在 Python3.4 发布后官方正式支持协程,即:asyncio 模块

import asyncio

@asyncio.coroutine
def func1():
    print(1)
    yield from asyncio.sleep(2)  # 遇到 IO 耗时操作,自动化切换到 tasks 中的其他任务
    print(2) 

@asyncio.coroutine
def func2():
    print(3)
    yield from asyncio.sleep(2)  # 遇到 IO 耗时操作,自动化切换到 tasks 中的其他任务
    print(4)

tasks = [
    asyncio.ensure_future(func1()),
    asyncio.ensure_future(func2())
]

loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))

注意:基于 asyncio 模块实现的协程比之前的要更厉害,因为它的内部还集成了遇到IO耗时操作自动切换的功能。
1.4 async & await
async & await 关键字在 Python3.5 版本中正式引入,基于它编写的协程代码其实就是上一个实例的加强版。让代码可以更加简便。
Python 3.8 之后,@sayncio.coroutine 装饰器就会被移除,推荐使用 async & awit 关键字来实现协程代码

import asyncio

async def func1():
    print(1)
    await asyncio.sleep(2)
    print(2)

async def func2():
    print(3)
    await syncio.sleep(2)
    print(4)

tasks = [
    asyncio.ensure_future(func1()),
    asyncio.ensure_future(func2())
]

loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))

1.5 小结
关于协程有很多种实现方式,目前主流使用的是 Python 官方推荐的asyncio 模块和 async & await 关键字的方式,例如:在 tonado, sanic, fastapi, django3 中均已支持。
接下来,我们也会针对 asyncio 模块 + async & await 关键字进行更加详细的讲解。

  1. 协程的意义
    通过学习,我们已经了解到协程可以通过一个线程在多个上下文中进行来回切换执行。
    但是,协程来回切换执行的意义何在呢?
"""
下载图片使用第三方模块 requests, 请提前安装 pip3 install requests
"""
import requests
def download_image(url):
    print("开始下载:", url)
    # 发送网络请求,下载图片
    response = requests.get(url)
    print("下载完成")
    file_name = url.rsplit('_')[-1]
    with open(file_name, mode='wb') as file_object:
        file_object.write(response.content)

if __name__ == "__main__":
    url_list = [
        'https://www3.autoimg.cn/newsdfs/g26/M02/35/A9/120x90_0_autohomecar__ChsEe12AXQ6AOOH_AAFocMs8nzU621.jpg',
        'https://www2.autoimg.cn/newsdfs/g30/M01/3C/E2/120x90_0_autohomecar__ChcCSV2BBICAUntfAADjJFd6800429.jpg',
        'https://www3.autoimg.cn/newsdfs/g26/M0B/3C/65/120x90_0_autohomecar__ChcCP12BFCmAIO83AAGq7vK0sGY193.jpg'
    ]
    for item in url_list:
        download_image(item)
"""
下载图片使用第三方模块 aiohttp 请提前安装 aiohttp
"""
import aiohttp
import asyncio

async def fetch(session, url):
    print("发送请求:", url)
    async with session.get(url, verify_ssl=False) as response:
        content = await response.content.read()
        file_name = url.rsplit('_')[-1]
        with open(file__name, mode='wb') as file_object:
            file_object.write(content)

async def main():
    async with aiohttp.ClientSession() as session:
        url_list = [
            'https://www3.autoimg.cn/newsdfs/g26/M02/35/A9/120x90_0_autohomecar__ChsEe12AXQ6AOOH_AAFocMs8nzU621.jpg',
            'https://www2.autoimg.cn/newsdfs/g30/M01/3C/E2/120x90_0_autohomecar__ChcCSV2BBICAUntfAADjJFd6800429.jpg',
            'https://www3.autoimg.cn/newsdfs/g26/M0B/3C/65/120x90_0_autohomecar__ChcCP12BFCmAIO83AAGq7vK0sGY193.jpg'
        ]
        tasks = [asyncio.create_task(fetch(session, url)) for url in url_list]
        await asyncio.wait(tasks)
if __name__ == '__main__':
    asyncio.run(main())

上述两种的执行对比之后发现,基于协程的异步编程要比同步编程的效率高了很多。因为:

  1. 异步编程
    基于 async & await 关键字的协程可以实现异步编程,这也是目前 python 异步相关的主流技术。
    想要正在的了解 Python 中内置的异步编程,根据下文的顺序一点点来看。

3.1 事件循环
事件循环,可以把它当做是一个 while 循环,这个 while 循环在周期性的运行并执行一个任务,在特定条件下终止循环。

# 伪代码
任务列表 = [任务1, 任务2, 任务3, ...]
while True:
    可执行的任务列表,以完成的任务列表 = 去任务列表中检查所有的任务,将'可执行'和'以完成'的任务返回
    for 就绪任务 in 以准备就绪的任务列表:
        执行已就绪的任务
    for 以完成的任务 in 以完成的任务列表:
        在任务列表中移除 以完成的任务
  
    如果 任务列表 中的任务都已完成,则终止循环

在编写程序时候可以通过如下代码来获取和创建事件循环

import asyncio
loop = asyncio.get_event_loop()

3.2 协程和异步编程
协程函数,定义形式为 async def 的函数。
协程对象,调用协程函数所返回的对象

# 定义一个协程函数
async def func():
    pass

# 调用协程函数,返回一个协程对象
result  = func()

注意:调佣协程函数时,函数内部代码不会执行,只是会返回一个协程对象。
3.2.1 基本应用
程序中,如果想要执行协程函数的内部代码,需要事件循环协程对象配合才能实现,如:

import asyncio

asyn def func():
    print("协程内部代码")

# 调用协程函数,返回一个协程对象
result = func()

# 方式一
loop = asyncio.get_event_loop()  # 创建一个事件循环
loop.run_until_complete(result)  # 将协程当做任务提交到事件循环的任务列表中,协程执行完成之后终止
# 方式二
# 本质上是一样的,内部先创建事件循环,然后执行 run_until_complete,一个简便的写法
asyncio.run(result)  # Python3.7 中加入的 asyncio 模块

这个过程可以理解为:将协程当做任务添加到事件循环的任务列表,然后事件循环检测列表中的协程是否已准备就绪(默认可以理解为就绪状态),如果准备就绪则执行其内部代码。
3.2.2 await
await 是一个只能在协程函数中使用的关键字,用于遇到 IO 操作时挂起当前协程(任务),当前协程(任务)挂起过程中事件循环可以去执行其他协程(任务),当前协程IO处理完成时,可以再次切换回来执行 await 之后的代码。

import asyncio

async def func():
    print("执行协程函数内部代码")
    # 遇到IO操作挂起当前协程(任务),等IO操作完成之后再继续往下执行。
    # 当前协程挂起时,事件循环可以去执行其他协程
    response = await asyncio.sleep(2)
    print("IO 请求结束,结果为:", response) 

result = func()
asyncio.run(result)
import asyncio

async def others():
    print("start")
    await asyncio.sleep(2)
    print("end")
    return "返回值"

async def func():
    print("执行协程函数内部代码")
    # 遇到 IO 操作挂起当前协程(任务),等IO操作完成之后再继续往下执行。当前协程挂起时,事件循环可以去执行其他协程(任务)。
    response = await others()
    print("IO 请求结束,结果为:", response)

asyncio.run(func())
import asynic

async def others():
    print("start")
    await asynic.sleep(2)
    print("end")

async def func():
    print("执行协程函数内部代码")
    response1 = await others()
    print("IO 请求结束,结果为:", response1)
    response2 = await others()
    print("IO 请求结束,结果为:", response2)

asyncio.run(func())

3.2.3 Task 对象
Tasks 用于并发调度协程,通过asyncio.create_task(协程对象)的方式创建 Task 对象,这样可以让协程加入事件循环中等待被调度执行。除了使用 asyncio.create_task() 函数以外,还可以用低层级的loop.create_task()ensure_future() 函数。不建议手动实例化 Task 对象。
本质上是将协程对象封装成 task 对象,并将协程立即加入事件循环,同时追踪协程的状态。
注意:asyncio.create_task() 函数在 Python3.7中被加入。在 Python3.7 之前,可以改用低层级的 asyncio.ensure_future()

import asyncio

async def func():
    print(1)
    await asyncio.sleep(2)
    print(2)
    return "返回值"

async def main():
    print("main 开始")
    # 创建协程,将协程封装到一个 Task 对象中并立即添加到事件循环的任务列表中,等待事件循环去执行(默认是就绪状态)。
    task1 = asyncio.create_task(func())
    task2 = asyncio.create_task(func())
    print("main 结束")

    ret1 = await task1
    ret2 = await task2
    print(ret1, ret2)

asyncio.run(main())
import asyncio
async def func():
    print(1)
    await asyncio.sleep(2)
    print(2)
    return "返回值"

async def main():
    print("main 开始")
    task_list = [
        asyncio.create_task(func(), name="n1"),
        asyncio.create_task(func(), name="n2")
    ]
    print("main 结束")
    # 当执行某协程遇到IO操作时,会自动切换执行其他任务。
    # 此处 await  是等待所有协程执行完毕,并将所有协程的返回值保存到 done。
    # 如果设置了 timeout 值,则意味着此处最多等待的秒,完成的协程返回值写入到 done 中,未完成的则写到 pending 中。
    done, pending = await asyncio.wait(task_list, timeout=None)
    print(done, pending)
  
asyncio.run(main())

注意:asyncio.wait 源码内部会对列表中的每个协程执行 ensure_future 从而封装为 Task 对象,所以在和 wait 配置使用时 task_list 的值为 [func(), func()] 也是可以的。

import asyncio

async def func():
    print("执行协程函数内部代码")
    response = await asyncio.sleep(2)
    print("IO 请求结束,结果为:", response)

coroutine_list = [func(), func()]
# 错误:coroutine_list = [asyncio.create_task(func()), asyncio.create_task(func())]
# 此处不能直接 asyncio.create_task,因为将 Task 立即加入到事件循环的任务列表
# 但此时事件循环还未创建,所以会报错
# 使用 asyncio.wait 将列表封装为一个协程,并调用 asyncio.run 实现执行两个协程
# asynico.wait 内部会对列表中的每个协程执行 ensure_future,封装为 Task 对象。
done, pending = asyncio.run(asyncio.wait(coroutine_list))

3.2.4 asyncio.Future 对象
asyncio 中的 Future 对象是一个相对更偏向底层的的对象,通常我们不会直接用这个对象,而是直接使用 Task 对象来完成任务和状态的追踪(Task 是 Future 的子类)
Future 为我们提供了异步编程中的最终结果的处理(Task 类也具备状态处理的功能)

async def main():
    # 获取当前事件循环
    loop = asyncio.get_running_loop()
    # 创建一个任务(Future 对象),这个任务什么都不干
    fut = loop.create_future()
    # 等待任务最终结果(Future 对象) ,没有结则会一直等下去
    await fut

asyncio.run(main(
import asyncio
async def set_after():
    await asyncio.sleep(2)
    fut.set_result("666")

async def main();
    # 获取当前事件循环
    loop = asyncio.get_running_loop()
    # 创建一个任务(Future 对象),没绑定任何行为,则这个任务永远不知道什么时候结束
    fut = loop.create_future()
    # 创建一个任务(Task 对象),绑定了 set_after 函数,函数内部在2s之后,会给 fut 赋值
    # 即手动设置 future 任务的最终结果,那么 fut 就可以结束了
    await loop.create_task(set_after(fut))
    # 等待 Future 对象获取最终结果,否则一直等下去
    data = await fut

asyncio.run(main())

Future 对象本身函数进行绑定,所以想要让事件循环获取 Future 的结果,则需要手动设置。而Task 对象继承了 Future 对象,其实就对 Future 进行扩展,他可以实现在对应绑定函数执行完成之后,自动执行 set_result ,从而实现自动结束。
虽然,平时使用的是 Task 对象,但对于结果的处理本质是基于Future 对象来实现的。
扩展:支持 await 对象 语法的对象被称为可等待对象,所以 协程对象,Task对象,Future 对象,都是可以被称为可等待对象。

3.2.5 future.Future 对象
在 Python 的 concurrent.futures 模块中也有一个 Future 对象,这个对象是基于线程池和进程池实现异步操作时使用的对象。

import time
from concurrent.futures import Future
from concurrent.futures.thread import ThreadPoolExecutor
from concurrent.futures.process import ProcessPoolExecutor

def func(value):
    time.sleep(1)
    print(value)

pool = ThreadPoolEexecutor(max_workers=5)
# 或 pool = ProcessPoolExecutor(max_workers=5)
for i in range(10):
    fut = pool.submit(func, i)
    print(fut)

两个 Future 对象是不同的,他们是为了不同的应用场景而设计,例如: concurrent.futures.Future 不支持 await 语法等。
在 Python 提供了一个将 futures.Future 对象包装成 asyncio.Future 对象的函数 asynic.wrap_future。
接下来你肯定问:为什么 Python 会提供这种功能?
其实,一般在程序开发中我们要么统一使用 asycio 的协程实现异步操作,要么都使用进程池和线程池实现异步操作。但如果二者混搭时,那么就会用到此功能。

import time
import asyncio
import concurrent.futures

def func1():
    # 某个耗时操作
    time.sleep(2)
    return "success"


async def main():
    loop = asyncio.get_running_loop()
    #  1. Run in the default loop's executor(默认ThreadPoolExecutor)
    # 第一步:内部会先调用 ThreadPoolExecutor 的 submit 方法去线程池中申请一个线程去执行 func1 函数,并返回一个 concurrent.futures.Future 对象
    # 第二步:调用 asyncio.wrap_future 将 concurrent.futures.Future 对象包装为 asycio.Future 对象。
    # 因为 concurrent.futures.Future 对象不支持 await 语法,所以需要包装为 asyncio.Future 对象才能使用。
    fut = loop.run_in_executor(None, func1)
    result = await fut
    print("default thread pool", result)
    # 2. Run in a custom thread pool
    with concurrent.futures.ThreadPoolExecutor() as pool:
        result = await loop.run_in_executor(pool, func1)
        print('custom thread pool', result)

    # 3. Run in a custom process pool:
    with concurrent.futures.ProcessPoolExecutor() as pool:
        result = await loop.run_in_executor(pool, func1)
        print("custom process pool", result)

asyncio.run(main())

应用场景:当项目以协程式的异步编程开发时,如果要使用一个第三方模块,而第三方模块不支持协程方式异步编程时,就需要用到这个功能。例如:

import asyncio
import requests

async def download_image(url): 
    # 发送网络请求,下载图片(遇到网络下载图片的IO请求,自动化切换到其他任务)
    print("开始下载:", url)
    loop = asyncio.get_event_loop()
    # requests 模块默认不支持异步操作,所以就使用线程池来配合实现
    future = loop.run_in_executor(None, requests.get, url)
    response = await future
    print("下载完成")
    file_name = url.rsplit('_')[-1]
    with open(file_name, mode='wb') as file_object:
        file_object.write(response.content)

if __name__ == "__main__":
    url_list = [
        'https://www3.autoimg.cn/newsdfs/g26/M02/35/A9/120x90_0_autohomecar__ChsEe12AXQ6AOOH_AAFocMs8nzU621.jpg',
        'https://www2.autoimg.cn/newsdfs/g30/M01/3C/E2/120x90_0_autohomecar__ChcCSV2BBICAUntfAADjJFd6800429.jpg',
        'https://www3.autoimg.cn/newsdfs/g26/M0B/3C/65/120x90_0_autohomecar__ChcCP12BFCmAIO83AAGq7vK0sGY193.jpg'  
]
    tasks = [download_image(url) for url in url_list]
    loop = asyncio.get_event_loop()
    loop.run_until_complete(asyncio.wait(tasks))

3.2.6 异步迭代器
什么是异步迭代器?
实现了 __aiter__()__anext__() 方法的对象。__anext__必须返回一个 awaitable 对象。async for 会处理异步迭代器的 __anext__() 方法所返回的可等待对象,直到其引发一个 StopAsyncIteration 异常。
什么是异步可迭代对象?
可以在 async for 语句中被使用的对象。必须通过它的 __aiter__() 方法返回一个 asynchronous iterator。

import asyncio
class Reader(object):
    ''' 自定义异步迭代器 (同时也是异步可迭代对象) '''
    def __init__(self):
        self.count = 0

    async def readline(self):
        self.count += 1
        if self.count == 100:
            return None
        return self.count

    def __aiter__(self):
        return self

    async def __anext__(self):
        val = await self.readline()
        if val == None:
            raise StopAsyncIteration
        return val

async def func():
    # 创建异步可迭代对象
    async_iter = Reader()
    # async for 必须要放在 async def  函数内,否则语法错误
    async for item in async_iter:
        print(item)

asyncio.run(func())

异步迭代器其实没有什么太大的作用,只是支持了 async for 语法

3.2.6 异步上下文管理器
此种对象通过定义 __aenter__()__aexit__() 方法来对 async with 语句中的环境进行控制。

import asyncio

class AsyncContextManager:
    def __init__(self):
        self.conn = conn

    async def do_something(self):
        # 异步操作数据库
        return 666

    async def __aenter__(self):
        # 异步链接数据库
        self.conn = await asyncio.sleep(1)
        return self

    async def __aexit__(self, exc_type, exc, tb):
        # 异步关闭数据库链接
        await asyncio.sleep(1)

async def func():
    async with AsyncContextManager() as f:
        result = await f.do_something()
        print(result)
asyncio.run(func())

这个异步上下文管理器还是比较有用的,平时在开过程中 打卡,处理,关闭 操作时,就可以用这种方式来处理。
3.3 小结
在程序中只要看到 asyncawait 关键字,其内部就是基于协程实现的异步编程,这种异步编程是通过一个线程在 IO 等待时间去执行其他任务,从而实现并发。
以上就是异步编程的常见操作。
https://docs.python.org/zh-cn/3.8/library/asyncio.html

  1. uvloop
    Python 标准库中提供了 asyncio 模块,用于支持基于协程的异步编程。
    uvloop 是 asyncio 中的事件循环替代方案,替换后可以使得 asyncio 性能提高。事实上,uvloop 要比 nodejs,gevent 等其他 python 异步框架至少快2倍,性能可以比肩Go语言。
    安装 uvloop pip3 install uvloop

在项目中想要使用 uvloop 替换 asyncio 的事件循环也非常简单,只要在代码中这么做就行:

import asyncio
import uvloop

asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
# 编写 asyncio 的代码,与之前写的代码一致
# 内部的事件循环自动变成 uvloop 
asyncio.run(...)

注意:知名的 asgi uvicorn 内部就是使用的 uvloop 的事件循环

  1. 事件案例
    为了更好理解,上述所有实例的 IO 情况都是以 asyncio.sleep为例,而真实的项目开发中会用到很多 IO 的情况。
    5.1 异步Redis
    当通过 Python 去操作 Redis 时,链接和设置值,获取值这些都涉及网络 IO 请求,使用 asyncio 异步的方式可以在 IO 等待时去做一些其他任务,从而提升性能。

安装 Python 异步操作 Redis 模块
pip3 install aioredis

实例一:异步操作 Redis

#!/usr/bin/env python
# -*- coding:utf-8 -*-

import asyncio
import aioredis

async def execute(address, password):
    print("开始执行", address)
    # 网络 IO 操作:创建 Redis 连接
    redis = await aioredis.create_redis(address, password=password)
    # 网络 IO 操作:在 Redis 中设置哈希值 car, 内部在设 三个键值对 {car:{key1:1,key2:2,key3:3}}
    await redis.hmset_dict('car', key1=1, key2=2, key3=3)
    # 网络 IO 操作:去 Redis 中获取值
    result = await redis.hgetall('car', encoding='utf-8')
    print(result)
    redis.close()
    # 网络IO操作:关闭 redis 连接
    await redis.wait_closed()
    print("结束", address)

asyncio.run(execute('redis://127.0.0.1:6379', "123456"))

实例二:连接多个 Redis 做操作(遇到IO会切换其他任务,提供了性能)

import asyncio
import aioredis

async def execute(address, password):
    print("开始执行", address)
    # 网络IO操作:先去连接 47.93.4.197:6379,遇到IO则自动切换任务,去连接47.93.4.198:6379
    redis = await aioredis.create_redis_pool(address, password=password)
    # 网络 IO 操作:遇到 IO 会自动 切换任务
    await redis.hmset_dict('car', key1=1, key2=2, key3=3)
    # 网络 IO 操作:遇到 IO 会自动 切换任务
    result = await redis.hgetall("car", encoding='utf-8')
    print(result)
    # 网络 IO 操作:遇到 IO 会自动 切换任务
    await redis.wait_closed()
    print("结束: ", address)

task_list = [
    execute('redis://47.93.4.197:6379', "root!2345"),
    execute('redis://47.93.4.198:6379', "root!2345")
]
asyncio.run(asyncio.wait(task_list))

5.2 异步MySQL
当通过 Python 去操作 MySQL 时,连接,执行SQL,关闭都涉及网络IO请求,使用 asyncio 异步的方式可以在IO等待时去做一些其他任务,从而提升性能。
安装 Python 异步操作 Mysql 模块
pip3 install aiomysql

import asyncio
import aiomysql

async def execute():
    # 网络操作:连接 MySQL
    conn = await aiomysql.connect(host='127.0.0.1', port=3306, user='root', password='123456', db='mysql')
    # 网络 IO 操作:创建 cursor
    cur = await conn.cursor()
    # 网络 IO 操作:执行 SQL
    await cur.execute("SELECT Host, User FROM user")
    # 网络 IO 操作:获取 sql 结果
    result = await cur.fetchall()
    print(result)
    # 网络 IO 操作:关闭链接
    await cur.close()
    conn.close()

asyncio.run(execute()) 

实例2:

#!/usr/bin/env python
#-*- coding:utf-8 -*-

import asyncio
import aiomysql


async def execute(host, password):
    print("开始", host)
    # 网络 IO 操作,先去连接 47.93.40.197  遇到 IO 则自动切换任务,去连接 47.93.40.198:6379
    conn = await aiomysql.connect(host=host, port=3306, user='root', password=password, db='mysql')
    # 网络 IO 操作:遇到IO操作会自动切换任务
    cur = await conn.cursor()
    # 网络 IO 操作,遇到 IO 会自动切换任务
    awai cur.execute("SELECT Host, User FROM usr.")
    # 网络 IO 操作:遇到IO会自动切换任务
    result = await cur.fetchall()
    print(result)
    # 网络 IO 操作:遇到 IO 会自动切换任务
    await cur.close()
    conn.close()
    print("结束", host)

task_list = [
    execute("47.93.40.197", "root!123"),
    execute("47.93.40.198", "root!123")
]

asyncio.run(asyncio.wait(task_list))

5.3 FastAPI 框架

FastAPI 是一款用于构建 API 的高性能 web 框架,框架基于 Python3.6+ 的 type hints 搭建。
接下来的异步实例以 FastAPI 和 uvicorn 来讲解(uvicorn 是一个支持异步的 asgi)

pip3 install fastapi

安装 uvicorn ,本质上为 web 提供 socket server 的支持 asgi (一般支持异步称 asgi,不支持异步称 wsgi)

pip3 install uvicorn

实例

#!/usr/bin/env python
# -*- coding:utf-8 -*-

import asyncio
import uvicorn
import aioredis
from aioredis import Redis
from fastapi import FastAPI

app = FastAPI()
REDIS_POOL = aioredis.ConnectionsPool("redis://47.193.14.198:6379", password="root123", minsize=1, maxsize=10)

@app.get("/")
def index():
    "普通操作接口"
    return {"message": "Hello World"}

@app.get("/red")
async def red():
    "异步操作接口"
    print("请求来了")
    await asyncio.sleep(3)
    # 连接池获取一个连接
    conn = await REDIS_POOL.acquire()
    redis = Redis(conn)
    # 设置值
    await redis.hmset_dict("car", key1=1, key2=2, key3=3)
    # 读取值
    result = await redis.hgetall('car', encoding='utf-8')
    print(result)
    # 连接归还连接池
    REDIS_POOL.release(conn)
    return result

if __name__ == "__main__":
    uvicorn.run("luffy:app", host="127.0.0.1", port=5000, log_level="info")

在多个用户并发请求的情况下,异步方式来编写的接口可以在 IO 等待中去处理其他的请求,提高性能。
例如:同时有两个用户并发向接口发送请求,服务器只有一个线程,同一时刻只有一个请求被处理。异步请求可以提供并发是因为:当视图函数在处理第一个请求时,第二个请求此时是等待被处理的状态,当第一个请求遇到 IO 等待时,会自动切换去接收并处理第二个请求,当遇到 IO 时自动切换至其他请求,一旦有请求 IO 执行完毕,则会再次回到指定向下继续执行其他功能代码。

下面基于上下文管理,来实现自动化管理的案例:

#!/usr/bin/env python
# -*- coding:utf-8 -*-

import asyncio
import uvicorn
import aioredis
from aioredis import Redis
from fastapi import FastAPI

app = FastAPI()
REDIS_POOL = aioredis.ConnectionsPool("redis://127.0.0.1:6379", password="root123", minsize=1, maxsize=10)

@app.get("/")
def index():
    "普通操作接口"
    return {"message": "Hello World"}

@app.get("/red")
async def red():
    " 异步操作接口 "
    print("请求来了")
    async with REDIS_POOL.get() as conn:
        redis = Redis(conn)
        # 设置值
        await redis.hmset_dict("car", key1=1, key2=2, key3=3)
        # 读取值
        result = await redis.hgetall("car", encoding='utf-8')
        print(result)

if __name__ == "__main__":
    uvicorn.run("fast3:app", host="127.0.0.1", port=5000, log_level="info")

5.4 爬虫

在编写爬虫应用时,需要通过网络 IO 去请求目标数据,这种情况适合使用异步编程来提升性能,接下来我们使用支持异步编程的 aiohttp 模块来实现

pip3 install aiohttp

实例

import aiohtttp
import asyncio

async def fetch(session, url):
    print("发送请求:", url)
    async with session.get(url, verify_ssl = False) as response:
        text = await response.text()
        print("得到结果:", url, len(text))

async def main():
    async with aiohttp.ClientSession() as session:
        url_list = [
            "https://python.org",
            "https://www.baidu.com",
            "https://www.pythonav.com"
        ]
        tasks = [asyncio.create_task(fetch(session, url)) for url in url_list]
        await asyncio.wait(tasks)

if __name__ == "__main__":
    asyncio.run(main())

总结

为了提升性能越来越多的框架都在向异步编程靠拢,例如: sanic,tornado, django3.0, django channels 组件等,用更少的资源可以处理更多的事。

实例:

import asyncio
import aiohttp

from bs4 import BeautifulSoup

async def fetch_content(url):
    async with aiohttp.ClientSession(connector=aiohttp.TCPConnector(ssl=True)) as session:
        async with session.get(url) as response:
            return await response.text()


async def main():
    url = "https://movie.douban.com/cinema/later/beijing/"
    init_page = await fetch_content(url)
    init_soup = BeautifulSoup(init_page, "lxml")

    movie_names, urls_to_fetch, movie_dates = [], [], []

    all_movies = init_soup.find("div", id="showing-soon")
    for each_movie in all_movies.find_all("div", class_="item")
        all_a_tag = each_movie.find_all("a")
        all_li_tag = each_movie.find_all("li")

        movie_names.append(all_a_tag[1].text)
        urls_to_fetch.append(all_a_tag[1]["href"])
        movie_dates.append(all_li_tag[0].text)

    tasks = [fetch_content(url) for url in urls_to_fetch]
    pages = await asyncio.gather(*tasks)
  
    for movie_name, movie_date, page in zip(movie_names, movie_dates, pages):
        soup_item = BeautifulSoup(page, 'lxml') 
        img_tag = soup_item.find('img')
        print("{} {} {}".format(movie_name, movie_date, img_tag['src']))


if __name__ == "__main__":
    asyncio.run(main())
上一篇 下一篇

猜你喜欢

热点阅读