scrapy分布式去重组件源码及其实现过程

2019-03-15  本文已影响0人  Python之战

scrapy_redis在继承scrapy去重组件的基础上覆盖了某些方法,原scrapy去重是基于单机情况下的内部去重,但是分布式是多机条件下的多爬虫协同去重,因此需要让不同及其上的同一个爬虫能够在同一个地方进行去重,这就是Redis的集合。

先看看scrapy_redis 去重组件dupefilter的源码:

import logging
import time

from scrapy.dupefilters import BaseDupeFilter
from scrapy.utils.request import request_fingerprint

from .connection import get_redis_from_settings

DEFAULT_DUPEFILTER_KEY = "dupefilter:%(timestamp)s"

logger = logging.getLogger(__name__)

# TODO: Rename class to RedisDupeFilter.
class RFPDupeFilter(BaseDupeFilter):
    """Redis-based request duplicates filter.
    This class can also be used with default Scrapy's scheduler.
    """

    logger = logger

    def __init__(self, server, key, debug=False):
        """Initialize the duplicates filter.
        Parameters
        ----------
        server : redis.StrictRedis
            The redis server instance.
        key : str
            Redis key Where to store fingerprints.
        debug : bool, optional
            Whether to log filtered requests.
        """
        self.server = server
        self.key = key
        self.debug = debug
        self.logdupes = True

    @classmethod
    def from_settings(cls, settings):
        """Returns an instance from given settings.
        This uses by default the key ``dupefilter:<timestamp>``. When using the
        ``scrapy_redis.scheduler.Scheduler`` class, this method is not used as
        it needs to pass the spider name in the key.
        Parameters
        ----------
        settings : scrapy.settings.Settings
        Returns
        -------
        RFPDupeFilter
            A RFPDupeFilter instance.
        """
        server = get_redis_from_settings(settings)
        # XXX: This creates one-time key. needed to support to use this
        # class as standalone dupefilter with scrapy's default scheduler
        # if scrapy passes spider on open() method this wouldn't be needed
        # TODO: Use SCRAPY_JOB env as default and fallback to timestamp.
        key = DEFAULT_DUPEFILTER_KEY % {'timestamp': int(time.time())}
        debug = settings.getbool('DUPEFILTER_DEBUG')
        return cls(server, key=key, debug=debug)

    @classmethod
    def from_crawler(cls, crawler):
        """Returns instance from crawler.
        Parameters
        ----------
        crawler : scrapy.crawler.Crawler
        Returns
        -------
        RFPDupeFilter
            Instance of RFPDupeFilter.
        """
        return cls.from_settings(crawler.settings)

    def request_seen(self, request):
        """Returns True if request was already seen.
        Parameters
        ----------
        request : scrapy.http.Request
        Returns
        -------
        bool
        """
        fp = self.request_fingerprint(request)
        # This returns the number of values added, zero if already exists.
        added = self.server.sadd(self.key, fp)
        return added == 0

    def request_fingerprint(self, request):
        """Returns a fingerprint for a given request.
        Parameters
        ----------
        request : scrapy.http.Request
        Returns
        -------
        str
        """
        return request_fingerprint(request)

    def close(self, reason=''):
        """Delete data on close. Called by Scrapy's scheduler.
        Parameters
        ----------
        reason : str, optional
        """
        self.clear()

    def clear(self):
        """Clears fingerprints data."""
        self.server.delete(self.key)

    def log(self, request, spider):
        """Logs given request.
        Parameters
        ----------
        request : scrapy.http.Request
        spider : scrapy.spiders.Spider
        """
        if self.debug:
            msg = "Filtered duplicate request: %(request)s"
            self.logger.debug(msg, {'request': request}, extra={'spider': spider})
        elif self.logdupes:
            msg = ("Filtered duplicate request %(request)s"
                   " - no more duplicates will be shown"
                   " (see DUPEFILTER_DEBUG to show all duplicates)")
            msg = "Filtered duplicate request: %(request)s"
            self.logger.debug(msg, {'request': request}, extra={'spider': spider})
            self.logdupes = False

from_settings、from_crawler方法不用解释,就是读取配置文件连接Redis设置key,关键在request_seen、request_fingerprint这两个方法;request_seen调用self.request_fingerprint进而调用from scrapy.utils.request import request_fingerprint生成request的指纹。

再看scrapy.utils.request 中的request_fingerprint源码:

"""
This module provides some useful functions for working with
scrapy.http.Request objects
"""

from __future__ import print_function
import hashlib
import weakref
from six.moves.urllib.parse import urlunparse

from w3lib.http import basic_auth_header
from scrapy.utils.python import to_bytes, to_native_str

from w3lib.url import canonicalize_url
from scrapy.utils.httpobj import urlparse_cached

_fingerprint_cache = weakref.WeakKeyDictionary()
def request_fingerprint(request, include_headers=None):
    """
    Return the request fingerprint.
    The request fingerprint is a hash that uniquely identifies the resource the
    request points to. For example, take the following two urls:
    http://www.example.com/query?id=111&cat=222
    http://www.example.com/query?cat=222&id=111
    Even though those are two different URLs both point to the same resource
    and are equivalent (ie. they should return the same response).
    Another example are cookies used to store session ids. Suppose the
    following page is only accesible to authenticated users:
    http://www.example.com/members/offers.html
    Lot of sites use a cookie to store the session id, which adds a random
    component to the HTTP Request and thus should be ignored when calculating
    the fingerprint.
    For this reason, request headers are ignored by default when calculating
    the fingeprint. If you want to include specific headers use the
    include_headers argument, which is a list of Request headers to include.
    """
    if include_headers:
        include_headers = tuple(to_bytes(h.lower())
                                 for h in sorted(include_headers))
    cache = _fingerprint_cache.setdefault(request, {})
    if include_headers not in cache:
        fp = hashlib.sha1()
        fp.update(to_bytes(request.method))
        fp.update(to_bytes(canonicalize_url(request.url)))
        fp.update(request.body or b'')
        if include_headers:
            for hdr in include_headers:
                if hdr in request.headers:
                    fp.update(hdr)
                    for v in request.headers.getlist(hdr):
                        fp.update(v)
        cache[include_headers] = fp.hexdigest()
    return cache[include_headers]

def request_authenticate(request, username, password):
    """Autenticate the given request (in place) using the HTTP basic access
    authentication mechanism (RFC 2617) and the given username and password
    """
    request.headers['Authorization'] = basic_auth_header(username, password)

def request_httprepr(request):
    """Return the raw HTTP representation (as bytes) of the given request.
    This is provided only for reference since it's not the actual stream of
    bytes that will be send when performing the request (that's controlled
    by Twisted).
    """
    parsed = urlparse_cached(request)
    path = urlunparse(('', '', parsed.path or '/', parsed.params, parsed.query, ''))
    s = to_bytes(request.method) + b" " + to_bytes(path) + b" HTTP/1.1\r\n"
    s += b"Host: " + to_bytes(parsed.hostname or b'') + b"\r\n"
    if request.headers:
        s += request.headers.to_string() + b"\r\n"
    s += b"\r\n"
    s += request.body
    return s

def referer_str(request):
    """ Return Referer HTTP header suitable for logging. """
    referrer = request.headers.get('Referer')
    if referrer is None:
        return referrer
    return to_native_str(referrer, errors='replace')

request_fingerprint用于返回唯一指纹,并且对于携带参数顺序不同的URL返回相同的指纹,通过哈希计算实现,参与哈希计算的有request.method、request.url、request.body,同时提供了一个备选列表参数,该列表存放需要加入计算的request.headers中的字段信息,也就是cookie也可以参与指纹计算,针对同一个URL但是header中的字段不同那么也可以视为不同的请求,从而不被去重。

当然获得一个请求的指纹之后,便可与Redis中的去重列表比对,如果存在,那么抛弃,如果不存在那么将指纹加入去重集合,同时通知调度器该request可以进入队列,这便是去重的实现过程。

image
上一篇下一篇

猜你喜欢

热点阅读