为爬虫搭建一个ProxyPool

2018-05-31  本文已影响210人  LinxsCoding

前言

在日常写数据采集脚本的时候,我们总会遇到各种反爬手段,比如:headers反爬,ip反爬等。上一篇我们知道了Cookies池的搭建过程,通过Cookies池可以登录站点抓取设置登录限制的页面数据。但是仅仅有Cookies池是不够的,因为你Cookies的数量再多,也只是单节点,有一些网站如58会设置IP反爬,你的频率快了之后,ip可能会有被封的风险。那我限制下速度不行吗? 限制了速度,其实也就降低了效率,还有这并不一定能有效绕过IP反爬。所以,我们就有必要去构建和维护一个代理池

Step 1 ---- 实现思路



Step2 ---- 模块构成及其功能

代理池有5个模块构成:存储模块,获取模块,检测模块,接口模块,调度模块。下图展示的是代理池的结构

proxy.jpg

Step3 ---- 各个模块实现

-- 存储模块 --



class RedisClient(object):
    def __init__(self):
        '''
        decode_responses参数设置为true,写入的value值为str,否则为字节型
        '''
        self.db = StrictRedis(host=HOST,port=PORT,password=PASSWD,decode_responses=True)
        pass


    def random(self):
        result = self.db.zrangebyscore(REDIS_KEY,MAX_SCORE,MAX_SCORE)
        if len(result):
            return choice(result)
            
        else:
            result = self.db.zrevrange(REDIS_KEY,MIN_SCORE,MAX_SCORE)
            if len(result):
                return choice(result)
            else:
                raise PoolEmptyError

    

    def add(self,proxy,score=INITIAL_SCORE):
        if not self.db.zscore(REDIS_KEY,proxy):
            return self.db.zadd(REDIS_KEY,score,proxy)
        
    

    def decrease(self,proxy):
        # 获取proxy对应的分数
        score = self.db.zscore(REDIS_KEY,proxy)
        if score and score > MIN_SCORE:
            print("代理 {} , 分数 {} , 减1".format(proxy,score))
            return self.db.zincrby(REDIS_KEY,proxy,-1)
            
        else:
            print("代理 {} , 当前分数 {} , 移除".format(proxy,score))
            self.db.zrem(REDIS_KEY,proxy)
            
        
    

    def exists(self,proxy):
        return not self.db.zscore(REDIS_KEY,proxy) == None
        
    

    def max(self,proxy):
        print("代理 {} , 可用 , 设置为{}".format(proxy,MAX_SCORE))
        return self.db.zadd(REDIS_KEY,MAX_SCORE,proxy)v

-- 获取模块 --


class ProxyMetaClass(type):
    def __new__(cls,name,bases,attrs):
        attrs['__CrawleFunc__'] = []
        count = 0
        for k,v in attrs.items():
            if 'crawle' in k:
                attrs['__CrawleFunc__'].append(k)
                count += 1
        attrs['__CrawleCount__'] = count
        return type.__new__(cls,name,bases,attrs)



class Crawler(object,metaclass=ProxyMetaClass):
    def get_proxies(self,callack):
        proxies = []
        if callack == 'crawle_daili66':
            return self.process_daili66(callack)
        else:
            for proxy in eval("self.{}()".format(callack)):
                print("成功获取代理 {}".format(proxy))
                proxies.append(proxy)
            return proxies
    

    def process_daili66(self,callback):
        proxies = []
        for page_count in range(1,11):
            for proxy in eval("self.{}(page_count={})".format(callback,page_count)):
                print("成功获取代理 {}".format(proxy))
                proxies.append(proxy)
        return proxies


    def crawle_xiciproxy(self):
        start_url = "http://www.xicidaili.com/"
        html = get_pages(start_url,"xici")
        if html:
            return parse_xiciproxy(html)
        
    

    def crawle_daili66(self,page_count=1):
        start_url = "http://www.66ip.cn/{}.html"
        sleep(5)
        html = get_pages(start_url.format(page_count),website="66")
        if html:
            print("第{}页代理".format(page_count))
            return parse_66(html)
    

    def crawle_guobanjia(self):
        start_url = "http://www.goubanjia.com/"
        html = get_pages(start_url,'goubanjia')
        if html:
            return parse_guobanjia(html)
        pass


-- 检测模块 --


class CheckUp(object):
    def __init__(self):
        self.db = RedisClient()
    


    def run(self):
        # get all proxies
        # get event loop
        # split proxies to serval part
        # pack a task list
        # call run to exec asynic

        print("Start test")
        try:
            proxies = self.db.all()
            loop = asyncio.get_event_loop()
            for i in range(0,len(proxies),BATCH_SIZE):
                tasks_proxies = proxies[i:i + BATCH_SIZE]
                tasks = [self.check_single_proxy(proxy) for proxy in tasks_proxies]
                loop.run_until_complete(asyncio.wait(tasks))
                sleep(3)
        except Exception as e:
            _ = e
            print(e.args)
            print("CheckUp occurs Error")

    async def check_single_proxy(self,proxy):
        connection = aiohttp.TCPConnector(verify_ssl=False)
        async with aiohttp.ClientSession(connector=connection) as session:
            # 测试代理可用性



-- 接口模块 --



@app.route("/")
def index():
    return "<h2>Welcome to Proxy Pool System</h2>"
    



def get_conn():
    if not hasattr(g,"redis"):
        g.redis = RedisClient()
    return g.redis

@app.route('/count')
def get_count():
    conn = get_conn()
    return conn.count()
    


@app.route('/random')
def random():
    conn = get_conn()
    return conn.random()



-- 调度模块 --



class Scheduler(object):
    def getter(self,cylcle=CYCLE_GETTER):
        getter = Getter()
        while True:
            print("Start to get proxy")
            getter.run()
            sleep(cylcle)
            
    

    def checkup(self,cycle=CYCLE_CHECKUP):
        check = CheckUp()
        while True:
            print("Start to checkup proxy")
            check.run()
            sleep(cycle)
    

    def api(self):
        app.run(API_HOST,API_PORT)
        
    
    def run(self):
        print("Proxy Pool start run")
        if API_PROCSS:
            api_process = multiprocessing.Process(target=self.api)
            api_process.start()

        
        if GETTER_PROCESS:
            getter_process = multiprocessing.Process(target=self.getter)
            getter_process.start()

        
        if CHECKUP_PROCESS:
            checkup_process = multiprocessing.Process(target=self.checkup)
            checkup_process.start()
        


最后

使用代理池我们可以随机更换IP(不一定都有用)来对抗IP反爬,这对于爬取大规模数据是必要的,也是对一个爬虫工程师最基本的条件

最后运行一下,可以看到如下结果

运行.png 代理池.png

具体代码已发布在github上:点我传送门

上一篇 下一篇

猜你喜欢

热点阅读