读轮子之免费代理池搭建

2018-09-17  本文已影响0人  hellodyp

造轮子很高大上,其实理解一个轮子也是一件麻烦事
往往想理解一个库,打开源代码,我们就会在层层的函数/方法调用栈中迷失...

大的项目比如tornado、flask的源码乍一下手难度太高>_<,不妨从一个小项目开始
因为业务需要,公司的爬虫项目需要搭建一个代理池,确保池中的代理必须是可用状态
面向博客编程后,发现很多代理池轮子
综合评比下来,选用了这个方案:https://yukunweb.com/2018/4/build-free-asynchronous-proxy-pool/
轮子不大,但涉及到的技术概念还蛮多的,该篇博客是代理池实现详情说明文档

整套程序模块化设计,分为以下部分:

程序结构:

ProxyPool \
    Api \
        __init__.py
        api.py
    Spider \
        __init__.py
        get_proxy.py
    Db \
        __init__.py
        db.py
    Schedule \
        __init__.py
        adder.py
        tester.py
        schedule.py
    config.py
    run.py
代理获取

作者选择的免费代理:幻代理66代理快代理西刺代理
然后分析这些代理网站的网页结构,设计规则抓下来并保存
抓取部分代码:

import requests
from lxml import etree
from requests.exceptions import ConnectionError

def parse_url(url):
    headers = {
        'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; WOW64; rv:54.0) Gecko/20100101 Firefox/54.0'
    }
    try:
        resp = requests.get(url, headers=headers)
        if resp.status_code == 200:
            return resp.text
        return None
    except ConnectionError:
        print('Error.')
    return None

def proxy_xici():
    url = 'http://www.xicidaili.com/'
    resp = parse_url(url)
    html = etree.HTML(resp)
    ips = html.xpath('//*[@id="ip_list"]/tr/td[2]/text()')
    ports = html.xpath('//*[@id="ip_list"]/tr/td[3]/text()')
    for ip, port in zip(ips, ports):
        proxy = ip + ':' + port
        yield proxy

parse_url是一个网页下载函数
proxy_xici是西刺代理的抓取规则,用xpath解析并使用yield返回组装好的ip+port格式的代理,注意此函数是一个生成器,需要用遍历的方式获取返回内容

这些抓取ip的方法用类来实现,并且做成了可扩展模式
具体就是所有的提取ip的方法统一以proxy_作方法名开头,并把这些方法保存在元类中(其实保存的是方法名的字符串格式数据),创建类的时候自动检查以proxy_开头的方法,然后做一些处理
如果有新的免费代理网站,添加新的方法按照约定的规则命名即可
具体实现:

# Spider/get_proxy.py
class ProxyMetaclass(type):
    """
    元类,在ProxyGetter类中加入
    __CrawlFunc__和__CrawlFuncCount__两个属性
    分别表示爬虫函数和爬虫函数的数量
    """
    def __new__(cls, name, bases, attrs):
        count = 0
        attrs['__CrawlFunc__'] = []
        for k in attrs.keys():
            if k.startswith('proxy_'):
                attrs['__CrawlFunc__'].append(k)
                count += 1
        attrs['__CrawlFuncCount__'] = count
        return type.__new__(cls, name, bases, attrs)


class ProxyGetter(object, metaclass=ProxyMetaclass):

    def proxy_ip66(self):
        pass

    def proxy_xici(self):
        pass

    def proxy_kuai(self):
        pass

    def proxy_ihuan(self):
        pass

上篇文章刚刚介绍了元类,如果你认真理解的话,这里的代码就不难懂了
如果你理解的更透彻,甚至可以发现这个实现中有一个元类属性是不需要添加的,那就是后话了,下篇文章我们会对这个元类稍作修改使代码更简洁
从上面的代码看,作者在attr中添加了两个属性,一个__CrawlFuncCount__是保存当前类中获取代理的方法数量
另一个__CrawlFunc__是ip代理爬虫方法名的string列表,具体实现是约定如果类中属性以proxy_开头就表明是代理爬虫方法,保存到属性中
注意new方法中使用的是type返回:
return type.__new__(cls, name, bases, attrs)
现学现卖,根据我的上篇文章,其实这样写会更好:
return super(ProxyMetaclass, cls).__new__(cls, name, bases, attrs)

同时类中还提供了一个方法,通过爬虫方法名来调用方法获取代理

class ProxyGetter(object, metaclass=ProxyMetaclass):

    ...

    def get_raw_proxies(self, callback):
        proxies = []
        for proxy in eval("self.{}()".format(callback)):
            print('Getting', proxy, 'from', callback)
            proxies.append(proxy)
        return proxies

关于eval的语法,作者也有解释:

>>> m = 5
>>> n = 3
>>> eval('m') + eval('n')
8
数据库

持久化用的是mongo数据库,封装了一套简洁api
mongo是非关系型文档类型数据库,数据在库中的体现形式非常类似于json格式
在linux中安装好mongo,终端shell中输入mongo启动客户端
使用use db命令指定库,这一点和mysql一致
但是查看库中某张表的命令是不同的:db.table.find()

> use proxy
switched to db proxy
> db.proxy.find()
{ "_id" : ObjectId("5b9f43666cb73e293b904801"), "proxy" : "112.195.206.72:4226", "num" : 1024, "http/s" : true }
{ "_id" : ObjectId("5b9f45956cb73e293b90480e"), "proxy" : "119.114.123.98:8946", "num" : 1028, "http/s" : false }
{ "_id" : ObjectId("5b9f45966cb73e293b904811"), "proxy" : "116.7.187.99:4223", "num" : 1029, "http/s" : true }
{ "_id" : ObjectId("5b9f49c76cb73e293b904820"), "proxy" : "182.113.33.83:4221", "num" : 1033, "http/s" : true }
{ "_id" : ObjectId("5b9f502c6cb73e293b904822"), "proxy" : "116.149.202.69:6436", "num" : 1034, "http/s" : false }
{ "_id" : ObjectId("5b9f502e6cb73e293b904825"), "proxy" : "182.105.201.64:4162", "num" : 1035, "http/s" : true }
{ "_id" : ObjectId("5b9f50406cb73e293b90482c"), "proxy" : "106.46.136.83:4237", "num" : 1036, "http/s" : false }
{ "_id" : ObjectId("5b9f50426cb73e293b90482f"), "proxy" : "112.83.93.63:2589", "num" : 1037, "http/s" : true }
>

表中的数据类似json格式的键值对,其中必须要有_id字段,如果插入数据的时候未指定id,则mongo会自动插入id

python中和mongo交互,使用的是pymongo库,封装了对数据库的操作
作者在此之上又封装了一层:

# db.py
from pymongo import MongoClient, ASCENDING
# 我们把配置文件放入 config.py 文件内
from config import NAME, HOST, PORT


class MongodbClient(object):

    def __init__(self, table=TABLE, host=HOST, port=PORT):
        self.table = table
        self.client = MongoClient(host, port)
        self.db = self.client.NAME

    @property
    def get_counts(self):
        """获取表里的数据总数"""
        return self.db[self.table].count()

    def change_table(self, table):
        """切换数据库表"""
        self.table = table

    def set_num(self):
        """给每条数据设置唯一自增num"""
        nums = []
        datas = self.get_all()
        if datas:
            for data in datas:
                nums.append(data['num'])
            return max(nums)
        return 0

    def get_new(self):
        """获取最新一条数据"""
        data = self.get_all()[-1] if self.get_all() else None
        return data

    def get_data(self, num):
        """获取指定num的数据"""
        datas = self.get_all()
        if datas:
            data = [i for i in datas if i['num']==num][0]
            return data
        return None

    def get_all(self):
        """获取全部数据"""
        # 判断表里是否有数据
        if self.get_counts != 0:
            # 先排序
            self.sort()
            datas = [i for i in self.db[self.table].find()]
            return datas
        return None

    def put(self, proxy):
        """
        放置代理到数据库
        """
        num = self.proxy_num() + 1
        if self.db[self.table].find_one({'proxy': proxy}):
            self.delete(proxy)
            self.db[self.table].insert({'proxy': proxy, 'num': num})
        else:
            self.db[self.table].insert({'proxy': proxy, 'num': num})

    def delete(self, data):
        """删除数据"""
        self.db[self.table].remove(data)

    def clear(self):
        """清空表"""
        self.client.drop_database(self.table)

    def sort(self):
        """按num键排序"""
        self.db[self.table].find().sort('num', ASCENDING)

在init中初始化数据库、host、端口
put方法向库中插入数据
delete方法根据数据内容删除对应的数据
值得一提的是,插入的每条数据都有一个自增的字段num
这个自己是设置的,sort方法就是根据num来排序

调度器

代理测试:
代理测试的网址可以选取一个页面很小网址,这样可以占用很小的流浪,响应的速度也会有略微提升
作者提出如果用百度作为测试网址,会出现不管代理是否成功,都会有响应200的情况,我测了一下,的确是
作者选择http://2017.ip138.com/ic.asp这个测试ip的站点作为检测站。是否好用未知,我用的http检测站点是:http://mini.eastday.com/assets/v1/js/search_word.js

作者用的是aiohttp这个异步请求库,是一个基于协程异步实现的库,推荐在python3.5以上环境使用

# Schedule/tester.py
import asyncio
import aiohttp

from Db.db import MongodbClient
# 将配置信息放入配置文件config.py
from config import TEST_URL

class ProxyTester(object):
    test_url = TEST_URL

    def __init__(self):
        self._raw_proxies = None

    def set_raw_proxies(self, proxies):
        # 供外部添加需要测试的代理
        self._raw_proxies = proxies
        self._conn = MongodbClient()

    async def test_single_proxy(self, proxy):
       """
       测试一个代理,如果有效,将他放入usable-proxies
       """
        try:
            async with aiohttp.ClientSession() as session:
                try:
                    if isinstance(proxy, bytes):
                        proxy = proxy.decode('utf-8')
                    real_proxy = 'http://' + proxy
                    print('Testing', proxy)
                    async with session.get(self.test_url, proxy=real_proxy, timeout=10) as response:
                        if response.status == 200:
                            # 请求成功,放入数据库
                            self._conn.put(proxy)
                            print('Valid proxy', proxy)
                except Exception as e:
                    print(e)
        except Exception as e:
            print(e)

    def test(self):
       """
       异步测试所有代理
       """
        print('Tester is working...')
        try:
            loop = asyncio.get_event_loop()
            tasks = [self.test_single_proxy(proxy) for proxy in self._raw_proxies]
            loop.run_until_complete(asyncio.wait(tasks))
        except ValueError:
            print('Async Error')

关于aiohttp的用法,大家可以参考这篇文章。也可以直接参考中文文档

使用aiohttp的好处是,当爬虫发出请求等待网路io的时候不会一直阻塞等待,而可以继续其他的任务,比如说请求一百个网页,假设每个网页io耗时90毫秒,如果使用吧requets阻塞请求的话,耗时大概是:100*90毫秒
如果使用aiohttp,总耗时大概是90毫秒多一点
可以发现任务越多的话,耗时的差距也非常大

代理测的入口是test方法:
创建事件循环,用列表生成器产生任务tasks,异步任务注册到事件循环中

test_single_proxy是测试单个代理的方法,注意方法前的async 关键字
async 、await关键字只能在python3.5以上的环境中使用,让我们可以像写同步任务一样去写异步任务,有了async 关键字以后,方法/函数就不是普通方法/函数了,需要注册到事件循环loop = asyncio.get_event_loop()里执行

添加代理

在循环中实现,先检测库中代理数量,如果低于设定的阈值,就调用获取代理类中的方法获取代理检测并入库,如果达到阈值则break循环
在获取代理类中通过遍历CrawlFunc列表拿到获取代理的方法,调用类中的get_raw_proxies拿到代理,然后通过测试类中的set_raw_proxies设置好要测试的代理,在事件循环注册测试入库

# Schedule/adder.py
from Db.db import MongodbClient
from Spider.get_proxy import ProxyGetter
from .tester import ProxyTester


class PoolAdder(object):
    """
    启动爬虫,添加代理到数据库中
    """

    def __init__(self, threshold):
        self._threshold = threshold
        self._conn = MongodbClient()
        self._tester = ProxyTester()
        self._crawler = ProxyGetter()

    def is_over_threshold(self):
        """
        判断数据库中代理数量是否达到设定阈值
        """
        return True if self._conn.get_nums >= self._threshold else False

    def add_to_pool(self):
        """
        补充代理
        """
        print('PoolAdder is working...')
        proxy_count = 0
        while not self.is_over_threshold():
            # 迭代所有的爬虫,元类给ProxyGetter的两个方法
            # __CrawlFuncCount__是爬虫数量,__CrawlFunc__是爬虫方法
            for callback_label in range(self._crawler.__CrawlFuncCount__):
                callback = self._crawler.__CrawlFunc__[callback_label]
                # 调用ProxyGetter()方法进行抓取代理
                raw_proxies = self._crawler.get_raw_proxies(callback)
                # 调用方法测试爬取到的代理
                self._tester.set_raw_proxies(raw_proxies)
                self._tester.test()
                proxy_count += len(raw_proxies)
                if self.is_over_threshold():
                    print('Proxy is enough, waiting to be used...')
                    break
            if proxy_count == 0:
                print('The proxy source is exhausted.')
调度器定时实现

调度器中两个定时任务,用进程来实现(协程配合进程比较简单些),一个是定时取部分代理调用测试类检测,一个是定时检测数据库中的代理是否低于最低阈值,调用添加类添加。

# Schedulr/schedule.py
import time
from multiprocessing import Process

from ProxyPool.db import MongodbClient
from .tester import ProxyTester
from .adder import PoolAdder
from config import VALID_CHECK_CYCLE, POOL_LEN_CHECK_CYCLE \
        POOL_LOWER_THRESHOLD, POOL_UPPER_THRESHOLD

class Schedule(object):

    @staticmethod
    def valid_proxy(cycle=VALID_CHECK_CYCLE):
        """
        从数据库中拿到一半代理进行检查
        """
        conn = MongodbClient()
        tester = ProxyTester()
        while True:
            print('Refreshing ip...')
            # 调用数据库,从左边开始拿到一半代理
            count = int(0.5 * conn.get_nums)
            if count == 0:
                print('Waiting for adding...')
                time.sleep(cycle)
                continue
            raw_proxies = conn.get(count)
            tester.set_raw_proxies(raw_proxies)
            tester.test()
            time.sleep(cycle)

    @staticmethod
    def check_pool(lower_threshold=POOL_LOWER_THRESHOLD,
                upper_threshold=POOL_UPPER_THRESHOLD,
                cycle=POOL_LEN_CHECK_CYCLE):
        """
        如果代理数量少于最低阈值,添加代理
        """
        conn = MongodbClient()
        adder = PoolAdder(upper_threshold)
        while True:
            if conn.get_nums < lower_threshold:
                adder.add_to_pool()
            time.sleep(cycle)

    def run(self):
        print('Ip Processing running...')
        valid_process = Process(target=Schedule.valid_proxy)
        check_process = Process(target=Schedule.check_pool)
        valid_process.start()
        check_process.start()

阈值和两个调度周期在config中:

# config.py
# Pool 的低阈值和高阈值
POOL_LOWER_THRESHOLD = 10
POOL_UPPER_THRESHOLD = 40

# 两个调度进程的周期
VALID_CHECK_CYCLE = 600
POOL_LEN_CHECK_CYCLE = 20

valid_proxy周期的从数据库中左侧(为什么是左侧,详情查看db.py源码,在db中实现了)获取一半的代理然后检测
check_pool方法周期的检测数据库中的代理数量,少于阈值则调用add_to_pool方法补充

API

接口部分用flask实现,比较简单,就不列举出来了
以下是程序入口文件:

# run.py
from Api.api import app
from Schedule.schedule import Schedule


def main():
    # 任务类的两个周期进程就是整个调度器
    s = Schedule()
    s.run()
    app.run()


if __name__ == '__main__':
    main()

项目地址

作者的项目github
感兴趣的童鞋赶紧下载下来好好调试一下吧

上一篇下一篇

猜你喜欢

热点阅读