Python进程+协程——从零开始搭建异步爬虫(1)
很多人入门 Python 是从爬虫开始的,笔者也不例外。爬取大量网页需要用到多进程、多线程、协程等等特性,而这类代码的编写往往比较繁琐,如果经常需要爬取不同的网页,我们往往会用到 scrapy 等爬虫框架以减少工作量。笔者最近正好需要大量爬取一些内容,本着学习的目的,我们不使用爬虫框架,从零开始搭建一个简单的异步爬虫。
目标
- 利用协程异步请求网页。
- 利用多进程加快爬取速度。
- 提供需要请求的链接,设定好处理流程后,程序自动按顺序处理。
简易流程图
![](https://img.haomeiwen.com/i5343278/b946191cff627e37.jpg)
需要用到的库
环境搭建
mkdir AiospiderWorkshop
cd AiospiderWorkshop
- 创建虚拟环境
pipenv install --python 3.8
- 更换
pipenv
源为阿里云源
# 打开 Pipfile 文件,修改 source 部分如下
[[source]]
url = "https://mirrors.aliyun.com/pypi/simple"
verify_ssl = true
name = "aliyun"
# 保存后重启终端(否则换源可能未生效)
- 进入虚拟环境,并安装
aiohttp
# 进入刚才创建的文件夹,并进入环境安装 aiohttp
cd AiospiderWorkshop
pipenv shell
pipenv install aiohttp
创建任务类 Workshop
和任务分配类 Works
此部分需要你对 Python
的 asyncio
库有基本了解即可。
- 导入需要的库
import asyncio
import time
from random import randint
from functools import wraps
- 首先创建一个任务类
Workshop
,函数async_simulation
用于模拟异步操作
class Workshop:
async def async_simulation(self):
# 模拟一个异步操作,随机停顿1-3秒
sleep_time = randint(1, 3)
print("等待{}秒".format(sleep_time))
await asyncio.sleep(sleep_time)
- 运行上述模拟异步操作,查看运行情况
在模拟运行之前,先创建一个装饰器用于记录程序运行时间
def print_run_time(func):
# 记录运行程序运行时间的装饰器
@wraps(func) # 保持被装饰的函数名不变,否则多进程调用出错
def wrap(*args, **kwargs):
start = time.time()
f = func(*args, **kwargs)
end = time.time()
print("运行时间:{:.2f}s".format(end - start))
return f
return wrap
运行查看结果
class Works:
async def run_works(self):
workshop = Workshop()
for i in range(3):
await workshop.async_simulation()
@print_run_time
def async_run(self):
asyncio.run(self.run_works())
if __name__ == "__main__":
works = Works()
works.async_run()
# 运行结果
等待3秒
等待2秒
等待3秒
运行时间:8.01s
我们看到程序并没有异步执行,要实现异步执行,我们需要对 Works
类的 run_works
函数做如下修改
class Works:
async def run_works(self):
task_lst = list()
for i in range(3):
workshop = Workshop()
task = asyncio.create_task(workshop.async_simulation())
task_lst.append(task)
for task in task_lst:
await task
# 运行结果
等待3秒
等待1秒
等待2秒
运行时间:3.01s
程序成功实现异步执行
在多进程中调用任务分配类
此部分需要你对 Python
的 multiprocessing
库有基本了解
程序多进程化非常简单,进行如下修改即可
from multiprocessing import Process
if __name__ == "__main__":
works = Works()
p_run_works = Process(target=works.async_run)
p_run_works.start()
p_run_works.join()
创建 App 类
如果程序初始化和多进程逻辑都放在 main
主函数中执行,程序功能越复杂, main
主函数也会越复杂,因此创建一个 App
类负责程序的初始化运行。对代码如下部分做修改。
class App:
def __init__(self):
self.works = Works()
def async_run(self):
p_run_works = Process(target=self.works.async_run)
p_run_works.start()
p_run_works.join()
if __name__ == "__main__":
app = App()
app.async_run()
创建信息传递类 Bridge
从前面的简易流程图可以看出,我们程序的运行依赖多个队列传输数据,因此,我们创建一个 Bridge
类负责所有共享数据的保存和操作。
Bridge
类中的 init_works
方法。将 workshop_lst
中的任务传入任务队列 work_queue
中,用于程序开始运行时任务初始化。
from multiprocessing import Manager
class Bridge:
def __init__(self):
manager = Manager()
self.work_queue = manager.Queue()
def put_work_queue(self, workshop):
self.work_queue.put_nowait(workshop)
def get_work_queue(self):
return self.work_queue.get_nowait()
def work_queue_empty(self):
return self.work_queue.empty()
def init_works(self, workshop_lst):
for workshop in workshop_lst:
self.put_work_queue(workshop)
- 修改
Works
类,使其可以利用信息传递类中的队列等共享数据。
修改 async_run
函数。调用 Works
类中的 async_run
函数时需要传入 bridge
。bridge
为信息传递类 Bridge
的实例,可通过其操作任务队列。
修改 run_works
方法,其运行时,如果任务队列 work_queue
不为空,则从队列中取出所有任务异步执行。
class Works:
def __init__(self):
self.bridge = None
async def run_works(self):
task_lst = list()
while not self.bridge.work_queue_empty():
workshop = self.bridge.get_work_queue()
task = asyncio.create_task(workshop.async_simulation())
task_lst.append(task)
for task in task_lst:
await task
@print_run_time
def async_run(self, bridge):
self.bridge = bridge
asyncio.run(self.run_works())
- 修改
App
类,使其可以传入bridge
。并且在async_run
方法中调用bridge.init_works(workshop_lst)
初始化任务队列。
class App:
def __init__(self):
self.works = Works()
self.bridge = Bridge()
def async_run(self, workshop_lst):
self.bridge.init_works(workshop_lst)
p_run_works = Process(target=self.works.async_run,
args=(self.bridge, ))
p_run_works.start()
p_run_works.join()
- 修改主函数
main
传入任务列表
if __name__ == "__main__":
work_lst = list()
for _ in range(3):
work_lst.append(Workshop())
app = App()
app.async_run(work_lst)
修改至此,程序已可以正常运行
增加按步骤运行函数的能力
到目前为止,我们的程序已经实现了传入任务列表并利用协程异步执行的功能,但还有一个问题需要解决:只能运行特定的方法,而爬取数据的处理往往需要经过很多步骤。
为了解决这个问题,我们在 Workshop
类中提供一个统一的函数运行接口 run_next_process
,每个函数运行结束后,设定好下次运行的函数,并返回实例自身或者需要运行的其他实例。同时修改 Works
类,使其获得返回的实例后将返回的实例再次传入队列中,以便下次调用。
- 修改
Workshop
类
增加了 _next_process
属性、set_start_process
方法、set_next_process
方法、set_end
方法、is_end
方法、run_next_process
方法。
_next_process
属性:用于存储下一步需要调用的函数,初始化为 None
。
set_start_process
方法:需要在实例化同时调用,设定程序第一个调用的函数。
set_next_process
方法:在程序运行时调用,设定下一个调用的函数。
set_end
方法:将 _next_process
属性设置为 \EOF
,标记所有流程均完成。
is_end
方法:返回流程是否全部结束。
run_next_process
方法:调用当前设定的函数,得到异步调用返回的实例并返回,如果无返回值,则返回当前实例自身。
class Workshop:
def __init__(self):
self._next_process = None
def set_start_process(self, func):
self._next_process = func
def set_next_process(self, func):
self._next_process = func
def set_end(self):
self._next_process = "/EOF"
def is_end(self):
return self._next_process == "/EOF"
async def run_next_process(self):
workshop = await self._next_process()
if workshop:
return workshop
else:
return self
- 继承
Workshop
类创建MyWorkshop
类,在其中编写业务业务代码,控制流程。
async_simulation
方法:模拟第一个异步操作,在其中调用set_next_process
方法设定下一个需要调用的函数。
print_end
方法:模拟第二步操作,在其中调用 set_end
方法设定流程结束。
注意:所有的业务流程函数均需要为异步函数。
class MyWorkshop(Workshop):
def __init__(self):
super().__init__()
self.set_start_process(self.async_simulation)
async def async_simulation(self):
# 模拟一个异步操作,随机停顿1-3秒
sleep_time = randint(1, 3)
print("等待{}秒".format(sleep_time))
await asyncio.sleep(sleep_time)
self.set_next_process(self.print_end)
async def print_end(self):
# 模拟第二步操作
print("End")
self.set_end()
- 修改
Bridge
类
因为任务 workshop
需要再次进入任务队列中,所以在 Works
类中不能再用 work_queue
是否为空来判断程序是否应该结束了,因此对 Bridge
类进行一些改造,添加运行任务计数,并提供是否应该结束程序的判断函数。
在保持原有属性和方法不变的情况下,增加 config_dict
属性、init_config
方法、flag_start
方法、work_end
方法、work_cnt_increase
方法、work_cnt_decrease
方法,修改 add_works
方法。
config_dict
属性:记录运行过程中的一些关键数据,如正在运行的任务数量 running_work_cnt
、任务开始标记 work_start_flag
,通过 init_config
方法初始化。因为这些常数数据在进入不同进程时会复制一份新数据,因此常数数据共享应该在可变数据类型下共享,比如这里采用 dict
共享数据。
flag_start
方法:在程序最开始运行时调用,标记程序已开始运行
work_end
方法:返回程序是否完全结束,判断方法为 work_start_flag
标记为 True
且 running_work_cnt
为 0
。
work_cnt_increase
方法:标记正在运行中的任务增加。
work_cnt_decrease
方法:标记正在运行中的任务减少。
修改 init_works
方法:接收 workshop
列表,将其压入任务队列,入队时调用 self.bridge.work_cnt_increase()
方法增加正在运行的程序计数。
class Bridge:
def __init__(self):
manager = Manager()
self.work_queue = manager.Queue()
self.config_dict = manager.dict()
self.init_config()
def init_works(self, workshop_lst):
for workshop in workshop_lst:
self.put_work_queue(workshop)
self.work_cnt_increase()
def init_config(self):
self.config_dict["running_work_cnt"] = 0
self.config_dict["work_start_flag"] = False
def flag_start(self):
self.config_dict["work_start_flag"] = True
def work_end(self):
return self.config_dict["work_start_flag"]\
and not self.config_dict["running_work_cnt"]
def work_cnt_increase(self):
self.config_dict["running_work_cnt"] += 1
def work_cnt_decrease(self):
self.config_dict["running_work_cnt"] -= 1
- 修改
Works
类
我们已经修改 Workshop
类使其每次运行完一个方法后,返回实例自身,因此需要修改 run_works
方法,拿到返回实例,并视情况将其重新加入队列。增加 distribute_works
方法处理返回的实例。并在 distribute_works
方法中处理运行中程序的计数问题。
run_works
方法:修改循环条件,由 not bridge.work_queue_empty()
变为 not bridge.work_end()
。await task
后调用 distribute_works
方法处理返回的实例。asyncio.create_task
中调用 Workshop
的 run_next_process()
方法。
distribute_works
方法:调用 Workshop
的 is_end()
方法判断是否已完成所有流程,如未完成,任务重新入队,如已完成,调用 bridge.work_cnt_decrease()
减少正在运行的程序计数。
最后,在 run_works
方法中调用 bridge.flag_start()
方法标记程序开始运行。
class Works:
def distribute_works(self, task):
workshop = task.result()
if not workshop.is_end():
self.bridge.put_work_queue(workshop)
else:
self.bridge.work_cnt_decrease()
async def run_works(self):
self.bridge.flag_start()
while not self.bridge.work_end():
task_lst = list()
while not self.bridge.work_queue_empty():
workshop = self.bridge.get_work_queue()
task = asyncio.create_task(workshop.run_next_process())
task_lst.append(task)
for task in task_lst:
await task
self.distribute_works(task)
- 修改
main
主函数
用 MyWorkshop()
代替 Workshop()
。
if __name__ == "__main__":
work_lst = list()
for _ in range(3):
work_lst.append(MyWorkshop())
app = App()
app.async_run(work_lst)
此时,程序能够正常运行,并按顺序调用业务流程函数。
# 运行结果
等待1秒
等待1秒
等待3秒
End
End
End
运行时间:3.01s
至此,我们成功的在多进程中利用协程实现了多任务异步执行和多流程次序执行,将来可以通过增减进程的方式灵活的调节速率。
本节完整代码
import asyncio
import time
from random import randint
from functools import wraps
from multiprocessing import Process
from multiprocessing import Manager
def print_run_time(func):
# 记录运行程序运行时间的装饰器
@wraps(func) # 保持被装饰的函数名不变,否则多进程调用出错
def wrap(*args, **kwargs):
start = time.time()
f = func(*args, **kwargs)
end = time.time()
print("运行时间:{:.2f}s".format(end - start))
return f
return wrap
class Bridge:
def __init__(self):
manager = Manager()
self.work_queue = manager.Queue()
self.config_dict = manager.dict()
self.init_config()
def init_works(self, workshop_lst):
for workshop in workshop_lst:
self.put_work_queue(workshop)
self.work_cnt_increase()
def init_config(self):
self.config_dict["running_work_cnt"] = 0
self.config_dict["work_start_flag"] = False
def flag_start(self):
self.config_dict["work_start_flag"] = True
def work_end(self):
return self.config_dict["work_start_flag"] \
and not self.config_dict["running_work_cnt"]
def work_cnt_increase(self):
self.config_dict["running_work_cnt"] += 1
def work_cnt_decrease(self):
self.config_dict["running_work_cnt"] -= 1
def put_work_queue(self, workshop):
self.work_queue.put_nowait(workshop)
def get_work_queue(self):
return self.work_queue.get_nowait()
def work_queue_empty(self):
return self.work_queue.empty()
class Workshop:
def __init__(self):
self._next_process = None
def set_start_process(self, func):
self._next_process = func
def set_next_process(self, func):
self._next_process = func
def set_end(self):
self._next_process = "/EOF"
def is_end(self):
return self._next_process == "/EOF"
async def run_next_process(self):
workshop = await self._next_process()
if workshop:
return workshop
else:
return self
class MyWorkshop(Workshop):
def __init__(self):
super().__init__()
self.set_start_process(self.async_simulation)
async def async_simulation(self):
# 模拟一个异步操作,随机停顿1-3秒
sleep_time = randint(1, 3)
print("等待{}秒".format(sleep_time))
await asyncio.sleep(sleep_time)
self.set_next_process(self.print_end)
async def print_end(self):
# 模拟第二步操作
print("End")
self.set_end()
class Works:
def __init__(self):
self.bridge = None
def distribute_works(self, task):
workshop = task.result()
if not workshop.is_end():
self.bridge.put_work_queue(workshop)
else:
self.bridge.work_cnt_decrease()
async def run_works(self):
self.bridge.flag_start()
while not self.bridge.work_end():
task_lst = list()
while not self.bridge.work_queue_empty():
workshop = self.bridge.get_work_queue()
task = asyncio.create_task(workshop.run_next_process())
task_lst.append(task)
for task in task_lst:
await task
self.distribute_works(task)
@print_run_time
def async_run(self, bridge):
self.bridge = bridge
asyncio.run(self.run_works())
class App:
def __init__(self):
self.works = Works()
self.bridge = Bridge()
def async_run(self, workshop_lst):
self.bridge.init_works(workshop_lst)
p_run_works = Process(target=self.works.async_run,
args=(self.bridge, ))
p_run_works.start()
p_run_works.join()
if __name__ == "__main__":
work_lst = list()
for _ in range(3):
work_lst.append(MyWorkshop())
app = App()
app.async_run(work_lst)
本节总结
在本节中,我们成功的在多进程中利用协程实现了多任务异步执行和多流程按次序执行,将来可以通过增减进程的方式灵活的调节速率,本文开头的目标第2、3点已基本完成。下一节将继续改造代码,增加网页请求功能,实现一个简单的异步爬虫,实现每次爬新网页只需要关注网络请求、网页解析和数据处理,多进程和异步请求部分由爬虫自身处理。