简介
- Celery 是一款非常简单、灵活、可靠的分布式系统,可用于处理大量消息,并且提供了一整套操作此系统的一系列工具。
- Celery 是一款专注于实时处理的异步任务队列,可用于处理实时数据以及任务调度。
Celery架构
Celery的架构由三部分组成,消息中间件(message broker),任务执行单元(worker)和任务执行结果存储(task result store)组成。

消息中间件
Celery本身不提供消息服务,但是可以方便的和第三方提供的消息中间件集成。包括,RabbitMQ Redis等等
任务执行单元
Worker是Celery提供的任务执行的单元,worker并发的运行在分布式的系统节点中。
任务结果存储
Task result store用来存储Worker执行的任务的结果,Celery支持以不同方式存储任务的结果,包括AMQP, redis等
使用场景
异步任务:将耗时操作任务提交给Celery去异步执行,比如发送短信/邮件、消息推送、音视频处理等等
定时任务:定时执行某件事情,比如每天数据统计
Celery的安装配置
pip install celery
中间人(Broker)消息中间件:RabbitMQ / Redis
RabbitMQ
RabbitMQ 是默认的中间人(Broker),只需要配置连接的URL即可,不需要安装额外的的配置以及初始化配置信息
1 | broker_url = 'amqp://myuser:mypassword@localhost:5672/myvhost' |
有关 Celery 各种中间人(Broker)的配置列表,请查阅代理设置,并且按照说明设置用户名和密码。
RabbitMQ
的安装和使用这里不多赘述,有空新开一篇文章讲解
Redis
Redis 的配置非常的简单,只需要配置 Redis 的 URL :
1 | app.conf.broker_url = 'redis://localhost:6379/0' |
URL 的格式为:
1 | redis://:password@hostname:port/db_number |
URL 的所有配置都可以自定义配置的,默认使用的是 localhost 的 6379 端口中 0 数据库。( Redis 默认有 16 个数据库)
Celery 也可以连接 Redis 哨兵也是非常简单的:
1 | app.conf.broker_url = 'sentinel://localhost:26379;sentinel://localhost:26380;sentinel://localhost:26381' |
RabbitMQ / Redis 比较
个人比较喜欢Redis作为Broker,Redis安装和使用比较简单,之所以Redis能够用作消息队列,因为其中的列表数据类型,可以存储多个有序的字符串,既然是有序的,就满足消息队列的特点,使用lpush
+rpop
或者rpush
+lpop
实现消息队列。除此之外,redis支持阻塞操作,在弹出元素的时候使用阻塞命令来实现阻塞队列。
以上已经在Redis分类的文章中有写到,新版Redis的特性也能更好地支持消息队列,这一块的知识后续新开一个文章进行学习。
Celery异步任务
创建任务
1 | from celery import Celery |
调用任务
需要调用我们创建的实例任务,可以通过 delay()
进行调用。
delay()
是 apply_async()
的快捷方法,可以更好的控制任务的执行(详情:调用任务:Calling Tasks
):
1 | from tasks import add |
保存结果
如果需要跟踪任务的状态,Celery 需要在某处存储任务的状态信息。Celery 内置了一些后端结果:SQLAlchemy/Django ORM、Memcached、Redis、 RPC (RabbitMQ/AMQP)以及自定义的后端结果存储中间件。
针对本次实例,我们使用 RPC 作为结果后端,将状态信息作为临时消息回传。后端通过 backend 参数指定给 Celery(或者通过配置模块中的 result_backend 选项设定):
1 | app = Celery('tasks', backend='rpc://', broker='pyamqp://') |
例如,可以使用Redis作为Celery结果后端,使用RabbitMQ作为中间人(Broker)可以使用以下配置(这种组合比较流行):
1 | app = Celery('tasks', backend='redis://localhost', broker='pyamqp://') |
ready()
可以检测是否已经处理完毕:
1 | 4, 4) result = add.delay( |
整个任务执行过程为异步的,如果一直等待任务完成,会将异步调用转换为同步调用:
1 | 1) result.get(timeout= |
如果任务出现异常,get()
会再次引发异常,可以通过 propagate 参数进行覆盖:
1 | False) result.get(propagate= |
如果任务出现异常,可以通过以下命令进行回溯:
1 | result.traceback |
Celery执行定时任务
celery beat
是一个调度程序;它定期启动任务,然后由集群中的可用节点执行任务。
设定时间让celery执行一个任务
1 | from celery_app_task import add |
类似于contab的定时任务
多任务结构中celery.py配置如下
1 | from datetime import timedelta |
启动一个beat:celery beat -A celery_task -l info
启动work执行:celery worker -A celery_task -l info -P eventlet
Django中使用Celery
在Django项目中异步任务和周期任务是必不可少的:
添加Celery配置项
在项目目录下创建celeryconfig.py(也可以写在settings.py中)
1 | import djcelery |
在app目录下创建tasks.py
1 | from celery import task |
视图函数views.py
1 | from django.shortcuts import render,HttpResponse |
Django的settings.py
1 | # 注册app |
周期任务
celery beat是用来开启定时任务调度的,一般用法为:启动celery beat,然后启动worker,让beat去调用worker里面的任务。
1 | from celery.schedules import crontab |
总结
写了这么多,从Celery简介、架构,到消息队列选型,再Python中如何使用Celery、Django项目怎么配置集成Celery,也算是Celery的超详细讲解了,用了这么久Celery,总算填了一个坑哈哈。
想要学习Celery更多的设计、调度、优化、路由、并发、信号、调试、测试、扩展等详细内容,可以参考 https://www.celerycn.io/
参考链接: