Python+Redis维护一个代理池

2019-04-29  本文已影响0人  Assassin007

对一个网站过高频率的爬取可能会导致ip被加入黑名单,这样以后对该网站的访问都会被拒绝,这个时候只能利用代理ip了。网上有大量的公开免费代理,然而大多是不能用的,也可以购买付费代理使用,但是手动设置ip非常麻烦,并且可能由于其他人爬取同样站点而被封禁,或者代理服务器故障等原因导致ip不可用,手动筛选可用ip也是非常麻烦的事情。

为了解决这个问题,我们可以维护一个代理池,定期自动检查以剔除其中的不可用代理,并不断爬取新的代理投入使用,项目地址

1. 结构设计


1.1 存储模块

代理应该是不重复的,并且要标识可用情况,比较好的方法是使用redis的有序集合存储,每一个元素都是一个代理,形式为 ip:端口号 ,此外,每一个元素都有一个分数,用来标识可用情况,分数规则设置为:

1.2 获取模块

定期从各大代理网站爬取代理,免费付费都可以,成功之后保存到数据库

1.3 测试模块

负责定期检测数据库中的代理,可以设置检测链接,检测在对应网站是否可用

1.4 接口模块

提供一个web api来对外提供随机的可用代理,用flask实现

2. 模块实现


2.1 存储模块(db.py)

第三方库:

import redis
from random import choice

# 分数设置
MAX_SCORE = 100
MIN_SCORE = 0
INIT_SCORE = 10

# 连接信息
REDIS_HOST = 'localhost'
REDIS_PORT = 6379
REDIS_PASSWORD = None
REDIS_KEY = 'proxies'

# 数据库最大存储的代理数量
POOL_UPPER_THRESHLD = 10000


class RedisClient:

    def __init__(self, host=REDIS_HOST, port=REDIS_PORT, passwd=REDIS_PASSWORD):
        """初始化redis对象"""
        self.db = redis.Redis(host=host, port=port, password=passwd, decode_responses=True)

    def add(self, proxy, score=INIT_SCORE):
        """添加一个代理,设置初始分数"""
        if not self.db.zscore(REDIS_KEY, proxy):
            return self.db.zadd(REDIS_KEY, {proxy: score})

    def random(self):
        """首先随机获取最高分的有效代理,不存在则按排名获取"""
        result = self.db.zrangebyscore(REDIS_KEY, MAX_SCORE, MAX_SCORE)
        if len(result):
            return choice(result)
        else:
            # 从分数前50的代理中随机获取一个
            result = self.db.zrevrange(REDIS_KEY, 0, 50)
            if len(result):
                return choice(result)
            else:
                Exception('无可用代理')

    def decrease(self, proxy):
        """代理分数-1,小于指定阈值则删除"""
        score = self.db.zscore(REDIS_KEY, proxy)
        if score and score > MIN_SCORE:
            print(proxy, score, '-1')
            return self.db.zincrby(REDIS_KEY, proxy, -1)
        else:
            print(proxy, score, '移除')
            return self.db.zrem(REDIS_KEY, proxy)

    def max(self, proxy):
        """更新代理分数到最大值"""
        print(proxy, MAX_SCORE)
        return self.db.zadd(REDIS_KEY, MAX_SCORE, proxy)

    def count(self):
        """获取代理数量"""
        return self.db.zcard(REDIS_KEY)

    def all(self):
        """获取全部代理"""
        return self.db.zrangebyscore(REDIS_KEY, MIN_SCORE, MAX_SCORE)

2.2 获取模块(getter.py)

第三方库:

from bs4 import BeautifulSoup

from db import RedisClient, POOL_UPPER_THRESHLD
# requests获取一个网页的方法封装到了一个工具类里
from utils import get_page


class ProxyMetaClass(type):
    """元类,初始化类时获取所有以crawl_开头的方法"""

    def __new__(mcs, name, bases, attrs):
        count = 0
        attrs['CrawlFunc'] = []
        for k, v in attrs.items():
            if 'crawl_' in k:
                attrs['CrawlFunc'].append(k)
                count += 1
        attrs['CrawlFuncCount'] = count
        return type.__new__(mcs, name, bases, attrs)


class Crawler(metaclass=ProxyMetaClass):

    def get_proxies(self, crawl_func):
        """执行指定方法来获取代理"""
        proxies = []
        for proxy in eval("self.{}()".format(crawl_func)):
            proxies.append(proxy)
        return proxies

    def crawl_kuaidaili(self, page_count=10):
        """获取快代理网站的免费代理"""
        start_url = 'https://www.kuaidaili.com/free/inha/{}/'
        urls = [start_url.format(page) for page in range(1, page_count + 1)]
        for url in urls:
            html = get_page(url)
            if html:
                soup = BeautifulSoup(html, 'lxml')
                trs = soup.find('tbody').find_all('tr')
                for tr in trs:
                    ip = tr.find_all('td')[0].string
                    port = tr.find_all('td')[1].string
                    yield ':'.join([ip, port])


class Getter:
    def __init__(self):
        """初始化数据库类和代理爬虫类"""
        self.redis = RedisClient()
        self.crawler = Crawler()

    def is_over_threshold(self):
        """判断数据库是否已经存满"""
        if self.redis.count() >= POOL_UPPER_THRESHLD:
            return True
        return False

    def run(self):
        """开始抓取各个代理网站的免费代理存入数据库"""
        print('开始获取...')
        if not self.is_over_threshold():
            for i in range(self.crawler.CrawlFuncCount):
                crawl_func = self.crawler.CrawlFunc[i]
                proxies = self.crawler.get_proxies(crawl_func)
                for proxy in proxies:
                    print(proxy)
                    self.redis.add(proxy)


if __name__ == '__main__':
    a = Getter()
    a.run()

这里借助元类实现了自动定义属性 CrawlFunc 来记录Crawler类中所有以 crawl_ 开头的方法,CrawlFuncCount 记录这些方法的数量,这样以后有新的代理网站可以获取代理时,只需要添加 crawl_XXX 方法以相同的方式返回解析后的数据就行了, get_proxies 会依次执行传入的方法列表中的方法, 可以非常方便的扩展。

然后就是实现Getter类来管理存储部分,run方法中先判断是否到达存储阈值,未满则通过 crawler.CrawlFunc 获取所有爬取方法交给 get_proxies 去执行,然后将返回结果存入数据库。这样存储部分的工作就完成了,实例化Getter类并调用run方法即可爬取代理并存入数据库。

2.3 测试模块

第三方库:

import asyncio
import aiohttp
import time

from db import RedisClient


# 目标网址
TEST_URL = 'http://www.baidu.com'
# 正确的响应码列表
TRUE_STATUS_CODE = [200]
# 同时测试一组代理的数量
BATCH_TEST_SIZE = 50


class Tester:

    def __init__(self):
        """初始化数据库管理对象"""
        self.redis = RedisClient()

    async def test_one_proxy(self, proxy):
        """对目标网站测试一个代理是否可用"""
        conn = aiohttp.TCPConnector(ssl=False)
        async with aiohttp.ClientSession(connector=conn) as session:
            try:
                if isinstance(proxy, bytes):
                    # 解码为字符串
                    proxy = proxy.decode('utf-8')
                real_proxy = 'http://' + proxy
                async with session.get(TEST_URL, proxy=real_proxy, timeout=15) as response:
                    if response.status in TRUE_STATUS_CODE:
                        # 代理可用
                        self.redis.max(proxy)
                        print(proxy, 100)
                    else:
                        # 代理不可用
                        self.redis.decrease(proxy)
                        print(proxy, -1, "状态码错误")
            except Exception as e:
                self.redis.decrease(proxy)
                print(proxy, -1, e.args)

    async def start(self):
        """启动协程, 测试所有代理"""
        try:
            proxies = self.redis.all()
            for i in range(0, len(proxies), BATCH_TEST_SIZE):
                test_proxies = proxies[i: i+BATCH_TEST_SIZE]
                tasks = [self.test_one_proxy(proxy) for proxy in test_proxies]
                # 并发完成一批任务
                await asyncio.gather(*tasks)
                time.sleep(5)
        except Exception as e:
            print('测试器发生错误', e.args)

    def run(self):
        """开始运行"""
        asyncio.run(self.start())
        # # python3.7之前的写法
        # loop = asyncio.get_event_loop()
        # loop.run_until_complete(self.start())


if __name__ == '__main__':
    a = Tester()
    a.run()

由于代理数量非常大,同步请求的requests很难高效率的完成这个任务,我们使用异步请求库aiohttp,同时用到协程asyncio来成批的检测代理。

测试网站这里设置为百度,使用时可以根据目标不同设置对应网站的网址。还定义了一个正确的状态码列表,目前只有200,某些网站可能会有重定向等操作,可以自行添加状态码。

2.4 API模块

第三方库:

from flask import Flask, g
import json

from db import RedisClient


app = Flask(__name__)


def get_conn():
    if not hasattr(g, 'redis'):
        g.redis = RedisClient()
    return g.redis


@app.route('/')
def index():
    return '<h2>hello</h2>'


@app.route('/get/')
def get_proxy():
    """获取随机可用代理"""
    conn = get_conn()
    try:
        proxy = conn.random()
        result = json.dumps({'status': 'success', 'proxy': proxy})
    except Exception as e:
        result = json.dumps({'status': 'failure', 'info': e})
    finally:
        return result


@app.route('/count/')
def get_count():
    """获取代理总数"""
    conn = get_conn()
    return str(conn.count())


if __name__ == '__main__':
    app.run()

这里使用flask做了一个简单的API,包含欢迎页,获取随机代理页和获取代理总数页,获取代理页返回json格式的数据。

3. 整合运行


模块写好了,接下来只需要以多进程的方式将他们运行起来即可

from multiprocessing import Process

from api import app
from getter import Getter
from tester import Tester


# 周期
TESTER_CYCLE = 20
GETTER_CYCLE = 20

# 模块开关
TESTER_ENABLE = True
GETTER_ENABLE = True
API_ENABLE = True


class Run:
    def run_tester(self, cycle=TESTER_CYCLE):
    """定时检测代理可用情况"""
        tester = Tester()
        while True:
            print('开始测试')
            tester.run()
            time.sleep(cycle)

    def run_getter(self, cycle=GETTER_CYCLE):
        """定时获取代理"""
        getter = Getter()
        while True:
            print('开始抓取代理')
            getter.run()
            time.sleep(cycle)

    def run_api(self):
    """启动API接口"""
        app.run()

    def run(self):
        print('代理池开始运行')
        if TESTER_ENABLE:
            tester_process = Process(target=self.run_tester)
            tester_process.start()
        if GETTER_ENABLE:
            getter_process = Process(target=self.run_getter)
            getter_process.start()
        if API_ENABLE:
            api_process = Process(target=self.run_api)
            api_process.start()


if __name__ == '__main__':
    a = Run()
    a.run()

运行此文件便可以开始整个代理池的运行:

控制台输出
获取代理

4. 使用实例


import requests
import json


def get_proxy():
    """尝试获取代理"""
    try:
        r = requests.get('http://localhost:5000/get/')
        r.raise_for_status
        r.encoding = r.apparent_encoding
        return r.text
    except ConnectionError:
        return None


r = get_proxy()
if r:
    # 加载返回的json数据
    data = json.loads(r)
    if data['status'] == 'success':
        proxy = data['proxy']
        # 构造代理字典,根据请求链接不同自动设置
        proxies = {
            'http': 'http://' + proxy,
            'https': 'http://' + proxy
        }
        try:
            # 使用代理访问网站
            r = requests.get('http://www.baidu.com/', proxies=proxies)
            r.status_code
            r.encoding = r.apparent_encoding
            print(r.text)
        except Exception as e:
            print(e.args)
    else:
        print(data['info'])
else:
    print('未正常获取代理')
上一篇 下一篇

猜你喜欢

热点阅读