当前位置:网站首页>celery最佳实践
celery最佳实践
2022-07-06 10:20:00 【全栈程序员站长】
大家好,又见面了,我是全栈君。
作为一个Celery使用重度用户。看到Celery Best Practices这篇文章。不由得菊花一紧。
干脆翻译出来,同一时候也会添加我们项目中celery的实战经验。
至于Celery为何物,看这里Celery。
通常在使用Django的时候,你可能须要运行一些长时间的后台任务,没准你可能须要使用一些能排序的任务队列,那么Celery将会是一个非常好的选择。
当把Celery作为一个任务队列用于非常多项目中后,作者积累了一些最佳实践方式,譬如怎样用合适的方式使用Celery,以及一些Celery提供的可是还未充分使用的特性。
1,不要使用数据库作为你的AMQP Broker
数据库并非天生设计成能用于AMQP broker的。在生产环境下,它非常有可能在某时候当机(PS,当掉这点我认为不论什么系统都不能保证不当吧!!
。)。
作者猜想为啥非常多人使用数据库作为broker主要是由于他们已经有一个数据库用来给web app提供数据存储了。于是干脆直接拿来使用。设置成Celery的broker是非常easy的。而且不须要再安装其它组件(譬如RabbitMQ)。
假设有例如以下场景:你有4个后端workers去获取并处理放入到数据库里面的任务,这意味着你有4个进程为了获取最新任务,须要频繁地去轮询数据库。没准每一个worker同一时候还有多个自己的并发线程在干这事情。
某一天。你发现由于太多的任务产生。4个worker不够用了,处理任务的速度已经大大落后于生产任务的速度,于是你不停去添加worker的数量。突然,你的数据库由于大量进程轮询任务而变得响应缓慢,磁盘IO一直处于高峰值状态,你的web应用也開始受到影响。这一切,都由于workers在不停地对数据库进行DDOS。
而当你使用一个合适的AMQP(譬如RabbitMQ)的时候,这一切都不会发生,以RabbitMQ为例。首先,它将任务队列放到内存里面。你不须要去訪问硬盘。其次,consumers(也就是上面的worker)并不须要频繁地去轮询由于RabbitMQ能将新的任务推送给consumers。
当然,假设RabbitMQ真出现故障了。至少也不会影响到你的web应用。
这也就是作者说的不用数据库作为broker的原因,而且非常多地方都提供了编译好的RabbitMQ镜像,你都能直接使用,譬如这些。
对于这点。我是深表赞同的。我们系统大量使用Celery处理异步任务,大概平均一天几百万的异步任务,曾经我们使用的mysql。然后总会出现任务处理延时太严重的问题,即使添加了worker也不管用。于是我们使用了redis。性能提升了非常多。至于为啥使用mysql非常慢,我们没去深究,没准也还真出现了DDOS的问题。
2,使用很多其它的queue(不要仅仅用默认的)
Celery非常easy设置,通常它会使用默认的queue用来存放任务(除非你显示指定其它queue)。通常写法例如以下:
@app.task()
def my_taskA(a, b, c):
print("doing something here...")
@app.task()
def my_taskB(x, y):
print("doing something here...")
这两个任务都会在同一个queue里面运行。这样写事实上非常有吸引力的,由于你仅仅须要使用一个decorator就能实现一个异步任务。作者关心的是taskA和taskB没准是全然两个不同的东西,或者一个可能比还有一个更加重要,那么为什么要把它们放到一个篮子里面呢?(鸡蛋都不能放到一个篮子里面,是吧!)没准taskB事实上不怎么重要,可是量太多,以至于重要的taskA反而不能高速地被worker进行处理。添加workers也解决不了这个问题,由于taskA和taskB仍然在一个queue里面运行。
3。使用具有优先级的workers
为了解决2里面出现的问题,我们须要让taskA在一个队列Q1,而taskB在还有一个队列Q2运行。同一时候指定x workers去处理队列Q1的任务,然后使用其它的workers去处理队列Q2的任务。使用这样的方式,taskB可以获得足够的workers去处理,同一时候一些优先级workers也能非常好地处理taskA而不须要进行长时间的等待。
首先手动定义queue
CELERY_QUEUES = (
Queue('default', Exchange('default'), routing_key='default'),
Queue('for_task_A', Exchange('for_task_A'), routing_key='for_task_A'),
Queue('for_task_B', Exchange('for_task_B'), routing_key='for_task_B'),
)
然后定义routes用来决定不同的任务去哪一个queue
CELERY_ROUTES = {
'my_taskA': {'queue': 'for_task_A', 'routing_key': 'for_task_A'},
'my_taskB': {'queue': 'for_task_B', 'routing_key': 'for_task_B'},
}
最后再为每一个task启动不同的workerscelery worker -E -l INFO -n workerA -Q for_task_A celery worker -E -l INFO -n workerB -Q for_task_B
在我们项目中。会涉及到大量文件转换问题,有大量小于1mb的文件转换,同一时候也有少量将近20mb的文件转换。小文件转换的优先级是最高的,同一时候不用占用非常多时间,但大文件的转换非常耗时。假设将转换任务放到一个队列里面,那么非常有可能由于出现转换大文件,导致耗时太严重造成小文件转换延时的问题。
所以我们依照文件大小设置了3个优先队列。而且每一个队列设置了不同的workers。非常好地攻克了我们文件转换的问题。
4,使用Celery的错误处理机制
大多数任务并没有使用错误处理,假设任务失败,那就失败了。在一些情况下这非常不错。可是作者见到的多数失败任务都是去调用第三方API然后出现了网络错误。或者资源不可用这些错误。而对于这些错误。最简单的方式就是重试一下,或许就是第三方API暂时服务或者网络出现故障,没准立即就好了,那么为什么不试着重试一下呢?
@app.task(bind=True, default_retry_delay=300, max_retries=5)
def my_task_A():
try:
print("doing stuff here...")
except SomeNetworkException as e:
print("maybe do some clenup here....")
self.retry(e)
作者喜欢给每一个任务定义一个等待多久重试的时间,以及最大的重试次数。当然还有更具体的參数设置,自己看文档去。
对于错误处理,我们由于使用场景特殊,比如一个文件转换失败,那么不管多少次重试都会失败。所以没有添加重试机制。
5,使用Flower
Flower是一个非常强大的工具,用来监控celery的tasks和works。
这玩意我们也没怎么使用。由于多数时候我们都是直接连接redis去查看celery相关情况了。貌似挺傻逼的对不,尤其是celery在redis里面存放的数据并不能方便的取出来。
6,没事别太关注任务退出状态
一个任务状态就是该任务结束的时候成功还是失败信息,没准在一些统计场合,这非常实用。但我们须要知道。任务退出的状态并非该任务运行的结果,该任务运行的一些结果由于会对程序有影响,一般会被写入数据库(比如更新一个用户的朋友列表)。
作者见过的多数项目都将任务结束的状态存放到sqlite或者自己的数据库,可是存这些真有必要吗,没准可能影响到你的web服务的。所以作者通常设置CELERY_IGNORE_RESULT = True
去丢弃。
对于我们来说,由于是异步任务,知道任务运行完毕之后的状态真没啥用。所以果断丢弃。
7,不要给任务传递 Database/ORM 对象
这个事实上就是不要传递Database对象(比如一个用户的实例)给任务。由于没准序列化之后的数据已经是过期的数据了。
所以不妨直接传递一个user id,然后在任务运行的时候实时的从数据库获取。
对于这个,我们也是如此,给任务仅仅传递相关id数据。譬如文件转换的时候,我们仅仅会传递文件的id,而其它文件信息的获取我们都是直接通过该id从数据库里面取得。
最后
后面就是我们自己的感触了,上面作者提到的Celery的使用,真的可以算是非常好地实践方式。至少如今我们的Celery没出过太大的问题,当然小坑还是有的。至于RabbitMQ,这玩意我们是真没用过,效果怎么样不知道。至少比mysql好用吧。
最后。附上作者的一个Celery Talk https://denibertovic.com/talks/celery-best-practices/。
发布者:全栈程序员栈长,转载请注明出处:https://javaforall.cn/117403.html原文链接:https://javaforall.cn
边栏推荐
- Jerry's setting currently uses the dial. Switch the dial through this function [chapter]
- 当保存参数使用结构体时必备的开发技巧方式
- 2022 Summer Project Training (I)
- Redis的五种数据结构
- 转载:基于深度学习的工业品组件缺陷检测技术
- HMS Core 机器学习服务打造同传翻译新“声”态,AI让国际交流更顺畅
- Codeforces Round #803 (Div. 2)
- FMT open source self driving instrument | FMT middleware: a high real-time distributed log module Mlog
- DNS hijacking
- Transfer data to event object in wechat applet
猜你喜欢
传输层 拥塞控制-慢开始和拥塞避免 快重传 快恢复
On time and parameter selection of asemi rectifier bridge db207
Take you through ancient Rome, the meta universe bus is coming # Invisible Cities
当保存参数使用结构体时必备的开发技巧方式
微信为什么使用 SQLite 保存聊天记录?
Scratch epidemic isolation and nucleic acid detection Analog Electronics Society graphical programming scratch grade examination level 3 true questions and answers analysis June 2022
Jerry is the custom background specified by the currently used dial enable [chapter]
78 year old professor Huake has been chasing dreams for 40 years, and the domestic database reaches dreams to sprint for IPO
Excellent open source fonts for programmers
關於這次通信故障,我想多說幾句…
随机推荐
Windows连接Linux上安装的Redis
Fleet tutorial 13 basic introduction to listview's most commonly used scroll controls (tutorial includes source code)
F200——搭载基于模型设计的国产开源飞控系统无人机
从交互模型中蒸馏知识!中科大&美团提出VIRT,兼具双塔模型的效率和交互模型的性能,在文本匹配上实现性能和效率的平衡!...
面试突击62:group by 有哪些注意事项?
Jerry's watch deletes the existing dial file [chapter]
容器里用systemctl运行服务报错:Failed to get D-Bus connection: Operation not permitted(解决方法)
STM32+ESP8266+MQTT协议连接OneNet物联网平台
微信为什么使用 SQLite 保存聊天记录?
Interview shock 62: what are the precautions for group by?
Virtual machine VirtualBox and vagrant installation
Jerry's access to additional information on the dial [article]
Appium automated test scroll and drag_ and_ Drop slides according to element position
RB157-ASEMI整流桥RB157
关于这次通信故障,我想多说几句…
解读云原生技术
【Swoole系列2.1】先把Swoole跑起来
递归的方式
std::true_type和std::false_type
Insert dial file of Jerry's watch [chapter]