测试开发程序员我的Python自学之路

Python爬虫实战-使用Scrapy框架爬取土巴兔(三)

2017-03-18  本文已影响2601人  imflyn

通过上一篇文章Python爬虫实战-使用Scrapy框架爬取土巴兔(二)我们创建了工程目录与完成了基本配置。接下来就要开始做中间件的编写。

该系列其它文章


该篇文章主要讲Scrapy中自定义中间件(MIDDLEWARES)的作用与代码编写。

一.下载器中间件(Downloader Middleware)

1.下载器中间件的作用

下载器中间件是介于Scrapy的request/response处理的钩子框架。 是用于全局修改Scrapy request和response的一个轻量、底层的系统。

2.下载器中间件的配置

要使用自定义的下载器中间件,需要加入到settings.py文件的DOWNLOADER_MIDDLEWARES配置中:

DOWNLOADER_MIDDLEWARES = {
    'msic.scrapy.middlewares.CustomUserAgentMiddleware': 2,
    'tubatu.middlewares.RedirectionMiddleware': 998,
}

if USE_PROXY:
    DOWNLOADER_MIDDLEWARES['msic.scrapy.middlewares.CustomHttpProxyMiddleware'] = 1
    DOWNLOADER_MIDDLEWARES['msic.scrapy.middlewares.CatchExceptionMiddleware'] = 999

该配置是一个字典类型,键为中间件类的路径,值为其中间件的顺序,值越小越先被调用。

3.下载器中间件的编写

爬取土巴兔网站时,土巴兔对IP会有限制,如果一段时间内同一ip访问请求过多。服务端的http响应会直接返回503的error code。所以我们必须编写一些下载器中间件来应对这样的限制。同时我们也需要自己建立一个ip代理池来为爬虫中的http请求维护可用的代理ip。

import random

from msic.common import log, agents
from msic.proxy.proxy_pool import proxy_pool

class CatchExceptionMiddleware(object):
    def process_response(self, request, response, spider):
        if response.status < 200 or response.status >= 400:
            try:
                proxy_pool.add_failed_time(request.meta['proxy'].replace('http://', ''))
            except KeyError:
                pass
        return response

    def process_exception(self, request, exception, spider):
        try:
            proxy_pool.add_failed_time(request.meta['proxy'].replace('http://', ''))
        except Exception:
            pass


class CustomHttpProxyMiddleware(object):
    def process_request(self, request, spider):
        try:
            request.meta['proxy'] = "http://%s" % proxy_pool.random_choice_proxy()
        except Exception as e:
            log.error(e)


class CustomUserAgentMiddleware(object):
    def process_request(self, request, spider):
        agent = random.choice(agents.AGENTS_ALL)
        request.headers['User-Agent'] = agent
from scrapy import Spider

class RedirectionMiddleware(object):
    ERROR_COUNT = 0

    def process_response(self, request, response, spider: Spider):
        if response.status == 302 or response.status == 503:
            self.ERROR_COUNT += 1
            print('错误次数%s' % self.ERROR_COUNT)
            if self.ERROR_COUNT > 100:
                spider.close(spider, 'http status error')
        return response

    def process_exception(self, request, exception, spider):
        pass

下载器中间件主要是覆写三个回调方法:

二.Spider中间件(Spider Middleware)

1.Spider中间件的作用

Spider中间件是介入到Scrapy的spider处理机制的钩子框架,通过它来处理发送给Spiders的response及spider产生的item和request。

2.Spider中间件的配置

要使用自定义的下载器中间件,需要加入到settings.py文件的SPIDER_MIDDLEWARES配置中:

SPIDER_MIDDLEWARES = {
    'myproject.middlewares.CustomSpiderMiddleware': 543,
}

该配置同样是一个字典类型,键为中间件类的路径,值为其中间件的顺序,值越小越先被调用。

由于工程中没有使用到,所以不做过多说明,有疑问可以参考官方文档

三.IP代理池

不光是土巴兔,很多网站都会都爬虫做自己的限制。限制在一定时间内访问请求过多的IP,所以我们不得不使用ip代理池来保证我们的爬虫能够长时间运作。
代理池的运行机制:
我们在爬虫启动时先去获取最新的ip代理。并将ip都存入到数据库中。所以我们在数据库中会存一份ip列表如图1。在scpray在爬取网站数据时http请求可以根据自定义的算法将数据库中可靠的ip做为代理,不能使用本机的ip直接访问目标网站,否则爬取了一段时间你的ip就被封了。

图1
如果你的代理ip质量足够稳定,那么你可能不用在这上面多花心思。但如果我们用的代理ip不够稳定,那么我们就要强化我们的代理池。

下面放上关键代码
代理IP实体类:

from msic.common import utils


class Proxy(object):
    def __init__(self):
        self.ip = ''
        self.response_speed = -1
        self.validity = False
        self.origin = ''
        self.create_time = ''
        self.update_time = ''
        self.failed_count = 0

    @staticmethod
    def create(ip, origin):
        proxy = Proxy()
        proxy.ip = ip
        proxy.origin = origin
        proxy.create_time = utils.get_utc_time()  # 格式: 2017-03-18T06:16:26.887Z
        proxy.update_time = proxy.create_time
        proxy.failed_count = 0
        proxy.response_speed = -1
        proxy.validity = False
        return proxy

启动爬虫任务

def start(self):
        #抓取ip代理任务
        self.crawl_proxy_task(False)

        def task():
            self.check_ip_availability_task()
            schedule = Scheduler()
            # 代理池IP自检,60分钟运行一次
            schedule.every(60).minutes.do(self.check_ip_availability_task)

            while True:
                schedule.run_pending()
                time.sleep(1)

        thread = threading.Thread(target=task)
        thread.start()

抓取IP代理

def crawl_proxy_task(self, check_num: bool = True):
    if check_num:
        count = self.collection.count()
        #如果数据库中IP数大于最小ip数则不抓取
        if count > MIN_PROXY_COUNT:
            return
    utils.log("开始抓取代理")
    #具体抓取逻辑
    proxy_list = proxy_strategy.crawl_proxy()
    utils.log("开始保存")
    for proxy in proxy_list:
        if not self.collection.find_one({'ip': proxy.ip}):
            self.collection.insert_one(proxy.__dict__)
            utils.log('保存了:' + proxy.ip)
    utils.log("保存结束")

代理池自检

def check_ip_availability_task(self):
    #redis获取上次自检时间,如果未达到设定时间则不在检查
    last_check_time = self.redis_client.get(REDIS_KEY_LAST_CHECK_IP_TIME)
    now_time = datetime.utcnow().timestamp()
    if last_check_time is not None and (now_time - float(last_check_time)) < (TASK_INTERVAL * 60):
        return
    self.redis_client.set(REDIS_KEY_LAST_CHECK_IP_TIME, now_time)

    proxy_list = self.collection.find()
    for proxy in proxy_list:
        ip = proxy['ip']
        start_time = time.time()
        response = utils.http_request('http://lwons.com/wx', timeout=10)
        is_success = response.status_code == 200
        response.close()
        if not is_success:
            #如果请求失败,直接删除IP
            try:
                self.collection.delete_one({'ip': ip})
            except:
                pass
            utils.log('Check ip %s FAILED' % ip)
        else:
            #如果请求成功,在数据库中记录该ip最后响应的时间,下次取ip时优先取出使用
            elapsed = round(time.time() - start_time, 4)
            try:
                self.collection.update_one({'ip': ip},
                                           {"$set": {'update_time': utils.get_utc_time(), 'response_speed': elapsed, 'validity': True}})
            except:
                pass
            utils.log('Check ip %s SUCCESS' % ip)

http请求失败,在数据库中处理请求失败的IP

def add_failed_time(self, ip):
    proxy = self.collection.find_one({'ip': ip})
    if proxy is not None:
        failed_count = proxy['failed_count'] + 1
        utils.log("ip: %s 失败次数+1 已失败次数%s次" % (ip, failed_count))
        if failed_count <= FAILED_COUNT_BORDER:
            #如果未达到最大失败次数,则在数据库中添加一次失败
            try:
                self.collection.update_one({'ip': ip}, {"$set": {'update_time': utils.get_utc_time(), 'failed_count': failed_count}})
            except:
                pass
        else:
            #达到最大失败次数,则在数据库中删除
            try:
                self.collection.delete_one({'ip': ip})
            except:
                pass
    #检查数据库中IP是否足够
    self.crawl_proxy_task()

Scrapy的中间件在取出代理池中IP时,优先取出失败次数少,最近http响应成功的有效IP

def random_choice_proxy(self) -> str:
    proxy = self.collection.find().sort(
        [("failed_count", pymongo.ASCENDING), ("validity", pymongo.DESCENDING), ("response_speed", pymongo.ASCENDING),
         ("update_time", pymongo.DESCENDING)])
    return proxy[0]['ip']

最后

爬虫的下载中间件和代理池都已创建完毕,接下来就要编写具体的爬取规则Python爬虫实战-使用Scrapy框架爬取土巴兔(四)

附:

详细的项目工程在Github中,如果觉得还不错的话记得Star哦。

上一篇下一篇

猜你喜欢

热点阅读