Python之Celery

简介

  • Celery 是一款非常简单、灵活、可靠的分布式系统,可用于处理大量消息,并且提供了一整套操作此系统的一系列工具。
  • Celery 是一款专注于实时处理的异步任务队列,可用于处理实时数据以及任务调度。

Celery架构

Celery的架构由三部分组成,消息中间件(message broker),任务执行单元(worker)和任务执行结果存储(task result store)组成。

消息中间件

Celery本身不提供消息服务,但是可以方便的和第三方提供的消息中间件集成。包括,RabbitMQ Redis等等

任务执行单元

Worker是Celery提供的任务执行的单元,worker并发的运行在分布式的系统节点中。

任务结果存储

Task result store用来存储Worker执行的任务的结果,Celery支持以不同方式存储任务的结果,包括AMQP, redis等

使用场景

异步任务:将耗时操作任务提交给Celery去异步执行,比如发送短信/邮件、消息推送、音视频处理等等

定时任务:定时执行某件事情,比如每天数据统计

Celery的安装配置

pip install celery

中间人(Broker)消息中间件:RabbitMQ / Redis

RabbitMQ

RabbitMQ 是默认的中间人(Broker),只需要配置连接的URL即可,不需要安装额外的的配置以及初始化配置信息

1
broker_url = 'amqp://myuser:mypassword@localhost:5672/myvhost'

有关 Celery 各种中间人(Broker)的配置列表,请查阅代理设置,并且按照说明设置用户名和密码。

RabbitMQ的安装和使用这里不多赘述,有空新开一篇文章讲解

Redis

Redis 的配置非常的简单,只需要配置 Redis 的 URL :

1
app.conf.broker_url = 'redis://localhost:6379/0'

URL 的格式为:

1
redis://:password@hostname:port/db_number

URL 的所有配置都可以自定义配置的,默认使用的是 localhost 的 6379 端口中 0 数据库。( Redis 默认有 16 个数据库)

Celery 也可以连接 Redis 哨兵也是非常简单的:

1
2
app.conf.broker_url = 'sentinel://localhost:26379;sentinel://localhost:26380;sentinel://localhost:26381'
app.conf.broker_transport_options = {'master_name':'cluster1'}

RabbitMQ / Redis 比较

  • RabbitMQ 的功能比较齐全、稳定、便于安装。在生产环境来说是首选的。

  • Redis 功能比较全,但是如果突然停止运行或断电会造成数据丢失。

个人比较喜欢Redis作为Broker,Redis安装和使用比较简单,之所以Redis能够用作消息队列,因为其中的列表数据类型,可以存储多个有序的字符串,既然是有序的,就满足消息队列的特点,使用lpush+rpop或者rpush+lpop实现消息队列。除此之外,redis支持阻塞操作,在弹出元素的时候使用阻塞命令来实现阻塞队列。

以上已经在Redis分类的文章中有写到,新版Redis的特性也能更好地支持消息队列,这一块的知识后续新开一个文章进行学习。

Celery异步任务

创建任务

tasks.py

1
2
3
4
5
from celery import Celery
app = Celery('tasks', broker='amqp://guest@localhost//')
@app.task
def add(x, y):
return x + y

调用任务

需要调用我们创建的实例任务,可以通过 delay() 进行调用。

delay()apply_async() 的快捷方法,可以更好的控制任务的执行(详情:调用任务:Calling Tasks):

1
2
>>> from tasks import add
>>> add.delay(4, 4)

保存结果

如果需要跟踪任务的状态,Celery 需要在某处存储任务的状态信息。Celery 内置了一些后端结果:SQLAlchemy/Django ORM、MemcachedRedis、 RPC (RabbitMQ/AMQP)以及自定义的后端结果存储中间件。

针对本次实例,我们使用 RPC 作为结果后端,将状态信息作为临时消息回传。后端通过 backend 参数指定给 Celery(或者通过配置模块中的 result_backend 选项设定):

1
app = Celery('tasks', backend='rpc://', broker='pyamqp://')

例如,可以使用Redis作为Celery结果后端,使用RabbitMQ作为中间人(Broker)可以使用以下配置(这种组合比较流行):

1
app = Celery('tasks', backend='redis://localhost', broker='pyamqp://')

ready() 可以检测是否已经处理完毕:

1
2
3
>>> result = add.delay(4, 4)
>>> result.ready()
False

整个任务执行过程为异步的,如果一直等待任务完成,会将异步调用转换为同步调用:

1
2
>>> result.get(timeout=1)
8

如果任务出现异常,get() 会再次引发异常,可以通过 propagate 参数进行覆盖:

1
>>> result.get(propagate=False)

如果任务出现异常,可以通过以下命令进行回溯:

1
>>> result.traceback

Celery执行定时任务

celery beat 是一个调度程序;它定期启动任务,然后由集群中的可用节点执行任务。

设定时间让celery执行一个任务

1
2
3
4
5
6
7
8
9
10
11
12
from celery_app_task import add
from datetime import datetime

ctime = datetime.now()
# 默认用utc时间
utc_ctime = datetime.utcfromtimestamp(ctime.timestamp())
from datetime import timedelta
time_delay = timedelta(seconds=10)
task_time = utc_ctime + time_delay

# 使用apply_async并设定时间
result = add.apply_async(args=[4, 3], eta=task_time)

类似于contab的定时任务

多任务结构中celery.py配置如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
from datetime import timedelta
from celery import Celery
from celery.schedules import crontab

cel = Celery('tasks', broker='redis://127.0.0.1:6379/1', backend='redis://127.0.0.1:6379/2', include=[
'celery_task.tasks1',
'celery_task.tasks2',
])
cel.conf.timezone = 'Asia/Shanghai'
cel.conf.enable_utc = False

cel.conf.beat_schedule = {
# 名字随意命名
'add-every-10-seconds': {
# 执行tasks1下的test_celery函数
'task': 'celery_task.tasks1.test_celery',
# 每隔2秒执行一次
# 'schedule': 1.0,
# 'schedule': crontab(minute="*/1"),
'schedule': timedelta(seconds=2),
# 传递参数
'args': ('test',)
},
# 'add-every-12-seconds': {
# 'task': 'celery_task.tasks1.test_celery',
# 每年4月11号,8点42分执行
# 'schedule': crontab(minute=42, hour=8, day_of_month=11, month_of_year=4),
# 'schedule': crontab(minute=42, hour=8, day_of_month=11, month_of_year=4),
# 'args': (16, 16)
# },
}

启动一个beat:celery beat -A celery_task -l info

启动work执行:celery worker -A celery_task -l info -P eventlet

Django中使用Celery

在Django项目中异步任务和周期任务是必不可少的:

添加Celery配置项

在项目目录下创建celeryconfig.py(也可以写在settings.py中)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import djcelery
djcelery.setup_loader()
# CELERY 配置,申明任务的文件路径,即包含有 @task 装饰器的函数文件
CELERY_IMPORTS=(
'app01.tasks',
)
# 有些情况可以防止死锁
CELERYD_FORCE_EXECV=True
# 设置并发worker数量
CELERYD_CONCURRENCY=4
# 允许重试
CELERY_ACKS_LATE=True
# 每个worker最多执行100个任务被销毁,可以防止内存泄漏
CELERYD_MAX_TASKS_PER_CHILD=100
# 超时时间
CELERYD_TASK_TIME_LIMIT=12*30

CELERYBEAT_SCHEDULER = "djcelery.schedulers.DatabaseScheduler"

在app目录下创建tasks.py

1
2
3
4
5
6
from celery import task
@task
def add(a,b):
with open('a.text', 'a', encoding='utf-8') as f:
f.write('a')
print(a+b)

视图函数views.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
from django.shortcuts import render,HttpResponse
from app01.tasks import add
from datetime import datetime
def test(request):
# result=add.delay(2,3)
ctime = datetime.now()
# 默认用utc时间
utc_ctime = datetime.utcfromtimestamp(ctime.timestamp())
from datetime import timedelta
time_delay = timedelta(seconds=5)
task_time = utc_ctime + time_delay
result = add.apply_async(args=[4, 3], eta=task_time)
print(result.id)
return HttpResponse('ok')

Django的settings.py

1
2
3
4
5
6
7
8
9
# 注册app
INSTALLED_APPS = [
'djcelery',
'app'
]
from djagocele import celeryconfig
BROKER_BACKEND='redis'
BROKER_URL='redis://127.0.0.1:6379/1'
CELERY_RESULT_BACKEND='redis://127.0.0.1:6379/2'

周期任务

celery beat是用来开启定时任务调度的,一般用法为:启动celery beat,然后启动worker,让beat去调用worker里面的任务。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
from celery.schedules import crontab
from celery.task import task, periodic_task

# crontab设置每天8点
# @periodic_task(run_every=(crontab(minute=0, hour=8)))
# crontab设置每分钟
@periodic_task(run_every=crontab(minute="*/1"))
def distribute_task():
now = datetime.datetime.now()
# 使用周期任务调动异步任务
get_alarm_list.apply_async(args=(now,))

@task()
def get_alarm_list(source_time):
print(source_time)

总结

写了这么多,从Celery简介、架构,到消息队列选型,再Python中如何使用Celery、Django项目怎么配置集成Celery,也算是Celery的超详细讲解了,用了这么久Celery,总算填了一个坑哈哈。

想要学习Celery更多的设计、调度、优化、路由、并发、信号、调试、测试、扩展等详细内容,可以参考 https://www.celerycn.io/


参考链接:

Celery 源码 - Github

django-celery - Github

django-celery-beat 源码 - Github

《Celery 中文手册》

Celery - 简书

欢迎关注我的其它发布渠道