A reproduction repo for a Scheduling bug in AirFlow 2.2.3

Overview

How to run

docker-compose build
docker-compose up

Setup

Have 3 DAGs:

  1. ~240 Tasks, executes every hour, runs for about 45-50 minutes total
  2. ~600 Tasks, executes every 5 days, runs for days
  3. ~10 Tasks, executes on trigger, runs for 10-50 minutes

DAGs run on the default_pool with max_active_tasks set to 2.

My AirFlow config file has the following:

parallelism = 32
default_pool_task_slot_count = 6
executor = LocalExecutor
default_task_weight_rule = absolute

parallelism was set initially like that. default_pool_task_slot_count was set to 6, because it seemed rational that if all of my 3 dags are executing at the same time, the maximum amount of tasks that can be executed is 3*2=6.

The problem:

Almost every time any one of the DAGs is executed, no tasks from other DAGs will start until all the tasks from the first one finish. That is, if slow DAG_2 with 600 tasks starts, DAG_1 will have to wait for days to start even a single Task.

the_bug

The Logs for the Scheduler look like this:

scheduler_1  | 
scheduler_1  | [2022-02-09 10:02:27,116] {scheduler_job.py:288} INFO - 4 tasks up for execution:
scheduler_1  |  
   
    
scheduler_1  |  
    
     
scheduler_1  |  
     
      
scheduler_1  |  
      
       
scheduler_1  | [2022-02-09 10:02:27,119] {scheduler_job.py:322} INFO - Figuring out tasks to run in Pool(name=default_pool) with 4 open slots and 4 task instances ready to be queued
scheduler_1  | [2022-02-09 10:02:27,119] {scheduler_job.py:349} INFO - DAG Slow_DAG has 2/2 running and queued tasks
scheduler_1  | [2022-02-09 10:02:27,119] {scheduler_job.py:357} INFO - Not executing 
       
         since the number of tasks running or queued from DAG Slow_DAG is >= to the DAG's max_active_tasks limit of 2 scheduler_1 | [2022-02-09 10:02:27,119] {scheduler_job.py:349} INFO - DAG Slow_DAG has 2/2 running and queued tasks scheduler_1 | [2022-02-09 10:02:27,119] {scheduler_job.py:357} INFO - Not executing 
        
          since the number of tasks running or queued from DAG Slow_DAG is >= to the DAG's max_active_tasks limit of 2 scheduler_1 | [2022-02-09 10:02:27,119] {scheduler_job.py:349} INFO - DAG Slow_DAG has 2/2 running and queued tasks scheduler_1 | [2022-02-09 10:02:27,120] {scheduler_job.py:357} INFO - Not executing 
         
           since the number of tasks running or queued from DAG Slow_DAG is >= to the DAG's max_active_tasks limit of 2 scheduler_1 | [2022-02-09 10:02:27,120] {scheduler_job.py:349} INFO - DAG Slow_DAG has 2/2 running and queued tasks scheduler_1 | [2022-02-09 10:02:27,120] {scheduler_job.py:357} INFO - Not executing 
          
            since the number of tasks running or queued from DAG Slow_DAG is >= to the DAG's max_active_tasks limit of 2 scheduler_1 | [2022-02-09 10:02:27,120] {scheduler_job.py:410} INFO - Setting the following tasks to queued state: scheduler_1 | 
          
         
        
       
      
     
    
   

I have fixed this by setting

parallelism = 1000
default_pool_task_slot_count = 999

The reason why this occurs and why the solution works

We need to look at this method of the Scheduler: https://github.com/apache/airflow/blob/2.2.3/airflow/jobs/scheduler_job.py#L229
In it, the overall logic is the following:

  1. Receive max_tis(== parallelism - active running tasks from the config) in the arguments. In my case that would be 30
  2. Calculate how many free slots in all all Pools we have. In my case that would be 4.
  3. Select the minimum and update the max_tis variable: max_tis = min(4, 30) = 4
  4. Query the DB for tasks that are Scheduled in an unpaused DAGs that are running normally and order them by DAG execution date.
  5. Limit the query by max_tis.
  6. Loop over each returned task and check if we can run it. Run, if possible.

Now, since the tasks are ordered by execution date, this will return us tasks for a DAG that started first, which lets say was DAG_2. It has over 600 tasks. The LIMIT operation will return 4 of them. Each off these tasks cannot be run since there are already 2 tasks running and the max_active_tasks of the DAG is 2. Thus, we just wait till one of these tasks finish and start another task from the same DAG.

Owner
Ilya Strelnikov
Ilya Strelnikov
Problem statements on System Design and Software Architecture as part of Arpit's System Design Masterclass

Problem statements on System Design and Software Architecture as part of Arpit's System Design Masterclass

Relog 1.1k Jan 04, 2023
Pyhexdmp - Python hex dump module

Pyhexdmp - Python hex dump module

25 Oct 23, 2022
Hopefully it'll become a very annoying desktop pet

AnnoyingPet Basic Tutorial: https://seebass22.github.io/python-desktop-pet-tutorial/ Handling Mouse Input: https://pythonhosted.org/pynput/mouse.html

1 Jun 08, 2022
Just some mtk tool for exploitation, reading/writing flash and doing crazy stuff

Just some mtk tool for exploitation, reading/writing flash and doing crazy stuff. For linux, a patched kernel is needed (see Setup folder) (except for read/write flash). For windows, you need to inst

Bjoern Kerler 1.1k Dec 31, 2022
[CVPR 2020] Rethinking Class-Balanced Methods for Long-Tailed Visual Recognition from a Domain Adaptation Perspective

Rethinking Class-Balanced Methods for Long-Tailed Visual Recognition from a Domain Adaptation Perspective [Arxiv] This is PyTorch implementation of th

Abdullah Jamal 22 Nov 19, 2022
Show Public IP Information In Linux Taskbar

IP Information In Linux Taskbar 📍 How Use IP Script? 🤔 Download ip.py script and save somewhere in your system. Add command applet in your taskbar a

HOP 2 Jan 25, 2022
A Desktop application for the signalum python library

Signalum Desktop A Desktop application on the Signalum Python Library/CLI Tool. The Signalum Desktop application is an attempt to develop a single too

BISOHNS 35 Feb 15, 2021
Data and analysis relating to the 5.8M Melbourne quake of 2021

quake2021 Data and analysis relating to the 5.8M Melbourne quake of 2021 Monash University Woodside Living Lab Building The building is located here T

Colin Caprani 6 May 16, 2022
Игра реализована с помощью языке python3.9, библиотеки pygame

Игра в танки Игра реализована с помощью языке python3.9, библиотеки pygame. Игра имеет несколько уровней. Правила: есть танки, которые стреляют, есть

1 Jan 01, 2022
A supercharged version of paperless: scan, index and archive all your physical documents

Paperless-ng Paperless (click me) is an application by Daniel Quinn and contributors that indexes your scanned documents and allows you to easily sear

Jonas Winkler 5.3k Jan 09, 2023
A minimalist starknet amm adapted from StarkWare's amm.

viscus • A minimalist starknet amm adapted from StarkWare's amm. Directory Structure contracts

Alucard 4 Dec 27, 2021
Python programs, usually short, of considerable difficulty, to perfect particular skills.

Peter Norvig MIT License 2015-2020 pytudes "An étude (a French word meaning study) is an instrumental musical composition, usually short, of considera

Peter Norvig 19.9k Dec 27, 2022
Hoopoe - Get notified of important stuff, right away.

Hoopoe - Get notified of important stuff, right away. Report a Bug · Request a Feature . Ask a Question Table of Contents About Getting Started Prereq

Vahid Al 8 Nov 12, 2022
A simple spyware in python.

Spyware-Python- Dependencies: Python 3.x OpenCV PyAutoGUI PyMongo (for mongodb connection) Flask (Web Server) Ngrok (helps us push our fla

Abubakar 3 Sep 07, 2022
Improving Representations via Similarities

embetter warning I like to build in public, but please don't expect anything yet. This is alpha stuff! notes Improving Representations via Similaritie

vincent d warmerdam 229 Jan 08, 2023
gwcheck is a tool to check .gnu.warning.* sections in ELF object files and display their content.

gwcheck Description gwcheck is a tool to check .gnu.warning.* sections in ELF object files and display their content. For an introduction to .gnu.warn

Frederic Cambus 11 Oct 28, 2022
Whole-day timezone comparison

Timezone Converter Compare a full day of your local timezone with foreign ones $ timezone-converter tijuana --zone $ timezone-converter tijuana new_yo

Iago Alonso 12 Nov 24, 2022
Cardano SundaeSwap ISO SPO vote ranking script

Cardano SundaeSwap ISO SPOs vote ranking This Python 3 script uses the database populated by cardano-db-sync from the Cardano blockchain to generate a

SM₳UG 1 Nov 17, 2021
A step-by-step tutorial for how to work with some of the most basic features of Nav2 using a Jupyter Notebook in a warehouse environment to create a basic application.

This project has a step-by-step tutorial for how to work with some of the most basic features of Nav2 using a Jupyter Notebook in a warehouse environment to create a basic application.

Steve Macenski 49 Dec 22, 2022
A beacon generator using Cobalt Strike and a variety of tools.

Beaconator is an aggressor script for Cobalt Strike used to generate either staged or stageless shellcode and packing the generated shellcode using your tool of choice.

Capt. Meelo 441 Dec 17, 2022