当前位置:网站首页>Celery | task queue artifact
Celery | task queue artifact
2022-06-10 04:20:00 【Python Institute】

author :wedo Experimenter
source :Python The Chinese community
1. What is? celery
celery It's a simple , flexible 、 A reliable distributed task execution framework , It can support the concurrent execution of a large number of tasks .celery Using typical producer and consumer models . The producer submits the task to the task queue , Many consumers take tasks from the task queue for execution .
1.1 celery framework
Celery It consists of the following three parts : Message middleware (Broker)、 Task execution unit Worker、 Result storage (Backend)

A task call submits a task execution request to Broker queue
If it is an asynchronous task ,worker The task is immediately taken from the queue and executed , The execution results are saved in Backend in
If it's a timed mission , Task by Celery Beat Processes periodically send tasks to Broker queue ,Worker Monitor the message queue in real time to get the task execution in the queue
1.2 Application scenarios
Asynchronous execution of a large number of long-time tasks , Such as uploading large files
Large scale real-time task execution , Support cluster deployment , For example, it supports highly concurrent machine learning reasoning
Scheduled task execution , Such as sending mail regularly , Regularly scan the operation of the machine
2. install
celery Very simple installation , In addition to installation celery, Used in this article redis As a message queue, i.e Broker
# celery install
pip install celery
# celery monitor flower
pip install flower
pip install redis
# redis install
yum install redis
# redis start-up
redis-server /etc/redis.conf
3. Complete example
celery There are four parts involved in the application development of
celery Instance initialization
Definition of task ( Scheduled and real-time tasks )
Mission worker Start of
Call of task
3.1 Project directory
# Project directory
wedo
.
├── config.py
├── __init__.py
├── period_task.py
└── tasks.py
3.2 celery Instance initialization
celery Instantiation , It mainly includes execution Broker and backend Access to , Declaration of task module, etc
# celery Instance initialization
# __init__.py
from celery import Celery
app = Celery('wedo') # establish Celery example
app.config_from_object('wedo.config')
# To configure wedo.config
# config.py
BROKER_URL = 'redis://10.8.238.2:6379/0' # Broker To configure , Use Redis As message middleware
CELERY_RESULT_BACKEND = 'redis://10.8.238.2:6379/0' # BACKEND To configure , Use here redis
CELERY_RESULT_SERIALIZER = 'json' # Result serialization scheme
CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24 # Task expiration time
CELERY_TIMEZONE='Asia/Shanghai' # Timezone configuration
CELERY_IMPORTS = ( # Specify the imported task module , You can specify multiple
'wedo.tasks',
'wedo.period_task'
)
3.3 Definition of task
celery Pass through @task To make a statement celery Mission , There is no difference in other operations
# Definition of task
# Simple task tasks.py
import celery
import time
from celery.utils.log import get_task_logger
from wedo import app
@app.task
def sum(x, y):
return x + y
@app.task
def mul(x, y):
time.sleep(5)
return x * y
The main difference between a scheduled task and a real-time task is to state when to execute the task , The task itself is also through task Decorator to declare When to perform a task has 2 Kind of
Specify the frequency of execution :sender.add_periodic_task( A unit of time and frequency s, Task function , name='to_string')
crontab The way : minute / Hours / God / month / Weekly granularity , It can support multiple scheduling
# Definition of task
# Timing task period_task.py
from wedo import app
from celery.schedules import crontab
@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
sender.add_periodic_task(5.0, to_string.s("celery peroid task"), name='to_string') # Every time 5 Seconds to perform add
sender.add_periodic_task(
crontab(minute='*/10'), # Every time 10 Once per minute
send_mail.s('hello, this is a celery'), name='send_mail'
)
@app.task
def send_mail(content):
print('send mail, content is %s' % content)
@app.task
def to_string(text):
return 'this is a %s' % text
3.4 Mission worker Start of
Task initiation is divided into worker Start and schedule tasks beat start-up
# -A wedo For the application module
# -l For log level
# -c Is the number of processes
celery worker -A wedo -l debug -c 4
# Background start
nohup celery worker -A wedo -l debug -c 4 > ./log.log 2>&1
# From the log below, we can see that 4 A mission
# . wedo.period_task.send_mail
# . wedo.period_task.to_string
# . wedo.tasks.mul
# . wedo.tasks.sum
-------------- [email protected] v4.4.2 (cliffs)
--- ***** -----
-- ******* ---- Linux-3.10.0-327.28.3.el7.x86_64-x86_64-with-centos-7.2.1511-Core 2020-04-25 23:35:26
- *** --- * ---
- ** ---------- [config]
- ** ---------- .> app: wedo:0x7f05af30d320
- ** ---------- .> transport: redis://10.8.238.2:6379/0
- ** ---------- .> results: redis://10.8.238.2:6379/0
- *** --- * --- .> concurrency: 4 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** -----
-------------- [queues]
.> celery exchange=celery(direct) key=celery
[tasks]
. celery.accumulate
. celery.backend_cleanup
...
. wedo.period_task.send_mail
. wedo.period_task.to_string
. wedo.tasks.mul
. wedo.tasks.sum
...
[2020-04-25 23:35:27,617: INFO/MainProcess] [email protected] ready.
[2020-04-25 23:35:27,617: DEBUG/MainProcess] basic.qos: prefetch_count->16
[2020-04-25 23:35:27,655: DEBUG/MainProcess] [email protected] joined the party
celery beat -A wedo.period_task
celery beat v4.4.2 (cliffs) is starting.
__ - ... __ - _
LocalTime -> 2020-04-25 23:37:08
Configuration ->
. broker -> redis://10.8.238.2:6379/0
. loader -> celery.loaders.app.AppLoader
. scheduler -> celery.beat.PersistentScheduler
. db -> celerybeat-schedule
. logfile -> [stderr]@%WARNING
. maxinterval -> 5.00 minutes (300s)
# worker Start up is 4 A process
\_ /root/anaconda3/envs/post/bin/celery worker -A wedo -l debug -c 4
\_ /root/anaconda3/envs/post/bin/celery worker -A wedo -l debug -c 4
\_ /root/anaconda3/envs/post/bin/celery worker -A wedo -l debug -c 4
\_ /root/anaconda3/envs/post/bin/celery worker -A wedo -l debug -c 4
\_ /root/anaconda3/envs/post/bin/celery worker -A wedo -l debug -c 4
worker and beat The stop of
ps auxww | awk '/celery worker/ {print $2}' | xargs kill -9
ps auxww | awk '/celery beat/ {print $2}' | xargs kill -9
3.5 Call of task
Mission worker It's started , Passed to by task call broker(redis), And return the task execution result There are two main types of task invocation , The essence is consistent ,delay yes apply_async Encapsulation ,apply_async More task call configurations can be supported
task.apply_async(args=[arg1, arg2], kwargs={'kwarg1': 'x', 'kwarg2': 'y'})
task.delay(arg1, arg2, kwarg1='x', kwarg2='y')
apply_async and delay An asynchronous task result will be returned ,AsyncResult The execution status and results of the task are stored in , Common operations
value = result.get() # Task return value
print(result.__dict__) # Result information
print(result.successful()) # The success of
print(result.fail()) # Failure or not
print(result.ready()) # Execution completed
print(result.state) # state PENDING -> STARTED -> SUCCESS/FAIL
Routine tasks :
from celery.utils.log import get_logger
from wedo.tasks import sum, mul, post_file
from celery import group, chain, chord
logger = get_logger(__name__)
try:
result = mul.apply_async(args=(2, 2))
value = result.get() # Wait until the task is completed , Will return the task return value
print(value)
except mul.OperationalError as exc: # Task exception handling
logger.exception('Sending task raised: %r', exc)
Combine tasks :
Multiple tasks are executed in parallel , group
Multiple tasks are executed in a chain ,chain: The return value of the first task is used as the input parameter of the second , And so on
result = group(sum.s(i, i) for i in range(5))()
result.get()
# [0, 2, 4, 6, 8]
result = chain(sum.s(1,2), sum.s(3), mul.s(3))()
result.get()
# ((1+2)+3)*3=18
4. Distributed cluster deployment
celery As a distributed task queue framework ,worker It can be executed on different servers . The deployment process is the same as starting on a single machine . Just put the project code copy To other servers , Just use the same command . Think about it , How did this come true ? by the way , Through sharing Broker queue . Use the appropriate queue , Such as redis, The single process and single thread method can effectively avoid the same task being different worker Simultaneous implementation .
celery worker -A wedo -l debug -c 4
The distributed cluster is as follows :

5. Advanced use
I've seen it before celery The main functions of .celery It also provides functions that need to be extended for some special scenarios
5.1 Task status tracking and logging
Sometimes we need to monitor the implementation of tasks , For example, alarm notification after failure .
celery In the decorator @app.task Provided in base Parameters , Pass in the rewritten Task modular , again on_* Function can control different task results
stay @app.task Provide bind=True, Can pass self obtain Task Various parameters in
self.request: Various parameters of the task
self.update_state: Custom task status , Original task status :PENDING -> STARTED -> SUCCESS, If you want to know STARTED -> SUCCESS A state between , Such as percentage of execution , This can be achieved by customizing the state
self.retry: retry
import celery
import time
from celery.utils.log import get_task_logger
from wedo import app
logger = logger = get_task_logger(__name__)
class TaskMonitor(celery.Task):
def on_failure(self, exc, task_id, args, kwargs, einfo):
"""failed callback"""
logger.info('task id: {0!r} failed: {1!r}'.format(task_id, exc))
def on_success(self, retval, task_id, args, kwargs):
"""success callback"""
logger.info('task id:{} , arg:{} , successful !'.format(task_id,args))
def on_retry(self, exc, task_id, args, kwargs, einfo):
"""retry callback"""
logger.info('task id:{} , arg:{} , retry ! einfo: {}'.format(task_id, args, exc))
@app.task(base=TaskMonitor, bind=True, name='post_file')
def post_file(self, file_names):
logger.info(self.request.__dict__)
try:
for i, file in enumerate(file_names):
print('the file %s is posted' % file)
if not self.request.called_directly:
self.update_state(state='PROGRESS',
meta={'current': i, 'total': len(file_names)})
time.sleep(2)
except Exception as exec:
raise self.retry(exc=exec, countdown=3, max_retries=5)
5.2 The task specifies a specific worker perform
celery To support distributed , Theoretically, it can be expanded infinitely worker. By default celery After submitting the task , The task will be put into a task named celery Queues , All online worker Will get tasks from the task queue , Any one worker It is possible to perform this task . occasionally , Sometimes the particularity of the task or the limitation of the machine itself , Some tasks can only run in some worker On .celery Provides queue In distinguishing between different worker, This situation is well supported .
start-up worker when ,-Q Appoint worker Supported task queue names , It can support multiple queue names
celery worker -A wedo -l debug -c 4 -Q celery,hipri
When a task is called ,
queue=*To specify what needs to be done worker
result = mul.apply_async(args=(2, 2), queue='hipri')
6. Task queue monitoring
If you want to visualize , see celery Everything .flower Provide feasible solutions , Very convenient
flower -A wedo --port=6006
# web visit http://10.8.238.2:6006/


7. summary
This article introduces the distributed queue celery, It's all right , Welcome to exchange . Summarize the following contents :
celery For distributed queues , Connect task submitters and performers through message queues worker, Loosely coupled mode , Scalable
celery The message queue recommendation is redis
celery adopt @app.task Decoration turns ordinary tasks into celery Task
celery worker Through difference queue Support specific worker Consume specific tasks
@app.task Can be synchronized in base and bind Parameter to get more control over the task life cycle
flower monitor celery Whole process
celery doc:https://docs.celeryproject.org/en/master/getting-started/index.html
End
Previous recommendation
Super easy to use Chrome plug-in unit !
Pampy | Powerful pattern matching tools
MeterSphere | Super easy to use open source testing platform
Doctest | Super simple unit test tool
9 Methods | Improve domestic visits GitHub The speed of
OS | A practical library that you despise !
shutil | High level file operations
coming , Here he comes , He finally came !
PrettyTable | Beautiful tables
Python-10- File read and write
The article looks better here

边栏推荐
- Quic and the future of Internet transmission
- [从零开始学习FPGA编程-13]:快速入门篇 - 操作步骤3(功能仿真)-2-Mentor HDL仿真工具modelsim工具的简介、功能仿真的基本原理
- Application of PhD debate 𞓜 self supervised learning in Recommendation System
- [essence of the trilogy of sub database and sub table]
- CVPR 2022 | indirect lighting modeling in inverse rendering
- 7- computer essential tools
- Pampy | 超强的模式匹配工具
- [in depth study of 4g/5g/6g topic -29]: 5g NR startup process 5.1 - NR network architecture, core network configuration of base stations
- [Error] anonymous type with no linkage used to declare function ‘bool InitSLinkList
- [laser principle and application-1]: what is a laser and its common applications
猜你喜欢

How to view Scala source code in idea

5- common tool management

tensorflow 中的 cross_entropy

分布式数据对象:超级终端的'全局变量'

FastApi-14-文件上传-2

这些编程语言老了,差不多死透了,年轻人不能碰

Rest assure framework tutorial

How to write Scala code in idea
![[graduation project 2] intelligent range hood system based on stm32](/img/b0/2ab9f22c939198f3cd5fcf11d87998.png)
[graduation project 2] intelligent range hood system based on stm32
![[laser principle and application-1]: what is a laser and its common applications](/img/90/d463b762e546154a6427404fa0de8d.jpg)
[laser principle and application-1]: what is a laser and its common applications
随机推荐
91. 栅栏
4-mirror address
23 个非常实用的 Shell 脚本
Leetcode 2001. Number of groups of interchangeable rectangles (brute force enumeration failed)
Today, 19:30 | graphics special session - Gao Lin's team from Institute of computing technology, Chinese Academy of Sciences
[in depth study of 4g/5g/6g topic -25]: 5g NR startup process 4.3 - first scheduling of RRC connection request message msg3/rrcsetuprequest and Pusch uplink channel
[laser principle and application-1]: what is a laser and its common applications
[image detection - edge detection] image edge extraction based on PCNN with matlab code
Distributed data object: HyperTerminal 'global variable'
[机缘参悟-20]:鬼谷子-反应篇-动静之术、说听结合、沉默是金
Lit(一):创建组件
FastApi-17-页面美化-2
[in depth study of 4g/5g/6g topic -24]: 5g NR startup process 4.2 - scheduling process and uplink synchronization of random access response message msg2, and time advance TA are sent through PDSCH cha
[in depth study of 4g/5g/6g topic -31]: 5g NR startup process 5.2 - ue attach process (registration request/accept/complete)
3- programming language -- golang website learning
JDBC 入門示例
Descriptor ready condition
Webcodecs解析GIF图
作为软件测试工程师,给年轻时的自己的建议(上)
Full path of idea replication method