使用Docke配置Rabbitmq及Celery的使用

2019-08-04  本文已影响0人  越大大雨天

消息队列选择

这里对之前celery异步任务的使用做个总结,在生产环境使用celery时,最好选择rabbitmq作为消息队列更为稳定,测试时也可以使用redis,简单快捷。

RabbitMQ架构

Rabbitmq架构示意图

基于AMQP协议,使用Channel进行连接和数据传输

使用docker安装rabbitmq

rabbitmq的正产安装比较麻烦,所以这里选择使用docker进行安装。

sudo docker pull rabbitmq:management

如果速度很慢,可以去阿里云:https://cr.console.aliyun.com/
免费注册并获取镜像加速器,配置方法很简单,官方有说明。

sudo docker run -d -p 5672:5672 -p 15672:15672 --name rabbitmq rabbitmq:management
  1. 启动可视化管理界面:sudo rabbitmq-plugins enable rabbitmq_management,使用http://server-ip:15672访问界面
  2. 创建普通用户:sudo rabbitmqctl add_user [username] [password]
  3. 设置用户为管理员:sudo rabbitmqctl set_user_tags [username] administrator
  4. 用户添加管理权限:sudo rabbitmqctl set_permissions -p /[username] '.''.''.*'
  1. 查看用户:sudo rabbitmqctl list_users
  2. 查看队列:sudo rabbitmqctl list_queues
  3. 查看交换机:sudo rabbitmqctl list_exchanges
  4. 查看连接:sudo rabbitmqctl list_connections
  5. 查看消费者:sudo rabbitmqctl list_consumers

Celery工作流

Celery工作流

创建Celery实例

在tasks.py文件中创建Celery实例,broker为中间人,用来存放任务,可使用rabbitmqredis;backend用于储存任务执行的结果,这里也可不指定。

from celery import Celery
imort time
# 实例化Celery对象,指定rabbitmq
app = Celery("task1", broker="amqp://",backend="rpc://")
@app.task
def add(x, y):
    print("计算2个值的和: %s %s" % (x, y))
    return x+y

@app.task
def multi(x,y):
    #模拟阻塞任务
    time.sleep(3)
    print("发送短信成功")
    return x * y
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'dailyfresh.settings')

启动celery worker监听

若不需要创建日志文件时,按如下启动worker监听:
切换到task1所在目录,使用命令开启监听:task1为创建实例时指定的名字

celery -A task1 worker --loglevel=info 

配置日志文件

若需要独立的日志文件记录Celery运行状态,需要使用celery自带的日志记录器。

from celery.utils.log import get_task_logger
logger = get_task_logger('mycelery')

logger.info('Refresh task start and refresh success')

并在启动celery worker的时候指定日志存放位置,不指定的情况将会默认生成在/var/lib/docker/目录。

celery -A task worker --loglevel=info -f [存放日志的绝对路径]

调用任务,异步执行

在其他地方导入celery实例,调用任务,该任务将以非阻塞的方式异步执行。

from tasks import add, multi
print('开始')
add.delay(1,2)
multi.delay(4,4)
print('结束')

你可以发现,celery自动在后台完成了任务,原本会阻塞主程序的任务在后台以异步的方式完成了。

定时任务

在celery4版本中,定时任务可以像普通任务一样单独定义,这里需要用到@app.on_after_configure.connect装饰器,详细情况有兴趣可以单独查找,网上资料很多,个人使用不多,因为在某些场景,celery的定时任务不能动态添加。对于定时任务更倾向于使用APScheduler,在框架中使用更方便简单。

使用场景

你能想到在某些特殊场景下,这种任务队列的机制是很有用的。比如邮件、短信发送等任务,需要调用第三方进行,而这些任务的耗时情况使我们不能控制的,为了不阻塞主程序,可以直接使用Celery+Rabbitmq或者Redis,将这些任务直接扔到消息队列中,让他们以异步的方式自动调用执行。

最后

这里这是对celery的最基本实用做了介绍,需要了解更多功能可以查看更多其他资料,比如任务重试、异常处理、返回结果获取等。需要注意的是,如果修改了任务内容,需要重启worker监听和IDE再执行才能生效。

上一篇下一篇

猜你喜欢

热点阅读