码农行者 码农行者
首页
  • Python

    • 语言特性
    • Django相关
    • Tornado
    • Celery
  • Golang

    • golang学习笔记
    • 对比python学习go
    • 模块学习
  • JavaScript

    • Javascript
  • 数据结构预算法笔记
  • ATS
  • Mongodb
  • Git
云原生
运维
垃圾佬的快乐
  • 数据库
  • 机器学习
  • 杂谈
  • 面试
关于
收藏
  • 分类
  • 标签
  • 归档
GitHub (opens new window)

DeanWu

软件工程师
首页
  • Python

    • 语言特性
    • Django相关
    • Tornado
    • Celery
  • Golang

    • golang学习笔记
    • 对比python学习go
    • 模块学习
  • JavaScript

    • Javascript
  • 数据结构预算法笔记
  • ATS
  • Mongodb
  • Git
云原生
运维
垃圾佬的快乐
  • 数据库
  • 机器学习
  • 杂谈
  • 面试
关于
收藏
  • 分类
  • 标签
  • 归档
GitHub (opens new window)
  • Python

    • 语言特性

    • Django

    • Best.Practices.for.Django

    • Djangorestfulframework

    • Celery

      • 「译」在django中使用celery实现异步任务
      • Tornado

      • Flask

      • FastApi

      • virtualenv

    • Golang

    • Javascript

    • 开发语言
    • Python
    • Celery
    DeanWu
    2015-12-03
    目录

    「译」在django中使用celery实现异步任务

    # 在django中使用celery

    在使用celery时,你需要定义一个celery的实例(叫做 app )。如果你有一个现成的django项目,如下:

    - proj/
      - proj/__init__.py
      - proj/settings.py
      - proj/urls.py
    - manage.py
    
    1
    2
    3
    4
    5

    建议如下方式创建celery实例。

    创建 proj/proj/celery.py 文件,如下:

    proj/proj/celery.py

    from __future__ import absolute_import
    
    import os
    
    from celery import Celery
    
    # set the default Django settings module for the 'celery' program.
    os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'proj.settings')
    
    from django.conf import settings  # noqa
    
    app = Celery('proj')
    
    # Using a string here means the worker will not have to
    # pickle the object when using Windows.
    app.config_from_object('django.conf:settings')
    app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
    
    
    @app.task(bind=True)
    def debug_task(self):
        print('Request: {0!r}'.format(self.request))
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22

    接下来,你需要把这个app放到proj/proj/__init__.py模块中,确保django启动的时候这个app能够被加载。

    从而提供给@shared_task(稍后讲到)注解使用。

    proj/proj/__init__.py

    from __future__ import absolute_import
    
    # This will make sure the app is always imported when
    # Django starts so that shared_task will use this app.
    from .celery import app as celery_app  # noqa
    
    1
    2
    3
    4
    5

    注意,这个项目实例的布局,适合于大型项目。对于简单的小项目来说,你可以用一个模块来同时定义celery实例和任务。

    参考celery 入门教程 (opens new window)教程。

    让我们逐个分析下第一个模块都做了什么。

    首先,我们导入future 模块,让我们的celery.py 不会与其他类库产生冲突。

    from __future__ import absolute_import
    

    接下来,我们为celery 设置类默认的django项目 DJANGO_SETTINGS_MODULE。

    os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'proj.settings')
    

    这个设置让celery程序知道我们的django项目配置在哪。这句必须放在celery每次被实例化之前。

    接下来,实例化 celery 。

    app = Celery('proj')
    

    在django项目中,你只需要一个celery实例就可以了。

    我们可以把celery的配置放到django的settings配置文件中,如下:

    app.config_from_object('django.conf:settings')
    

    此处的参数你可以传递对象,但是建议字符串,这样使用windows 或execv 的时候,无需序列化对象。

    接下来,将所有可以重复操作的任务tasks.py 放到一个django的app中,celery使用如下的机制来发现此模块。

    app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
    

    tasks.py 的放至如下:

    - app1/
        - app1/tasks.py
        - app1/models.py
    - app2/
        - app2/tasks.py
        - app2/models.py
    

    这样,您就不必在各个模块手动添加到CELERY_IMPORTS参数。lambda 函数会使app在调用的时候能够被自动的发现,并且使你输入的模块不会影响Django的设置对象。

    最后,debug_task 例子是绑定了自己的request请求信息的一个任务函数。

    Finally, the debug_task example is a task that dumps its own request information.

    This is using the new bind=True task option introduced in Celery 3.1 to easily refer to the current task instance.

    # Using the @shared_task decorator

    你的task任务写在了django的app中,这个app可能不依赖与django项目本身。所以你不能讲celery的实例一如此app中。

    该@shared_task装饰可以让你创建任务,而无需任何具体的celery实例:

    注:此时需要把 django 的app 添加到 settings的app列表(INSTALLED_APPS)中。

    demoapp/tasks.py:

    from __future__ import absolute_import
    
    from celery import shared_task
    
    
    @shared_task
    def add(x, y):
        return x + y
    
    
    @shared_task
    def mul(x, y):
        return x * y
    
    
    @shared_task
    def xsum(numbers):
        return sum(numbers)
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18

    # Using the Django ORM/Cache as a result backend.

    如果你想讲celery的返回结果使用django的ORM(或SQLAlchemy)存到数据库中,你需要安装 django-celery 模块库。

    django-celery 模块库结果默认使用django的ORM 和 Cache 框架。

    使用步骤如下:

    1、安装 django-celery 库:

    $ pip install django-celery
    

    2、Add djcelery to INSTALLED_APPS.

    3、创建celery的数据表

    If you are using south for schema migrations, you’ll want to:
    
    $ python manage.py migrate djcelery
    For those who are not using south, a normal syncdb will work:
    
    $ python manage.py syncdb
    
    >=django1.7 
    可直接使用django自带的数据库同步命令:
    $ python manage.py makemigrations
    $ python manage.py migrate
    

    4、配置celery。

    # For the database backend you must use:
    app.conf.update(
        CELERY_RESULT_BACKEND='djcelery.backends.database:DatabaseBackend',
    )
    # For the cache backend you can use:
    app.conf.update(
        CELERY_RESULT_BACKEND='djcelery.backends.cache:CacheBackend',
    )
    # 此处配置用来存储任务执行结果。
    

    除了这种方式外,你可以把 CELERY_RESULT_BACKEND 放到你的django的settings配置文件中。

    # Starting the worker process

    在生产环境中,你将希望在后台运行worker - 请参阅 Running the worker as a daemon (opens new window) -

    但对于测试和开发它能够通过使用 celery 启动工作实例的管理命令非常有用,

    就像你使用Django的runserver:

    celery -A proj worker -l info
    

    获取完整的celery命令列表,请使用:

    celery help 
    

    # 2015年12月10日 update:

    按照此教程,启动celery后,报错误如下:

    [2016-05-29 01:19:24,751: ERROR/MainProcess] consumer: Cannot connect to amqp://guest:**@127.0.0.1:5672//: [Errno 111] Connection refused.
    Trying again in 2.00 seconds...
    

    原因是,celery 需要一个 broker 来发送和接收消息,支持:rabbitmq、redis、数据库等作为这个broker。上边的配置中没有设置这个borker,

    celery 默认使用 rabbitmq来作为broker。所以,报错链接不上rabbitmq。

    解决办法:

    1、按装rabbitmq 显式的指定 broker为你配置的rabbitmq。

    CELERY_BROKER_URL = 'amqp://guest:guest@localhost//' 
    

    2、使用django数据库作为 broker:

    # 增加配置
    BROKER_URL = 'django://'
    # 增加app实例配置
    INSTALLED_APPS = ('kombu.transport.django', )
    

    broker 官方建议使用 rabbitmq 或 redis ,其他的broker 仅为开发测试,性能不稳定。

    官网详细介绍:http://docs.jinkan.org/docs/celery/getting-started/brokers/index.html

    # 2016年5月29日 更新:

    在搭建django + celery 使用总结:

    1、使用 app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)这个配置,让celery 自动发现django app 下定义的task 时, django app下的task 文件名 必须为 tasks.py。

    2、celery 3.1 后,和django的结合无需再依靠django-celery。 可按以上方式搭建。具体官方地址 这里 (opens new window)

    3、todo 后续补充

    # 各种功能搭建实例源码

    源码 (opens new window)


    ps: 个人英文水平有限,还请各位批评指正。

    # 参考:

    http://docs.celeryproject.org/en/latest/django/first-steps-with-django.html (opens new window)

    #Django#Celery#翻译
    上次更新: 2023/03/28, 16:27:19
    「译」Django restful framework 性能优化
    我心中的 tornado 最佳实践

    ← 「译」Django restful framework 性能优化 我心中的 tornado 最佳实践→

    最近更新
    01
    chromebox/chromebook 刷bios步骤
    03-01
    02
    redis 集群介绍
    11-28
    03
    go语法题二
    10-09
    更多文章>
    Theme by Vdoing | Copyright © 2015-2024 DeanWu | 遵循CC 4.0 BY-SA版权协议
    • 跟随系统
    • 浅色模式
    • 深色模式
    • 阅读模式