Celery部署爬虫(一)

2019-12-30  本文已影响0人  鬼子口音
celery

Celery - 分布式任务队列

用官方文档的原话说 ,Celery是一个简单,灵活,可靠的分布式系统,用于处理大量消息,同时为操作提供维护此类系统所需的工具。

它是一个任务队列,专注于实时处理,同时还支持任务调度。

Celery是用Python编写的,但协议可以用任何语言实现。

除了 Python 之外,还有 Node.js 和 PHP 客户端。

生产者消费者模式

Celery的架构

Celery的架构由三部分组成,消息中间件(message broker),任务执行单元(worker)和任务执行结果存储(task result store)组成。

消息中间件

celery消息中间件,也就是所谓的中间人,官方支持的两种稳定的消息队列数据库,一个是 RabbitMQ,另一个 Redis。当然选用其他数据库也是可行的,比如, MongoDB 等,用的比较多的当然就是高性能Redis数据库啦,当然MQ也同样强大。 Worker 进程会持续监视队列中是否有需要处理的新任务(如果有就消费,没有则持续监听队列)。

任务执行单元

Worker 是 Celery 提供的任务执行的单元,Worker 并发的运行在分布式的系统节点中,也就是充当了任务工人的角色(消费者),用于系统调度。

在开启中间消息队列之后,任务单元会监听消息队列并从中间件里消费任务,执行任务单元,将结果存入后端数据库。

任务结果存储

Celery支持以不同方式存储任务的结果,后端存储包括 Redis,MongoDB,Mysql 等等。

然后就该配置一些环境依赖了

pip install celery
pip install redis
ubuntu 建议用4.0以上版本 redis>=3.2  否则报错
windows 使用3.x  (其实使用4.0 和 低版本的redis也可以的 但是不建议) 当然3.x 对应低版本的redis==2.10.6

接下来就需要测试一下Celery能否正常工作,运行一个简单的爬虫任务感受一下。

# task.py
from celery import Celery
import requests
from lxml import etree
import pymongo
app = Celery('tasks', broker='redis://localhost:6379/2')
client = pymongo.MongoClient('localhost',27017)
db = client['baike']
@app.task
def get_url(link):
    item = {}
    headers = {'User-Agent':'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_2) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/34.0.1847.131 Safari/537.36'}
    res = requests.get(link,headers=headers)
    res.encoding = 'UTF-8'
    doc = etree.HTML(res.text)
    content = doc.xpath("//div[@class='lemma-summary']/div[@class='para']//text()")
    print(res.status_code)
    print(link,'\t','++++++++++++++++++++')
    item['link'] = link
    data = ''.join(content).replace(' ', '').replace('\t', '').replace('\n', '').replace('\r', '')
    item['data'] = data
    if db['Baike'].insert(dict(item)):
        print("OK ...")
    else:
        print('......')

在 app 的实例中,使用redis数据库作为中间消息队列,mongodb 数据库作为后端存储。并用 app.task 声明一个任务函数。

在终端键入

celery -A task worker -l info -P gevent -c 10

监听 redis 消息队列,-A 参数表示的是 Celery 的名称,这里就是 task.py, worker 是一个执行任务角色,-l 是日志等级,-P 指定并发的方法。

这里使用 gevent 并发10个线程,-c 表示并发个数。

注意:windows下gevent用法不兼容,报错。

开启后的界面是这样的

image

调用任务

# run_task.py
from task import get_url
from urls import url_list

def delay(url):
    result = get_url.delay(url)
    return result

def run():
    with open('./url.txt', 'r') as f:
        for url in f.readlines():
            delay(url.strip('\n'))

if __name__ == '__main__':
    run()

任务调度的日志信息会在终端打印出来,说明确实有 worker 得到系统的调度,并从消息队列消费任务,查看 redis 数据库发现已经有了缓存信息。

总之,这一篇先提前体验了一把celery是如何工作的
然而,更详细的,还在后面

欢迎转载,但要声明出处,不然我顺着网线过去就是一拳。
个人技术博客:http://www.gzky.live

上一篇下一篇

猜你喜欢

热点阅读