当前位置:网站首页>Timed task processing based on DRF apscheduler

Timed task processing based on DRF apscheduler

2022-06-09 15:50:00 laoli815

Whereas restframework Good self-contained web page function , Use django-admin Create a new app, install restframework modular , To make our django-admin The backstage page looks better , Use the popular simpleui, This module Element-UI + Vue The blessing , Let the old django admin Taken on a new look .
Two 、 install simpleui Templates
pip install simpleui
Create static file directory
stay settings The file is configured STATIC_ROOT= os.path.join(BASE_DIR,‘static’)
Otherwise, in the python manage.py collectstatic Will report a mistake
Plus SIMPLEUI_HOME_INFO = False
 Insert picture description here
stay INSTALLED_APPS = [
‘simpleui’,
‘django.contrib.admin’,
‘django.contrib.auth’,]
Be careful simpleui Be sure to add it to the first line , Or you'll be admin Covered , Failure of installation effect
Once installed django-admin The home page of becomes the following , Looks very nice
 Insert picture description here
 Insert picture description here
Is this interface forced to go up directly , Looks very nice
Create a project django-admin startapp apsched project
1. install django-apscheduler
pip install django-apscheduler
2. register django-apscheduler
stay setting.py Register in the file django-apscheduler by APP
INSTALLED_APPS = [

‘django_apscheduler’, # Timing task
3, Database migration
python manage.py makemigrations
python manage.py migrate
Here you can see MySQL Two tables have been created for the data
 Insert picture description here
These two tables have been built , It is inferred that django-apscheduler After the module is installed, there will be model Set up , It's true
see apsched app Medium serializers.py

from django_apscheduler.models import DjangoJob
from rest_framework.serializers import ModelSerializer
class DjangoJobSerializer(ModelSerializer):
    class Meta:
        fields = '__all__'
        model = DjangoJob

see apsched app Medium views.py

from django.shortcuts import render

# Create your views here.
from apscheduler.schedulers.background import BackgroundScheduler
from django_apscheduler.jobstores import DjangoJobStore,register_job
from rest_framework.viewsets import ModelViewSet
from django.http import  JsonResponse
import datetime,time
from rest_framework.decorators import action
from .serializers import DjangoJobSerializer
from django_apscheduler.models import DjangoJob
import threading,os
def my_job():
    print(datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'),threading.current_thread().getName(),threading.get_ident(),os.getpid())
    print("============ Start to rest 10S=====================")
    time.sleep(10)
    print(datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'),threading.current_thread().getName(),threading.get_ident(),os.getpid())
    print("============ Continue to rest 10S=====================")
    time.sleep(10)
    print(datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'),threading.current_thread().getName(),threading.get_ident(),os.getpid())
    print("============ Continue to rest 10S=====================")
    time.sleep(10)
    print(datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'),threading.current_thread().getName(),threading.get_ident(),os.getpid())
    print("================ The rest is over =====================")

from apscheduler.executors.pool import ThreadPoolExecutor,ProcessPoolExecutor

EXECUTORS = {
    
    'default': ThreadPoolExecutor(2),
    # 'default': ProcessPoolExecutor(5)
}
JOB_DEFAULTS = {
    
    'coalesce': False,
    'max_instances': 1,
    'misfire_grace_time': None
}
#  Instantiation scheduler 
scheduler=BackgroundScheduler(job_defaults=JOB_DEFAULTS,executors=EXECUTORS,timezone='Asia/Shanghai')
#  The scheduler uses the default DjangoJobStore()
scheduler.add_jobstore(DjangoJobStore(), 'default')
class DjangoJobViewSet(ModelViewSet):
    permission_classes = []
    authentication_classes = []
    queryset = DjangoJob.objects.all()
    serializer_class = DjangoJobSerializer
    def create(self,request):
        print(dir(request))
        try:
            # id = request.data.get('id')
            trigger_type = request.data.get('trigger_type')
            if trigger_type == "date":
                run_time = request.data.get('run_time')
                job=scheduler.add_job(func=my_job,
                                        trigger=trigger_type,
                                        next_run_time=run_time,
                                        replace_existing=True,
                                        coalesce=False)
                print(job,type(job),job.__getstate__().get('id'))
                print(" Add one-time task succeeded ---[ %s ] " % job.__getstate__().get('id'))
            elif trigger_type == 'interval':
                print(request.data)
                seconds = request.data.get('interval_time')
                seconds = int(seconds)
                print('seconds value is',seconds)
                if seconds <= 0:
                    raise TypeError(' Please enter greater than 0 Time interval of !')
                job=scheduler.add_job(func=my_job,
                                  trigger=trigger_type,
                                  seconds=seconds,
                                  replace_existing=True,
                                  coalesce=False)
                print(" Add one-time task succeeded ---[ %s ] " % job.__getstate__().get('id'))
            elif trigger_type == "cron":
                print(type(eval(request.data.get("run_time"))))
                print(eval(request.data.get("run_time"))["day_of_week"])
                day_of_week = eval(request.data.get("run_time"))["day_of_week"]
                hour = eval(request.data.get("run_time"))["hour"]
                minute =eval(request.data.get("run_time"))["minute"]
                second = eval(request.data.get("run_time"))["second"]
                job=scheduler.add_job(func=my_job, trigger=trigger_type, day_of_week=day_of_week,
                                  hour=hour, minute=minute,
                                  second=second, replace_existing=True)
                print(" Adding cycle execution task succeeded. Task succeeded ---[ %s ] " % job.__getstate__().get('id'))
            return JsonResponse({
    'msg':' New task succeeded '})
        except Exception as e:
            return JsonResponse({
    'msg':f' Failed to add task error:{
      e}'})
    @action(methods=['POST','GET'],detail=True)
    def pause(self,request,*args,**kwargs):
        print(args,kwargs)
        response = {
    'status': False}
        try:
            target_id=kwargs['pk']
            scheduler.pause_job(target_id)
            response['status'] = True
            response['msg'] = "job[%s] pause success!" %(target_id)
        except Exception as e:
            response['msg'] = str(e)
        return JsonResponse(response)
    @action(methods=['POST','GET'],detail=True)
    def resume(self,request,pk):
        response = {
    'status': False}
        try:
            scheduler.resume_job(pk)
            response['status'] = True
            response['msg'] = "job[%s] resume success!" % pk
        except Exception as e:
            response['msg'] = str(e)
        return JsonResponse(response)

    def update(self, request, *args,**kwargs):
        pk=kwargs['pk']
        print('put Method pk The value is :',pk)
        try:
            trigger_type = request.data.get('trigger_type')
            if trigger_type == "date":
                run_time = request.data.get('run_time')
                job = scheduler.modify_job(pk,jobstore=None, func=my_job,
                                           trigger=trigger_type,
                                           next_run_time=run_time,
                                           replace_existing=True,
                                           coalesce=False)
                print(job, type(job), job.__getstate__().get('id'))
                print(" The one-time task was modified successfully ---[ %s ] " % job.__getstate__().get('id'))
            elif trigger_type == 'interval':
                print(request.data)
                seconds = request.data.get('interval_time')
                seconds = int(seconds)
                print('seconds value is', seconds)
                if seconds <= 0:
                    raise TypeError(' Please enter greater than 0 Time interval of !')
                job = scheduler.modify_job(pk,jobstore=None, func=my_job,
                                           trigger=trigger_type,
                                           seconds=seconds,
                                           replace_existing=True,
                                           coalesce=False)
                print(" The one-time task was modified successfully ---[ %s ] " % job.__getstate__().get('id'))
            elif trigger_type == "cron":
                print(type(eval(request.data.get("run_time"))))
                print(eval(request.data.get("run_time"))["day_of_week"])
                day_of_week = eval(request.data.get("run_time"))["day_of_week"]
                hour = eval(request.data.get("run_time"))["hour"]
                minute = eval(request.data.get("run_time"))["minute"]
                second = eval(request.data.get("run_time"))["second"]
                print(hour,minute,second)
                temp_dict=dict(day_of_week=day_of_week,
                                           hour=hour, minute=minute,
                                           second=second)
                job=scheduler.reschedule_job(pk,trigger='cron',**temp_dict)
                print(" The task was successfully modified and executed ---[ %s ] " % job.__getstate__().get('id'))
            return  JsonResponse(dict(msg=f' Modifying a single task succeeded {
      pk}'))
        except Exception as e:
            return  JsonResponse(dict(msg=f' Failed to modify a single task {
      pk} Report errors :{
      e}'))

#  The scheduler starts running 
scheduler.start()

The above script is the key script , Used restframework Of action function
permission_classes = []
authentication_classes = []
The above framework project has other reasons app Validation applied , stay restframework In the authority authentication of settings Set in , Therefore, in order to facilitate the test, the above two permissions should be set to null , Otherwise, you will be prompted that the permission is not enough
urls.py Script for

from django.contrib import admin
from django.urls import path,re_path
from django.conf.urls import include,url
from rest_framework import routers
router=routers.DefaultRouter()
from .views import  DjangoJobViewSet
router.register('apsjob',DjangoJobViewSet)
urlpatterns = [
    path('',include(router.urls)),
]

above views.py There's... In the script interval,cron,date( Specify Date ) Several scheduled tasks
Get the project running

PS F:\pyprogram3\restful_manu> python .\manage.py runserver
Watching for file changes with StatReloader
Performing system checks...

System check identified 7 issues (0 silenced).
June 08, 2022 - 10:51:43
Django version 3.2.13, using settings 'restful_manu.settings'
Starting development server at http://127.0.0.1:8000/
Quit the server with CTRL-BREAK.

Because some variables need to be passed in by the front end , I was lazy, and of course the front end was troublesome, so I didn't do it , Adopted postman Analog front-end incoming variables , The following three types of intrusions are verified
The first one is cron function
 Insert picture description here

 Insert picture description here

It can be seen that four have been created cron Timing task , We added threads to our code ID, Thread name and process ID, stay EXECUTORS Set up 2 A thread pool , Only... Is allowed at the same time job To run a ,max_instances:1 This is the limitation
The back end can also see that the execution is successful
 Insert picture description here
You can see it django When running scheduled tasks, they are single process multithreading , I set up 4 Scheduled tasks , All at the same time ,2 Threads are allocated , First 2 The first two runs are over, and the next two
If it is 5 The two timed tasks are also consistent with our above results

2022-06-08 15:10:00 ThreadPoolExecutor-1_0 8424 16936
============ Start to rest 10S=====================
2022-06-08 15:10:00 ThreadPoolExecutor-1_1 12196 16936
============ Start to rest 10S=====================
2022-06-08 15:10:10 ThreadPoolExecutor-1_0 8424 16936
============ Continue to rest 10S=====================
2022-06-08 15:10:10 ThreadPoolExecutor-1_1 12196 16936
============ Continue to rest 10S=====================
2022-06-08 15:10:20 ThreadPoolExecutor-1_0 8424 16936
============ Continue to rest 10S=====================
2022-06-08 15:10:20 ThreadPoolExecutor-1_1 12196 16936
============ Continue to rest 10S=====================
2022-06-08 15:10:30 ThreadPoolExecutor-1_0 8424 16936
================ The rest is over =====================
2022-06-08 15:10:30 ThreadPoolExecutor-1_1 12196 16936
================ The rest is over =====================
2022-06-08 15:10:30 ThreadPoolExecutor-1_0 8424 16936
============ Start to rest 10S=====================
2022-06-08 15:10:30 ThreadPoolExecutor-1_1 12196 16936
============ Start to rest 10S=====================
2022-06-08 15:10:40 ThreadPoolExecutor-1_0 8424 16936
============ Continue to rest 10S=====================
2022-06-08 15:10:40 ThreadPoolExecutor-1_1 12196 16936
============ Continue to rest 10S=====================
2022-06-08 15:10:50 ThreadPoolExecutor-1_0 8424 16936
============ Continue to rest 10S=====================
2022-06-08 15:10:50 ThreadPoolExecutor-1_1 12196 16936
============ Continue to rest 10S=====================
2022-06-08 15:11:00 ThreadPoolExecutor-1_0 8424 16936
================ The rest is over =====================
2022-06-08 15:11:00 ThreadPoolExecutor-1_0 8424 16936
============ Start to rest 10S=====================
2022-06-08 15:11:00 ThreadPoolExecutor-1_1 12196 16936
================ The rest is over =====================
2022-06-08 15:11:10 ThreadPoolExecutor-1_0 8424 16936
============ Continue to rest 10S=====================
2022-06-08 15:11:20 ThreadPoolExecutor-1_0 8424 16936
============ Continue to rest 10S=====================
2022-06-08 15:11:30 ThreadPoolExecutor-1_0 8424 16936
================ The rest is over =====================

A process allocates two threads , One thread has finished running and continues to the next , Be similar to thread.start() thread.join() This model
adopt restframework The web page can see action The content of has been reflected
 Insert picture description here
Inside delete,pause,resume All functions can be successfully realized
MAX_INSTANCES Represents the number of instances that can be run at the same time , That is, if job A stay 11:00 It's running , Every time 10 Run every minute , however job A stay 11:10 Isn't over , that 11:10 When the jobA Will stop
The second kind interval function
 Insert picture description here

 Insert picture description here
The last one date function
 Insert picture description here
It can be observed that the operation is successful , Only this kind of scheduled task runs once, which is very short , When running, the scheduled task no longer exists , Like some animals, the beginning to the end is a flash in the pan
Note that if you modify scheduler It does not work to modify the run-time parameters directly through the database , I tried , But it doesn't work
It needs to be a fixed function to modify , I use modify_job A modified , Code

scheduler.modify_job('61cf7ed98e974522a61e69d4e5e78fe6',jobstore=None,func=my_job,trigger='cron', day_of_week=2,hour=14, minute=50,second=00, replace_existing=True)

But remind me Expected a trigger instance, got str instead’, The specific reason is that the source code has been modified in this way modify It still didn't work, but rescheduler_job Successfully modified , There's code on it

tmp_dict = {
    "seconds":10}
        temp_trigger = 	scheduler._create_trigger(trigger='interval',trigger_args=tmp_dict)
        result = scheduler.modify_job(pk,trigger=temp_trigger)

        rescheduler_job It is also the end use modify_job modify 

        trigger = self._create_trigger(trigger, trigger_args)
        now = datetime.now(self.timezone)
        next_run_time = trigger.get_next_fire_time(None, now)
        return self.modify_job(job_id, jobstore, trigger=trigger, next_run_time=next_run_time)

django-apscheduler If the multi process mode cannot be enabled, an error will be reported , Opening mode

EXECUTORS = {
    
   # 'default': ThreadPoolExecutor(2),
     'default': ProcessPoolExecutor(5)
}

Wrong content

django.core.exceptions.AppRegistryNotReady: Apps aren't loaded yet.
Error running job 4de15bdff2484c1ea5c303fc755fe021
concurrent.futures.process.BrokenProcessPool: A process in the process pool was terminated abruptly while the future was running or pending.

Timing task cron Description of relevant parameters

year (int|str)4-digit year
month (int|str) – month (1-12)
day (int|str) – day of the (1-31)
week (int|str) – ISO week (1-53)
day_of_week (int|str) – number or name of weekday (0-6 or mon,tue,wed,thu,fri,sat,sun)
hour (int|str) – hour (0-23)
minute (int|str) – minute (0-59)
second (int|str) – second (0-59)
start_date (datetime|str) – earliest possible date/time to trigger on (inclusive)
end_date (datetime|str) – latest possible date/time to trigger on (inclusive)
timezone (datetime.tzinfo|str) – time zone to use for the date/time calculations (defaults to scheduler timezone)

In this paper, the api Interface create,update Functions are manually rewritten by themselves ,restframework The page modification will be invalid when calling

原网站

版权声明
本文为[laoli815]所创,转载请带上原文链接,感谢
https://yzsam.com/2022/160/202206091523454129.html