当前位置:网站首页>Celery | task queue artifact

Celery | task queue artifact

2022-06-10 04:20:00 Python Institute


author :wedo Experimenter

source :Python The Chinese community

1. What is? celery

celery It's a simple , flexible 、 A reliable distributed task execution framework , It can support the concurrent execution of a large number of tasks .celery Using typical producer and consumer models . The producer submits the task to the task queue , Many consumers take tasks from the task queue for execution .

1.1 celery framework

Celery It consists of the following three parts : Message middleware (Broker)、 Task execution unit Worker、 Result storage (Backend)

  • A task call submits a task execution request to Broker queue

  • If it is an asynchronous task ,worker The task is immediately taken from the queue and executed , The execution results are saved in Backend in

  • If it's a timed mission , Task by Celery Beat Processes periodically send tasks to Broker queue ,Worker Monitor the message queue in real time to get the task execution in the queue

1.2 Application scenarios

  • Asynchronous execution of a large number of long-time tasks , Such as uploading large files

  • Large scale real-time task execution , Support cluster deployment , For example, it supports highly concurrent machine learning reasoning

  • Scheduled task execution , Such as sending mail regularly , Regularly scan the operation of the machine

2. install

celery Very simple installation , In addition to installation celery, Used in this article redis As a message queue, i.e Broker

# celery  install 
pip install celery
# celery  monitor  flower
pip install flower
pip install redis
# redis  install 
yum install redis
# redis start-up 
redis-server /etc/redis.conf

3. Complete example

celery There are four parts involved in the application development of

  • celery Instance initialization

  • Definition of task ( Scheduled and real-time tasks )

  • Mission worker Start of

  • Call of task

3.1 Project directory

#  Project directory 
wedo
.
├── config.py
├── __init__.py
├── period_task.py
└── tasks.py

3.2 celery Instance initialization

celery Instantiation , It mainly includes execution Broker and backend Access to , Declaration of task module, etc

# celery  Instance initialization  
# __init__.py
from celery import Celery
app = Celery('wedo')  #  establish  Celery  example 
app.config_from_object('wedo.config') 

#  To configure  wedo.config
# config.py
BROKER_URL = 'redis://10.8.238.2:6379/0' # Broker To configure , Use Redis As message middleware 
CELERY_RESULT_BACKEND = 'redis://10.8.238.2:6379/0' # BACKEND To configure , Use here redis
CELERY_RESULT_SERIALIZER = 'json' #  Result serialization scheme 
CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24 #  Task expiration time 
CELERY_TIMEZONE='Asia/Shanghai'   #  Timezone configuration 
CELERY_IMPORTS = (     #  Specify the imported task module , You can specify multiple 
    'wedo.tasks',
    'wedo.period_task'
)

3.3 Definition of task

celery Pass through @task To make a statement celery Mission , There is no difference in other operations

#  Definition of task 
#  Simple task   tasks.py
import celery
import time
from celery.utils.log import get_task_logger
from wedo import app

@app.task
def sum(x, y):
    return x + y

@app.task
def mul(x, y):
    time.sleep(5)
    return x * y

The main difference between a scheduled task and a real-time task is to state when to execute the task , The task itself is also through task Decorator to declare When to perform a task has 2 Kind of

  • Specify the frequency of execution :sender.add_periodic_task( A unit of time and frequency s, Task function , name='to_string')

  • crontab The way : minute / Hours / God / month / Weekly granularity , It can support multiple scheduling

#  Definition of task 
#  Timing task   period_task.py
from wedo import app
from celery.schedules import crontab

@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
    sender.add_periodic_task(5.0, to_string.s("celery peroid task"), name='to_string') #  Every time 5 Seconds to perform add
    sender.add_periodic_task(
        crontab(minute='*/10'),      # Every time 10 Once per minute 
        send_mail.s('hello, this is a celery'), name='send_mail'
    )

@app.task
def send_mail(content):
    print('send mail, content is %s' % content)

@app.task
def to_string(text):
    return 'this is a %s' % text

3.4 Mission worker Start of

Task initiation is divided into worker Start and schedule tasks beat start-up

# -A wedo For the application module 
# -l For log level
# -c  Is the number of processes 
celery worker -A wedo  -l debug -c 4

#  Background start 
nohup celery worker -A wedo -l debug -c 4 > ./log.log  2>&1

#  From the log below, we can see that 4 A mission 
#   . wedo.period_task.send_mail
#   . wedo.period_task.to_string
#   . wedo.tasks.mul
#   . wedo.tasks.sum

 -------------- [email protected] v4.4.2 (cliffs)
--- ***** ----- 
-- ******* ---- Linux-3.10.0-327.28.3.el7.x86_64-x86_64-with-centos-7.2.1511-Core 2020-04-25 23:35:26
- *** --- * --- 
- ** ---------- [config]
- ** ---------- .> app:         wedo:0x7f05af30d320
- ** ---------- .> transport:   redis://10.8.238.2:6379/0
- ** ---------- .> results:     redis://10.8.238.2:6379/0
- *** --- * --- .> concurrency: 4 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** ----- 
 -------------- [queues]
                .> celery           exchange=celery(direct) key=celery
                

[tasks]
  . celery.accumulate
  . celery.backend_cleanup
...
  . wedo.period_task.send_mail
  . wedo.period_task.to_string
  . wedo.tasks.mul
  . wedo.tasks.sum
...
[2020-04-25 23:35:27,617: INFO/MainProcess] [email protected] ready.
[2020-04-25 23:35:27,617: DEBUG/MainProcess] basic.qos: prefetch_count->16
[2020-04-25 23:35:27,655: DEBUG/MainProcess] [email protected] joined the party

celery beat -A wedo.period_task

celery beat v4.4.2 (cliffs) is starting.
__    -    ... __   -        _
LocalTime -> 2020-04-25 23:37:08
Configuration ->
    . broker -> redis://10.8.238.2:6379/0
    . loader -> celery.loaders.app.AppLoader
    . scheduler -> celery.beat.PersistentScheduler
    . db -> celerybeat-schedule
    . logfile -> [stderr]@%WARNING
    . maxinterval -> 5.00 minutes (300s)
# worker Start up is 4 A process 
\_  /root/anaconda3/envs/post/bin/celery worker -A wedo -l debug -c 4    
    \_  /root/anaconda3/envs/post/bin/celery worker -A wedo -l debug -c 4
    \_  /root/anaconda3/envs/post/bin/celery worker -A wedo -l debug -c 4
    \_  /root/anaconda3/envs/post/bin/celery worker -A wedo -l debug -c 4
    \_  /root/anaconda3/envs/post/bin/celery worker -A wedo -l debug -c 4

worker and beat The stop of

ps auxww | awk '/celery worker/ {print $2}' | xargs kill -9
ps auxww | awk '/celery beat/ {print $2}' | xargs kill -9

3.5 Call of task

Mission worker It's started , Passed to by task call broker(redis), And return the task execution result There are two main types of task invocation , The essence is consistent ,delay yes apply_async Encapsulation ,apply_async More task call configurations can be supported

  • task.apply_async(args=[arg1, arg2], kwargs={'kwarg1': 'x', 'kwarg2': 'y'})

  • task.delay(arg1, arg2, kwarg1='x', kwarg2='y')

apply_async and delay An asynchronous task result will be returned ,AsyncResult The execution status and results of the task are stored in , Common operations

value = result.get() #  Task return value 
print(result.__dict__) #  Result information 
print(result.successful()) #  The success of 
print(result.fail()) #  Failure or not 
print(result.ready()) #  Execution completed 
print(result.state) #  state  PENDING -> STARTED -> SUCCESS/FAIL

Routine tasks :

from celery.utils.log import get_logger
from wedo.tasks import sum, mul, post_file
from celery import group, chain, chord
logger = get_logger(__name__)
try:
    result = mul.apply_async(args=(2, 2))
    value = result.get() #  Wait until the task is completed , Will return the task return value 
    print(value)
except mul.OperationalError as exc: #  Task exception handling 
    logger.exception('Sending task raised: %r', exc)

Combine tasks :

  • Multiple tasks are executed in parallel , group

  • Multiple tasks are executed in a chain ,chain: The return value of the first task is used as the input parameter of the second , And so on

result = group(sum.s(i, i) for i in range(5))()
result.get()
# [0, 2, 4, 6, 8]
result = chain(sum.s(1,2), sum.s(3), mul.s(3))()
result.get()
# ((1+2)+3)*3=18

4. Distributed cluster deployment

celery As a distributed task queue framework ,worker It can be executed on different servers . The deployment process is the same as starting on a single machine . Just put the project code copy To other servers , Just use the same command . Think about it , How did this come true ? by the way , Through sharing Broker queue . Use the appropriate queue , Such as redis, The single process and single thread method can effectively avoid the same task being different worker Simultaneous implementation .

celery worker -A wedo  -l debug -c 4
  • The distributed cluster is as follows :

5. Advanced use

I've seen it before celery The main functions of .celery It also provides functions that need to be extended for some special scenarios

5.1 Task status tracking and logging

Sometimes we need to monitor the implementation of tasks , For example, alarm notification after failure .

  • celery In the decorator @app.task Provided in base Parameters , Pass in the rewritten Task modular , again on_* Function can control different task results

  • stay @app.task Provide bind=True, Can pass self obtain Task Various parameters in

    • self.request: Various parameters of the task

    • self.update_state: Custom task status , Original task status :PENDING -> STARTED -> SUCCESS, If you want to know STARTED -> SUCCESS A state between , Such as percentage of execution , This can be achieved by customizing the state

    • self.retry: retry

import celery
import time
from celery.utils.log import get_task_logger
from wedo import app

logger = logger = get_task_logger(__name__)
class TaskMonitor(celery.Task):
    def on_failure(self, exc, task_id, args, kwargs, einfo):
        """failed callback"""
        logger.info('task id: {0!r} failed: {1!r}'.format(task_id, exc))

    def on_success(self, retval, task_id, args, kwargs):
        """success callback"""
        logger.info('task id:{} , arg:{} , successful !'.format(task_id,args))

    def on_retry(self, exc, task_id, args, kwargs, einfo):
        """retry callback"""
        logger.info('task id:{} , arg:{} , retry !  einfo: {}'.format(task_id, args, exc))

@app.task(base=TaskMonitor, bind=True, name='post_file')
def post_file(self, file_names):
    logger.info(self.request.__dict__)
    try:
        for i, file in enumerate(file_names):
            print('the file %s is posted' % file)
            if not self.request.called_directly:
                self.update_state(state='PROGRESS',
                    meta={'current': i, 'total': len(file_names)})
            time.sleep(2)
    except Exception as exec:
        raise self.retry(exc=exec, countdown=3, max_retries=5)

5.2 The task specifies a specific worker perform

celery To support distributed , Theoretically, it can be expanded infinitely worker. By default celery After submitting the task , The task will be put into a task named celery Queues , All online worker Will get tasks from the task queue , Any one worker It is possible to perform this task . occasionally , Sometimes the particularity of the task or the limitation of the machine itself , Some tasks can only run in some worker On .celery Provides queue In distinguishing between different worker, This situation is well supported .

  • start-up worker when ,-Q Appoint worker Supported task queue names , It can support multiple queue names

celery worker -A wedo  -l debug -c 4 -Q celery,hipri
  • When a task is called , queue=* To specify what needs to be done worker

result = mul.apply_async(args=(2, 2), queue='hipri')

6. Task queue monitoring

If you want to visualize , see celery Everything .flower Provide feasible solutions , Very convenient

flower -A wedo --port=6006
# web visit  http://10.8.238.2:6006/

7. summary

This article introduces the distributed queue celery, It's all right , Welcome to exchange . Summarize the following contents :

  • celery For distributed queues , Connect task submitters and performers through message queues worker, Loosely coupled mode , Scalable

  • celery The message queue recommendation is redis

  • celery adopt @app.task Decoration turns ordinary tasks into celery Task

  • celery worker Through difference queue Support specific worker Consume specific tasks

  • @app.task Can be synchronized in base and bind Parameter to get more control over the task life cycle

  • flower monitor celery Whole process

  • celery doc:https://docs.celeryproject.org/en/master/getting-started/index.html

End

Previous recommendation

Super easy to use Chrome plug-in unit !

Pampy | Powerful pattern matching tools

MeterSphere | Super easy to use open source testing platform

Doctest | Super simple unit test tool

9 Methods | Improve domestic visits GitHub The speed of

OS | A practical library that you despise !

shutil | High level file operations

coming , Here he comes , He finally came !

PrettyTable | Beautiful tables

Python-10- File read and write

The article looks better here

原网站

版权声明
本文为[Python Institute]所创,转载请带上原文链接,感谢
https://yzsam.com/2022/161/202206100411425651.html