当前位置:网站首页>Sanic based services use celery to complete dynamic modification timing tasks
Sanic based services use celery to complete dynamic modification timing tasks
2022-06-26 07:12:00 【Moelimoe】
First of all, let's make a statement
in consideration of celery At present and asyncio The incompatibility of , A coroutine task needs to be converted to a non asynchronous common method before it can be treated as task Join the timing , also celery and asyncio Use may cause unexpected problems , stay celery The second official commitment 6.0 Version fusion asyncio Before , It needs careful consideration
If your project is integrated asyncio Project , And it doesn't need to be like celery So many complex timing functions described in the documentation , A lightweight package APScheduler It can meet your needs , And compatible asyncio frame
Function implementation introduction
This is a base Sanic Service and Celery Functions of scheduled task operation , The principle of implementation is roughly shown in the figure below
- Server: It is our sanic service , Responsible for receiving and responding to requests , After receiving the task request, it will asynchronously and non blocking hand over the scheduled task of the alert to celery Handle
- Beat(Scheduler): Trigger task periodically ( Periodic or scheduled tasks set in advance ), Available worker when , The task will be performed , Here our service uses redis As Beat Scheduler
- Queue: Queue of received tasks , Make the task go in and out orderly , yes celery Self realization
- Worker: Perform tasks
- Result Store(Result backend ):
Where to store the task , Recall the results of the task when necessary , But the result of the task will set an expiration time , Here our service uses redis As Result Store
Examples of operation and use
sanic-celery server Example directory structure 
The main focus is on celery_app, query And the first floor sanic_server.py And structure ,settings.py What is saved is the root directory of the project
import os
import sys
CELERY_BASE_DIR = os.path.dirname(os.path.abspath(__file__))
sys.path.insert(0, CELERY_BASE_DIR)
celery
celery app start-up :
- establish celery app, And will celery app The configuration information of startup is added ( The configuration information starts at the command line celery You can join before )
- Content of profile , Refer to official documents , Here is the configuration content and description of a simple example , Be careful 4.x After that celery Configuration variables should be in lowercase

# -*- coding:utf-8 -*-
from celery import Celery
from . import config
app = Celery("app_name")
app.config_from_object(config)
config.py
broker_url = 'redis://localhost:6379/1'
result_backend = 'redis://localhost:6379/2'
redbeat_redis_url = 'redis://localhost:6379/3'
redbeat_key_prefix = 'roiq_redbeat_key_prefix:'
# Task run result expiration time , Default one day , Pass in seconds or timedelta object , Reference resources https://docs.celeryq.dev/en/stable/userguide/configuration.html#result-expires
result_expires = 600
task_serializer = 'json'
result_serializer = 'json'
accept_content = ['json']
timezone = 'Asia/Shanghai'
enable_utc = True
# (!) be-all tasks Must be here in advance imports
imports = (
"query.tasks",
"send_email.tasks"
)
More details on parameters , May refer to Official documents
Beat Scheduler It is for periodic tasks and delayed tasks , Not Django Of celery Not supported by default celery Modifying the task status when the service is running , For our business needs , We need to add... While the service is running 、 Modify and view tasks , Therefore, support has been introduced redis As beat scheduler Module redbeat,redbeat The use of reference link , You just need to use one of these to create 、 Common operation methods such as update and delete
Reference resources redbeat The getting started link is installed redbeat after , With redbeat As celery Of beat start-up celery, Not configured redbeat_redis_url By default broker It's also beat
celery Start command
stay windows In the environment ,beat Want to be with worker、broker Start separately
Appoint readbeat As beat start-up celery
Execute on the command line :celery -A celery_app beat -S redbeat.RedBeatScheduler -l debug --max-interval=10
- -A yes celery app The location of , here celery_app Of __init__.py Contained in the celery app
- beat Specify that you need to start beat( Not started by default )
- -S Appoint beat Of Scheduler object
- -l yes loglevel, Print the information level of the log , Support info, debug Other key words
- –max-interval Appoint beat Check the interval between newly modified tasks , Default 5 minute , For the convenience of debugging, it is set to 10 Second , See the results in real time
start-up worker
Execute on the command line :celery -A worker -l debug -P gevent, For support windows Up operation , You need to install gevent(pip install gevent), stay linux Unwanted -P Options
For more parameters and details, you can use celery --help,celery worker --help, celery beat --help see
start-up celery After the service , test celery Run time modification
redbeat stay celery Modify the operation of the task at run time
Use redbeat Support in celery Modify the operation of the task at run time , Ensure that celery Of app、worker、beat Service and redis And other storage services are running
Define a simple task :
query/tasks.py
# -*- coding:utf-8 -*-import asyncio
import time
import pandas as pd
from celery_app import app
async def countdown_task(a, b):
""" Replace with a simple method sql Of the query task"""
await asyncio.sleep(1)
for i in range(3):
print(f"-------{
i}---------")
time.sleep(1)
return a+b
@app.task
def sync_countdown_task(a, b):
return asyncio.get_running_loop().run_until_complete(countdown_task(a, b))
Because all the methods used in the project are asynchronous concurrent methods , You need to convert a collaboration into a normal task , Can be registered as celery Of task
sanic_server.py
# -*- coding:utf-8 -*-
import asyncio
from datetime import timedelta
from celery.schedules import crontab, schedule
from redbeat import RedBeatSchedulerEntry
from sanic import Sanic
from sanic import response
from celery_app import app as celery_app
from celery_app.config import redbeat_key_prefix
from query.tasks import sync_countdown_task
sanic_app = Sanic("sanic_celery")
loop = asyncio.get_event_loop()
# Start a timed mission , It needs to be restarted celery Add tasks to... In the case of services beat
async def query_task_create(request):
""" Through this api Create periodic query tasks """
tasks = f"query.tasks" # The module where the task is located ( Specific to the .py file )
sche = schedule(timedelta(seconds=5))
task_name = sync_countdown_task.__name__
task = f"{
tasks}.{
task_name}"
entry = RedBeatSchedulerEntry(task_name, task, sche, args=(1, 2), app=celery_app)
print(entry)
key = entry.key # key Save to database ...
entry.save()
return response.text(f"schedule2 created..., task key is: {
key}")
async def schedule_disable(request):
task_name = sync_countdown_task.__name__
key = redbeat_key_prefix + task_name # key Sure
entry = RedBeatSchedulerEntry.from_key(key, celery_app)
entry.enabled = False
entry.save()
print(entry)
return response.text("schedule disabled..")
async def schedule_enable(request):
task_name = sync_countdown_task.__name__
key = redbeat_key_prefix + task_name
entry = RedBeatSchedulerEntry.from_key(key, celery_app)
entry.enabled = True
entry.save()
print(entry)
return response.text("schedule enabled..")
async def schedule_delete(request):
task_name = sync_countdown_task.__name__ # Obtained on request ( In the beginning, the database is also used to store and obtain )
task_key = f"{
redbeat_key_prefix}{
sync_countdown_task.__name__}"
entry = RedBeatSchedulerEntry.from_key(task_key, app=celery_app)
print(entry)
entry.delete()
print(" Delete the entry: ", entry)
return response.text(task_name+" deleted")
async def schedule_update(request):
task_name = sync_countdown_task.__name__ # Obtained on request ( In the beginning, the database is also used to store and obtain )
task_key = f"{
redbeat_key_prefix}{
sync_countdown_task.__name__}"
# obtain task key
entry = RedBeatSchedulerEntry.from_key(task_key, app=celery_app) # (!) Consider that the task has been deleted ,key A situation that does not exist
print(entry)
# modify schedule
entry.schedule = schedule(timedelta(seconds=3))
# Modify the parameters
entry.args = (3, 4)
entry.save()
print(entry)
return response.text(task_name+" updated")
async def schedule_info(request):
""" """
task_key = f"{
redbeat_key_prefix}{
sync_countdown_task.__name__}"
entry = RedBeatSchedulerEntry.from_key(task_key, app=celery_app)
return response.text(f"{
entry}")
# todo: 1. Set up result Expiration time of storage ; 2. When adding and updating tasks , consider key Problems that are not thrown in the project ;
sanic_app.add_route(query_task_create, "/create2")
sanic_app.add_route(schedule_update, "/update")
sanic_app.add_route(schedule_delete, "/delete")
sanic_app.add_route(schedule_disable, "/disable")
sanic_app.add_route(schedule_enable, "/enable")
sanic_app.add_route(schedule_info, "/info")
if __name__ == '__main__':
sanic_app.run(port=4321)
notes : Update and delete key/task_key Acquisition , You need to store and obtain from the database when you go online
Set the operation process of scheduled tasks
- Set up celery To configure , Stored in config.py in ( It can also be stored in other ways )
- establish app, Import the contents of the configuration
- Write it well task and server Called api
celery -A celery_app beat -S redbeat.RedBeatScheduler -l debug --max-interval=10Similar commands run beat,celery -A celery_app worker -l debug -P gevent -ESimilar commands run worker- function sanic service
- according to api The passed in parameters use redbeat.RedBeatSchedulerEntry Create a scheduled task , Use RedBeatSchedulerEntry.from_key() Get and modify scheduled tasks
- according to api Users and products return the set scheduled task list for users to view and operate
边栏推荐
- 【元胞自动机】基于元胞自动机实现高速公路收费站交通流问题附matlab代码
- [path planning] robot path planning based on improved artificial potential field with matlab code
- The performance of iron and steel enterprises was expected to be good in January this year. Since February, the prices of products of iron and steel enterprises have increased significantly. A mighty
- Massive log collection tool flume
- Crosslinked porphyrin based polyimide ppbpi-2, ppbpi-1-cr and ppbpi-2-cr; Porous porphyrin based hyperbranched polyimide (ppbpi-1, ppbpi-2) supplied by Qiyue
- This paper analyzes the use method and implementation principle of eventbus event bus
- 炒股怎么选择证券公司?手机开户安全么?
- How to choose securities companies for stock speculation? Is it safe to open a mobile account?
- Research Report on market development prospect and investment strategy of China's water soluble film industry 2022-2027
- ES cluster_block_exception read_only_allow_delete问题
猜你喜欢

Invalid problem of self defined map used by Gaode map

Tetra - (4-pyridyl) porphyrin tpyp and metal complexes zntpyp/fetpyp/mntpyp/cutpyp/nitpyp/cotpyp/ptpyp/pdtpyp/cdtpyp (supplied by Qiyue porphyrin)

QTreeWidget And QTableWidget

Oracle中计算除法——解决除数为零报错
![[image detection] image target size measurement system based on morphology with matlab code](/img/b4/87c9b1dc3e6fdf6bd58ee7d5a8f37b.png)
[image detection] image target size measurement system based on morphology with matlab code
![[cellular automata] Based on cellular automata, realize the traffic flow problem of expressway toll station, with matlab code](/img/6c/488be82a720635333eea90adcc388f.png)
[cellular automata] Based on cellular automata, realize the traffic flow problem of expressway toll station, with matlab code

【推荐一款实体类转换工具 MapStruct,性能强劲,简单易上手 】

Mysql操作数据库

【图像融合】基于梯度能量、局部能量、 PCA三种融合规则实现MRI-CT图像融合附matlab代码
![[path planning] robot path planning based on improved artificial potential field with matlab code](/img/82/b4cfb84de1cd56abe0c51205e99035.png)
[path planning] robot path planning based on improved artificial potential field with matlab code
随机推荐
Golang source package collection
When asked during the interview, can redis master-slave copy not answer? These 13 pictures let you understand thoroughly
item2安装配置及环境失效问题解决
Es string type (text vs keyword) selection
Typescript: use polymorphism instead of switch and other conditional statements
Item2 installation configuration and environment failure solution
Massive log collection tool flume
MySQL operation database
NumPy学习挑战第五关-创建数组
Porphyrin based polyimide (ppbpis); Synthesis of crosslinked porphyrin based polyimides (ppbpi CRS) porphyrin products supplied by Qiyue biology
Parameter index out of range (0 < 1) (1> number of parameters, which is 0
Redis series - redis startup, client day1-2
Oracle中计算除法——解决除数为零报错
PyTorch搭建CNN-LSTM混合模型实现多变量多步长时间序列预测(负荷预测)
Meso tetra (4-bromophenyl) porphyrin (tbpp); 5,10,15,20-tetra (4-methoxy-3-sulfonylphenyl) porphyrin [t (4-mop) ps4] supplied by Qiyue
专业课-代码题记录
面试被问Redis主从复制不会答?这13张图让你彻底弄明白
Operation mode and investment planning report of China's financial warehousing industry during the "14th five year plan" period 2022-2027
Liangshui Xianmu shows his personal awareness as a unity3d worker
高德地图使用自定义地图无效问题