当前位置:网站首页>Distributed task queue: Celery usage record
Distributed task queue: Celery usage record
2022-07-01 18:03:00 【RS don't forget your original intention】
Catalog
Two 、 matters needing attention
1、Celery Use with password Redis
7、Celery + Flower The start-up problem of
9、 Mission task Can call each other , But you need to pay attention to parameter passing
One 、 Basic introduction
Github Address :https://github.com/celery/celery
The latest Chinese documents are hosted in Preface - Celery Chinese Manual in , Contains user guides 、 course 、API Interfaces, etc. .
Celery It's a very simple one 、 flexible 、 Reliable distributed system , Can be used to handle a large number of messages , It also provides a set of tools for operating the system .
Celery Is a message queuing tool , It can be used to process real-time data and task scheduling .
Two 、 matters needing attention
1、Celery Use with password Redis
celery Use a password to connect redis, among xxxxx It's a password , The password must be preceded by a colon .
BROKER_URL='redis://:[email protected]:6379/2'
2、 Store results
Please refer to : Use Redis - Celery Chinese Manual
If the backend uses resources to store results ( such as Redis), You must return each after calling the task AsyncResult The instance get() or forget() , Release resources . It means that after using the result data of the run , You need to delete the data , Otherwise, with the migration of time , The amount of data will soar .
If the task is :result = add.delay(4, 4), Then the way to release data can be :result.forget()
3、 Do not store results
This depends on the specific business scenario , If you don't care about the result , Or the execution of the task itself will have an impact on the data , The result of execution can be known by judging the data, so there is no need to return celery Exit status of the task , There are two situations :
Note that the restart takes effect after setting :
- All results are not saved : Profile Settings
CELERY_IGNORE_RESULT = True
- Specify that the task is not saved :ignore_result=True
@app.task(ignore_result=True)
def mytask(…):
something()
4、 About the use of queues
Suppose we have two 2 Mission , Namely add and subtract, We want to make add This addition task takes precedence over subtract Subtraction task is performed , We You can put two tasks in different queues , It's up to us to decide which task to perform first , We can configure this in the configuration file
app.conf.update(
result_expires=3600,
task_routes = {
"celery-demo.tasks.add": {"queue": "for_add", "routing_key": "for_add"},
"celery-demo.tasks.subtract": {"queue": "for_subtract", "routing_key": "for_subtract"},
},
)
I will add This function task is placed in a function called for_add In the queue , take subtract This function task is placed in a function called for_subtract In the queue .
If you start worker Handling only for_add The task of this queue , The start command is as follows :
celery -A celery_demo worker -Q for_add -l info
If you want to start worker At the same time for_add as well as for_subtract Queue tasks , The start command is as follows :
celery -A celery_demo worker -Q for_add,for_subtract -l info
explain : By default, tasks without a queue are generally placed in celery This queue is for processing ( The historical reason is named celery), Of course, it can also be modified through configuration .app.conf.task_default_queue = 'default'
Suppose we start a worker, And refers to treatment for_add Mission , as follows :
Ps: From the printed results, we can see that the message oriented middleware (broker) And storage result terminal (backend) We all use redis,DB Respectively 20 and 21.
Let's go now subtract function , as follows , Use Python Terminal :
>>> from tasks import subtract
>>>
>>> subtract.apply_async((2, 3), queue='for_subtract')
<AsyncResult: 4215043c-f2d4-4abc-bf32-78c63a8cdb46>
From message oriented middleware (broker) It can be seen that ,for_subtract There is a message in the queue that has not been processed ( a backlog ). This is in line with expectations , Because we only started one processing only add Mission worker.
10.xx.xxx.xxx:7555[20]> keys *
celery
_kombu.binding.for_add
_kombu.binding.hipri
hipri
_kombu.binding.celeryev
_kombu.binding.for_subtract
for_subtract
_kombu.binding.celery
_kombu.binding.lowpri
_kombu.binding.celery.pidbox
10.xx.xxx.xxx:7555[20]>
10.xx.xxx.xxx:7555[20]> LLEN for_subtract
1
10.xx.xxx.xxx:7555[20]>
10.xx.xxx.xxx:7555[20]> LRANGE for_subtract 0 -1
{"body": "W1syLCAzXSwge30sIHsiY2FsbGJhY2tzIjogbnVsbCwgImVycmJhY2tzIjogbnVsbCwgImNoYWluIjogbnVsbCwgImNob3JkIjogbnVsbH1d", "content-encoding": "utf-8", "content-type": "application/json", "headers": {"lang": "py", "task": "tasks.subtract", "id": "4b2279e9-9b62-4d45-84b5-ad8840fa5b12", "shadow": null, "eta": null, "expires": null, "group": null, "group_index": null, "retries": 0, "timelimit": [null, null], "root_id": "4b2279e9-9b62-4d45-84b5-ad8840fa5b12", "parent_id": null, "argsrepr": "(2, 3)", "kwargsrepr": "{}", "origin": "[email protected]"}, "properties": {"correlation_id": "4b2279e9-9b62-4d45-84b5-ad8840fa5b12", "reply_to": "6e2c2593-ff67-3bab-8c63-19e176a7fdd6", "delivery_mode": 2, "delivery_info": {"exchange": "", "routing_key": "for_subtract"}, "priority": 0, "body_encoding": "base64", "delivery_tag": "fd8a1a8c-19e2-4a97-ae2b-bd23fabf2bbd"}}
5、 Run as a daemons worker
Reference resources :Celery course - Run as a daemons worker
The customization script and configuration script described earlier in this article must use root user , Users with insufficient permissions cannot use initialization scripts , have access to celery multi Tools ( perhaps celery worker --detach).
Background start worker, have access to --detach, An example command is as follows :
celery -A celery_demo worker -Q for_add -l info --detach
Or use multi command :
celery multi start worker1 -A celery_demo worker -Q for_add --pidfile="/home/sokf/rs/celery-demo/pidflie/%n.pid" --logfile="/home/sokf/rs/celery-demo/log/%n%I.log"
6、Flower Use
Examples of persistent task commands are as follows :
celery -A proj flower --persistent=True --db=/flower/flower --max_tasks=100
This limits number of tasks that will be stored in the db. Once limit is reached, it will discard old tasks.
Reference resources :persistence - Flush Flower database occasionally and/or exit gracefully from Docker? - Stack Overflow
7、Celery + Flower The start-up problem of
I try to start with the following command flower:
celery flower --broker=redis://:[email protected]:75xx/20
The result is wrong :
[W 220215 16:13:11 connection:632] No hostname was supplied. Reverting to default 'localhost'
See the prompt message :
You have incorrectly specified the following celery arguments after flower command: ['--broker']. Please specify them after celery command instead following this template: celery [celery args] flower [flower args].
Obviously , mean ['--broker'] The configuration of is not correct , because "--broker" yes celery Parameters of , Should be placed in flower In front of , Then we modify the startup command :
celery flower --broker=redis://:[email protected]:75xx/20
Now it can start normally , And connect the specified broker 了 .
$ celery --broker='redis://:[email protected]:75xx/20' flower
[I 220215 16:25:03 command:154] Visit me at http://localhost:5555
[I 220215 16:25:03 command:159] Broker: redis://:**@10.xx.xxx.xxx:xx55/20
[I 220215 16:25:03 command:162] Registered tasks:
['celery.accumulate',
'celery.backend_cleanup',
'celery.chain',
'celery.chord',
'celery.chord_unlock',
'celery.chunks',
'celery.group',
'celery.map',
'celery.starmap']
[I 220215 16:25:03 mixins:226] Connected to redis://:**@10.xx.xxx.xxx:xx55/20
8、 Remember in the configuration file import Your task file , Otherwise, the following mistakes will be reported
It's easy to forget when adding a new task file , For example, we added a new debugging task file tasks_debug, This is the time import
Error message :Received unregistered task of type 'tasks_xxx.xxx'.The message has been ignored and discarded.
Did you remember to import the module containing this task?
Or maybe you're using relative imports?
imports = ("tasks", "tasks_debug")
9、 Mission task Can call each other , But you need to pay attention to parameter passing
Let's say we are now 2 A mission :job_one and job_set,job_one In the process of execution, it will call job_set The logic of , You need to use celery Method debugging , such as :job_set.apply_async, And when there is only one parameter, the following comma cannot be omitted , For details, please refer to the following code :
from __future__ import absolute_import, unicode_literals
from celery_app import app
@app.task
def job_one(x, y):
result = {"message": "success one", "code": 200, "data": {"add": x + y, "k_add": "v_add"}}
# The correct way to call , Pass it to another queue for processing as a message , Note that when there is only one parameter , The following comma cannot be less
job_set.apply_async((result, ), queue='for_job_set')
# Note that the following call is invalid
# job_set(result)
return result
@app.task
def job_set(x):
result = {"message": "success set", "code": 200, "data": x}
return result
10、 Get the apply_async Results of execution
Reference resources :【python Essays 】celery Asynchronous task and call return value
Reference resources :
边栏推荐
- Check log4j problems using stain analysis
- Yolov5 practice: teach object detection by hand
- Small exercise -- subnet division and summary
- Redis -- data type and operation
- Sword finger offer II 105 Maximum area of the island
- MySQL + JSON = King fried
- Htt [ripro network disk link detection plug-in] currently supports four common network disks
- Common design parameters of solid rocket motor
- Enter wechat applet
- Nielseniq found that 60% of the re launched products had poor returns
猜你喜欢
From comedians to NBA Zhan Huang, check the encrypted advertisements during this super bowl
SQL injection vulnerability (MySQL and MSSQL features)
Apache iceberg source code analysis: schema evolution
Wechat applet blind box - docking wechat payment
[Verilog quick start of Niuke network question brushing series] ~ priority encoder circuit ①
Penetration practice vulnhub range Keyring
Setting up a time server requires the client to automatically synchronize the time of the server at 9 a.m. every day
How to use JMeter function and mockjs function in metersphere interface test
DNS
Intelligent operation and maintenance practice: banking business process and single transaction tracking
随机推荐
ACL 2022 | decomposed meta learning small sample named entity recognition
In depth Research Report on China's disposable sanitary products production equipment industry (2022 Edition)
Samba basic usage
Leetcode 1380. Lucky numbers in the matrix (save the minimum number of each row and the maximum number of each column)
Detailed explanation of ArrayList expansion
China metallocene polyethylene (MPE) Industry Research Report (2022 Edition)
This is the latest opportunity of the London bank trend
Redis master-slave realizes 10 second check and recovery
Rotation order and universal lock of unity panel
China biodegradable plastics market forecast and investment strategy report (2022 Edition)
Source code of new campus errand / campus task platform on mutual station
The method of real-time tracking the current price of London Silver
Zabbix报警执行远程命令
Nearly 60% of the employees strongly support Ctrip's "3+2" working mode, and work at home for two days a week
transform. Forward and vector3 Differences in the use of forward
Gameframework eating guide
[2. Basics of Delphi grammar] 4 Object Pascal operators and expressions
Thinkphp6 - CMS multi wechat management system source code
Is it safe to open a stock account by mobile phone? What do you need to bring with you to open an account?
Is online stock account opening safe? Is it reliable?