当前位置:网站首页>Timed task processing based on DRF apscheduler
Timed task processing based on DRF apscheduler
2022-06-09 15:50:00 【laoli815】
Whereas restframework Good self-contained web page function , Use django-admin Create a new app, install restframework modular , To make our django-admin The backstage page looks better , Use the popular simpleui, This module Element-UI + Vue The blessing , Let the old django admin Taken on a new look .
Two 、 install simpleui Templates
pip install simpleui
Create static file directory
stay settings The file is configured STATIC_ROOT= os.path.join(BASE_DIR,‘static’)
Otherwise, in the python manage.py collectstatic Will report a mistake
Plus SIMPLEUI_HOME_INFO = False
stay INSTALLED_APPS = [
‘simpleui’,
‘django.contrib.admin’,
‘django.contrib.auth’,]
Be careful simpleui Be sure to add it to the first line , Or you'll be admin Covered , Failure of installation effect
Once installed django-admin The home page of becomes the following , Looks very nice 

Is this interface forced to go up directly , Looks very nice
Create a project django-admin startapp apsched project
1. install django-apscheduler
pip install django-apscheduler
2. register django-apscheduler
stay setting.py Register in the file django-apscheduler by APP
INSTALLED_APPS = [
…
‘django_apscheduler’, # Timing task
3, Database migration
python manage.py makemigrations
python manage.py migrate
Here you can see MySQL Two tables have been created for the data 
These two tables have been built , It is inferred that django-apscheduler After the module is installed, there will be model Set up , It's true
see apsched app Medium serializers.py
from django_apscheduler.models import DjangoJob
from rest_framework.serializers import ModelSerializer
class DjangoJobSerializer(ModelSerializer):
class Meta:
fields = '__all__'
model = DjangoJob
see apsched app Medium views.py
from django.shortcuts import render
# Create your views here.
from apscheduler.schedulers.background import BackgroundScheduler
from django_apscheduler.jobstores import DjangoJobStore,register_job
from rest_framework.viewsets import ModelViewSet
from django.http import JsonResponse
import datetime,time
from rest_framework.decorators import action
from .serializers import DjangoJobSerializer
from django_apscheduler.models import DjangoJob
import threading,os
def my_job():
print(datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'),threading.current_thread().getName(),threading.get_ident(),os.getpid())
print("============ Start to rest 10S=====================")
time.sleep(10)
print(datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'),threading.current_thread().getName(),threading.get_ident(),os.getpid())
print("============ Continue to rest 10S=====================")
time.sleep(10)
print(datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'),threading.current_thread().getName(),threading.get_ident(),os.getpid())
print("============ Continue to rest 10S=====================")
time.sleep(10)
print(datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'),threading.current_thread().getName(),threading.get_ident(),os.getpid())
print("================ The rest is over =====================")
from apscheduler.executors.pool import ThreadPoolExecutor,ProcessPoolExecutor
EXECUTORS = {
'default': ThreadPoolExecutor(2),
# 'default': ProcessPoolExecutor(5)
}
JOB_DEFAULTS = {
'coalesce': False,
'max_instances': 1,
'misfire_grace_time': None
}
# Instantiation scheduler
scheduler=BackgroundScheduler(job_defaults=JOB_DEFAULTS,executors=EXECUTORS,timezone='Asia/Shanghai')
# The scheduler uses the default DjangoJobStore()
scheduler.add_jobstore(DjangoJobStore(), 'default')
class DjangoJobViewSet(ModelViewSet):
permission_classes = []
authentication_classes = []
queryset = DjangoJob.objects.all()
serializer_class = DjangoJobSerializer
def create(self,request):
print(dir(request))
try:
# id = request.data.get('id')
trigger_type = request.data.get('trigger_type')
if trigger_type == "date":
run_time = request.data.get('run_time')
job=scheduler.add_job(func=my_job,
trigger=trigger_type,
next_run_time=run_time,
replace_existing=True,
coalesce=False)
print(job,type(job),job.__getstate__().get('id'))
print(" Add one-time task succeeded ---[ %s ] " % job.__getstate__().get('id'))
elif trigger_type == 'interval':
print(request.data)
seconds = request.data.get('interval_time')
seconds = int(seconds)
print('seconds value is',seconds)
if seconds <= 0:
raise TypeError(' Please enter greater than 0 Time interval of !')
job=scheduler.add_job(func=my_job,
trigger=trigger_type,
seconds=seconds,
replace_existing=True,
coalesce=False)
print(" Add one-time task succeeded ---[ %s ] " % job.__getstate__().get('id'))
elif trigger_type == "cron":
print(type(eval(request.data.get("run_time"))))
print(eval(request.data.get("run_time"))["day_of_week"])
day_of_week = eval(request.data.get("run_time"))["day_of_week"]
hour = eval(request.data.get("run_time"))["hour"]
minute =eval(request.data.get("run_time"))["minute"]
second = eval(request.data.get("run_time"))["second"]
job=scheduler.add_job(func=my_job, trigger=trigger_type, day_of_week=day_of_week,
hour=hour, minute=minute,
second=second, replace_existing=True)
print(" Adding cycle execution task succeeded. Task succeeded ---[ %s ] " % job.__getstate__().get('id'))
return JsonResponse({
'msg':' New task succeeded '})
except Exception as e:
return JsonResponse({
'msg':f' Failed to add task error:{
e}'})
@action(methods=['POST','GET'],detail=True)
def pause(self,request,*args,**kwargs):
print(args,kwargs)
response = {
'status': False}
try:
target_id=kwargs['pk']
scheduler.pause_job(target_id)
response['status'] = True
response['msg'] = "job[%s] pause success!" %(target_id)
except Exception as e:
response['msg'] = str(e)
return JsonResponse(response)
@action(methods=['POST','GET'],detail=True)
def resume(self,request,pk):
response = {
'status': False}
try:
scheduler.resume_job(pk)
response['status'] = True
response['msg'] = "job[%s] resume success!" % pk
except Exception as e:
response['msg'] = str(e)
return JsonResponse(response)
def update(self, request, *args,**kwargs):
pk=kwargs['pk']
print('put Method pk The value is :',pk)
try:
trigger_type = request.data.get('trigger_type')
if trigger_type == "date":
run_time = request.data.get('run_time')
job = scheduler.modify_job(pk,jobstore=None, func=my_job,
trigger=trigger_type,
next_run_time=run_time,
replace_existing=True,
coalesce=False)
print(job, type(job), job.__getstate__().get('id'))
print(" The one-time task was modified successfully ---[ %s ] " % job.__getstate__().get('id'))
elif trigger_type == 'interval':
print(request.data)
seconds = request.data.get('interval_time')
seconds = int(seconds)
print('seconds value is', seconds)
if seconds <= 0:
raise TypeError(' Please enter greater than 0 Time interval of !')
job = scheduler.modify_job(pk,jobstore=None, func=my_job,
trigger=trigger_type,
seconds=seconds,
replace_existing=True,
coalesce=False)
print(" The one-time task was modified successfully ---[ %s ] " % job.__getstate__().get('id'))
elif trigger_type == "cron":
print(type(eval(request.data.get("run_time"))))
print(eval(request.data.get("run_time"))["day_of_week"])
day_of_week = eval(request.data.get("run_time"))["day_of_week"]
hour = eval(request.data.get("run_time"))["hour"]
minute = eval(request.data.get("run_time"))["minute"]
second = eval(request.data.get("run_time"))["second"]
print(hour,minute,second)
temp_dict=dict(day_of_week=day_of_week,
hour=hour, minute=minute,
second=second)
job=scheduler.reschedule_job(pk,trigger='cron',**temp_dict)
print(" The task was successfully modified and executed ---[ %s ] " % job.__getstate__().get('id'))
return JsonResponse(dict(msg=f' Modifying a single task succeeded {
pk}'))
except Exception as e:
return JsonResponse(dict(msg=f' Failed to modify a single task {
pk} Report errors :{
e}'))
# The scheduler starts running
scheduler.start()
The above script is the key script , Used restframework Of action function
permission_classes = []
authentication_classes = []
The above framework project has other reasons app Validation applied , stay restframework In the authority authentication of settings Set in , Therefore, in order to facilitate the test, the above two permissions should be set to null , Otherwise, you will be prompted that the permission is not enough
urls.py Script for
from django.contrib import admin
from django.urls import path,re_path
from django.conf.urls import include,url
from rest_framework import routers
router=routers.DefaultRouter()
from .views import DjangoJobViewSet
router.register('apsjob',DjangoJobViewSet)
urlpatterns = [
path('',include(router.urls)),
]
above views.py There's... In the script interval,cron,date( Specify Date ) Several scheduled tasks
Get the project running
PS F:\pyprogram3\restful_manu> python .\manage.py runserver
Watching for file changes with StatReloader
Performing system checks...
System check identified 7 issues (0 silenced).
June 08, 2022 - 10:51:43
Django version 3.2.13, using settings 'restful_manu.settings'
Starting development server at http://127.0.0.1:8000/
Quit the server with CTRL-BREAK.
Because some variables need to be passed in by the front end , I was lazy, and of course the front end was troublesome, so I didn't do it , Adopted postman Analog front-end incoming variables , The following three types of intrusions are verified
The first one is cron function 

It can be seen that four have been created cron Timing task , We added threads to our code ID, Thread name and process ID, stay EXECUTORS Set up 2 A thread pool , Only... Is allowed at the same time job To run a ,max_instances:1 This is the limitation
The back end can also see that the execution is successful 
You can see it django When running scheduled tasks, they are single process multithreading , I set up 4 Scheduled tasks , All at the same time ,2 Threads are allocated , First 2 The first two runs are over, and the next two
If it is 5 The two timed tasks are also consistent with our above results
2022-06-08 15:10:00 ThreadPoolExecutor-1_0 8424 16936
============ Start to rest 10S=====================
2022-06-08 15:10:00 ThreadPoolExecutor-1_1 12196 16936
============ Start to rest 10S=====================
2022-06-08 15:10:10 ThreadPoolExecutor-1_0 8424 16936
============ Continue to rest 10S=====================
2022-06-08 15:10:10 ThreadPoolExecutor-1_1 12196 16936
============ Continue to rest 10S=====================
2022-06-08 15:10:20 ThreadPoolExecutor-1_0 8424 16936
============ Continue to rest 10S=====================
2022-06-08 15:10:20 ThreadPoolExecutor-1_1 12196 16936
============ Continue to rest 10S=====================
2022-06-08 15:10:30 ThreadPoolExecutor-1_0 8424 16936
================ The rest is over =====================
2022-06-08 15:10:30 ThreadPoolExecutor-1_1 12196 16936
================ The rest is over =====================
2022-06-08 15:10:30 ThreadPoolExecutor-1_0 8424 16936
============ Start to rest 10S=====================
2022-06-08 15:10:30 ThreadPoolExecutor-1_1 12196 16936
============ Start to rest 10S=====================
2022-06-08 15:10:40 ThreadPoolExecutor-1_0 8424 16936
============ Continue to rest 10S=====================
2022-06-08 15:10:40 ThreadPoolExecutor-1_1 12196 16936
============ Continue to rest 10S=====================
2022-06-08 15:10:50 ThreadPoolExecutor-1_0 8424 16936
============ Continue to rest 10S=====================
2022-06-08 15:10:50 ThreadPoolExecutor-1_1 12196 16936
============ Continue to rest 10S=====================
2022-06-08 15:11:00 ThreadPoolExecutor-1_0 8424 16936
================ The rest is over =====================
2022-06-08 15:11:00 ThreadPoolExecutor-1_0 8424 16936
============ Start to rest 10S=====================
2022-06-08 15:11:00 ThreadPoolExecutor-1_1 12196 16936
================ The rest is over =====================
2022-06-08 15:11:10 ThreadPoolExecutor-1_0 8424 16936
============ Continue to rest 10S=====================
2022-06-08 15:11:20 ThreadPoolExecutor-1_0 8424 16936
============ Continue to rest 10S=====================
2022-06-08 15:11:30 ThreadPoolExecutor-1_0 8424 16936
================ The rest is over =====================
A process allocates two threads , One thread has finished running and continues to the next , Be similar to thread.start() thread.join() This model
adopt restframework The web page can see action The content of has been reflected 
Inside delete,pause,resume All functions can be successfully realized
MAX_INSTANCES Represents the number of instances that can be run at the same time , That is, if job A stay 11:00 It's running , Every time 10 Run every minute , however job A stay 11:10 Isn't over , that 11:10 When the jobA Will stop
The second kind interval function 

The last one date function 
It can be observed that the operation is successful , Only this kind of scheduled task runs once, which is very short , When running, the scheduled task no longer exists , Like some animals, the beginning to the end is a flash in the pan
Note that if you modify scheduler It does not work to modify the run-time parameters directly through the database , I tried , But it doesn't work
It needs to be a fixed function to modify , I use modify_job A modified , Code
scheduler.modify_job('61cf7ed98e974522a61e69d4e5e78fe6',jobstore=None,func=my_job,trigger='cron', day_of_week=2,hour=14, minute=50,second=00, replace_existing=True)
But remind me Expected a trigger instance, got str instead’, The specific reason is that the source code has been modified in this way modify It still didn't work, but rescheduler_job Successfully modified , There's code on it
tmp_dict = {
"seconds":10}
temp_trigger = scheduler._create_trigger(trigger='interval',trigger_args=tmp_dict)
result = scheduler.modify_job(pk,trigger=temp_trigger)
rescheduler_job It is also the end use modify_job modify
trigger = self._create_trigger(trigger, trigger_args)
now = datetime.now(self.timezone)
next_run_time = trigger.get_next_fire_time(None, now)
return self.modify_job(job_id, jobstore, trigger=trigger, next_run_time=next_run_time)
django-apscheduler If the multi process mode cannot be enabled, an error will be reported , Opening mode
EXECUTORS = {
# 'default': ThreadPoolExecutor(2),
'default': ProcessPoolExecutor(5)
}
Wrong content
django.core.exceptions.AppRegistryNotReady: Apps aren't loaded yet.
Error running job 4de15bdff2484c1ea5c303fc755fe021
concurrent.futures.process.BrokenProcessPool: A process in the process pool was terminated abruptly while the future was running or pending.
Timing task cron Description of relevant parameters
year (int|str) – 4-digit year
month (int|str) – month (1-12)
day (int|str) – day of the (1-31)
week (int|str) – ISO week (1-53)
day_of_week (int|str) – number or name of weekday (0-6 or mon,tue,wed,thu,fri,sat,sun)
hour (int|str) – hour (0-23)
minute (int|str) – minute (0-59)
second (int|str) – second (0-59)
start_date (datetime|str) – earliest possible date/time to trigger on (inclusive)
end_date (datetime|str) – latest possible date/time to trigger on (inclusive)
timezone (datetime.tzinfo|str) – time zone to use for the date/time calculations (defaults to scheduler timezone)
In this paper, the api Interface create,update Functions are manually rewritten by themselves ,restframework The page modification will be invalid when calling
边栏推荐
- [IV. demand analysis of several Internet enterprises based on domain name]
- 微信小程序使用canvas绘图,并保存下载到本地。圆形头像,虚线网络图片
- I learned that automated testing is so popular after I got 2w+ in a month
- 实现图片灯箱功能
- Blog recommended | bookkeeper - Apache pulsar high availability / strong consistency / low latency storage implementation
- Cordova 开发,提示net::ERR_CLEARTEXT_NOT_PERMITTED
- Typecho comment reply to Aite reviewer
- Mh2103act6 domestic software and hardware compatible alternative to stm32f103cbt6
- 期货在哪开户,找谁开安全可靠??
- MySQL数据库用户权限管理
猜你喜欢

Linux 运行升讯威在线客服系统:同时支持 SQL Server 和 MySQL 的实现方法

低代码分析盘点:银行业低代码应用需要规避两大误区
Redis实现登录注册的示例代码

How to use PS slicing tool to cut pictures

How to solve the problem that Epson printer cannot print

技术干货 | Linkis1.0.2安装及使用指南

wps如何取消隐藏的单元格工作表

Linux runs shengxunwei online customer service system: implementation method of supporting SQL server and MySQL at the same time

Mh2103act6 domestic software and hardware compatible alternative to stm32f103cbt6
![[IV. demand analysis of several Internet enterprises based on domain name]](/img/61/4c2ad2b623ab03cd5418ada49bf2e9.png)
[IV. demand analysis of several Internet enterprises based on domain name]
随机推荐
Common breakthrough upload posture
鸿蒙 Picker日期选择器实现教程
你真的不能再穷下去了,做影视剪辑一天有400,全靠这几个工具
MySQL数据库外键 foreing key
ps填充快捷键是什么
数据产品学习-实时计算平台
Redis跳跃表的基本原理和实现
Halodoc's key experience in building Lakehouse using Apache Hudi
Aloudata创始人周卫林:以NoETL叩开数据平台变革之门
LM03丨谁告诉你跨品种就一定要套利?
一个优雅的字体压缩解决方案 ---fonttools
ps怎么复制图层到另一个图
关闭StackExchange等平台的privacy收集窗口
Vuforia for unity add button to zoom in and out the model
Add read more button for ueeditor for Typecho
From outsourcing to self research and then to large factories, who knows how I came here in the past five years
微信小程序使用canvas绘图,并保存下载到本地。圆形头像,虚线网络图片
After five years of testing in a large factory, he was ruthlessly dismissed in May. He wanted to remind his brother of fishing
618的省钱技术攻略 来啦 -体验场景 领取10元猫超卡!
Nuxt.js如何部署Artalk和遇到的问题