A reproduction repo for a Scheduling bug in AirFlow 2.2.3

Overview

How to run

docker-compose build
docker-compose up

Setup

Have 3 DAGs:

  1. ~240 Tasks, executes every hour, runs for about 45-50 minutes total
  2. ~600 Tasks, executes every 5 days, runs for days
  3. ~10 Tasks, executes on trigger, runs for 10-50 minutes

DAGs run on the default_pool with max_active_tasks set to 2.

My AirFlow config file has the following:

parallelism = 32
default_pool_task_slot_count = 6
executor = LocalExecutor
default_task_weight_rule = absolute

parallelism was set initially like that. default_pool_task_slot_count was set to 6, because it seemed rational that if all of my 3 dags are executing at the same time, the maximum amount of tasks that can be executed is 3*2=6.

The problem:

Almost every time any one of the DAGs is executed, no tasks from other DAGs will start until all the tasks from the first one finish. That is, if slow DAG_2 with 600 tasks starts, DAG_1 will have to wait for days to start even a single Task.

the_bug

The Logs for the Scheduler look like this:

scheduler_1  | 
scheduler_1  | [2022-02-09 10:02:27,116] {scheduler_job.py:288} INFO - 4 tasks up for execution:
scheduler_1  |  
   
    
scheduler_1  |  
    
     
scheduler_1  |  
     
      
scheduler_1  |  
      
       
scheduler_1  | [2022-02-09 10:02:27,119] {scheduler_job.py:322} INFO - Figuring out tasks to run in Pool(name=default_pool) with 4 open slots and 4 task instances ready to be queued
scheduler_1  | [2022-02-09 10:02:27,119] {scheduler_job.py:349} INFO - DAG Slow_DAG has 2/2 running and queued tasks
scheduler_1  | [2022-02-09 10:02:27,119] {scheduler_job.py:357} INFO - Not executing 
       
         since the number of tasks running or queued from DAG Slow_DAG is >= to the DAG's max_active_tasks limit of 2 scheduler_1 | [2022-02-09 10:02:27,119] {scheduler_job.py:349} INFO - DAG Slow_DAG has 2/2 running and queued tasks scheduler_1 | [2022-02-09 10:02:27,119] {scheduler_job.py:357} INFO - Not executing 
        
          since the number of tasks running or queued from DAG Slow_DAG is >= to the DAG's max_active_tasks limit of 2 scheduler_1 | [2022-02-09 10:02:27,119] {scheduler_job.py:349} INFO - DAG Slow_DAG has 2/2 running and queued tasks scheduler_1 | [2022-02-09 10:02:27,120] {scheduler_job.py:357} INFO - Not executing 
         
           since the number of tasks running or queued from DAG Slow_DAG is >= to the DAG's max_active_tasks limit of 2 scheduler_1 | [2022-02-09 10:02:27,120] {scheduler_job.py:349} INFO - DAG Slow_DAG has 2/2 running and queued tasks scheduler_1 | [2022-02-09 10:02:27,120] {scheduler_job.py:357} INFO - Not executing 
          
            since the number of tasks running or queued from DAG Slow_DAG is >= to the DAG's max_active_tasks limit of 2 scheduler_1 | [2022-02-09 10:02:27,120] {scheduler_job.py:410} INFO - Setting the following tasks to queued state: scheduler_1 | 
          
         
        
       
      
     
    
   

I have fixed this by setting

parallelism = 1000
default_pool_task_slot_count = 999

The reason why this occurs and why the solution works

We need to look at this method of the Scheduler: https://github.com/apache/airflow/blob/2.2.3/airflow/jobs/scheduler_job.py#L229
In it, the overall logic is the following:

  1. Receive max_tis(== parallelism - active running tasks from the config) in the arguments. In my case that would be 30
  2. Calculate how many free slots in all all Pools we have. In my case that would be 4.
  3. Select the minimum and update the max_tis variable: max_tis = min(4, 30) = 4
  4. Query the DB for tasks that are Scheduled in an unpaused DAGs that are running normally and order them by DAG execution date.
  5. Limit the query by max_tis.
  6. Loop over each returned task and check if we can run it. Run, if possible.

Now, since the tasks are ordered by execution date, this will return us tasks for a DAG that started first, which lets say was DAG_2. It has over 600 tasks. The LIMIT operation will return 4 of them. Each off these tasks cannot be run since there are already 2 tasks running and the max_active_tasks of the DAG is 2. Thus, we just wait till one of these tasks finish and start another task from the same DAG.

Owner
Ilya Strelnikov
Ilya Strelnikov
Anki Cards for the HSK vocabulary Chinese-German

Anki-HanyuShuipingKaoshi Anki Cards for the HSK vocabulary Chinese-German Das Deck baut auf folgenden Quellen auf: China Endecken Wortschatz von wohok

1 Jan 07, 2022
GA SEI Unit 4 project backend for Bloom.

Grow Your OpportunitiesTM Background Watch the Bloom Intro Video At Bloom, we believe every job seeker deserves an opportunity to find meaningful work

Jonathan Herman 3 Sep 20, 2021
Kellogg bad | Union good | Support strike funds

KelloggBot Credit to SeanDaBlack for the basis of the script. req.py is selenium python bot. sc.js is a the base of the ios shortcut [COMING SOON] Set

407 Nov 17, 2022
banking system with python, beginner friendly, preadvanced level

banking-system-python banking system with python, beginner friendly, preadvanced level Used topics Functions else/if/elif dicts methods parameters hol

Razi Falah 1 Feb 03, 2022
WinBoost: Boost your windows system.

Winboost runs a complete checkup of your entire system locating junk files, speed-reducing issues and causes of any system or application glitches or crashes. Through a lot of research and testing, w

Smit Parmar 4 Oct 01, 2021
A python script for osu!lazer rulesets auto update.

osu-lazer-rulesets-autoupdater A python script for osu!lazer rulesets auto update. How to use: 如何使用: You can refer to the python script. The begining

3 Jul 26, 2022
Use Fofa、shodan、zoomeye、360quake to collect information(e.g:domain,IP,CMS,OS)同时调用Fofa、shodan、zoomeye、360quake四个网络空间测绘API完成红队信息收集

Cyberspace Map API English/中文 Development fofaAPI Completed zoomeyeAPI shodanAPI regular 360 quakeAPI Completed Difficulty APIs uses different inputs

Xc1Ym 61 Oct 08, 2022
This is a Fava extension to display a grouped portfolio view in Fava for a set of Beancount accounts.

Fava Portfolio Summary This is a Fava extension to display a grouped portfolio view in Fava for a set of Beancount accounts. It can also calculate MWR

18 Dec 26, 2022
Project issue to website data transformation toolkit

braintransform Project issue to website data transformation toolkit. Introduction The purpose of these scripts is to be able to dynamically generate t

Brainhack 1 Nov 19, 2021
A Notifier Program that Notifies you to relax your eyes Every 15 Minutes👀

Every 15 Minutes is an application that is used to Notify you to Relax your eyes Every 15 Minutes, This is fully made with Python and also with the us

FSP Gang s' YT 2 Nov 11, 2021
A Github Action for sending messages to a Matrix Room.

matrix-commit A Github Action for sending messages to a Matrix Room. Screenshot: Example Usage: # .github/workflows/matrix-commit.yml on: push:

3 Sep 11, 2022
Some shitty programs just to brush up on my understanding of binary conversions.

Binary Converters Some shitty programs just to brush up on my understanding of binary conversions. Supported conversions formats = "unsigned-binary" |

Tim 2 Jan 09, 2022
How to build an Fahrenheit to Celsius Converter in Python

Generally to measure the temperature we make use of one of these two popular units i.e. Fahrenheit & Celsius.

PyLaboratory 0 Feb 07, 2022
A tool for removing PUPs using signatures

Unwanted program removal tool A tool for removing PUPs using signatures What is the unwanted program removal tool? The unwanted program removal tool i

4 Sep 20, 2022
Data and analysis relating to the 5.8M Melbourne quake of 2021

quake2021 Data and analysis relating to the 5.8M Melbourne quake of 2021 Monash University Woodside Living Lab Building The building is located here T

Colin Caprani 6 May 16, 2022
How did Covid affect businesses?

NYC_Business_Analysis How did Covid affect businesses? COVID's effect on NYC businesses We all know that businesses in NYC have been affected by COVID

AK 1 Jan 15, 2022
This Program Automates The Procces Of Adding Camos On Guns And Saving Them On Modern Warfare Guns

This Program Automates The Procces Of Adding Camos On Guns And Saving Them On Modern Warfare Guns

Flex Tools 6 May 26, 2022
Very efficient backup system based on the git packfile format, providing fast incremental saves and global deduplication

Very efficient backup system based on the git packfile format, providing fast incremental saves and global deduplication (among and within files, including virtual machine images). Current release is

bup 6.9k Dec 27, 2022
API development made easy: a smart Python 3 API framework

appkernel - API development made easy What is Appkernel? A super-easy to use API framework, enabling API creation from zero to production within minut

156 Sep 28, 2022
python for windows extensions

This is the readme for the Python for Win32 (pywin32) extensions source code. See CHANGES.txt for recent changes. 'setup.py' is a standard distutils

27 Dec 08, 2022