scrapy入门(二)
暂停和重启
https://scrapy-chs.readthedocs.io/zh_CN/1.0/topics/jobs.html
要启用持久化支持,你只需要通过 JOBDIR
设置 job directory
选项。这个路径将会存储 所有的请求数据来保持一个单独任务的状态(例如:一次spider爬取(a spider run))。必须要注意的是,这个目录不允许被不同的spider 共享,甚至是同一个spider的不同jobs/runs也不行。也就是说,这个目录就是存储一个 单独 job的状态信息。
scrapy crawl somespider -s JOBDIR=crawls/somespider-1
然后,你就能在任何时候安全地停止爬虫(按Ctrl-C或者发送一个信号)。恢复这个爬虫也是同样的命令:
scrapy crawl somespider -s JOBDIR=crawls/somespider-1
去重原理
在scrapy源码中找到scrapy/dupefilters.py
文件,部分源码
class RFPDupeFilter(BaseDupeFilter):
"""Request Fingerprint duplicates filter"""
def __init__(self, path=None, debug=False):
self.file = None
self.fingerprints = set()
self.logdupes = True
self.debug = debug
self.logger = logging.getLogger(__name__)
if path:
self.file = open(os.path.join(path, 'requests.seen'), 'a+')
self.file.seek(0)
self.fingerprints.update(x.rstrip() for x in self.file)
@classmethod
def from_settings(cls, settings):
debug = settings.getbool('DUPEFILTER_DEBUG')
return cls(job_dir(settings), debug)
def request_seen(self, request):
fp = self.request_fingerprint(request)
if fp in self.fingerprints:
return True
self.fingerprints.add(fp)
if self.file:
self.file.write(fp + os.linesep)
def request_fingerprint(self, request):
return request_fingerprint(request)
def close(self, reason):
if self.file:
self.file.close()
def log(self, request, spider):
if self.debug:
msg = "Filtered duplicate request: %(request)s"
self.logger.debug(msg, {'request': request}, extra={'spider': spider})
elif self.logdupes:
msg = ("Filtered duplicate request: %(request)s"
" - no more duplicates will be shown"
" (see DUPEFILTER_DEBUG to show all duplicates)")
self.logger.debug(msg, {'request': request}, extra={'spider': spider})
self.logdupes = False
spider.crawler.stats.inc_value('dupefilter/filtered', spider=spider)
里面有一个request_seen
方法,这个方法在scrapy/core/scheduler.py
中被调用
class Scheduler(object):
...
def enqueue_request(self, request):
if not request.dont_filter and self.df.request_seen(request):
self.df.log(request, self.spider)
return False
dqok = self._dqpush(request)
if dqok:
self.stats.inc_value('scheduler/enqueued/disk', spider=self.spider)
else:
self._mqpush(request)
self.stats.inc_value('scheduler/enqueued/memory', spider=self.spider)
self.stats.inc_value('scheduler/enqueued', spider=self.spider)
return True
...
回到request_seen
方法继续查看
def request_seen(self, request):
fp = self.request_fingerprint(request)
if fp in self.fingerprints:
return True
self.fingerprints.add(fp)
if self.file:
self.file.write(fp + os.linesep)
# 返回的`request_fingerprint`是`from scrapy.utils.request import request_fingerprint`
def request_fingerprint(self, request):
return request_fingerprint(request)
scrapy\utils\request.py
这个函数将request
进行hash
,最后生成摘要(fp.hexdigest()
)
def request_fingerprint(request, include_headers=None):
"""
Return the request fingerprint.
The request fingerprint is a hash that uniquely identifies the resource the
request points to. For example, take the following two urls:
http://www.example.com/query?id=111&cat=222
http://www.example.com/query?cat=222&id=111
Even though those are two different URLs both point to the same resource
and are equivalent (ie. they should return the same response).
Another example are cookies used to store session ids. Suppose the
following page is only accesible to authenticated users:
http://www.example.com/members/offers.html
Lot of sites use a cookie to store the session id, which adds a random
component to the HTTP Request and thus should be ignored when calculating
the fingerprint.
For this reason, request headers are ignored by default when calculating
the fingeprint. If you want to include specific headers use the
include_headers argument, which is a list of Request headers to include.
"""
if include_headers:
include_headers = tuple(to_bytes(h.lower())
for h in sorted(include_headers))
cache = _fingerprint_cache.setdefault(request, {})
if include_headers not in cache:
fp = hashlib.sha1()
fp.update(to_bytes(request.method))
fp.update(to_bytes(canonicalize_url(request.url)))
fp.update(request.body or b'')
if include_headers:
for hdr in include_headers:
if hdr in request.headers:
fp.update(hdr)
for v in request.headers.getlist(hdr):
fp.update(v)
cache[include_headers] = fp.hexdigest()
return cache[include_headers]
我们可以看到,去重指纹是sha1(method + url + body + header)
所以,实际能够去掉重复的比例并不大。
如果我们需要自己提取去重的finger,需要自己实现Filter,并配置上它。
下面这个Filter只根据url去重:
from scrapy.dupefilter import RFPDupeFilter
class SeenURLFilter(RFPDupeFilter):
"""A dupe filter that considers the URL"""
def __init__(self, path=None):
self.urls_seen = set()
RFPDupeFilter.__init__(self, path)
def request_seen(self, request):
if request.url in self.urls_seen:
return True
else:
self.urls_seen.add(request.url)
不要忘记配置上:
DUPEFILTER_CLASS ='scraper.custom_filters.SeenURLFilter'
Telnet
https://scrapy-chs.readthedocs.io/zh_CN/1.0/topics/telnetconsole.html
Scrapy运行的有telnet服务,我们可以通过这个功能来得到一些性能指标。通过telnet命令连接到6023端口,然后就会得到一个在爬虫内部环境的Python命令行。要小心的是,如果你在这里运行了一些阻塞的操作,比如time.sleep(),正在运行的爬虫就会被中止。通过内建的est()函数可以打印出一些性能指标。
打开第一个命令行,运行以下代码:
$ telnet localhost 6023
>>> est()
...
len(engine.downloader.active) : 16
...
len(engine.slot.scheduler.mqs) : 4475
...
len(engine.scraper.slot.active) : 115
engine.scraper.slot.active_size : 117760
engine.scraper.slot.itemproc_size : 105
在这里我们忽略了dqs指标,如果你启用了持久化支持的功能,亦即设置了JOBDIR设置项,你也
会得到非零的dqs(len(engine.slot.scheduler.dqs)
)值,这时候就应当把dqs加到mqs上去,以便后续的分析。
-
mqs
意味着在调度器中有很多请求等待处理(4475个请求)。这是没问题的。 -
len(engine.downloader.active)
表示着现在有16个请求正被下载器下载。这和我们设置的CONCURRENT_REQUESTS
值是一样的,所以也没问题。 -
len(engine.scraper.slot.active)
告诉我们现在正有115个响应在scraper
中处理,这些响应的总的大小可以从engine.scraper.slot.active_size
指标得到,共是115kb。除了这些响应,pipeline
中正有105个 -
Item
被处理——从engine.scraper.slot.itemproc_size
中得知,也就是说,还有10个正在爬虫中进行处理。总的来说,可以确定下载器就是系统的瓶颈,因为在下载器之前有很多请求(mqs)在队列中等待处理,下载器已经被充分地利用了;在下载器之后,我们有一个或多或少比较很稳定的工作量(可以通过多次调用est()
函数来证实这一点)。
另一个信息来源是stats
对象,它一般情况下会在爬虫运行结束后打印出来。而在telnet
中,我们可以随时通过stats.get_stats()
得到一个dict
对象,并用p()函数打印出来:
$ p(stats.get_stats()) {'downloader/request_bytes': 558330, ... 'item_scraped_count': 2485, ...}
数据收集
https://scrapy-chs.readthedocs.io/zh_CN/1.0/topics/stats.html
scrapy/statscollectors.py
"""
Scrapy extension for collecting scraping stats
"""
import pprint
import logging
logger = logging.getLogger(__name__)
class StatsCollector(object):
def __init__(self, crawler):
self._dump = crawler.settings.getbool('STATS_DUMP')
self._stats = {}
def get_value(self, key, default=None, spider=None):
return self._stats.get(key, default)
def get_stats(self, spider=None):
return self._stats
def set_value(self, key, value, spider=None):
self._stats[key] = value
def set_stats(self, stats, spider=None):
self._stats = stats
def inc_value(self, key, count=1, start=0, spider=None):
d = self._stats
d[key] = d.setdefault(key, start) + count
def max_value(self, key, value, spider=None):
self._stats[key] = max(self._stats.setdefault(key, value), value)
def min_value(self, key, value, spider=None):
self._stats[key] = min(self._stats.setdefault(key, value), value)
def clear_stats(self, spider=None):
self._stats.clear()
def open_spider(self, spider):
pass
def close_spider(self, spider, reason):
if self._dump:
logger.info("Dumping Scrapy stats:\n" + pprint.pformat(self._stats),
extra={'spider': spider})
self._persist_stats(self._stats, spider)
def _persist_stats(self, stats, spider):
pass
class MemoryStatsCollector(StatsCollector):
def __init__(self, crawler):
super(MemoryStatsCollector, self).__init__(crawler)
self.spider_stats = {}
def _persist_stats(self, stats, spider):
self.spider_stats[spider.name] = stats
class DummyStatsCollector(StatsCollector):
def get_value(self, key, default=None, spider=None):
return default
def set_value(self, key, value, spider=None):
pass
def set_stats(self, stats, spider=None):
pass
def inc_value(self, key, count=1, start=0, spider=None):
pass
def max_value(self, key, value, spider=None):
pass
def min_value(self, key, value, spider=None):
pass
404页面收集
class JobboleSpider(scrapy.Spider):
name = 'jobbole'
allowed_domains = ['blog.jobbole.com']
start_urls = ['http://blog.jobbole.com/all-posts/']
# 收集404的url和数量
handle_httpstatus_list = [404,]
def __init__(self):
self.fail_urls = []
super(JobboleSpider, self).__init__()
def parse(self, response):
if response.status == 404:
self.fail_urls.append(response.url)
self.crawler.stats.inc_value('failed_url')
...
信号
https://scrapy-chs.readthedocs.io/zh_CN/1.0/topics/signals.html
在spider关闭时对fail_urls
进行处理
def __init__(self):
self.fail_urls = []
super(JobboleSpider, self).__init__()
dispatcher.connect(self.handle_spider_closed, signal=signals.spider_closed)
def handle_spider_closed(self, spider, response):
self.crawler.stats.set_value('failed_urls', ','.join(self.fail_urls))
...
扩展
https://scrapy-chs.readthedocs.io/zh_CN/1.0/topics/extensions.html
scrapy/extensions
包里有一些扩展实例
分布式爬虫
http://doc.scrapy.org/en/master/topics/practices.html#distributed-crawls
Scrapy并没有提供内置的机制支持分布式(多服务器)爬取。不过还是有办法进行分布式爬取, 取决于您要怎么分布了。
如果您有很多spider,那分布负载最简单的办法就是启动多个Scrapyd,并分配到不同机器上。
如果想要在多个机器上运行一个单独的spider,那您可以将要爬取的url进行分块,并发送给spider。 例如:
首先,准备要爬取的url列表,并分配到不同的文件url里:
http://somedomain.com/urls-to-crawl/spider1/part1.list
http://somedomain.com/urls-to-crawl/spider1/part2.list
http://somedomain.com/urls-to-crawl/spider1/part3.list
接着在3个不同的Scrapd服务器中启动spider。spider会接收一个(spider)参数 part , 该参数表示要爬取的分块:
curl http://scrapy1.mycompany.com:6800/schedule.json -d project=myproject -d spider=spider1 -d part=1
curl http://scrapy2.mycompany.com:6800/schedule.json -d project=myproject -d spider=spider1 -d part=2
curl http://scrapy3.mycompany.com:6800/schedule.json -d project=myproject -d spider=spider1 -d part=3
scrapy-redis分布式爬虫
https://github.com/rmax/scrapy-redis
Redis 命令参考
http://redisdoc.com/
pip install scrapy-redis
Scrapy 是一个通用的爬虫框架,但是不支持分布式,Scrapy-redis是为了更方便地实现Scrapy分布式爬取,而提供了一些以redis为基础的组件(仅有组件)。
Scrapy-redis提供了下面四种组件(components):(四种组件意味着这四个模块都要做相应的修改)
image
- Scheduler
- Duplication Filter
- Item Pipeline
- Base Spider
如上图所⽰示,scrapy-redis在scrapy的架构上增加了redis,基于redis的特性拓展了如下组件:
-
Scheduler
Scrapy改造了python本来的collection.deque
(双向队列)形成了自己的Scrapy queue(https://github.com/scrapy/queuelib/blob/master/queuelib/queue.py)),但是Scrapy多个spider不能共享
待爬取队列Scrapy queue, 即Scrapy本身不支持爬虫分布式,scrapy-redis
的解决是把这个Scrapy queue
换成redis
数据库(也是指redis队列),从同一个redis-server
存放要爬取的request
,便能让多个spider
去同一个数据库里读取。
Scrapy中跟“待爬队列”直接相关的就是调度器Scheduler
,它负责对新的request进行入列操作(加入Scrapy queue
),取出下一个要爬取的request
(从Scrapy queue
中取出)等操作。它把待爬队列按照优先级建立了一个字典结构,比如:
{
优先级0 : 队列0
优先级1 : 队列1
优先级2 : 队列2
}
然后根据request中的优先级,来决定该入哪个队列,出列时则按优先级较小的优先出列。为了管理这个比较高级的队列字典,Scheduler
需要提供一系列的方法。但是原来的Scheduler
已经无法使用,所以使用Scrapy-redis
的scheduler
组件。
-
Duplication Filter
Scrapy中用集合实现这个request
去重功能,Scrapy中把已经发送的request指纹放入到一个集合中,把下一个request的指纹拿到集合中比对,如果该指纹存在于集合中,说明这个request发送过了,如果没有则继续操作。这个核心的判重功能是这样实现的:
def request_seen(self, request):
# self.request_figerprints就是一个指纹集合
fp = self.request_fingerprint(request)
# 这就是判重的核心操作
if fp in self.fingerprints:
return True
self.fingerprints.add(fp)
if self.file:
self.file.write(fp + os.linesep)
在scrapy-redis
中去重是由Duplication Filter
组件来实现的,它通过redis
的set
不重复的特性,巧妙的实现了Duplication Filter
去重。scrapy-redis
调度器从引擎接受request
,将request
的指纹存⼊redis
的set
检查是否重复,并将不重复的request push
写⼊redis
的 request queue
。
引擎请求request(Spider发出的)
时,调度器从redis
的request queue
队列⾥里根据优先级pop
出⼀个request
返回给引擎,引擎将此request
发给spider
处理。
-
Item Pipeline
引擎将(Spider返回的)爬取到的Item给Item Pipeline
,scrapy-redis
的Item Pipeline
将爬取到的Item
存⼊redis
的items queue
。
修改后的Item Pipeline
可以很方便的根据key
从items queue
提取item
,从⽽实现items processes
集群。 -
Base Spider
不在使用scrapy原有的Spider类,重写的RedisSpider
继承了Spider
和RedisMixin
这两个类,RedisMixin
是用来从redis
读取url
的类。
当我们生成一个Spider
继承RedisSpider
时,调用setup_redis
函数,这个函数会去连接redis
数据库,然后会设置signals
(信号):
一个是当spider空闲时候的signal,会调用
spider_idle
函数,这个函数调用schedule_next_request
函数,保证spider
是一直活着的状态,并且抛出DontCloseSpider
异常。一个是当抓到一个item时的signal,会调用
item_scraped
函数,这个函数会调用schedule_next_request
函数,获取下一个request
。
scrapy-redis源码分析参考
scrapy-redis的源码并不多,工程的主体还是是redis和scrapy两个库,工程本身实现的东西不是很多,这个工程就像胶水一样,把这两个插件粘结了起来。下面我们来看看,scrapy-redis的每一个源代码文件都实现了什么功能,最后如何实现分布式的爬虫系统
connection.py
负责根据setting
中配置实例化redis
连接。被dupefilter
和scheduler
调用,总之涉及到redis
存取的都要使用到这个模块。
import six
from scrapy.utils.misc import load_object
from . import defaults
# 连接redis数据库
# Shortcut maps 'setting name' -> 'parmater name'.
SETTINGS_PARAMS_MAP = {
'REDIS_URL': 'url',
'REDIS_HOST': 'host',
'REDIS_PORT': 'port',
'REDIS_ENCODING': 'encoding',
}
def get_redis_from_settings(settings):
"""Returns a redis client instance from given Scrapy settings object.
This function uses ``get_client`` to instantiate the client and uses
``defaults.REDIS_PARAMS`` global as defaults values for the parameters. You
can override them using the ``REDIS_PARAMS`` setting.
Parameters
----------
settings : Settings
A scrapy settings object. See the supported settings below.
Returns
-------
server
Redis client instance.
Other Parameters
----------------
REDIS_URL : str, optional
Server connection URL.
REDIS_HOST : str, optional
Server host.
REDIS_PORT : str, optional
Server port.
REDIS_ENCODING : str, optional
Data encoding.
REDIS_PARAMS : dict, optional
Additional client parameters.
"""
params = defaults.REDIS_PARAMS.copy()
params.update(settings.getdict('REDIS_PARAMS'))
# XXX: Deprecate REDIS_* settings.
for source, dest in SETTINGS_PARAMS_MAP.items():
val = settings.get(source)
if val:
params[dest] = val
# Allow ``redis_cls`` to be a path to a class.
if isinstance(params.get('redis_cls'), six.string_types):
params['redis_cls'] = load_object(params['redis_cls'])
return get_redis(**params)
# Backwards compatible alias.
from_settings = get_redis_from_settings
def get_redis(**kwargs):
"""Returns a redis client instance.
Parameters
----------
redis_cls : class, optional
Defaults to ``redis.StrictRedis``.
url : str, optional
If given, ``redis_cls.from_url`` is used to instantiate the class.
**kwargs
Extra parameters to be passed to the ``redis_cls`` class.
Returns
-------
server
Redis client instance.
"""
redis_cls = kwargs.pop('redis_cls', defaults.REDIS_CLS)
url = kwargs.pop('url', None)
if url:
return redis_cls.from_url(url, **kwargs)
else:
return redis_cls(**kwargs)
defaults.py
scrapy-redis默认配置
import redis
# For standalone use.
DUPEFILTER_KEY = 'dupefilter:%(timestamp)s'
PIPELINE_KEY = '%(spider)s:items'
REDIS_CLS = redis.StrictRedis
REDIS_ENCODING = 'utf-8'
# Sane connection defaults.
# 套接字的超时时间、等待时间等
REDIS_PARAMS = {
'socket_timeout': 30,
'socket_connect_timeout': 30,
'retry_on_timeout': True,
'encoding': REDIS_ENCODING,
}
SCHEDULER_QUEUE_KEY = '%(spider)s:requests'
SCHEDULER_QUEUE_CLASS = 'scrapy_redis.queue.PriorityQueue'
SCHEDULER_DUPEFILTER_KEY = '%(spider)s:dupefilter'
SCHEDULER_DUPEFILTER_CLASS = 'scrapy_redis.dupefilter.RFPDupeFilter'
START_URLS_KEY = '%(name)s:start_urls'
START_URLS_AS_SET = False
dupefilter.py
负责执行requst
的去重,实现的很有技巧性,使用redis
的set
数据结构。但是注意scheduler
并不使用其中用于在这个模块中实现的dupefilter
键做request
的调度,而是使用queue.py
模块中实现的queue
。
当request
不重复时,将其存入到queue
中,调度时将其弹出。
import logging
import time
from scrapy.dupefilters import BaseDupeFilter
from scrapy.utils.request import request_fingerprint
from . import defaults
from .connection import get_redis_from_settings
logger = logging.getLogger(__name__)
# TODO: Rename class to RedisDupeFilter.
class RFPDupeFilter(BaseDupeFilter):
"""Redis-based request duplicates filter.
This class can also be used with default Scrapy's scheduler.
"""
logger = logger
def __init__(self, server, key, debug=False):
"""Initialize the duplicates filter.
Parameters
----------
server : redis.StrictRedis
The redis server instance.
key : str
Redis key Where to store fingerprints.
debug : bool, optional
Whether to log filtered requests.
"""
self.server = server
self.key = key
self.debug = debug
self.logdupes = True
@classmethod
def from_settings(cls, settings):
"""Returns an instance from given settings.
This uses by default the key ``dupefilter:<timestamp>``. When using the
``scrapy_redis.scheduler.Scheduler`` class, this method is not used as
it needs to pass the spider name in the key.
Parameters
----------
settings : scrapy.settings.Settings
Returns
-------
RFPDupeFilter
A RFPDupeFilter instance.
"""
server = get_redis_from_settings(settings)
# XXX: This creates one-time key. needed to support to use this
# class as standalone dupefilter with scrapy's default scheduler
# if scrapy passes spider on open() method this wouldn't be needed
# TODO: Use SCRAPY_JOB env as default and fallback to timestamp.
key = defaults.DUPEFILTER_KEY % {'timestamp': int(time.time())}
debug = settings.getbool('DUPEFILTER_DEBUG')
return cls(server, key=key, debug=debug)
@classmethod
def from_crawler(cls, crawler):
"""Returns instance from crawler.
Parameters
----------
crawler : scrapy.crawler.Crawler
Returns
-------
RFPDupeFilter
Instance of RFPDupeFilter.
"""
return cls.from_settings(crawler.settings)
def request_seen(self, request):
"""Returns True if request was already seen.
Parameters
----------
request : scrapy.http.Request
Returns
-------
bool
"""
fp = self.request_fingerprint(request)
# This returns the number of values added, zero if already exists.
added = self.server.sadd(self.key, fp)
return added == 0
def request_fingerprint(self, request):
"""Returns a fingerprint for a given request.
Parameters
----------
request : scrapy.http.Request
Returns
-------
str
"""
return request_fingerprint(request)
@classmethod
def from_spider(cls, spider):
settings = spider.settings
server = get_redis_from_settings(settings)
dupefilter_key = settings.get("SCHEDULER_DUPEFILTER_KEY", defaults.SCHEDULER_DUPEFILTER_KEY)
key = dupefilter_key % {'spider': spider.name}
debug = settings.getbool('DUPEFILTER_DEBUG')
return cls(server, key=key, debug=debug)
def close(self, reason=''):
"""Delete data on close. Called by Scrapy's scheduler.
Parameters
----------
reason : str, optional
"""
self.clear()
def clear(self):
"""Clears fingerprints data."""
self.server.delete(self.key)
def log(self, request, spider):
"""Logs given request.
Parameters
----------
request : scrapy.http.Request
spider : scrapy.spiders.Spider
"""
if self.debug:
msg = "Filtered duplicate request: %(request)s"
self.logger.debug(msg, {'request': request}, extra={'spider': spider})
elif self.logdupes:
msg = ("Filtered duplicate request %(request)s"
" - no more duplicates will be shown"
" (see DUPEFILTER_DEBUG to show all duplicates)")
self.logger.debug(msg, {'request': request}, extra={'spider': spider})
self.logdupes = False
这个文件看起来比较复杂,重写了scrapy
本身已经实现的request
判重功能。因为本身scrapy
单机跑的话,只需要读取内存中的request
队列或者持久化的request
队列,就能判断这次要发出的request url
是否已经请求过或者正在调度(本地读就行了)。而分布式跑的话,就需要各个主机上的scheduler
都连接同一个数据库的同一个request池
来判断这次的请求是否是重复的了。
在这个文件中,通过继承BaseDupeFilter
重写他的方法,实现了基于redis
的判重。根据源代码来看,scrapy-redis
使用了scrapy
本身的一个fingerprint
即request_fingerprint
,这个函数在前面去重原理中已经说过了.
这个类通过连接redis
,使用一个key
来向redis
的一个set
中插入fingerprint
(这个key
对于同一种spider
是相同的,redis
是一个key-value
的数据库,如果key
是相同的,访问到的值就是相同的,这里使用spider名字
+DupeFilter的key
就是为了在不同主机上的不同爬虫实例,只要属于同一种spider
,就会访问到同一个set
,而这个set
就是他们的url
判重池),如果返回值为0
,说明该set
中该fingerprint
已经存在(因为集合是没有重复值的),则返回False
,如果返回值为1
,说明添加了一个fingerprint
到set
中,则说明这个request
没有重复,于是返回True
,还顺便把新fingerprint
加入到数据库中了。 DupeFilter
判重会在scheduler
类中用到,每一个request
在进入调度之前都要进行判重,如果重复就不需要参加调度,直接舍弃就好了,不然就是白白浪费资源。
picklecompat.py
"""A pickle wrapper module with protocol=-1 by default."""
try:
import cPickle as pickle # PY2
except ImportError:
import pickle
def loads(s):
return pickle.loads(s)
def dumps(obj):
return pickle.dumps(obj, protocol=-1)
这里实现了loads
和dumps
两个函数,其实就是实现了一个序列化器。
因为redis
数据库不能存储复杂对象(key
部分只能是字符串,value
部分只能是字符串,字符串列表,字符串集合和hash),所以我们存啥都要先串行化成文本才行。
这里使用的就是python
的pickle
模块,一个兼容py2和py3的串行化工具。这个serializer
主要用于一会的scheduler
存reuqest
对象。
pipelines.py
这是是用来实现分布式处理的作用。它将Item
存储在redis
中以实现分布式处理。由于在这里需要读取配置,所以就用到了from_crawler()
函数。
from scrapy.utils.misc import load_object
from scrapy.utils.serialize import ScrapyJSONEncoder
from twisted.internet.threads import deferToThread
from . import connection, defaults
default_serialize = ScrapyJSONEncoder().encode
class RedisPipeline(object):
"""Pushes serialized item into a redis list/queue
Settings
--------
REDIS_ITEMS_KEY : str
Redis key where to store items.
REDIS_ITEMS_SERIALIZER : str
Object path to serializer function.
"""
def __init__(self, server,
key=defaults.PIPELINE_KEY,
serialize_func=default_serialize):
"""Initialize pipeline.
Parameters
----------
server : StrictRedis
Redis client instance.
key : str
Redis key where to store items.
serialize_func : callable
Items serializer function.
"""
self.server = server
self.key = key
self.serialize = serialize_func
@classmethod
def from_settings(cls, settings):
params = {
'server': connection.from_settings(settings),
}
if settings.get('REDIS_ITEMS_KEY'):
params['key'] = settings['REDIS_ITEMS_KEY']
if settings.get('REDIS_ITEMS_SERIALIZER'):
params['serialize_func'] = load_object(
settings['REDIS_ITEMS_SERIALIZER']
)
return cls(**params)
@classmethod
def from_crawler(cls, crawler):
return cls.from_settings(crawler.settings)
def process_item(self, item, spider):
return deferToThread(self._process_item, item, spider)
def _process_item(self, item, spider):
key = self.item_key(item, spider)
data = self.serialize(item)
self.server.rpush(key, data)
return item
def item_key(self, item, spider):
"""Returns redis key based on given spider.
Override this function to use a different key depending on the item
and/or spider.
"""
return self.key % {'spider': spider.name}
pipelines
文件实现了一个item pipieline
类,和scrapy
的item pipeline
是同一个对象,通过从settings
中拿到我们配置的REDIS_ITEMS_KEY
作为key
,把item
串行化之后存入redis
数据库对应的value
中(这个value
可以看出出是个list
,我们的每个item
是这个list
中的一个结点),这个pipeline
把提取出的item
存起来,主要是为了方便我们后续处理数据。(集中处理放在同一台服务器,还是各自保存各自的)
queue.py
该文件实现了几个容器类,这些容器与redis进行交互,在交互时,会对request请求
进行编码和解码操作(序列化和反序列化)
from scrapy.utils.reqser import request_to_dict, request_from_dict
from . import picklecompat
class Base(object):
"""Per-spider base queue class"""
def __init__(self, server, spider, key, serializer=None):
"""Initialize per-spider redis queue.
Parameters
----------
server : StrictRedis
Redis client instance.
spider : Spider
Scrapy spider instance.
key: str
Redis key where to put and get messages.
serializer : object
Serializer object with ``loads`` and ``dumps`` methods.
"""
if serializer is None:
# Backward compatibility.
# TODO: deprecate pickle.
serializer = picklecompat
if not hasattr(serializer, 'loads'):
raise TypeError("serializer does not implement 'loads' function: %r"
% serializer)
if not hasattr(serializer, 'dumps'):
raise TypeError("serializer '%s' does not implement 'dumps' function: %r"
% serializer)
self.server = server
self.spider = spider
self.key = key % {'spider': spider.name}
self.serializer = serializer
def _encode_request(self, request):
"""Encode a request object"""
obj = request_to_dict(request, self.spider)
return self.serializer.dumps(obj)
def _decode_request(self, encoded_request):
"""Decode an request previously encoded"""
obj = self.serializer.loads(encoded_request)
return request_from_dict(obj, self.spider)
def __len__(self):
"""Return the length of the queue"""
raise NotImplementedError
def push(self, request):
"""Push a request"""
raise NotImplementedError
def pop(self, timeout=0):
"""Pop a request"""
raise NotImplementedError
def clear(self):
"""Clear queue/stack"""
self.server.delete(self.key)
# 先进先出, 队列
class FifoQueue(Base):
"""Per-spider FIFO queue"""
def __len__(self):
"""Return the length of the queue"""
return self.server.llen(self.key)
# 压头, 出尾
def push(self, request):
"""Push a request"""
self.server.lpush(self.key, self._encode_request(request))
def pop(self, timeout=0):
"""Pop a request"""
if timeout > 0:
data = self.server.brpop(self.key, timeout)
if isinstance(data, tuple):
data = data[1]
else:
data = self.server.rpop(self.key)
if data:
return self._decode_request(data)
# 有序队列
class PriorityQueue(Base):
"""Per-spider priority queue abstraction using redis' sorted set"""
def __len__(self):
"""Return the length of the queue"""
return self.server.zcard(self.key)
def push(self, request):
"""Push a request"""
data = self._encode_request(request)
score = -request.priority
# We don't use zadd method as the order of arguments change depending on
# whether the class is Redis or StrictRedis, and the option of using
# kwargs only accepts strings, not bytes.
self.server.execute_command('ZADD', self.key, score, data)
def pop(self, timeout=0):
"""
Pop a request
timeout not support in this queue class
"""
# use atomic range/remove using multi/exec
pipe = self.server.pipeline()
pipe.multi()
pipe.zrange(self.key, 0, 0).zremrangebyrank(self.key, 0, 0)
results, count = pipe.execute()
if results:
return self._decode_request(results[0])
# 后进先出 栈
class LifoQueue(Base):
"""Per-spider LIFO queue."""
def __len__(self):
"""Return the length of the stack"""
return self.server.llen(self.key)
# 压头, 出头
def push(self, request):
"""Push a request"""
self.server.lpush(self.key, self._encode_request(request))
def pop(self, timeout=0):
"""Pop a request"""
if timeout > 0:
data = self.server.blpop(self.key, timeout)
if isinstance(data, tuple):
data = data[1]
else:
data = self.server.lpop(self.key)
if data:
return self._decode_request(data)
# TODO: Deprecate the use of these names.
SpiderQueue = FifoQueue
SpiderStack = LifoQueue
SpiderPriorityQueue = PriorityQueue
scheduler.py
此扩展是对scrapy
中自带的scheduler
的替代(在settings
的SCHEDULER
变量中指出),正是利用此扩展实现crawler
的分布式调度。其利用的数据结构来自于queue
中实现的数据结构。
scrapy-redi
s所实现的两种分布式:爬虫分布式
以及item处理分布式
就是由模块scheduler
和模块pipelines
实现。上述其它模块作为为二者辅助的功能模块
import importlib
import six
from scrapy.utils.misc import load_object
from . import connection, defaults
# TODO: add SCRAPY_JOB support.
class Scheduler(object):
"""Redis-based scheduler
Settings
--------
SCHEDULER_PERSIST : bool (default: False)
Whether to persist or clear redis queue.
SCHEDULER_FLUSH_ON_START : bool (default: False)
Whether to flush redis queue on start.
SCHEDULER_IDLE_BEFORE_CLOSE : int (default: 0)
How many seconds to wait before closing if no message is received.
SCHEDULER_QUEUE_KEY : str
Scheduler redis key.
SCHEDULER_QUEUE_CLASS : str
Scheduler queue class.
SCHEDULER_DUPEFILTER_KEY : str
Scheduler dupefilter redis key.
SCHEDULER_DUPEFILTER_CLASS : str
Scheduler dupefilter class.
SCHEDULER_SERIALIZER : str
Scheduler serializer.
"""
def __init__(self, server,
persist=False,
flush_on_start=False,
queue_key=defaults.SCHEDULER_QUEUE_KEY,
queue_cls=defaults.SCHEDULER_QUEUE_CLASS,
dupefilter_key=defaults.SCHEDULER_DUPEFILTER_KEY,
dupefilter_cls=defaults.SCHEDULER_DUPEFILTER_CLASS,
idle_before_close=0,
serializer=None):
"""Initialize scheduler.
Parameters
----------
server : Redis
The redis server instance.
persist : bool
Whether to flush requests when closing. Default is False.
flush_on_start : bool
Whether to flush requests on start. Default is False.
queue_key : str
Requests queue key.
queue_cls : str
Importable path to the queue class.
dupefilter_key : str
Duplicates filter key.
dupefilter_cls : str
Importable path to the dupefilter class.
idle_before_close : int
Timeout before giving up.
"""
if idle_before_close < 0:
raise TypeError("idle_before_close cannot be negative")
self.server = server
self.persist = persist
self.flush_on_start = flush_on_start
self.queue_key = queue_key
self.queue_cls = queue_cls
self.dupefilter_cls = dupefilter_cls
self.dupefilter_key = dupefilter_key
self.idle_before_close = idle_before_close
self.serializer = serializer
self.stats = None
def __len__(self):
return len(self.queue)
@classmethod
def from_settings(cls, settings):
kwargs = {
'persist': settings.getbool('SCHEDULER_PERSIST'),
'flush_on_start': settings.getbool('SCHEDULER_FLUSH_ON_START'),
'idle_before_close': settings.getint('SCHEDULER_IDLE_BEFORE_CLOSE'),
}
# If these values are missing, it means we want to use the defaults.
optional = {
# TODO: Use custom prefixes for this settings to note that are
# specific to scrapy-redis.
'queue_key': 'SCHEDULER_QUEUE_KEY',
'queue_cls': 'SCHEDULER_QUEUE_CLASS',
'dupefilter_key': 'SCHEDULER_DUPEFILTER_KEY',
# We use the default setting name to keep compatibility.
'dupefilter_cls': 'DUPEFILTER_CLASS',
'serializer': 'SCHEDULER_SERIALIZER',
}
for name, setting_name in optional.items():
val = settings.get(setting_name)
if val:
kwargs[name] = val
# Support serializer as a path to a module.
if isinstance(kwargs.get('serializer'), six.string_types):
kwargs['serializer'] = importlib.import_module(kwargs['serializer'])
server = connection.from_settings(settings)
# Ensure the connection is working.
server.ping()
return cls(server=server, **kwargs)
@classmethod
def from_crawler(cls, crawler):
instance = cls.from_settings(crawler.settings)
# FIXME: for now, stats are only supported from this constructor
instance.stats = crawler.stats
return instance
def open(self, spider):
self.spider = spider
try:
self.queue = load_object(self.queue_cls)(
server=self.server,
spider=spider,
key=self.queue_key % {'spider': spider.name},
serializer=self.serializer,
)
except TypeError as e:
raise ValueError("Failed to instantiate queue class '%s': %s",
self.queue_cls, e)
self.df = load_object(self.dupefilter_cls).from_spider(spider)
if self.flush_on_start:
self.flush()
# notice if there are requests already in the queue to resume the crawl
if len(self.queue):
spider.log("Resuming crawl (%d requests scheduled)" % len(self.queue))
def close(self, reason):
if not self.persist:
self.flush()
def flush(self):
self.df.clear()
self.queue.clear()
def enqueue_request(self, request):
if not request.dont_filter and self.df.request_seen(request):
self.df.log(request, self.spider)
return False
if self.stats:
self.stats.inc_value('scheduler/enqueued/redis', spider=self.spider)
self.queue.push(request)
return True
def next_request(self):
block_pop_timeout = self.idle_before_close
request = self.queue.pop(block_pop_timeout)
if request and self.stats:
self.stats.inc_value('scheduler/dequeued/redis', spider=self.spider)
return request
def has_pending_requests(self):
return len(self) > 0
这个文件重写了scheduler
类,用来代替scrapy.core.scheduler
的原有调度器。其实对原有调度器的逻辑没有很大的改变,主要是使用了redis
作为数据存储的媒介,以达到各个爬虫之间的统一调度。scheduler
负责调度各个spider
的request请求
,scheduler
初始化时,通过settings
文件读取queue
和dupefilters
的类型(一般就用上边默认的),配置queue
和dupefilters
使用的key
(一般就是spider name
加上queue
或者dupefilters
,这样对于同一种spider
的不同实例,就会使用相同的数据块了)。每当一个request
要被调度时,enqueue_request
被调用,scheduler
使用dupefilters
来判断这个url
是否重复,如果不重复,就添加到queue
的容器中(先进先出,先进后出和优先级都可以,可以在settings
中配置)。当调度完成时,next_request
被调用,scheduler
就通过queue
容器的接口,取出一个request
,把他发送给相应的spider
,让spider
进行爬取工作。
spiders.py
设计的这个spider从redis中读取要爬的url,然后执行爬取,若爬取过程中返回更多的url,那么继续进行直至所有的request完成。之后继续从redis中读取url,循环这个过程。
分析:在这个spider中通过signals.spider_idle
(空闲)信号实现对crawler
状态的监视。当idle时,返回新的make_requests_from_url(url)
给引擎,进而交给调度器调度。
from scrapy import signals
from scrapy.exceptions import DontCloseSpider
from scrapy.spiders import Spider, CrawlSpider
from . import connection, defaults
from .utils import bytes_to_str
class RedisMixin(object):
"""Mixin class to implement reading urls from a redis queue."""
redis_key = None
redis_batch_size = None
redis_encoding = None
# Redis client placeholder.
server = None
def start_requests(self):
"""Returns a batch of start requests from redis."""
return self.next_requests()
def setup_redis(self, crawler=None):
"""Setup redis connection and idle signal.
This should be called after the spider has set its crawler object.
"""
if self.server is not None:
return
if crawler is None:
# We allow optional crawler argument to keep backwards
# compatibility.
# XXX: Raise a deprecation warning.
crawler = getattr(self, 'crawler', None)
if crawler is None:
raise ValueError("crawler is required")
settings = crawler.settings
if self.redis_key is None:
self.redis_key = settings.get(
'REDIS_START_URLS_KEY', defaults.START_URLS_KEY,
)
self.redis_key = self.redis_key % {'name': self.name}
if not self.redis_key.strip():
raise ValueError("redis_key must not be empty")
if self.redis_batch_size is None:
# TODO: Deprecate this setting (REDIS_START_URLS_BATCH_SIZE).
self.redis_batch_size = settings.getint(
'REDIS_START_URLS_BATCH_SIZE',
settings.getint('CONCURRENT_REQUESTS'),
)
try:
self.redis_batch_size = int(self.redis_batch_size)
except (TypeError, ValueError):
raise ValueError("redis_batch_size must be an integer")
if self.redis_encoding is None:
self.redis_encoding = settings.get('REDIS_ENCODING', defaults.REDIS_ENCODING)
self.logger.info("Reading start URLs from redis key '%(redis_key)s' "
"(batch size: %(redis_batch_size)s, encoding: %(redis_encoding)s",
self.__dict__)
self.server = connection.from_settings(crawler.settings)
# The idle signal is called when the spider has no requests left,
# that's when we will schedule new requests from redis queue
crawler.signals.connect(self.spider_idle, signal=signals.spider_idle)
def next_requests(self):
"""Returns a request to be scheduled or none."""
use_set = self.settings.getbool('REDIS_START_URLS_AS_SET', defaults.START_URLS_AS_SET)
fetch_one = self.server.spop if use_set else self.server.lpop
# XXX: Do we need to use a timeout here?
found = 0
# TODO: Use redis pipeline execution.
while found < self.redis_batch_size:
data = fetch_one(self.redis_key)
if not data:
# Queue empty.
break
req = self.make_request_from_data(data)
if req:
yield req
found += 1
else:
self.logger.debug("Request not made from data: %r", data)
if found:
self.logger.debug("Read %s requests from '%s'", found, self.redis_key)
def make_request_from_data(self, data):
"""Returns a Request instance from data coming from Redis.
By default, ``data`` is an encoded URL. You can override this method to
provide your own message decoding.
Parameters
----------
data : bytes
Message from redis.
"""
url = bytes_to_str(data, self.redis_encoding)
return self.make_requests_from_url(url)
def schedule_next_requests(self):
"""Schedules a request if available"""
# TODO: While there is capacity, schedule a batch of redis requests.
for req in self.next_requests():
self.crawler.engine.crawl(req, spider=self)
def spider_idle(self):
"""Schedules a request if available, otherwise waits."""
# XXX: Handle a sentinel to close the spider.
self.schedule_next_requests()
raise DontCloseSpider
class RedisSpider(RedisMixin, Spider):
"""Spider that reads urls from redis queue when idle.
Attributes
----------
redis_key : str (default: REDIS_START_URLS_KEY)
Redis key where to fetch start URLs from..
redis_batch_size : int (default: CONCURRENT_REQUESTS)
Number of messages to fetch from redis on each attempt.
redis_encoding : str (default: REDIS_ENCODING)
Encoding to use when decoding messages from redis queue.
Settings
--------
REDIS_START_URLS_KEY : str (default: "<spider.name>:start_urls")
Default Redis key where to fetch start URLs from..
REDIS_START_URLS_BATCH_SIZE : int (deprecated by CONCURRENT_REQUESTS)
Default number of messages to fetch from redis on each attempt.
REDIS_START_URLS_AS_SET : bool (default: False)
Use SET operations to retrieve messages from the redis queue. If False,
the messages are retrieve using the LPOP command.
REDIS_ENCODING : str (default: "utf-8")
Default encoding to use when decoding messages from redis queue.
"""
@classmethod
def from_crawler(self, crawler, *args, **kwargs):
obj = super(RedisSpider, self).from_crawler(crawler, *args, **kwargs)
obj.setup_redis(crawler)
return obj
class RedisCrawlSpider(RedisMixin, CrawlSpider):
"""Spider that reads urls from redis queue when idle.
Attributes
----------
redis_key : str (default: REDIS_START_URLS_KEY)
Redis key where to fetch start URLs from..
redis_batch_size : int (default: CONCURRENT_REQUESTS)
Number of messages to fetch from redis on each attempt.
redis_encoding : str (default: REDIS_ENCODING)
Encoding to use when decoding messages from redis queue.
Settings
--------
REDIS_START_URLS_KEY : str (default: "<spider.name>:start_urls")
Default Redis key where to fetch start URLs from..
REDIS_START_URLS_BATCH_SIZE : int (deprecated by CONCURRENT_REQUESTS)
Default number of messages to fetch from redis on each attempt.
REDIS_START_URLS_AS_SET : bool (default: True)
Use SET operations to retrieve messages from the redis queue.
REDIS_ENCODING : str (default: "utf-8")
Default encoding to use when decoding messages from redis queue.
"""
@classmethod
def from_crawler(self, crawler, *args, **kwargs):
obj = super(RedisCrawlSpider, self).from_crawler(crawler, *args, **kwargs)
obj.setup_redis(crawler)
return obj
spider
的改动也不是很大,主要是通过connect
接口,给spider
绑定了spider_idle
信号,spider
初始化时,通过setup_redis
函数初始化和redis
的连接,之后通过next_requests
函数从redis
中取出strat url
,使用的key
是settings
中REDIS_START_URLS_AS_SET
定义的(注意了这里的初始化url
池和我们上边的queue
的url
池不是一个东西,queue
的池是用于调度的,初始化url
池是存放入口url
的,他们都存在redis
中,但是使用不同的key
来区分,就当成是不同的表吧),spider
使用少量的start url
,可以发展出很多新的url
,这些url
会进入scheduler
进行判重和调度。直到spider
跑到调度池内没有url
的时候,会触发spider_idle
信号,从而触发spider
的next_requests
函数,再次从redis
的start url
池中读取一些url
。
utils.py
py2和py3字符串兼容
import six
def bytes_to_str(s, encoding='utf-8'):
"""Returns a str if a bytes object is given."""
if six.PY3 and isinstance(s, bytes):
return s.decode(encoding)
return s
总结
这个工程通过重写scheduler
和spider
类,实现了调度
、spider启动
和redis的交互
。实现新的dupefilter
和queue
类,达到了判重
和调度容器
和redis的交互
,因为每个主机上的爬虫进程都访问同一个redis
数据库,所以调度和判重都统一进行统一管理,达到了分布式爬虫的目的。 当spider
被初始化时,同时会初始化一个对应的scheduler
对象,这个调度器对象通过读取settings
,配置好自己的调度容器queue
和判重工具dupefilter
。每当一个spider
产出一个request
的时候,scrapy
内核会把这个reuqest
递交给这个spider
对应的scheduler
对象进行调度,scheduler
对象通过访问redis
对request
进行判重,如果不重复就把他添加进redis
中的调度池。当调度条件满足时,scheduler
对象就从redis
的调度池中取出一个request
发送给spider
,让他爬取。当spider
爬取的所有暂时可用url
之后,scheduler
发现这个spider
对应的redis
的调度池空了,于是触发信号spider_idle
,spider
收到这个信号之后,直接连接redis
读取strart url
池,拿去新的一批url
入口,然后再次重复上边的工作。
Scrapy-Redis调度的任务是Request对象,里面信息量比较大(不仅包含url,还有callback函数、headers等信息),可能导致的结果就是会降低爬虫速度、而且会占用Redis大量的存储空间
,所以如果要保证效率,那么就需要一定硬件水平,尤其是主机。
Bloom Filter
https://piaosanlang.gitbooks.io/spiders/09day/section9.1.html
https://pypi.org/project/pybloomfiltermmap3/#description
https://pypi.org/project/pybloom_live
scrapy-redis去重
scrapy_redis
是利用set
数据结构来去重的,去重的对象是request
的fingerprint
。
去重原理说过了.
def request_seen(self, request):
fp = self.request_fingerprint(request)
# This returns the number of values added, zero if already exists.
added = self.server.sadd(self.key, fp)
return added == 0
如果要使用Bloomfilter优化,可以修改去重函数request_seen
def request_seen(self, request):
fp = self.request_fingerprint(request)
if self.bf.isContains(fp): # 如果已经存在
return True
else:
self.bf.insert(fp)
return False
self.bf
是类Bloomfilter()
的实例化
# encoding=utf-8
import redis
from hashlib import md5
class SimpleHash(object):
def __init__(self, cap, seed):
self.cap = cap
self.seed = seed
def hash(self, value):
ret = 0
for i in range(len(value)):
ret += self.seed * ret + ord(value[i])
return (self.cap - 1) & ret
class BloomFilter(object):
def __init__(self, host='localhost', port=6379, db=0, blockNum=1, key='bloomfilter'):
"""
:param host: the host of Redis
:param port: the port of Redis
:param db: witch db in Redis
:param blockNum: one blockNum for about 90,000,000; if you have more strings for filtering, increase it.
:param key: the key's name in Redis
"""
self.server = redis.Redis(host=host, port=port, db=db)
self.bit_size = 1 << 31 # Redis的String类型最大容量为512M,现使用256M= 2^8 *2^20 字节 = 2^28 * 2^3bit
self.seeds = [5, 7, 11, 13, 31, 37, 61]
self.key = key
self.blockNum = blockNum
self.hashfunc = []
for seed in self.seeds:
self.hashfunc.append(SimpleHash(self.bit_size, seed))
def isContains(self, str_input):
if not str_input:
return False
m5 = md5()
m5.update(str_input)
str_input = m5.hexdigest()
ret = True
name = self.key + str(int(str_input[0:2], 16) % self.blockNum)
for f in self.hashfunc:
loc = f.hash(str_input)
ret = ret & self.server.getbit(name, loc)
return ret
def insert(self, str_input):
m5 = md5()
m5.update(str_input)
str_input = m5.hexdigest()
name = self.key + str(int(str_input[0:2], 16) % self.blockNum)
for f in self.hashfunc:
loc = f.hash(str_input)
self.server.setbit(name, loc, 1)
if __name__ == '__main__':
""" 第一次运行时会显示 not exists!,之后再运行会显示 exists! """
bf = BloomFilter()
if bf.isContains('http://www.baidu.com'): # 判断字符串是否存在
print 'exists!'
else:
print 'not exists!'
bf.insert('http://www.baidu.com')
基于Redis
的Bloomfilter
去重,既用上了Bloomfilter
的海量去重能力,又用上了Redis
的可持久化能力,基于Redis
也方便分布式机器的去重
scrapyd部署scrapy
https://github.com/scrapy/scrapyd
扩展
如何防止死循环
在Scrapy的默认配置中,是根据url进行去重的。这个对付一般网站是够的。但是有一些网站的SEO做的很变态:为了让爬虫多抓,会根据request,动态的生成一些链接,导致爬虫 在网站上抓取大量的随机页面,甚至是死循环。。
为了解决这个问题,有2个方案:
(1) 在setting.py中,设定爬虫的嵌套次数上限(全局设定,实际是通过DepthMiddleware实现的):
DEPTH_LIMIT = 20
(2) 在parse中通过读取response来自行判断(spider级别设定) :
def parse(self, response):
if response.meta['depth'] > 100:
print ('Loop?')