python爬虫之celery分布式任务(踩坑)

2019-11-27  本文已影响0人  嗨_小罗哥

一. celery和RabbitMQ简单介绍

二.分布式任务

(1)项目结构
项目结构
from __future__ import absolute_import
from celery import Celery
app = Celery(include=['tasks'])
app.config_from_object('celeryconfig')
if __name__ == '__main__':
    app.start()

# -*- coding: UTF-8 -*-
from __future__ import absolute_import, unicode_literals
from datetime import timedelta
from kombu import Queue, Exchange
from celery.schedules import crontab
BROKER_URL='amqp://guest:guest@localhost:5672//'
# CELERY_RESULT_BACKEND = 'redis://localhost:6379/0' # 把任务结果存在了Redis
#默认celery与broker的连接池连接数
BROKER_POOL_LIMIT = 10

CELERY_ACKS_LATE = True
CELERY_IGNORE_RESULT = True
CELERY_DISABLE_RATE_LIMITS = True
BROKER_TRANSPORT_OPTIONS = {'visibility_timeout': 86400}
WORKER_MAX_MEMORY_PER_CHILD = 600
CELERYD_MAX_TASKS_PER_CHILD = 1
CELERY_TASK_SERIALIZER = 'json'
#CELERY_RESULT_SERIALIZER = 'json'
CELERY_ACCEPT_CONTENT = ['json']
CELERY_ENABLE_UTC = False
CELERY_TIMEZONE = 'Asia/Shanghai'
TIME_ZONE = 'Asia/Shanghai'
# 配置队列
CELERY_QUEUES = {
        Queue('default', Exchange('default'),routing_key='default'),
        Queue('spider_001', Exchange('spider_001'), routing_key='spider_001'),
        Queue('spider_002', Exchange('spider_002'), routing_key='spider_002'),
        Queue('spider_003', Exchange('spider_003'), routing_key='spider_003'),
}
#队列路由
CELERY_ROUTES = {
    'tasks.daily_spider_001': {'queue': 'spider_001', 'routing_key': 'spider_001'},
    'tasks.daily_spider_002': {'queue': 'spider_002', 'routing_key': 'spider_002'},
    'tasks.daily_spider_003': {'queue': 'spider_003', 'routing_key': 'spider_003'}
}

# 调度任务/定时任务
CELERYBEAT_SCHEDULE = {
    'daily_spider_001': {
        'task': 'tasks.daily_spider_001',
        'schedule': timedelta(seconds=10), #每10秒执行一次
        # 'args': (16, 16)
    },
    'daily_spider_002': {
        'task': 'tasks.daily_spider_002',
        'schedule': timedelta(seconds=11), #每11秒执行一次
    },
    'daily_spider_003': {
        'task': 'tasks.daily_spider_003',
        'schedule': timedelta(seconds=12), #每11秒执行一次
    },
}
from __future__ import absolute_import
from celery_test import app

@app.task
def daily_spider_001():
    return 1 + 2


@app.task
def daily_spider_002():
    return 2 + 2


@app.task
def daily_spider_003():
    return 3 + 2

程序启动:

celery beat -A celery_test
beat启动效果图
celery -A celery_test worker -l info
任务启动
celery -B -A celery_test worker -l info
上一篇 下一篇

猜你喜欢

热点阅读