当前位置:网站首页>Distributed task queue: Celery usage record
Distributed task queue: Celery usage record
2022-07-01 18:03:00 【RS don't forget your original intention】
Catalog
Two 、 matters needing attention
1、Celery Use with password Redis
7、Celery + Flower The start-up problem of
9、 Mission task Can call each other , But you need to pay attention to parameter passing
One 、 Basic introduction
Github Address :https://github.com/celery/celery
The latest Chinese documents are hosted in Preface - Celery Chinese Manual in , Contains user guides 、 course 、API Interfaces, etc. .
Celery It's a very simple one 、 flexible 、 Reliable distributed system , Can be used to handle a large number of messages , It also provides a set of tools for operating the system .
Celery Is a message queuing tool , It can be used to process real-time data and task scheduling .
Two 、 matters needing attention
1、Celery Use with password Redis
celery Use a password to connect redis, among xxxxx It's a password , The password must be preceded by a colon .
BROKER_URL='redis://:[email protected]:6379/2'2、 Store results
Please refer to : Use Redis - Celery Chinese Manual
If the backend uses resources to store results ( such as Redis), You must return each after calling the task AsyncResult The instance get() or forget() , Release resources . It means that after using the result data of the run , You need to delete the data , Otherwise, with the migration of time , The amount of data will soar .
If the task is :result = add.delay(4, 4), Then the way to release data can be :result.forget()
3、 Do not store results
This depends on the specific business scenario , If you don't care about the result , Or the execution of the task itself will have an impact on the data , The result of execution can be known by judging the data, so there is no need to return celery Exit status of the task , There are two situations :
Note that the restart takes effect after setting :
- All results are not saved : Profile Settings
CELERY_IGNORE_RESULT = True- Specify that the task is not saved :ignore_result=True
@app.task(ignore_result=True)
def mytask(…):
something()4、 About the use of queues
Suppose we have two 2 Mission , Namely add and subtract, We want to make add This addition task takes precedence over subtract Subtraction task is performed , We You can put two tasks in different queues , It's up to us to decide which task to perform first , We can configure this in the configuration file
app.conf.update(
result_expires=3600,
task_routes = {
"celery-demo.tasks.add": {"queue": "for_add", "routing_key": "for_add"},
"celery-demo.tasks.subtract": {"queue": "for_subtract", "routing_key": "for_subtract"},
},
)I will add This function task is placed in a function called for_add In the queue , take subtract This function task is placed in a function called for_subtract In the queue .
If you start worker Handling only for_add The task of this queue , The start command is as follows :
celery -A celery_demo worker -Q for_add -l infoIf you want to start worker At the same time for_add as well as for_subtract Queue tasks , The start command is as follows :
celery -A celery_demo worker -Q for_add,for_subtract -l infoexplain : By default, tasks without a queue are generally placed in celery This queue is for processing ( The historical reason is named celery), Of course, it can also be modified through configuration .app.conf.task_default_queue = 'default'
Suppose we start a worker, And refers to treatment for_add Mission , as follows :
Ps: From the printed results, we can see that the message oriented middleware (broker) And storage result terminal (backend) We all use redis,DB Respectively 20 and 21.

Let's go now subtract function , as follows , Use Python Terminal :
>>> from tasks import subtract
>>>
>>> subtract.apply_async((2, 3), queue='for_subtract')
<AsyncResult: 4215043c-f2d4-4abc-bf32-78c63a8cdb46>From message oriented middleware (broker) It can be seen that ,for_subtract There is a message in the queue that has not been processed ( a backlog ). This is in line with expectations , Because we only started one processing only add Mission worker.
10.xx.xxx.xxx:7555[20]> keys *
celery
_kombu.binding.for_add
_kombu.binding.hipri
hipri
_kombu.binding.celeryev
_kombu.binding.for_subtract
for_subtract
_kombu.binding.celery
_kombu.binding.lowpri
_kombu.binding.celery.pidbox
10.xx.xxx.xxx:7555[20]>
10.xx.xxx.xxx:7555[20]> LLEN for_subtract
1
10.xx.xxx.xxx:7555[20]>
10.xx.xxx.xxx:7555[20]> LRANGE for_subtract 0 -1
{"body": "W1syLCAzXSwge30sIHsiY2FsbGJhY2tzIjogbnVsbCwgImVycmJhY2tzIjogbnVsbCwgImNoYWluIjogbnVsbCwgImNob3JkIjogbnVsbH1d", "content-encoding": "utf-8", "content-type": "application/json", "headers": {"lang": "py", "task": "tasks.subtract", "id": "4b2279e9-9b62-4d45-84b5-ad8840fa5b12", "shadow": null, "eta": null, "expires": null, "group": null, "group_index": null, "retries": 0, "timelimit": [null, null], "root_id": "4b2279e9-9b62-4d45-84b5-ad8840fa5b12", "parent_id": null, "argsrepr": "(2, 3)", "kwargsrepr": "{}", "origin": "[email protected]"}, "properties": {"correlation_id": "4b2279e9-9b62-4d45-84b5-ad8840fa5b12", "reply_to": "6e2c2593-ff67-3bab-8c63-19e176a7fdd6", "delivery_mode": 2, "delivery_info": {"exchange": "", "routing_key": "for_subtract"}, "priority": 0, "body_encoding": "base64", "delivery_tag": "fd8a1a8c-19e2-4a97-ae2b-bd23fabf2bbd"}}5、 Run as a daemons worker
Reference resources :Celery course - Run as a daemons worker
The customization script and configuration script described earlier in this article must use root user , Users with insufficient permissions cannot use initialization scripts , have access to celery multi Tools ( perhaps celery worker --detach).
Background start worker, have access to --detach, An example command is as follows :
celery -A celery_demo worker -Q for_add -l info --detachOr use multi command :
celery multi start worker1 -A celery_demo worker -Q for_add --pidfile="/home/sokf/rs/celery-demo/pidflie/%n.pid" --logfile="/home/sokf/rs/celery-demo/log/%n%I.log"6、Flower Use
Examples of persistent task commands are as follows :
celery -A proj flower --persistent=True --db=/flower/flower --max_tasks=100This limits number of tasks that will be stored in the db. Once limit is reached, it will discard old tasks.
Reference resources :persistence - Flush Flower database occasionally and/or exit gracefully from Docker? - Stack Overflow
7、Celery + Flower The start-up problem of
I try to start with the following command flower:
celery flower --broker=redis://:[email protected]:75xx/20The result is wrong :
[W 220215 16:13:11 connection:632] No hostname was supplied. Reverting to default 'localhost'See the prompt message :
You have incorrectly specified the following celery arguments after flower command: ['--broker']. Please specify them after celery command instead following this template: celery [celery args] flower [flower args].Obviously , mean ['--broker'] The configuration of is not correct , because "--broker" yes celery Parameters of , Should be placed in flower In front of , Then we modify the startup command :
celery flower --broker=redis://:[email protected]:75xx/20Now it can start normally , And connect the specified broker 了 .
$ celery --broker='redis://:[email protected]:75xx/20' flower
[I 220215 16:25:03 command:154] Visit me at http://localhost:5555
[I 220215 16:25:03 command:159] Broker: redis://:**@10.xx.xxx.xxx:xx55/20
[I 220215 16:25:03 command:162] Registered tasks:
['celery.accumulate',
'celery.backend_cleanup',
'celery.chain',
'celery.chord',
'celery.chord_unlock',
'celery.chunks',
'celery.group',
'celery.map',
'celery.starmap']
[I 220215 16:25:03 mixins:226] Connected to redis://:**@10.xx.xxx.xxx:xx55/208、 Remember in the configuration file import Your task file , Otherwise, the following mistakes will be reported
It's easy to forget when adding a new task file , For example, we added a new debugging task file tasks_debug, This is the time import
Error message :Received unregistered task of type 'tasks_xxx.xxx'.The message has been ignored and discarded.
Did you remember to import the module containing this task?
Or maybe you're using relative imports?
imports = ("tasks", "tasks_debug")9、 Mission task Can call each other , But you need to pay attention to parameter passing
Let's say we are now 2 A mission :job_one and job_set,job_one In the process of execution, it will call job_set The logic of , You need to use celery Method debugging , such as :job_set.apply_async, And when there is only one parameter, the following comma cannot be omitted , For details, please refer to the following code :
from __future__ import absolute_import, unicode_literals
from celery_app import app
@app.task
def job_one(x, y):
result = {"message": "success one", "code": 200, "data": {"add": x + y, "k_add": "v_add"}}
# The correct way to call , Pass it to another queue for processing as a message , Note that when there is only one parameter , The following comma cannot be less
job_set.apply_async((result, ), queue='for_job_set')
# Note that the following call is invalid
# job_set(result)
return result
@app.task
def job_set(x):
result = {"message": "success set", "code": 200, "data": x}
return result 10、 Get the apply_async Results of execution
Reference resources :【python Essays 】celery Asynchronous task and call return value

Reference resources :
边栏推荐
- Is it safe to open an ETF account online? What are the steps?
- China biodegradable plastics market forecast and investment strategy report (2022 Edition)
- 传感器尺寸、像素、DPI分辨率、英寸、毫米的关系
- 深度优先遍历和广度优先遍历[通俗易懂]
- Good looking UI mall source code has been scanned, no back door, no encryption
- MFC obtains local IP (used more in network communication)
- Software construction scheme of smart factory collaborative management and control application system
- . Net cloud native architect training camp (permission system code implements actionaccess) -- learning notes
- PHP实现敏感词过滤系统「建议收藏」
- 网上股票开户安全吗?是否可靠?
猜你喜欢

Product service, operation characteristics

Data warehouse (3) star model and dimension modeling of data warehouse modeling

ACM mm 2022 video understanding challenge video classification track champion autox team technology sharing

New 95 community system whole station source code

The difference and relationship between iteratible objects, iterators and generators

The new server is packaged with the source code of H5 mall with an operation level value of several thousand

Oracle TRUNC function processing date format

ACL 2022 | decomposed meta learning small sample named entity recognition

DNS

How to write good code - Defensive Programming Guide
随机推荐
This is the latest opportunity of the London bank trend
. Net cloud native architect training camp (permission system code implements actionaccess) -- learning notes
golang中的select详解
MySQL -- explain performance optimization
Sword finger offer II 105 Maximum area of the island
[beauty detection artifact] come on, please show your unique skill (is this beauty worthy of the audience?)
期货先锋这个软件正规吗安全吗?选择哪家期货公司更安全?
Yolov5 practice: teach object detection by hand
Intel's open source deep learning tool library openvino will increase cooperation with local software and hardware parties and continue to open
PIP version problems: PIP problems still occur when installing akshare and using Tsinghua source and Douban source
DNS
Fix the black screen caused by iPhone system failure
[splishsplash] about how to receive / display user parameters, MVC mode and genparam on GUI and JSON
Common design parameters of solid rocket motor
Software construction scheme of smart factory collaborative management and control application system
PHP implements sensitive word filtering system "suggestions collection"
[Verilog quick start of Niuke network question brushing series] ~ priority encoder circuit ①
Apache iceberg source code analysis: schema evolution
Replace UUID, nanoid is faster and safer!
Relationship between sensor size, pixel, dpi resolution, inch and millimeter