当前位置:网站首页>Celery | 任务队列神器
Celery | 任务队列神器
2022-06-10 04:11:00 【Python研究所】

作者:wedo实验君
来源:Python中文社区
1.什么是celery
celery是一个简单,灵活、可靠的分布式任务执行框架,可以支持大量任务的并发执行。celery采用典型生产者和消费者模型。生产者提交任务到任务队列,众多消费者从任务队列中取任务执行。
1.1 celery架构
Celery由以下三部分构成:消息中间件(Broker)、任务执行单元Worker、结果存储(Backend)

任务调用提交任务执行请求给Broker队列
如果是异步任务,worker会立即从队列中取出任务并执行,执行结果保存在Backend中
如果是定时任务,任务由Celery Beat进程周期性地将任务发往Broker队列,Worker实时监视消息队列获取队列中的任务执行
1.2 应用场景
大量的长时间任务的异步执行, 如上传大文件
大规模实时任务执行,支持集群部署,如支持高并发的机器学习推理
定时任务执行,如定时发送邮件,定时扫描机器运行情况
2.安装
celery安装非常简单, 除了安装celery,本文中使用redis作为消息队列即Broker
# celery 安装
pip install celery
# celery 监控 flower
pip install flower
pip install redis
# redis 安装
yum install redis
# redis启动
redis-server /etc/redis.conf
3. 完整例子
celery的应用开发涉及四个部分
celery 实例初始化
任务的定义(定时和实时任务)
任务worker的启动
任务的调用
3.1 项目目录
# 项目目录
wedo
.
├── config.py
├── __init__.py
├── period_task.py
└── tasks.py
3.2 celery 实例初始化
celery的实例化,主要包括执行Broker和backend的访问方式,任务模块的申明等
# celery 实例初始化
# __init__.py
from celery import Celery
app = Celery('wedo') # 创建 Celery 实例
app.config_from_object('wedo.config')
# 配置 wedo.config
# config.py
BROKER_URL = 'redis://10.8.238.2:6379/0' # Broker配置,使用Redis作为消息中间件
CELERY_RESULT_BACKEND = 'redis://10.8.238.2:6379/0' # BACKEND配置,这里使用redis
CELERY_RESULT_SERIALIZER = 'json' # 结果序列化方案
CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24 # 任务过期时间
CELERY_TIMEZONE='Asia/Shanghai' # 时区配置
CELERY_IMPORTS = ( # 指定导入的任务模块,可以指定多个
'wedo.tasks',
'wedo.period_task'
)
3.3 任务的定义
celery中通过@task的装饰器来进行申明celery任务,其他操作无任何差别
# 任务的定义
# 简单任务 tasks.py
import celery
import time
from celery.utils.log import get_task_logger
from wedo import app
@app.task
def sum(x, y):
return x + y
@app.task
def mul(x, y):
time.sleep(5)
return x * y
定时任务和实时任务的区别主要是要申明何时执行任务,任务本身也是通过task装饰器来申明 何时执行任务有2种
指定频率执行:sender.add_periodic_task(时间频率单位s, 任务函数, name='to_string')
crontab方式:分钟/小时/天/月/周粒度, 可以支持多种调度
# 任务的定义
# 定时任务 period_task.py
from wedo import app
from celery.schedules import crontab
@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
sender.add_periodic_task(5.0, to_string.s("celery peroid task"), name='to_string') # 每5秒执行add
sender.add_periodic_task(
crontab(minute='*/10'), #每10分钟执行一次
send_mail.s('hello, this is a celery'), name='send_mail'
)
@app.task
def send_mail(content):
print('send mail, content is %s' % content)
@app.task
def to_string(text):
return 'this is a %s' % text
3.4 任务worker的启动
任务启动分为worker启动和定时任务beat启动
# -A wedo为应用模块
# -l为日志level
# -c 为进程数
celery worker -A wedo -l debug -c 4
# 后台启动
nohup celery worker -A wedo -l debug -c 4 > ./log.log 2>&1
# 从下面的日志可以看出启动了4个任务
# . wedo.period_task.send_mail
# . wedo.period_task.to_string
# . wedo.tasks.mul
# . wedo.tasks.sum
-------------- [email protected] v4.4.2 (cliffs)
--- ***** -----
-- ******* ---- Linux-3.10.0-327.28.3.el7.x86_64-x86_64-with-centos-7.2.1511-Core 2020-04-25 23:35:26
- *** --- * ---
- ** ---------- [config]
- ** ---------- .> app: wedo:0x7f05af30d320
- ** ---------- .> transport: redis://10.8.238.2:6379/0
- ** ---------- .> results: redis://10.8.238.2:6379/0
- *** --- * --- .> concurrency: 4 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** -----
-------------- [queues]
.> celery exchange=celery(direct) key=celery
[tasks]
. celery.accumulate
. celery.backend_cleanup
...
. wedo.period_task.send_mail
. wedo.period_task.to_string
. wedo.tasks.mul
. wedo.tasks.sum
...
[2020-04-25 23:35:27,617: INFO/MainProcess] [email protected] ready.
[2020-04-25 23:35:27,617: DEBUG/MainProcess] basic.qos: prefetch_count->16
[2020-04-25 23:35:27,655: DEBUG/MainProcess] [email protected] joined the party
celery beat -A wedo.period_task
celery beat v4.4.2 (cliffs) is starting.
__ - ... __ - _
LocalTime -> 2020-04-25 23:37:08
Configuration ->
. broker -> redis://10.8.238.2:6379/0
. loader -> celery.loaders.app.AppLoader
. scheduler -> celery.beat.PersistentScheduler
. db -> celerybeat-schedule
. logfile -> [stderr]@%WARNING
. maxinterval -> 5.00 minutes (300s)
# worker启动是4个进程
\_ /root/anaconda3/envs/post/bin/celery worker -A wedo -l debug -c 4
\_ /root/anaconda3/envs/post/bin/celery worker -A wedo -l debug -c 4
\_ /root/anaconda3/envs/post/bin/celery worker -A wedo -l debug -c 4
\_ /root/anaconda3/envs/post/bin/celery worker -A wedo -l debug -c 4
\_ /root/anaconda3/envs/post/bin/celery worker -A wedo -l debug -c 4
worker和beat的停止
ps auxww | awk '/celery worker/ {print $2}' | xargs kill -9
ps auxww | awk '/celery beat/ {print $2}' | xargs kill -9
3.5 任务的调用
任务worker已经启动好了,通过任务调用传递给broker(redis),并返回任务执行结果 任务调用主要有两种,本质是一致的,delay是apply_async的封装,apply_async可以支持更多的任务调用配置
task.apply_async(args=[arg1, arg2], kwargs={'kwarg1': 'x', 'kwarg2': 'y'})
task.delay(arg1, arg2, kwarg1='x', kwarg2='y')
apply_async和delay会返回一个异步的任务结果,AsyncResult中存储了任务的执行状态和结果,常用的操作
value = result.get() # 任务返回值
print(result.__dict__) # 结果信息
print(result.successful()) # 是否成功
print(result.fail()) # 是否失败
print(result.ready()) # 是否执行完成
print(result.state) # 状态 PENDING -> STARTED -> SUCCESS/FAIL
常规任务:
from celery.utils.log import get_logger
from wedo.tasks import sum, mul, post_file
from celery import group, chain, chord
logger = get_logger(__name__)
try:
result = mul.apply_async(args=(2, 2))
value = result.get() # 等待任务执行完毕后,才会返回任务返回值
print(value)
except mul.OperationalError as exc: # 任务异常处理
logger.exception('Sending task raised: %r', exc)
组合任务:
多个任务并行执行, group
多个任务链式执行,chain:第一个任务的返回值作为第二个的输入参数,以此类推
result = group(sum.s(i, i) for i in range(5))()
result.get()
# [0, 2, 4, 6, 8]
result = chain(sum.s(1,2), sum.s(3), mul.s(3))()
result.get()
# ((1+2)+3)*3=18
4. 分布式集群部署
celery作为分布式的任务队列框架,worker是可以执行在不同的服务器上的。部署过程和单机上启动是一样。只要把项目代码copy到其他服务器,使用相同命令就可以了。可以思考下,这个是怎么实现的?对了,就是通过共享Broker队列。使用合适的队列,如redis,单进程单线程的方式可以有效的避免同个任务被不同worker同时执行的情况。
celery worker -A wedo -l debug -c 4
分布式集群如下:

5. 进阶使用
在前面已经了解了celery的主要的功能了。celery还为一些特别的场景提供了需要扩展的功能
5.1 任务状态跟踪和日志
有时候我们需要对任务的执行情况做一些监控,比如失败后报警通知。
celery在装饰器@app.task中提供了base参数,传入重写的Task模块,重新on_*函数就可以控制不同的任务结果
在@app.task提供bind=True,可以通过self获取Task中各种参数
self.request:任务的各种参数
self.update_state: 自定义任务状态, 原有的任务状态:PENDING -> STARTED -> SUCCESS, 如果你想了解STARTED -> SUCCESS之间的一个状态,比如执行的百分比之类,可以通过自定义状态来实现
self.retry: 重试
import celery
import time
from celery.utils.log import get_task_logger
from wedo import app
logger = logger = get_task_logger(__name__)
class TaskMonitor(celery.Task):
def on_failure(self, exc, task_id, args, kwargs, einfo):
"""failed callback"""
logger.info('task id: {0!r} failed: {1!r}'.format(task_id, exc))
def on_success(self, retval, task_id, args, kwargs):
"""success callback"""
logger.info('task id:{} , arg:{} , successful !'.format(task_id,args))
def on_retry(self, exc, task_id, args, kwargs, einfo):
"""retry callback"""
logger.info('task id:{} , arg:{} , retry ! einfo: {}'.format(task_id, args, exc))
@app.task(base=TaskMonitor, bind=True, name='post_file')
def post_file(self, file_names):
logger.info(self.request.__dict__)
try:
for i, file in enumerate(file_names):
print('the file %s is posted' % file)
if not self.request.called_directly:
self.update_state(state='PROGRESS',
meta={'current': i, 'total': len(file_names)})
time.sleep(2)
except Exception as exec:
raise self.retry(exc=exec, countdown=3, max_retries=5)
5.2 任务指定特定的worker执行
celery做为支持分布式,理论上可以无限扩展worker。默认情况下celery提交任务后,任务会放入名为celery的队列,所有在线的worker都会从任务队列中获取任务,任一个worker都有可能执行这个任务。有时候,有时候任务的特殊性或者机器本身的限制,某些任务只能跑在某些worker上。celery提供了queue在区别不同的worker,很好的支持这种情况。
启动worker时,-Q 指定worker支持的任务列队名, 可以支持多个队列名哦
celery worker -A wedo -l debug -c 4 -Q celery,hipri
任务调用时,
queue=*来指定需要执行worker
result = mul.apply_async(args=(2, 2), queue='hipri')
6. 任务队列监控
如果你想通过可视化的方式,查看celery的一切。flower提供可行的解决方案,十分的方便
flower -A wedo --port=6006
# web访问 http://10.8.238.2:6006/


7. 总结
本文和大家了介绍了分布式的队列celery, 妥妥的很全吧, 欢迎交流。总结下内容:
celery为分布式队列, 通过消息队列连接任务提交和执行者worker, 松耦合模式,可扩展
celery消息队列建议为redis
celery通过@app.task装饰把普通任务变成celery Task
celery worker 通过不同queue支持特定的worker消费特定的任务
@app.task中可以同步base和bind参数获取更过的控制任务生命周期
flower监控celery全过程
celery doc:https://docs.celeryproject.org/en/master/getting-started/index.html
完
往期推荐
文章好看点这里

边栏推荐
- 从当前文件夹打开 VS Code
- [learn FPGA programming from scratch -12]: quick start chapter - operation step 3 (functional simulation) -1-mentor HDL simulation tool Modelsim se 10.4 introduction and installation
- [in depth study of 4g/5g/6g topic -28]: 5g NR startup process 4.6 - msg5 (rrcsetupcomplete) message scheduling
- Award winning research
- Getting started with JDBC example
- Lit (I): create components
- Pytoch cpu/gpu installation method.
- [深入研究4G/5G/6G专题-27]: 5G NR开机流程4.5 - RRC连接应答消息MSG4、PUCCH上行控制信道首次调度、UCI与HARQ应答
- Google Earth Engine(GEE)——GPWv411:平均行政单位面积数据集
- LeetCode 力扣周赛 296
猜你喜欢

Lit (I): create components
![[semi supervised classification] semi supervised web page classification based on K-means and label+propagation](/img/3c/d709544542d3db8b28263f87f42f7e.png)
[semi supervised classification] semi supervised web page classification based on K-means and label+propagation

idea中怎样编写Scala代码

OpenAI新博客:训练大模型的技术

How to view Scala source code in idea

SSTI (template injection) - (7)

用80%的图表满足日常工作,用剩下20%的图表建立核心竞争力!

Open vs code from current folder
![[机缘参悟-18]:儒释道,不同人生阶段,不同选择](/img/3c/68eec2d6e29b703d2182b4cbba91e1.png)
[机缘参悟-18]:儒释道,不同人生阶段,不同选择

Assembly: assembly instruction classification
随机推荐
How to write Scala code in idea
[机缘参悟-20]:鬼谷子-捭阖篇-一开一合,一动一静,一张一弛谓之捭阖,职场应用。
golang学习之五:error、painc、recover
idea中怎样编写Scala代码
什么时候用@ComponentScan?与@MapperScan有什么区别?
每天都在写Getter、Setter方法,我不耐烦了,于是用了神器MapperStruct,crud效率一下子提高了! (最好使用硬编码)
Jmeter测试TCP百万连接
Is the online account opening channel reliable? Is it safe?
Fleeing Beijing, Shanghai and Guangzhou at the age of 35, and giving away food when unemployed at the age of 40, the "dignity" of middle-aged people lies in investing in themselves
阿里云首次年度盈利,国内云厂商何时迎来集体回报期?
SSTI (template injection) - (8)
Using VBA's WebBrowser control to realize single sign on (SSO) in Excel
7- computer essential tools
10n60-asemi medium and small power MOS transistor 10n60
epoll的两种模式和epoll反应堆
[in depth study of 4g/5g/6g topic -25]: 5g NR startup process 4.3 - first scheduling of RRC connection request message msg3/rrcsetuprequest and Pusch uplink channel
[深入研究4G/5G/6G专题-27]: 5G NR开机流程4.5 - RRC连接应答消息MSG4、PUCCH上行控制信道首次调度、UCI与HARQ应答
[in depth study of 4g/5g/6g topic -29]: 5g NR startup process 5.1 - NR network architecture, core network configuration of base stations
[机缘参悟-19]:鬼谷子-捭阖篇-阴阳即矛盾,是万事万物的基本属性,也是鬼谷子哲学的根基之一
Lit(一):创建组件