Python 异步IO和协程

2019-02-21  本文已影响0人  马本不想再等了

01. 并发、并行、同步、异步、阻塞、非阻塞概念理解

并发、并行

并发:是指一个时间段内,有几个程序在同一个CPU上运行,但是任意时刻只有一个程序在CPU上运行。由于CPU的运行速度极快,可以在多个程序之间切换,这样造成一个假象就是多个程序同时在运行。
并行:是指任意时刻点上,有多个程序同时运行在多个CPU上。并行的数量与CPU的核心数一致,即CPU为4核,并行数量最多为4。

同步、异步

同步:是指代码调用IO操作时,必须等待IO操作完成才返回的调用方式。
异步:是指代码调用IO操作时,不必等待IO操作完成就返回的调用方式。

阻塞、非阻塞

阻塞:是指调用函数的时候当前线程被挂起。
非阻塞:是指调用函数的时候当前线程不会被挂起,而是立即返回。

02. IO多路复用

select,poll,epoll都是IO多路复用的机制,但三者本质上都是同步IO,因为他们都需要在读写事件就绪后自己负责读写,也就是说这个读写过程是阻塞的,而异步IO则无需自己负责读写,异步IO的实现会负责把数据从内核拷贝到用户空间。
相对于select和poll来说,epoll更加灵活,没有描述符限制。epoll使用一个文件符管理多个文件描述符,将用户关系的文件描述的事件存放到内核的一个事件表中,这样在用户空间和内和空间的copy只需要一次。
epoll并不代表一定比select好
在高并发的情况下,连接活跃度不是很高,epoll比select好
在并发不高的情况下,同时连接很活跃,select比epoll好

02. 使用select完成http请求

这里跟着学习的内容写下这段代码,实际并没有理解,其中涉及到回调+事件循环+select(poll/epoll),在单线程中实现并发,待补充基础知识后,返回来再次学习。也欢迎读者进行指点。

# 使用select完成http请求
import socket
from urllib.parse import urlparse
from selectors import DefaultSelector, EVENT_READ, EVENT_WRITE


selector = DefaultSelector()
urls = ["http://www.baidu.com"]
stop = False


class Fetcher:
    def connected(self, key):
        selector.unregister(key.fd)
        self.client.send("GET {} HTTP/1.1\r\nHost:{}\r\nConnection:close\r\n\r\n".format(self.path, self.host).encode("utf-8"))
        selector.register(self.client.fileno(), EVENT_READ, self.readable)

    def readable(self, key):
        d = self.client.recv(1024)
        if d:
            self.data += d
        else:
            selector.unregister(key.fd)
            data = self.data.decode("utf-8")
            # 使去掉头部信息
            html_data = data.split("\r\n\r\n")[1]
            print(data)
            self.client.close()
            urls.remove(self.spider_url)
            if not urls:
                global stop
                stop = True


    def get_url(self, url):
        self.spider_url = url
        url = urlparse(url)
        self.host = url.netloc
        self.path = url.path
        self.data = b""
        if self.path == "":
            self.path = "/"
        # 建立socket链接
        self.client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.client.setblocking(False)

        try:
            self.client.connect((self.host, 80))
        except BlockingIOError as e:
            pass
        # 注册
        selector.register(self.client.fileno(), EVENT_WRITE, self.connected)

def loop():
    # 事件循环,不停的请求socket的状态调用对应的回调函数
    # 1. select本身是支持register模式
    # 2. socket状态变化之后的回调是由程序员完成的
    while not stop:
        ready = selector.select()
        for key, mask in ready:
            call_back = key.data
            call_back(key)
    # 回调+事件循环+select(poll\epoll)

if __name__ == '__main__':
    fetcher = Fetcher()
    fetcher.get_url("http://www.baidu.com")
    loop()

回调的痛楚:1. 可读性差 2. 共享状态管理难 3. 异常处理困难

03. coroutine - 协程的理解

抛出的问题:

  1. 回调模式编码复杂度高
  2. 同步编程额并发性不高
  3. 多线程编程需要线程间同步,使用lock机制又会降低并发性能

解决方案:

  1. 采用同步的方式去编写异步的代码
  2. 采用单线程去切换任务:
  3. 线程是有操作系统去切换的,单线程切换意味着需要自己去调度任务
  4. 不再需要lock,单线程内切换函数,性能远高于线程切换,并发性更高
    这时候就需要有一个可以暂定的函数,并且在需要时还可以恢复函数继续执行,接着协程就出现了。
    协程理解:有多个入口的函数、可以暂停的函数、可以暂停的函数(可以向暂停的地方传入值)
    然而生成器有一个很大的特点就是可以暂停!! 可以尝试使用生成器去完成协程。

04. 生成器进阶 - send、close和throw方法

生成器不但可以产出值,还可以接收值

send()调试

def gen_func():
    # 1. 可以产出值 2. 也可以接收值(调用方传递过来值)
    html = yield "http://projectsedu.com"
    print(html)
    return "maben"


if __name__ == "__main__":
    gen = gen_func()
    # 在调用send发送非none值之前,我们必须启动生成器,方式有两种:1.gen.send(None),2.next(gen)
    url = gen.send(None)
    html = "maben"

close调试

def gen_func():
    # 1. 可以产出值 2. 也可以接收值(调用方传递过来值)
    try:
        html = yield "http://projectsedu.com"
    except BaseException:
        pass
    yield 2
    yield 3
    return "maben"

if __name__ == '__main__':
    gen = gen_func()
    print(next(gen))
    gen.close()
    print("maben")

    # GenerationExit是继承自BaseException,而不是Exception

throw调试

def gen_func():
    # 1. 可以产出值 2. 也可以接收值(调用方传递过来值)
    try:
        yield "http://projectsedu.com"
    except Exception as e:
        pass
    yield 2
    yield 3
    return "maben"

if __name__ == '__main__':
    gen = gen_func()
    print(next(gen))
    gen.throw(Exception, "download error")
    print(next(gen))
    gen.throw(Exception, "download error")

经过上面的调试,可以发现生成器可以进行启动、关闭、传入值、处理异常等,更够符合协程的需要。

05. 查看生成器状态

# 生成器是可以暂停的函数
import inspect


def gen_func():
    yield 1
    return  "maben"

if __name__ == '__main__':
    gen = gen_func()
    print(inspect.getgeneratorstate(gen))
    next(gen)
    print(inspect.getgeneratorstate(gen))
    try:
        next(gen)
    except StopIteration:
        pass
    print(inspect.getgeneratorstate(gen))
>>> GEN_CREATED
>>> GEN_SUSPENDED
>>> GEN_CLOSED

06. 生成器进阶 - yield from

yield from 与 yield 的区别

def g1(iterable):
    yield iterable

def ge(iterable):
    yield from iterable

for value in g1(range(5)):
    print(value)
for value in ge(range(5)):
    print(value)
>>> range(0, 5)
>>> 0
>>> 1
>>> 2
>>> 3
>>> 4

yield from 关键作用

def g1(gen):
    yield  from gen

def main():
    g = g1()
    g.send(None)

# 1.main(调用方) g1(委托生成器) gen(子生成器)
# 2.yield from 会在调用方和子生成器之间建立一个双向通道

yield from工作原理(暂时不理解,给自己挖的坑。。。)

"""
源码先省略
总结:
1. 子生成器产生的值,都是直接传递给调用方的;调用方法通过.send()发送的值都是直接传递给子生成器的;
如果发送的是None,会调用生成器的__next__()方法,如果不是None,会调用子生成器的.send()方法;
2. 子生成器退出的时候,最后的return EXPR,会触发一个StopIteration(EXPR)异常;
3. yield from表达式的值,是子生成器终止时,传递给StopIteration异常的第一个参数;
4. 如果调用的时候出现StopIteration异常,委托生成器会恢复运行,同时其他的异常会向上“冒泡”;
5. 传入委托生成器的异常里,除了GeneratorEixt之外,其他的所有异常全部传递给子生成器的.throw()方法;
如果调用.throw()的时候出现了StopIteration异常,那么就恢复委托生成器的运行,其他的异常全部向上“冒泡”;
6. 如果在委托生成器上调用.close()或传入GenerationExit异常,会调用子生成器的.close()方法,没有的话就不用调用了。
如果在调用.close()的时候那么就想上“冒泡”,否则的话委托生成器会抛出GeneratorExit异常。

"""

07. async 和 await 原生协程

# python为了将语义变得更加明确,就引入了async和await关键词用于定义原生的协程。

async def downloader(url):
    return "maben"


async def download_url(url):
    # dosomething
    html = await downloader(url)

    return html

if __name__ == '__main__':
    coro = download_url("http://www.imooc.com")
    # next(None)
    coro.send(None)
>>> StopIteration: maben
上一篇下一篇

猜你喜欢

热点阅读