当前位置:网站首页>Distributed task queue: Celery usage record
Distributed task queue: Celery usage record
2022-07-01 18:03:00 【RS don't forget your original intention】
Catalog
Two 、 matters needing attention
1、Celery Use with password Redis
7、Celery + Flower The start-up problem of
9、 Mission task Can call each other , But you need to pay attention to parameter passing
One 、 Basic introduction
Github Address :https://github.com/celery/celery
The latest Chinese documents are hosted in Preface - Celery Chinese Manual in , Contains user guides 、 course 、API Interfaces, etc. .
Celery It's a very simple one 、 flexible 、 Reliable distributed system , Can be used to handle a large number of messages , It also provides a set of tools for operating the system .
Celery Is a message queuing tool , It can be used to process real-time data and task scheduling .
Two 、 matters needing attention
1、Celery Use with password Redis
celery Use a password to connect redis, among xxxxx It's a password , The password must be preceded by a colon .
BROKER_URL='redis://:[email protected]:6379/2'
2、 Store results
Please refer to : Use Redis - Celery Chinese Manual
If the backend uses resources to store results ( such as Redis), You must return each after calling the task AsyncResult The instance get() or forget() , Release resources . It means that after using the result data of the run , You need to delete the data , Otherwise, with the migration of time , The amount of data will soar .
If the task is :result = add.delay(4, 4), Then the way to release data can be :result.forget()
3、 Do not store results
This depends on the specific business scenario , If you don't care about the result , Or the execution of the task itself will have an impact on the data , The result of execution can be known by judging the data, so there is no need to return celery Exit status of the task , There are two situations :
Note that the restart takes effect after setting :
- All results are not saved : Profile Settings
CELERY_IGNORE_RESULT = True
- Specify that the task is not saved :ignore_result=True
@app.task(ignore_result=True)
def mytask(…):
something()
4、 About the use of queues
Suppose we have two 2 Mission , Namely add and subtract, We want to make add This addition task takes precedence over subtract Subtraction task is performed , We You can put two tasks in different queues , It's up to us to decide which task to perform first , We can configure this in the configuration file
app.conf.update(
result_expires=3600,
task_routes = {
"celery-demo.tasks.add": {"queue": "for_add", "routing_key": "for_add"},
"celery-demo.tasks.subtract": {"queue": "for_subtract", "routing_key": "for_subtract"},
},
)
I will add This function task is placed in a function called for_add In the queue , take subtract This function task is placed in a function called for_subtract In the queue .
If you start worker Handling only for_add The task of this queue , The start command is as follows :
celery -A celery_demo worker -Q for_add -l info
If you want to start worker At the same time for_add as well as for_subtract Queue tasks , The start command is as follows :
celery -A celery_demo worker -Q for_add,for_subtract -l info
explain : By default, tasks without a queue are generally placed in celery This queue is for processing ( The historical reason is named celery), Of course, it can also be modified through configuration .app.conf.task_default_queue = 'default'
Suppose we start a worker, And refers to treatment for_add Mission , as follows :
Ps: From the printed results, we can see that the message oriented middleware (broker) And storage result terminal (backend) We all use redis,DB Respectively 20 and 21.
Let's go now subtract function , as follows , Use Python Terminal :
>>> from tasks import subtract
>>>
>>> subtract.apply_async((2, 3), queue='for_subtract')
<AsyncResult: 4215043c-f2d4-4abc-bf32-78c63a8cdb46>
From message oriented middleware (broker) It can be seen that ,for_subtract There is a message in the queue that has not been processed ( a backlog ). This is in line with expectations , Because we only started one processing only add Mission worker.
10.xx.xxx.xxx:7555[20]> keys *
celery
_kombu.binding.for_add
_kombu.binding.hipri
hipri
_kombu.binding.celeryev
_kombu.binding.for_subtract
for_subtract
_kombu.binding.celery
_kombu.binding.lowpri
_kombu.binding.celery.pidbox
10.xx.xxx.xxx:7555[20]>
10.xx.xxx.xxx:7555[20]> LLEN for_subtract
1
10.xx.xxx.xxx:7555[20]>
10.xx.xxx.xxx:7555[20]> LRANGE for_subtract 0 -1
{"body": "W1syLCAzXSwge30sIHsiY2FsbGJhY2tzIjogbnVsbCwgImVycmJhY2tzIjogbnVsbCwgImNoYWluIjogbnVsbCwgImNob3JkIjogbnVsbH1d", "content-encoding": "utf-8", "content-type": "application/json", "headers": {"lang": "py", "task": "tasks.subtract", "id": "4b2279e9-9b62-4d45-84b5-ad8840fa5b12", "shadow": null, "eta": null, "expires": null, "group": null, "group_index": null, "retries": 0, "timelimit": [null, null], "root_id": "4b2279e9-9b62-4d45-84b5-ad8840fa5b12", "parent_id": null, "argsrepr": "(2, 3)", "kwargsrepr": "{}", "origin": "[email protected]"}, "properties": {"correlation_id": "4b2279e9-9b62-4d45-84b5-ad8840fa5b12", "reply_to": "6e2c2593-ff67-3bab-8c63-19e176a7fdd6", "delivery_mode": 2, "delivery_info": {"exchange": "", "routing_key": "for_subtract"}, "priority": 0, "body_encoding": "base64", "delivery_tag": "fd8a1a8c-19e2-4a97-ae2b-bd23fabf2bbd"}}
5、 Run as a daemons worker
Reference resources :Celery course - Run as a daemons worker
The customization script and configuration script described earlier in this article must use root user , Users with insufficient permissions cannot use initialization scripts , have access to celery multi Tools ( perhaps celery worker --detach).
Background start worker, have access to --detach, An example command is as follows :
celery -A celery_demo worker -Q for_add -l info --detach
Or use multi command :
celery multi start worker1 -A celery_demo worker -Q for_add --pidfile="/home/sokf/rs/celery-demo/pidflie/%n.pid" --logfile="/home/sokf/rs/celery-demo/log/%n%I.log"
6、Flower Use
Examples of persistent task commands are as follows :
celery -A proj flower --persistent=True --db=/flower/flower --max_tasks=100
This limits number of tasks that will be stored in the db. Once limit is reached, it will discard old tasks.
Reference resources :persistence - Flush Flower database occasionally and/or exit gracefully from Docker? - Stack Overflow
7、Celery + Flower The start-up problem of
I try to start with the following command flower:
celery flower --broker=redis://:[email protected]:75xx/20
The result is wrong :
[W 220215 16:13:11 connection:632] No hostname was supplied. Reverting to default 'localhost'
See the prompt message :
You have incorrectly specified the following celery arguments after flower command: ['--broker']. Please specify them after celery command instead following this template: celery [celery args] flower [flower args].
Obviously , mean ['--broker'] The configuration of is not correct , because "--broker" yes celery Parameters of , Should be placed in flower In front of , Then we modify the startup command :
celery flower --broker=redis://:[email protected]:75xx/20
Now it can start normally , And connect the specified broker 了 .
$ celery --broker='redis://:[email protected]:75xx/20' flower
[I 220215 16:25:03 command:154] Visit me at http://localhost:5555
[I 220215 16:25:03 command:159] Broker: redis://:**@10.xx.xxx.xxx:xx55/20
[I 220215 16:25:03 command:162] Registered tasks:
['celery.accumulate',
'celery.backend_cleanup',
'celery.chain',
'celery.chord',
'celery.chord_unlock',
'celery.chunks',
'celery.group',
'celery.map',
'celery.starmap']
[I 220215 16:25:03 mixins:226] Connected to redis://:**@10.xx.xxx.xxx:xx55/20
8、 Remember in the configuration file import Your task file , Otherwise, the following mistakes will be reported
It's easy to forget when adding a new task file , For example, we added a new debugging task file tasks_debug, This is the time import
Error message :Received unregistered task of type 'tasks_xxx.xxx'.The message has been ignored and discarded.
Did you remember to import the module containing this task?
Or maybe you're using relative imports?
imports = ("tasks", "tasks_debug")
9、 Mission task Can call each other , But you need to pay attention to parameter passing
Let's say we are now 2 A mission :job_one and job_set,job_one In the process of execution, it will call job_set The logic of , You need to use celery Method debugging , such as :job_set.apply_async, And when there is only one parameter, the following comma cannot be omitted , For details, please refer to the following code :
from __future__ import absolute_import, unicode_literals
from celery_app import app
@app.task
def job_one(x, y):
result = {"message": "success one", "code": 200, "data": {"add": x + y, "k_add": "v_add"}}
# The correct way to call , Pass it to another queue for processing as a message , Note that when there is only one parameter , The following comma cannot be less
job_set.apply_async((result, ), queue='for_job_set')
# Note that the following call is invalid
# job_set(result)
return result
@app.task
def job_set(x):
result = {"message": "success set", "code": 200, "data": x}
return result
10、 Get the apply_async Results of execution
Reference resources :【python Essays 】celery Asynchronous task and call return value
Reference resources :
边栏推荐
- Session layer of csframework, server and client (1)
- 2022 Heilongjiang latest fire protection facility operator simulation test question bank and answers
- Depth first traversal and breadth first traversal [easy to understand]
- Gameframework eating guide
- Function, condition, regular expression
- New 95 community system whole station source code
- js如何将带有分割符的字符串转化成一个n维数组
- MFC obtains local IP (used more in network communication)
- 聊聊项目经理最爱使用的工具
- JS how to convert a string with a delimiter into an n-dimensional array
猜你喜欢
2022 Heilongjiang latest fire protection facility operator simulation test question bank and answers
[splishsplash] about how to receive / display user parameters, MVC mode and genparam on GUI and JSON
New patent applications and transfers
Yuancosmos game farmersworld farmers world - core content of the second conference in China!
Heavy disclosure! Hundreds of important information systems have been invaded, and the host has become a key attack target
[C supplement] [string] display the schedule of a month by date
Check log4j problems using stain analysis
Data warehouse (3) star model and dimension modeling of data warehouse modeling
June issue | antdb database participated in the preparation of the "Database Development Research Report" and appeared on the list of information technology and entrepreneurship industries
The method of real-time tracking the current price of London Silver
随机推荐
Work and leisure suggestions of old programmers
Petrv2: a unified framework for 3D perception of multi camera images
2022 Heilongjiang latest fire protection facility operator simulation test question bank and answers
[Verilog quick start of Niuke network question brushing series] ~ priority encoder circuit ①
C language implementation of sum of two numbers [easy to understand]
Detailed explanation of ArrayList expansion
Easycvr accesses the equipment through the national standard gb28181 protocol. What is the reason for the automatic streaming of the equipment?
golang中的select详解
Is it reasonable and safe to open a securities account for 10000 shares free of charge? How to say
Key points on February 15, 2022
ISO 27001 Information Security Management System Certification
Kia recalls some K3 new energy with potential safety hazards
Rotation order and universal lock of unity panel
Detailed explanation of select in golang
Pyqt5, draw a histogram on the control
An example of data analysis of an old swatch and an old hard disk disassembly and assembly combined with the sensor of an electromagnetic press
Setting up a time server requires the client to automatically synchronize the time of the server at 9 a.m. every day
Irradiance, Joule energy, exercise habits
Technical secrets of ByteDance data platform: implementation and optimization of complex query based on Clickhouse
DNS