Python进程+协程——从零开始搭建异步爬虫(2)
在上节中,我们成功的在多进程中利用协程实现了多任务异步执行和多流程按次序执行的目标。本节我们将在原有代码的基础上继续改造代码,增加网页请求功能,实现一个简单的异步爬虫,实现每次爬新网页只需要关注网络请求、网页解析和数据处理,多进程和异步请求部分由爬虫自身处理。
详细流程图
Blog-异步爬虫实现架构图需要用到的库
Beautifulsoup
:一个可以从 HTML
或 XML
文件中提取数据的Python库。
# 安装方法
cd AiospiderWorkshop
pipenv shell
pipenv install beautifulsoup4
创建下载类 Downloader
我们以崔庆才崔老师建立的爬虫练习网站 https://scrape.center/
为练习对象。我们用到的是其中最简单的一个网页 https://ssr1.scrape.center/page/1
。阅读本节需要对 Beautifulsoup
库和 aiohttp
库有简单了解。
新建一个 py
文件,验证下载类 Downloader
的功能。
- 建立一个函数备用,从网页抽取电影名并打印到屏幕上。
from bs4 import BeautifulSoup
def extract_movie_name(html):
soup = BeautifulSoup(html, "html.parser")
name_tags = soup.find_all(class_="m-b-sm")
for name_tag in name_tags:
print(name_tag.string)
- 创建下载类
Downloader
Downloader
类主要有两个方法 get_async
、download
。
download
:打开一个 session
,异步请求 url
列表中的所有 url
。
get_async
:请求网页并返回网页 html
。
import asyncio
from aiohttp import ClientSession
class Downloader:
async def get_async(self, session, url):
async with session.get(url=url) as resp:
return await resp.text()
async def download(self):
async with ClientSession() as session:
url_lst = [
"https://ssr1.scrape.center/page/1",
"https://ssr1.scrape.center/page/2"
]
download_tasks = list()
for url in url_lst:
download_task = asyncio.create_task(self.get_async(session, url))
download_tasks.append(download_task)
for task in download_tasks:
await task
result = task.result()
extract_movie_name(result)
def async_run(self):
asyncio.run(self.download())
- 编写主函数
main
if __name__ == "__main__":
downloader = Downloader()
downloader.async_run()
此时,下载类能够正常运行。
# 运行结果
霸王别姬 - Farewell My Concubine
这个杀手不太冷 - Léon
肖申克的救赎 - The Shawshank Redemption
...
整合下载类
目前我们的下载类还是一个单独的功能,我们需要将下载方法整合进现有代码,采用多进程方法调用下载方法,并通过下载队列交换数据。
- 改造
Bridge
类
增加下载队列相关功能,原有代码不变。
download_queue
:下载队列。
put_download_queue
、get_download_queue
、download_queue_empty
的功能不言自明。
class Bridge:
def __init__(self):
manager = Manager()
self.download_queue = manager.Queue()
def put_download_queue(self, workshop):
self.download_queue.put_nowait(workshop)
def get_download_queue(self):
return self.download_queue.get_nowait()
def download_queue_empty(self):
return self.download_queue.empty()
- 改造
Workshop
类
增加 url
、need_download
、html
三个属性
class Workshop:
def __init__(self, url, need_download):
self.url = url
self.need_download = need_download
self.html = None
self._next_process = None
- 改造
MyWorkshop
类
依据 Workshop
类的改变修改初始化代码,用本节的 extract_movie_name
方法稍加改造代替上节的两段模拟代码。
class MyWorkshop(Workshop):
def __init__(self, url, need_download):
super().__init__(url, need_download)
self.set_start_process(self.extract_movie_name)
async def extract_movie_name(self):
soup = BeautifulSoup(self.html, "html.parser")
name_tags = soup.find_all(class_="m-b-sm")
for name_tag in name_tags:
print(name_tag.string)
self.set_end()
- 改造
Downloader
类
改造 async_run
、__init__
方法,使其可以接收信息传递类 Bridge
并保存。
增加 get_page
方法:接收 workshop
,取出 url
交给 get_async
下载,下载好的 html
保存在 workshop
的 html
属性,之后置 workshop
的 need_download
属性为 False
,返回 workshop
。
修改 download
方法:和 works
一样采用 bridge.work_end()
判断是否程序结束,从 download_queue
下载队列中取得 workshop
,交给 get_page
方法处理,返回的 workshop
放入任务队列 work_queue
中进行下一步处理。
class Downloader:
def __init__(self):
self.bridge = None
async def get_async(self, session, url):
async with session.get(url=url) as resp:
return await resp.text()
async def get_page(self, session, workshop):
workshop.html = await self.get_async(session, workshop.url)
workshop.need_download = False
return workshop
async def download(self):
while not self.bridge.work_end():
async with ClientSession() as session:
download_tasks = list()
while not self.bridge.download_queue_empty():
workshop = self.bridge.get_download_queue()
task = asyncio.create_task(self.get_page(session, workshop))
download_tasks.append(task)
for task in download_tasks:
await task
workshop = task.result()
self.bridge.put_work_queue(workshop)
def async_run(self, bridge):
self.bridge = bridge
asyncio.run(self.download())
- 改造
Works
类
修改 run_works
方法:从 work_queue
拿到 workshop
后,判断其是否需要下载,如果需要下载就推入下载队列 download_queue
让下载进程下载。
其余部分保持不变。
class Works:
async def run_works(self):
self.bridge.flag_start()
while not self.bridge.work_end():
task_lst = list()
while not self.bridge.work_queue_empty():
workshop = self.bridge.get_work_queue()
if workshop.need_download:
self.bridge.put_download_queue(workshop)
continue
task = asyncio.create_task(workshop.run_next_process())
task_lst.append(task)
for task in task_lst:
await task
self.distribute_works(task)
- 改造
App
类
下载进程作为一个新进程调用。
class App:
def __init__(self):
self.works = Works()
self.bridge = Bridge()
self.download = Downloader()
def async_run(self, workshop_lst):
self.bridge.init_works(workshop_lst)
p_run_works = Process(target=self.works.async_run,
args=(self.bridge,))
p_download = Process(target=self.download.async_run,
args=(self.bridge,))
p_run_works.start()
p_download.start()
p_run_works.join()
p_download.join()
- 改造主函数
main
在主函数中生成 Workshop
的列表,交给 App
执行即可。
if __name__ == "__main__":
work_lst = list()
url_template = "https://ssr1.scrape.center/page/{}"
for i in range(1, 11):
url = url_template.format(str(i))
work_lst.append(
MyWorkshop(url=url, need_download=True)
)
app = App()
app.async_run(work_lst)
至此,程序已可正常执行。
# 运行结果
霸王别姬 - Farewell My Concubine
...
魂断蓝桥 - Waterloo Bridge
运行时间:2.26s
本节完整代码
import asyncio
import time
from functools import wraps
from multiprocessing import Process
from multiprocessing import Manager
from aiohttp import ClientSession
from bs4 import BeautifulSoup
def print_run_time(func):
# 记录运行程序运行时间的装饰器
@wraps(func) # 保持被装饰的函数名不变,否则多进程调用出错
def wrap(*args, **kwargs):
start = time.time()
f = func(*args, **kwargs)
end = time.time()
print("运行时间:{:.2f}s".format(end - start))
return f
return wrap
class Bridge:
def __init__(self):
manager = Manager()
self.work_queue = manager.Queue()
self.download_queue = manager.Queue()
self.config_dict = manager.dict()
self.init_config()
def init_config(self):
self.config_dict["running_work_cnt"] = 0
self.config_dict["work_start_flag"] = False
def init_works(self, workshop_lst):
for workshop in workshop_lst:
self.put_work_queue(workshop)
self.work_cnt_increase()
def flag_start(self):
self.config_dict["work_start_flag"] = True
def work_end(self):
return self.config_dict["work_start_flag"]\
and not self.config_dict["running_work_cnt"]
def work_cnt_increase(self):
self.config_dict["running_work_cnt"] += 1
def work_cnt_decrease(self):
self.config_dict["running_work_cnt"] -= 1
def put_work_queue(self, workshop):
self.work_queue.put_nowait(workshop)
def get_work_queue(self):
return self.work_queue.get_nowait()
def work_queue_empty(self):
return self.work_queue.empty()
def put_download_queue(self, workshop):
self.download_queue.put_nowait(workshop)
def get_download_queue(self):
return self.download_queue.get_nowait()
def download_queue_empty(self):
return self.download_queue.empty()
class Workshop:
def __init__(self, url, need_download):
self.url = url
self.need_download = need_download
self.html = None
self._next_process = None
def set_start_process(self, func):
self._next_process = func
def set_next_process(self, func):
self._next_process = func
def set_end(self):
self._next_process = "/EOF"
def is_end(self):
return self._next_process == "/EOF"
async def run_next_process(self):
workshop = await self._next_process()
if workshop:
return workshop
else:
return self
class Works:
def __init__(self):
self.bridge = None
def distribute_works(self, task):
workshop = task.result()
if not workshop.is_end():
self.bridge.put_work_queue(workshop)
else:
self.bridge.work_cnt_decrease()
async def run_works(self):
self.bridge.flag_start()
while not self.bridge.work_end():
task_lst = list()
while not self.bridge.work_queue_empty():
workshop = self.bridge.get_work_queue()
if workshop.need_download:
self.bridge.put_download_queue(workshop)
continue
task = asyncio.create_task(workshop.run_next_process())
task_lst.append(task)
for task in task_lst:
await task
self.distribute_works(task)
@print_run_time
def async_run(self, bridge):
self.bridge = bridge
asyncio.run(self.run_works())
class Downloader:
def __init__(self):
self.bridge = None
async def get_async(self, session, url):
async with session.get(url=url) as resp:
return await resp.text()
async def get_page(self, session, workshop):
workshop.html = await self.get_async(session, workshop.url)
workshop.need_download = False
return workshop
async def download(self):
while not self.bridge.work_end():
async with ClientSession() as session:
download_tasks = list()
while not self.bridge.download_queue_empty():
workshop = self.bridge.get_download_queue()
task = asyncio.create_task(self.get_page(session, workshop))
download_tasks.append(task)
for task in download_tasks:
await task
workshop = task.result()
self.bridge.put_work_queue(workshop)
def async_run(self, bridge):
self.bridge = bridge
asyncio.run(self.download())
class App:
def __init__(self):
self.works = Works()
self.bridge = Bridge()
self.download = Downloader()
def async_run(self, workshop_lst):
self.bridge.init_works(workshop_lst)
p_run_works = Process(target=self.works.async_run,
args=(self.bridge, ))
p_download = Process(target=self.download.async_run,
args=(self.bridge,))
p_run_works.start()
p_download.start()
p_run_works.join()
p_download.join()
class MyWorkshop(Workshop):
def __init__(self, url, need_download):
super().__init__(url, need_download)
self.set_start_process(self.extract_movie_name)
async def extract_movie_name(self):
soup = BeautifulSoup(self.html, "html.parser")
name_tags = soup.find_all(class_="m-b-sm")
for name_tag in name_tags:
print(name_tag.string)
self.set_end()
if __name__ == "__main__":
work_lst = list()
url_template = "https://ssr1.scrape.center/page/{}"
for i in range(1, 11):
url = url_template.format(str(i))
work_lst.append(
MyWorkshop(url=url, need_download=True)
)
app = App()
app.async_run(work_lst)
本节总结
经过本节的改造,我们已经得到了一个简单的异步爬虫。针对一系列新网页,只需要继承 Workshop
类,实现自己的爬取流程代码即可。当然,目前它只能胜任最简单的工作,没有考虑错误处理、定制请求参数、代理、日志等一系列问题,这些需要在日后的使用中慢慢完善。