Python

python3.7 中使用django-celery 完成异步任

2019-02-11  本文已影响386人  finlu

本文是在学习了慕课网中 “Python 异步任务队列 Celery 使用”课程后记录下来的笔记。由于笔者使用的python版本是python 3.7,所以直接需要在老师的代码基础上进行一定的修改,才能适配当前的环境。

环境:
python: python3.7
django: 2.1.5
celery: 4.2.0
django-celery: 3.2.2
flower: 0.9.2
kombu: 4.3.0
tornado: 5.1.1

什么是 Celery ?

使用场景

安装配置

python 虚拟环境管理工具

  1. virtualenv & virtualenvwrapper

  2. pyenv

  3. pipenv

  4. venv

使用 Celery

  1. 配置 Celery

    broker = 'redis://localhost:6379/1'
    backend = 'redis://localhost:6379/2'
    app = Celery('my_task', broker=broker, backend=backend)
    
  2. 注册任务

    @app.task   # 将 `add` 方法变成异步
    def add(x, y):
        print('enter call function ...')
        time.sleep(5)
        return x + y
    
  3. Celery 提交任务

    • windows 64bit用户:在任务脚本下添加以下代码
      import os
      
      os.environ.setdefault('FORKED_BY_MULTIPROCESSING', '1')
      
    • 提交任务
      result = add.delay(2, 15)
      s
      # 或者使用 apply_async(由参数组成的元组)
      # result = add.apply_async((2, 15))
      

合理分配文件夹管理 celery 任务

  1. 在当前项目下新建一个 celery_app 的包,用来存放 celerytaskconfig, 并在 __init__.py 中配置 celery 实例

    from celery import Celery
    import os
    
    os.environ.setdefault('FORKED_BY_MULTIPROCESSING', '1')
    
    app = Celery('demo')
    
    # 通过 `Celery` 实例加载配置模块
    app.config_from_object('celery_app.celery_config')
    
  2. celery_config.py 中添加 celery 的配置

    # celery_config.py
    from datetime import timedelta
    from celery.schedules import crontab
    
    
    BROKER_URL = 'redis://localhost:6379/1'
    CELERY_RESULT_BACKEND = 'redis://localhost:6379/2'
    
    
    CELERY_TIMEZONE = 'Asia/Shanghai'  # 默认为 UTC
    
    # 导入指定的任务模块
    CELERY_IMPORTS = {
        'celery_app.task1',
        'celery_app.task2',
    }
    
    
    # 指定 `celery` 要执行的任务
    CELERYBEAT_SCHEDULE = {
        'task1': {
            'task': 'celery_app.task1.add',
            'schedule': timedelta(seconds=500),
            'args': (2, 8)
        },
        'task2': {
            'task': 'celery_app.task2.multiply',
            'schedule': crontab(hour=16, minute=32),
            'args': (4,6)
        }
    }
    
  3. 编写 celerytask

    import time
    
    from celery_app import app
    
    @app.task
    def add(x, y):
        time.sleep(3)
        return x + y
    
  4. 在外部脚本中向 celery 提交任务

    # app.py
    from celery_app import task1, task2
    
    task1.add.delay(2 ,5)
    task2.multiply.apply_async(args=(2, 5))  # 可以添加额外参数,比如:指定使用的队列  
    

启动 Celery

在Python3.7中运行celery worker时出现报错以及解决办法

from . import async, base
                ^
SyntaxError: invalid syntax

错误提出及讨论:

https://github.com/celery/celery/issues/4500

解决方案:

https://github.com/celery/celery/pull/4852/commits/d737dec3c943632f21f73a2235409c29e3fe63e3

在django中使用 celery

运行 python manage.py celery worker -l INFO 时报错:

Traceback (most recent call last):
  File "manage.py", line 15, in <module>
    execute_from_command_line(sys.argv)
  File "C:\Users\jzw\Desktop\celery_learn\dj_celery\venv\lib\site-packages\django\core\management\__init__.py", line 381, in execute_from_command_line
    utility.execute()
  File "C:\Users\jzw\Desktop\celery_learn\dj_celery\venv\lib\site-packages\django\core\management\__init__.py", line 375, in execute
    self.fetch_command(subcommand).run_from_argv(self.argv)
  File "C:\Users\jzw\Desktop\celery_learn\dj_celery\venv\lib\site-packages\django\core\management\__init__.py", line 224, in fetch_command
    klass = load_command_class(app_name, subcommand)
  File "C:\Users\jzw\Desktop\celery_learn\dj_celery\venv\lib\site-packages\django\core\management\__init__.py", line 36, in load_command_class
    module = import_module('%s.management.commands.%s' % (app_name, name))
  File "D:\Programs\Python\Python37\lib\importlib\__init__.py", line 127, in import_module
    return _bootstrap._gcd_import(name[level:], package, level)
  File "<frozen importlib._bootstrap>", line 1006, in _gcd_import
  File "<frozen importlib._bootstrap>", line 983, in _find_and_load
  File "<frozen importlib._bootstrap>", line 967, in _find_and_load_unlocked
  File "<frozen importlib._bootstrap>", line 677, in _load_unlocked
  File "<frozen importlib._bootstrap_external>", line 728, in exec_module
  File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed
  File "C:\Users\jzw\Desktop\celery_learn\dj_celery\venv\lib\site-packages\djcelery\management\co
mmands\celery.py", line 11, in <module>
    class Command(CeleryCommand):
  File "C:\Users\jzw\Desktop\celery_learn\dj_celery\venv\lib\site-packages\djcelery\management\co
mmands\celery.py", line 15, in Command
    base.get_options() +
TypeError: can only concatenate tuple (not "NoneType") to tuple

参考:

https://stackoverflow.com/questions/49085230/django-celery-typeerror-can-only-concatenate-tuple-not-nonetype-to-tuple
http://docs.celeryproject.org/en/latest/django/first-steps-with-django.html

解决方案:

  1. djcelery\management\commands\celery.py 中的 options 部分注释掉
    # celery.py
    from __future__ import absolute_import, unicode_literals
    
    from celery.bin import celery
    
    from djcelery.app import app
    from djcelery.management.base import CeleryCommand
    
    base = celery.CeleryCommand(app=app)
    
    
    class Command(CeleryCommand):
        """The celery command."""
        help = 'celery commands, see celery help'
        # options = (CeleryCommand.options +
        #            base.get_options() +
        #            base.preload_options)
    
        def run_from_argv(self, argv):
            argv = self.handle_default_options(argv)
            base.execute_from_commandline(
                ['{0[0]} {0[1]}'.format(argv)] + argv[2:],
            )
    
  2. 修改后运行 python manage.py celery worker -l INFO 时报错,报错信息如下:
    [2019-02-11 01:11:40,836: CRITICAL/MainProcess] Unrecoverable error: SyntaxError('invalid syntax', ('C:\\Users\\jzw\\Desktop\\celery_learn\\dj_celery\\venv\\lib\\site-packages\\cel
    ery\\backends\\redis.py', 22, 19, 'from . import async, base\n'))
    Traceback (most recent call last):
      File "C:\Users\jzw\Desktop\celery_learn\dj_celery\venv\lib\site-packages\kombu\utils\objects.py", line 42, in __get__
        return obj.__dict__[self.__name__]
    KeyError: 'backend'
    
    During handling of the above exception, another exception occurred:
    
    Traceback (most recent call last):
      File "C:\Users\jzw\Desktop\celery_learn\dj_celery\venv\lib\site-packages\celery\worker\worker.py", line 205, in start
        self.blueprint.start(self)
      File "C:\Users\jzw\Desktop\celery_learn\dj_celery\venv\lib\site-packages\celery\bootsteps.py", line 115, in start
        self.on_start()
      File "C:\Users\jzw\Desktop\celery_learn\dj_celery\venv\lib\site-packages\celery\apps\worker.py", line 139, in on_start
        self.emit_banner()
      File "C:\Users\jzw\Desktop\celery_learn\dj_celery\venv\lib\site-packages\celery\apps\worker.py", line 154, in emit_banner
        ' \n', self.startup_info(artlines=not use_image))),
      File "C:\Users\jzw\Desktop\celery_learn\dj_celery\venv\lib\site-packages\celery\apps\worker.py", line 217, in startup_info
        results=self.app.backend.as_uri(),
      File "C:\Users\jzw\Desktop\celery_learn\dj_celery\venv\lib\site-packages\kombu\utils\objects.py", line 44, in __get__
        value = obj.__dict__[self.__name__] = self.__get(obj)
      File "C:\Users\jzw\Desktop\celery_learn\dj_celery\venv\lib\site-packages\celery\app\base.py", line 1196, in backend
        return self._get_backend()
      File "C:\Users\jzw\Desktop\celery_learn\dj_celery\venv\lib\site-packages\celery\app\base.py", line 914, in _get_backend
        self.loader)
      File "C:\Users\jzw\Desktop\celery_learn\dj_celery\venv\lib\site-packages\celery\app\backends.py", line 70, in by_url
        return by_name(backend, loader), url
      File "C:\Users\jzw\Desktop\celery_learn\dj_celery\venv\lib\site-packages\celery\app\backends.py", line 50, in by_name
        cls = symbol_by_name(backend, aliases)
      File "C:\Users\jzw\Desktop\celery_learn\dj_celery\venv\lib\site-packages\kombu\utils\imports.py", line 56, in symbol_by_name
        module = imp(module_name, package=package, **kwargs)
      File "D:\Programs\Python\Python37\lib\importlib\__init__.py", line 127, in import_module
        return _bootstrap._gcd_import(name[level:], package, level)
      File "<frozen importlib._bootstrap>", line 1006, in _gcd_import
      File "<frozen importlib._bootstrap>", line 983, in _find_and_load
      File "<frozen importlib._bootstrap>", line 967, in _find_and_load_unlocked
      File "<frozen importlib._bootstrap>", line 677, in _load_unlocked
      File "<frozen importlib._bootstrap_external>", line 724, in exec_module
      File "<frozen importlib._bootstrap_external>", line 860, in get_code
      File "<frozen importlib._bootstrap_external>", line 791, in source_to_code
      File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed
      File "C:\Users\jzw\Desktop\celery_learn\dj_celery\venv\lib\site-packages\celery\backends\redis.py", line 22
        from . import async, base
                          ^
    SyntaxError: invalid syntax
    

说明:这是因为在 python 3.7 中将 async 作为了关键字,所以当 py 文件中出现类似 from . import async, base 这类不符合python语法的语句时,Python会报错。

解决:

celery 官方的提议下,建议将 async.py 文件的文件名改成 asynchronous。所以我们只需要将 celery\backends\async.py 改成 celery\backends\asynchronous.py,并且把 celery\backends\redis.py 中的所有 async 改成 asynchronous 就可以了。

  1. 重新运行 python manage.py celery worker 命令,celeryworker 运行成功!

Celery 的监控工具: flower

使用 supervisor 来部署管理 celery

上一篇 下一篇

猜你喜欢

热点阅读