A reproduction repo for a Scheduling bug in AirFlow 2.2.3

Overview

How to run

docker-compose build
docker-compose up

Setup

Have 3 DAGs:

  1. ~240 Tasks, executes every hour, runs for about 45-50 minutes total
  2. ~600 Tasks, executes every 5 days, runs for days
  3. ~10 Tasks, executes on trigger, runs for 10-50 minutes

DAGs run on the default_pool with max_active_tasks set to 2.

My AirFlow config file has the following:

parallelism = 32
default_pool_task_slot_count = 6
executor = LocalExecutor
default_task_weight_rule = absolute

parallelism was set initially like that. default_pool_task_slot_count was set to 6, because it seemed rational that if all of my 3 dags are executing at the same time, the maximum amount of tasks that can be executed is 3*2=6.

The problem:

Almost every time any one of the DAGs is executed, no tasks from other DAGs will start until all the tasks from the first one finish. That is, if slow DAG_2 with 600 tasks starts, DAG_1 will have to wait for days to start even a single Task.

the_bug

The Logs for the Scheduler look like this:

scheduler_1  | 
scheduler_1  | [2022-02-09 10:02:27,116] {scheduler_job.py:288} INFO - 4 tasks up for execution:
scheduler_1  |  
   
    
scheduler_1  |  
    
     
scheduler_1  |  
     
      
scheduler_1  |  
      
       
scheduler_1  | [2022-02-09 10:02:27,119] {scheduler_job.py:322} INFO - Figuring out tasks to run in Pool(name=default_pool) with 4 open slots and 4 task instances ready to be queued
scheduler_1  | [2022-02-09 10:02:27,119] {scheduler_job.py:349} INFO - DAG Slow_DAG has 2/2 running and queued tasks
scheduler_1  | [2022-02-09 10:02:27,119] {scheduler_job.py:357} INFO - Not executing 
       
         since the number of tasks running or queued from DAG Slow_DAG is >= to the DAG's max_active_tasks limit of 2 scheduler_1 | [2022-02-09 10:02:27,119] {scheduler_job.py:349} INFO - DAG Slow_DAG has 2/2 running and queued tasks scheduler_1 | [2022-02-09 10:02:27,119] {scheduler_job.py:357} INFO - Not executing 
        
          since the number of tasks running or queued from DAG Slow_DAG is >= to the DAG's max_active_tasks limit of 2 scheduler_1 | [2022-02-09 10:02:27,119] {scheduler_job.py:349} INFO - DAG Slow_DAG has 2/2 running and queued tasks scheduler_1 | [2022-02-09 10:02:27,120] {scheduler_job.py:357} INFO - Not executing 
         
           since the number of tasks running or queued from DAG Slow_DAG is >= to the DAG's max_active_tasks limit of 2 scheduler_1 | [2022-02-09 10:02:27,120] {scheduler_job.py:349} INFO - DAG Slow_DAG has 2/2 running and queued tasks scheduler_1 | [2022-02-09 10:02:27,120] {scheduler_job.py:357} INFO - Not executing 
          
            since the number of tasks running or queued from DAG Slow_DAG is >= to the DAG's max_active_tasks limit of 2 scheduler_1 | [2022-02-09 10:02:27,120] {scheduler_job.py:410} INFO - Setting the following tasks to queued state: scheduler_1 | 
          
         
        
       
      
     
    
   

I have fixed this by setting

parallelism = 1000
default_pool_task_slot_count = 999

The reason why this occurs and why the solution works

We need to look at this method of the Scheduler: https://github.com/apache/airflow/blob/2.2.3/airflow/jobs/scheduler_job.py#L229
In it, the overall logic is the following:

  1. Receive max_tis(== parallelism - active running tasks from the config) in the arguments. In my case that would be 30
  2. Calculate how many free slots in all all Pools we have. In my case that would be 4.
  3. Select the minimum and update the max_tis variable: max_tis = min(4, 30) = 4
  4. Query the DB for tasks that are Scheduled in an unpaused DAGs that are running normally and order them by DAG execution date.
  5. Limit the query by max_tis.
  6. Loop over each returned task and check if we can run it. Run, if possible.

Now, since the tasks are ordered by execution date, this will return us tasks for a DAG that started first, which lets say was DAG_2. It has over 600 tasks. The LIMIT operation will return 4 of them. Each off these tasks cannot be run since there are already 2 tasks running and the max_active_tasks of the DAG is 2. Thus, we just wait till one of these tasks finish and start another task from the same DAG.

Owner
Ilya Strelnikov
Ilya Strelnikov
Film-dosimetry - Film dosimetry for DUVS

film-dosimetry Film dosimetry for DUVS Hi David and Joe, here we go this is a te

Christine L Kuryla 3 Jan 20, 2022
Add your recently blog and douban states in your GitHub Profile

Add your recently blog and douban states in your GitHub Profile

Bingjie Yan 4 Dec 12, 2022
WordlistPasswordGenerator - Shuhfab Basheer

WordlistPasswordGenerator - Shuhfab Basheer Python wordlist generator MAINTAINER

1 Dec 31, 2021
Personal Chat Assistance

Python-Programming Personal Chat Assistance {% import "bootstrap/wtf.html" as wtf %} titleEVT/title script src="https://code.jquery.com/jquery-3.

PRASH_SMVIT 2 Nov 14, 2021
Compile Binary Ninja's HLIL IR to LLVM, for purposes of compiling it back to a binary again.

Compiles BinaryNinja's HLIL to LLVM Approach Sweep binary for global variables, create them Sweep binary for (used?) external functions, declare those

Kyle Martin 31 Nov 10, 2022
These are After Effects and Python files that were made in the process of creating the video for the contest.

spirograph These are After Effects and Python files that were made in the process of creating the video for the contest. In the python file you can qu

91 Dec 07, 2022
A script to generate NFT art living on the Solana blockchain.

NFT Generator This script generates NFT art based on its desired traits with their specific rarities. It has been used to generate the full collection

Rude Golems 24 Oct 08, 2022
Today I Commit (1일 1커밋) 챌린지 알림 봇

Today I Commit Challenge 1일1커밋 챌린지를 위한 알림 봇 config.py github_token = "github private access key" slack_token = "slack authorization token" channel = "

sunho 4 Nov 08, 2021
ASCII-Wordle - A port of the game Wordle to terminal emulators/CMD

ASCII-Wordle A 'port' of Wordle to text-based interfaces A near-feature complete

32 Jun 11, 2022
Users can read others' travel journeys in addition to being able to upload and delete posts detailing their own experiences

Users can read others' travel journeys in addition to being able to upload and delete posts detailing their own experiences! Posts are organized by country and destination within that country.

Christopher Zeas 1 Feb 03, 2022
Manjaro CN Repository

Manjaro CN Repository Automatically built packages based on archlinuxcn/repo and manjarocn/docker. Install Add manjarocn to /etc/pacman.conf: Please m

Manjaro CN 28 Jun 26, 2022
The update manager for the ERA App (era.sh)

ERA Update Manager This is the official update manager used in the ERA app (see era.sh) How it works Once a new version of ERA is available, the app l

Kian Shahriyari 1 Dec 29, 2021
A multi purpose password managing and generating tool called Kyper.

Kyper A multi purpose password managing and generating tool called Kyper. Setup The setup for Kyper is fairly simple only involving the command python

Jan Dorian Poczekaj 1 Feb 05, 2022
Быстрый локальный старт

Быстрый локальный старт

Anton Ogorodnikov 1 Sep 28, 2021
Batch Python Program Verify

Batch Python Program Verify About As a TA(teaching assistant) of Programming Class, it is very annoying to test students' homework assignments one by

Han-Wei Li 7 Dec 20, 2022
3x+1 recreated in Python

3x-1 3x+1 recreated in Python If a number is odd it is multiplied by 3 and 1 is added to the product. If a number is even it is divided by 2. These ru

4 Aug 19, 2022
Python package for handling and analyzing PSRFITS files

PyPulse A pure-Python package for handling and analyzing PSRFITS files. Read the documentation here. This is an alternate code base from PSRCHIVE. Req

Michael Lam 15 Nov 30, 2022
TikTok Auto Claimer Made By Aim low!#9999 Leaked By bazooka#0001

Zues Auto Claimer Leaked By bazooka#0001 put proxies in prox.txt put ssid in sid.txt put all users you want to target in user.txt for the login just t

1 Jan 14, 2022
contextlib2 is a backport of the standard library's contextlib module to earlier Python versions.

contextlib2 is a backport of the standard library's contextlib module to earlier Python versions. It also sometimes serves as a real world proving gro

Jazzband 35 Dec 23, 2022