为爬虫搭建一个ProxyPool
前言
在日常写数据采集脚本的时候,我们总会遇到各种反爬手段,比如:headers反爬,ip反爬等。上一篇我们知道了Cookies池的搭建过程,通过Cookies池可以登录站点抓取设置登录限制的页面数据。但是仅仅有Cookies池是不够的,因为你Cookies的数量再多,也只是单节点,有一些网站如58会设置IP反爬,你的频率快了之后,ip可能会有被封的风险。那我限制下速度不行吗? 限制了速度,其实也就降低了效率,还有这并不一定能有效绕过IP反爬。所以,我们就有必要去构建和维护一个代理池
Step 1 ---- 实现思路
-
获取代理:
从各个代理网站抓取免费代理
-
存储代理:
选定一种数据结构(这里采用redis数据库中的有序集合,原因稍后说)用来存储抓取下来的代理
-
检测代理:
设置一个测试URL,然后使用代理去请求,根据响应的状态码判断代理是否可用。
-
接口:
提供给爬虫一个接口,返回一个随机代理(这里使用flask)
Step2 ---- 模块构成及其功能
代理池有5个模块构成:存储模块,获取模块,检测模块,接口模块,调度模块。下图展示的是代理池的结构
proxy.jpg-
获取模块
从多个站点中抓取代理存放到数据库中,我们要测试是否有用,就必须设计一个筛选可用代理的规则(这里我使用
Python3网络爬虫开发实战
一书作者的方法)。
筛选规则:在获取代理后,在将它写入数据库前给它设置一个初始score,如果检测到该代理可用,就讲它设置为100,不可用减一;当减到0时,从数据库中删除。然后设置一个随机函数,用于返回随机代理
-
存储模块
获取到一个代理之后,我们需要对其设置score(设置score的目的是用于筛选出可用代理)。这里我们选用redis中的有序集合,是因为这个数据结构里面带有score字段,我们可以根据score字段进行排序,而且集合内所有元素不重复。
-
检测模块
拿出每一个代理进行请求测试,根据代理筛选规则设置代理分数
-
接口模块
在检测模块运行完之后,代理池中会留下两种代理。一种是可用代理(
score=100
),另一种是不可用代理(score<100
,失败次数太多或是代理不稳定),通过flask构建一个爬虫接口返回score=100
的代理。 -
调度模块
如果构建一个代理池,你是先
采集代理
-->存储代理
-->检测代理
这样的顺序执行的话,那效率就有点低了,因为需要等待上一个任务完成之后才能进行下一个任务。因此,这里我们采用多进程和异步来实现对各个模块的调度(异步的语法要Python3.5以上才支持。)
Step3 ---- 各个模块实现
-- 存储模块 --
class RedisClient(object):
def __init__(self):
'''
decode_responses参数设置为true,写入的value值为str,否则为字节型
'''
self.db = StrictRedis(host=HOST,port=PORT,password=PASSWD,decode_responses=True)
pass
def random(self):
result = self.db.zrangebyscore(REDIS_KEY,MAX_SCORE,MAX_SCORE)
if len(result):
return choice(result)
else:
result = self.db.zrevrange(REDIS_KEY,MIN_SCORE,MAX_SCORE)
if len(result):
return choice(result)
else:
raise PoolEmptyError
def add(self,proxy,score=INITIAL_SCORE):
if not self.db.zscore(REDIS_KEY,proxy):
return self.db.zadd(REDIS_KEY,score,proxy)
def decrease(self,proxy):
# 获取proxy对应的分数
score = self.db.zscore(REDIS_KEY,proxy)
if score and score > MIN_SCORE:
print("代理 {} , 分数 {} , 减1".format(proxy,score))
return self.db.zincrby(REDIS_KEY,proxy,-1)
else:
print("代理 {} , 当前分数 {} , 移除".format(proxy,score))
self.db.zrem(REDIS_KEY,proxy)
def exists(self,proxy):
return not self.db.zscore(REDIS_KEY,proxy) == None
def max(self,proxy):
print("代理 {} , 可用 , 设置为{}".format(proxy,MAX_SCORE))
return self.db.zadd(REDIS_KEY,MAX_SCORE,proxy)v
-- 获取模块 --
class ProxyMetaClass(type):
def __new__(cls,name,bases,attrs):
attrs['__CrawleFunc__'] = []
count = 0
for k,v in attrs.items():
if 'crawle' in k:
attrs['__CrawleFunc__'].append(k)
count += 1
attrs['__CrawleCount__'] = count
return type.__new__(cls,name,bases,attrs)
class Crawler(object,metaclass=ProxyMetaClass):
def get_proxies(self,callack):
proxies = []
if callack == 'crawle_daili66':
return self.process_daili66(callack)
else:
for proxy in eval("self.{}()".format(callack)):
print("成功获取代理 {}".format(proxy))
proxies.append(proxy)
return proxies
def process_daili66(self,callback):
proxies = []
for page_count in range(1,11):
for proxy in eval("self.{}(page_count={})".format(callback,page_count)):
print("成功获取代理 {}".format(proxy))
proxies.append(proxy)
return proxies
def crawle_xiciproxy(self):
start_url = "http://www.xicidaili.com/"
html = get_pages(start_url,"xici")
if html:
return parse_xiciproxy(html)
def crawle_daili66(self,page_count=1):
start_url = "http://www.66ip.cn/{}.html"
sleep(5)
html = get_pages(start_url.format(page_count),website="66")
if html:
print("第{}页代理".format(page_count))
return parse_66(html)
def crawle_guobanjia(self):
start_url = "http://www.goubanjia.com/"
html = get_pages(start_url,'goubanjia')
if html:
return parse_guobanjia(html)
pass
-- 检测模块 --
class CheckUp(object):
def __init__(self):
self.db = RedisClient()
def run(self):
# get all proxies
# get event loop
# split proxies to serval part
# pack a task list
# call run to exec asynic
print("Start test")
try:
proxies = self.db.all()
loop = asyncio.get_event_loop()
for i in range(0,len(proxies),BATCH_SIZE):
tasks_proxies = proxies[i:i + BATCH_SIZE]
tasks = [self.check_single_proxy(proxy) for proxy in tasks_proxies]
loop.run_until_complete(asyncio.wait(tasks))
sleep(3)
except Exception as e:
_ = e
print(e.args)
print("CheckUp occurs Error")
async def check_single_proxy(self,proxy):
connection = aiohttp.TCPConnector(verify_ssl=False)
async with aiohttp.ClientSession(connector=connection) as session:
# 测试代理可用性
-- 接口模块 --
@app.route("/")
def index():
return "<h2>Welcome to Proxy Pool System</h2>"
def get_conn():
if not hasattr(g,"redis"):
g.redis = RedisClient()
return g.redis
@app.route('/count')
def get_count():
conn = get_conn()
return conn.count()
@app.route('/random')
def random():
conn = get_conn()
return conn.random()
-- 调度模块 --
class Scheduler(object):
def getter(self,cylcle=CYCLE_GETTER):
getter = Getter()
while True:
print("Start to get proxy")
getter.run()
sleep(cylcle)
def checkup(self,cycle=CYCLE_CHECKUP):
check = CheckUp()
while True:
print("Start to checkup proxy")
check.run()
sleep(cycle)
def api(self):
app.run(API_HOST,API_PORT)
def run(self):
print("Proxy Pool start run")
if API_PROCSS:
api_process = multiprocessing.Process(target=self.api)
api_process.start()
if GETTER_PROCESS:
getter_process = multiprocessing.Process(target=self.getter)
getter_process.start()
if CHECKUP_PROCESS:
checkup_process = multiprocessing.Process(target=self.checkup)
checkup_process.start()
最后