Python+Redis维护一个代理池
对一个网站过高频率的爬取可能会导致ip被加入黑名单,这样以后对该网站的访问都会被拒绝,这个时候只能利用代理ip了。网上有大量的公开免费代理,然而大多是不能用的,也可以购买付费代理使用,但是手动设置ip非常麻烦,并且可能由于其他人爬取同样站点而被封禁,或者代理服务器故障等原因导致ip不可用,手动筛选可用ip也是非常麻烦的事情。
为了解决这个问题,我们可以维护一个代理池,定期自动检查以剔除其中的不可用代理,并不断爬取新的代理投入使用,项目地址。
1. 结构设计
1.1 存储模块
代理应该是不重复的,并且要标识可用情况,比较好的方法是使用redis的有序集合存储,每一个元素都是一个代理,形式为 ip:端口号 ,此外,每一个元素都有一个分数,用来标识可用情况,分数规则设置为:
- 分数100为可用,定期检测时代理可用就设置为100,不可用就将分数减1,直到减为0后删除
- 新获取代理设置为10,测试可行立即置为100,否则减1
1.2 获取模块
定期从各大代理网站爬取代理,免费付费都可以,成功之后保存到数据库
1.3 测试模块
负责定期检测数据库中的代理,可以设置检测链接,检测在对应网站是否可用
1.4 接口模块
提供一个web api来对外提供随机的可用代理,用flask实现
2. 模块实现
2.1 存储模块(db.py)
第三方库:
- redis
import redis
from random import choice
# 分数设置
MAX_SCORE = 100
MIN_SCORE = 0
INIT_SCORE = 10
# 连接信息
REDIS_HOST = 'localhost'
REDIS_PORT = 6379
REDIS_PASSWORD = None
REDIS_KEY = 'proxies'
# 数据库最大存储的代理数量
POOL_UPPER_THRESHLD = 10000
class RedisClient:
def __init__(self, host=REDIS_HOST, port=REDIS_PORT, passwd=REDIS_PASSWORD):
"""初始化redis对象"""
self.db = redis.Redis(host=host, port=port, password=passwd, decode_responses=True)
def add(self, proxy, score=INIT_SCORE):
"""添加一个代理,设置初始分数"""
if not self.db.zscore(REDIS_KEY, proxy):
return self.db.zadd(REDIS_KEY, {proxy: score})
def random(self):
"""首先随机获取最高分的有效代理,不存在则按排名获取"""
result = self.db.zrangebyscore(REDIS_KEY, MAX_SCORE, MAX_SCORE)
if len(result):
return choice(result)
else:
# 从分数前50的代理中随机获取一个
result = self.db.zrevrange(REDIS_KEY, 0, 50)
if len(result):
return choice(result)
else:
Exception('无可用代理')
def decrease(self, proxy):
"""代理分数-1,小于指定阈值则删除"""
score = self.db.zscore(REDIS_KEY, proxy)
if score and score > MIN_SCORE:
print(proxy, score, '-1')
return self.db.zincrby(REDIS_KEY, proxy, -1)
else:
print(proxy, score, '移除')
return self.db.zrem(REDIS_KEY, proxy)
def max(self, proxy):
"""更新代理分数到最大值"""
print(proxy, MAX_SCORE)
return self.db.zadd(REDIS_KEY, MAX_SCORE, proxy)
def count(self):
"""获取代理数量"""
return self.db.zcard(REDIS_KEY)
def all(self):
"""获取全部代理"""
return self.db.zrangebyscore(REDIS_KEY, MIN_SCORE, MAX_SCORE)
2.2 获取模块(getter.py)
第三方库:
- beautifulsoup4
- requests
from bs4 import BeautifulSoup
from db import RedisClient, POOL_UPPER_THRESHLD
# requests获取一个网页的方法封装到了一个工具类里
from utils import get_page
class ProxyMetaClass(type):
"""元类,初始化类时获取所有以crawl_开头的方法"""
def __new__(mcs, name, bases, attrs):
count = 0
attrs['CrawlFunc'] = []
for k, v in attrs.items():
if 'crawl_' in k:
attrs['CrawlFunc'].append(k)
count += 1
attrs['CrawlFuncCount'] = count
return type.__new__(mcs, name, bases, attrs)
class Crawler(metaclass=ProxyMetaClass):
def get_proxies(self, crawl_func):
"""执行指定方法来获取代理"""
proxies = []
for proxy in eval("self.{}()".format(crawl_func)):
proxies.append(proxy)
return proxies
def crawl_kuaidaili(self, page_count=10):
"""获取快代理网站的免费代理"""
start_url = 'https://www.kuaidaili.com/free/inha/{}/'
urls = [start_url.format(page) for page in range(1, page_count + 1)]
for url in urls:
html = get_page(url)
if html:
soup = BeautifulSoup(html, 'lxml')
trs = soup.find('tbody').find_all('tr')
for tr in trs:
ip = tr.find_all('td')[0].string
port = tr.find_all('td')[1].string
yield ':'.join([ip, port])
class Getter:
def __init__(self):
"""初始化数据库类和代理爬虫类"""
self.redis = RedisClient()
self.crawler = Crawler()
def is_over_threshold(self):
"""判断数据库是否已经存满"""
if self.redis.count() >= POOL_UPPER_THRESHLD:
return True
return False
def run(self):
"""开始抓取各个代理网站的免费代理存入数据库"""
print('开始获取...')
if not self.is_over_threshold():
for i in range(self.crawler.CrawlFuncCount):
crawl_func = self.crawler.CrawlFunc[i]
proxies = self.crawler.get_proxies(crawl_func)
for proxy in proxies:
print(proxy)
self.redis.add(proxy)
if __name__ == '__main__':
a = Getter()
a.run()
这里借助元类实现了自动定义属性 CrawlFunc 来记录Crawler类中所有以 crawl_ 开头的方法,CrawlFuncCount 记录这些方法的数量,这样以后有新的代理网站可以获取代理时,只需要添加 crawl_XXX 方法以相同的方式返回解析后的数据就行了, get_proxies 会依次执行传入的方法列表中的方法, 可以非常方便的扩展。
然后就是实现Getter类来管理存储部分,run方法中先判断是否到达存储阈值,未满则通过 crawler.CrawlFunc 获取所有爬取方法交给 get_proxies 去执行,然后将返回结果存入数据库。这样存储部分的工作就完成了,实例化Getter类并调用run方法即可爬取代理并存入数据库。
2.3 测试模块
第三方库:
- aiohttp
import asyncio
import aiohttp
import time
from db import RedisClient
# 目标网址
TEST_URL = 'http://www.baidu.com'
# 正确的响应码列表
TRUE_STATUS_CODE = [200]
# 同时测试一组代理的数量
BATCH_TEST_SIZE = 50
class Tester:
def __init__(self):
"""初始化数据库管理对象"""
self.redis = RedisClient()
async def test_one_proxy(self, proxy):
"""对目标网站测试一个代理是否可用"""
conn = aiohttp.TCPConnector(ssl=False)
async with aiohttp.ClientSession(connector=conn) as session:
try:
if isinstance(proxy, bytes):
# 解码为字符串
proxy = proxy.decode('utf-8')
real_proxy = 'http://' + proxy
async with session.get(TEST_URL, proxy=real_proxy, timeout=15) as response:
if response.status in TRUE_STATUS_CODE:
# 代理可用
self.redis.max(proxy)
print(proxy, 100)
else:
# 代理不可用
self.redis.decrease(proxy)
print(proxy, -1, "状态码错误")
except Exception as e:
self.redis.decrease(proxy)
print(proxy, -1, e.args)
async def start(self):
"""启动协程, 测试所有代理"""
try:
proxies = self.redis.all()
for i in range(0, len(proxies), BATCH_TEST_SIZE):
test_proxies = proxies[i: i+BATCH_TEST_SIZE]
tasks = [self.test_one_proxy(proxy) for proxy in test_proxies]
# 并发完成一批任务
await asyncio.gather(*tasks)
time.sleep(5)
except Exception as e:
print('测试器发生错误', e.args)
def run(self):
"""开始运行"""
asyncio.run(self.start())
# # python3.7之前的写法
# loop = asyncio.get_event_loop()
# loop.run_until_complete(self.start())
if __name__ == '__main__':
a = Tester()
a.run()
由于代理数量非常大,同步请求的requests很难高效率的完成这个任务,我们使用异步请求库aiohttp,同时用到协程asyncio来成批的检测代理。
测试网站这里设置为百度,使用时可以根据目标不同设置对应网站的网址。还定义了一个正确的状态码列表,目前只有200,某些网站可能会有重定向等操作,可以自行添加状态码。
2.4 API模块
第三方库:
- flask
from flask import Flask, g
import json
from db import RedisClient
app = Flask(__name__)
def get_conn():
if not hasattr(g, 'redis'):
g.redis = RedisClient()
return g.redis
@app.route('/')
def index():
return '<h2>hello</h2>'
@app.route('/get/')
def get_proxy():
"""获取随机可用代理"""
conn = get_conn()
try:
proxy = conn.random()
result = json.dumps({'status': 'success', 'proxy': proxy})
except Exception as e:
result = json.dumps({'status': 'failure', 'info': e})
finally:
return result
@app.route('/count/')
def get_count():
"""获取代理总数"""
conn = get_conn()
return str(conn.count())
if __name__ == '__main__':
app.run()
这里使用flask做了一个简单的API,包含欢迎页,获取随机代理页和获取代理总数页,获取代理页返回json格式的数据。
3. 整合运行
模块写好了,接下来只需要以多进程的方式将他们运行起来即可
from multiprocessing import Process
from api import app
from getter import Getter
from tester import Tester
# 周期
TESTER_CYCLE = 20
GETTER_CYCLE = 20
# 模块开关
TESTER_ENABLE = True
GETTER_ENABLE = True
API_ENABLE = True
class Run:
def run_tester(self, cycle=TESTER_CYCLE):
"""定时检测代理可用情况"""
tester = Tester()
while True:
print('开始测试')
tester.run()
time.sleep(cycle)
def run_getter(self, cycle=GETTER_CYCLE):
"""定时获取代理"""
getter = Getter()
while True:
print('开始抓取代理')
getter.run()
time.sleep(cycle)
def run_api(self):
"""启动API接口"""
app.run()
def run(self):
print('代理池开始运行')
if TESTER_ENABLE:
tester_process = Process(target=self.run_tester)
tester_process.start()
if GETTER_ENABLE:
getter_process = Process(target=self.run_getter)
getter_process.start()
if API_ENABLE:
api_process = Process(target=self.run_api)
api_process.start()
if __name__ == '__main__':
a = Run()
a.run()
运行此文件便可以开始整个代理池的运行:
控制台输出获取代理
4. 使用实例
import requests
import json
def get_proxy():
"""尝试获取代理"""
try:
r = requests.get('http://localhost:5000/get/')
r.raise_for_status
r.encoding = r.apparent_encoding
return r.text
except ConnectionError:
return None
r = get_proxy()
if r:
# 加载返回的json数据
data = json.loads(r)
if data['status'] == 'success':
proxy = data['proxy']
# 构造代理字典,根据请求链接不同自动设置
proxies = {
'http': 'http://' + proxy,
'https': 'http://' + proxy
}
try:
# 使用代理访问网站
r = requests.get('http://www.baidu.com/', proxies=proxies)
r.status_code
r.encoding = r.apparent_encoding
print(r.text)
except Exception as e:
print(e.args)
else:
print(data['info'])
else:
print('未正常获取代理')