A reproduction repo for a Scheduling bug in AirFlow 2.2.3

Overview

How to run

docker-compose build
docker-compose up

Setup

Have 3 DAGs:

  1. ~240 Tasks, executes every hour, runs for about 45-50 minutes total
  2. ~600 Tasks, executes every 5 days, runs for days
  3. ~10 Tasks, executes on trigger, runs for 10-50 minutes

DAGs run on the default_pool with max_active_tasks set to 2.

My AirFlow config file has the following:

parallelism = 32
default_pool_task_slot_count = 6
executor = LocalExecutor
default_task_weight_rule = absolute

parallelism was set initially like that. default_pool_task_slot_count was set to 6, because it seemed rational that if all of my 3 dags are executing at the same time, the maximum amount of tasks that can be executed is 3*2=6.

The problem:

Almost every time any one of the DAGs is executed, no tasks from other DAGs will start until all the tasks from the first one finish. That is, if slow DAG_2 with 600 tasks starts, DAG_1 will have to wait for days to start even a single Task.

the_bug

The Logs for the Scheduler look like this:

scheduler_1  | 
scheduler_1  | [2022-02-09 10:02:27,116] {scheduler_job.py:288} INFO - 4 tasks up for execution:
scheduler_1  |  
   
    
scheduler_1  |  
    
     
scheduler_1  |  
     
      
scheduler_1  |  
      
       
scheduler_1  | [2022-02-09 10:02:27,119] {scheduler_job.py:322} INFO - Figuring out tasks to run in Pool(name=default_pool) with 4 open slots and 4 task instances ready to be queued
scheduler_1  | [2022-02-09 10:02:27,119] {scheduler_job.py:349} INFO - DAG Slow_DAG has 2/2 running and queued tasks
scheduler_1  | [2022-02-09 10:02:27,119] {scheduler_job.py:357} INFO - Not executing 
       
         since the number of tasks running or queued from DAG Slow_DAG is >= to the DAG's max_active_tasks limit of 2 scheduler_1 | [2022-02-09 10:02:27,119] {scheduler_job.py:349} INFO - DAG Slow_DAG has 2/2 running and queued tasks scheduler_1 | [2022-02-09 10:02:27,119] {scheduler_job.py:357} INFO - Not executing 
        
          since the number of tasks running or queued from DAG Slow_DAG is >= to the DAG's max_active_tasks limit of 2 scheduler_1 | [2022-02-09 10:02:27,119] {scheduler_job.py:349} INFO - DAG Slow_DAG has 2/2 running and queued tasks scheduler_1 | [2022-02-09 10:02:27,120] {scheduler_job.py:357} INFO - Not executing 
         
           since the number of tasks running or queued from DAG Slow_DAG is >= to the DAG's max_active_tasks limit of 2 scheduler_1 | [2022-02-09 10:02:27,120] {scheduler_job.py:349} INFO - DAG Slow_DAG has 2/2 running and queued tasks scheduler_1 | [2022-02-09 10:02:27,120] {scheduler_job.py:357} INFO - Not executing 
          
            since the number of tasks running or queued from DAG Slow_DAG is >= to the DAG's max_active_tasks limit of 2 scheduler_1 | [2022-02-09 10:02:27,120] {scheduler_job.py:410} INFO - Setting the following tasks to queued state: scheduler_1 | 
          
         
        
       
      
     
    
   

I have fixed this by setting

parallelism = 1000
default_pool_task_slot_count = 999

The reason why this occurs and why the solution works

We need to look at this method of the Scheduler: https://github.com/apache/airflow/blob/2.2.3/airflow/jobs/scheduler_job.py#L229
In it, the overall logic is the following:

  1. Receive max_tis(== parallelism - active running tasks from the config) in the arguments. In my case that would be 30
  2. Calculate how many free slots in all all Pools we have. In my case that would be 4.
  3. Select the minimum and update the max_tis variable: max_tis = min(4, 30) = 4
  4. Query the DB for tasks that are Scheduled in an unpaused DAGs that are running normally and order them by DAG execution date.
  5. Limit the query by max_tis.
  6. Loop over each returned task and check if we can run it. Run, if possible.

Now, since the tasks are ordered by execution date, this will return us tasks for a DAG that started first, which lets say was DAG_2. It has over 600 tasks. The LIMIT operation will return 4 of them. Each off these tasks cannot be run since there are already 2 tasks running and the max_active_tasks of the DAG is 2. Thus, we just wait till one of these tasks finish and start another task from the same DAG.

Owner
Ilya Strelnikov
Ilya Strelnikov
Python AVL Protocols Server for Codec 8 and Codec 8 Extended Protocols

pycodecs Package provides python AVL Protocols Server for Codec 8 and Codec 8 Extended Protocols This package will parse the AVL Data and log it in hu

Vardharajulu K N 2 Jun 21, 2022
The first Python 1v1.lol triggerbot working with colors !

1v1.lol TriggerBot Afin d'utiliser mon triggerbot, vous devez activer le plein écran sur 1v1.lol sur votre naviguateur (quelque-soit ce dernier). Vous

Venax 5 Jul 25, 2022
Powerful Assistant

Delta-Assistant Hi I'm Phoenix This project is a smart assistant This is the 1.0 version of this project I am currently working on the third version o

1 Nov 17, 2021
A stupid obfuscation thing

StupidObfuscation A stupid obfuscation thing How it works The obfuscator takes a string, splits into pieces of one, then, using the table from letter.

Echo 2 May 03, 2022
This is a simple python script for checking A/L Examination results of srilankan students

AL-Result-Checker This is a simple python script for checking A/L Examination results of srilankan students INSTALLATION [Termux] [Linux] : apt-get up

Razor Kenway 8 Oct 24, 2022
Open Source defrag's mod code

Open Source defrag's mod code Goals: Code & License: Respect FOSS philosophy. Open source and community focus. Eliminate all traces of q3a-sdk licensi

sOkam! 1 Dec 10, 2022
A simple Programming Language

R.S.O.C. A custom built programming language About The Project R.S.O.C. is a custom built programming language very similar to a low-level 8085 progra

Ravi Maurya 17 Sep 13, 2022
An interactive tool with which to explore the possible imaging performance of candidate ngEHT architectures.

ngEHTexplorer An interactive tool with which to explore the possible imaging performance of candidate ngEHT architectures. Welcome! ngEHTexplorer is a

Avery Broderick 7 Jan 28, 2022
Async Python Circuit Breaker implementation

aiocircuitbreaker This is an async Python implementation of the circuitbreaker library. Installation The project is available on PyPI. Simply run: $ p

5 Sep 05, 2022
A python implementation of differentiable quality diversity.

Differentiable Quality Diversity This repository is the official implementation of Differentiable Quality Diversity.

ICAROS 41 Nov 30, 2022
Team collaborative evaluation tracker.

Team collaborative evaluation tracker.

2 Dec 19, 2021
Remove Sheet Protection from .xlsx files. Easily.

🔓 Excel Sheet Unlocker Remove sheet protection from .xlsx files. How to use Run Run the script/packaged executable from the command line. Universal u

Daniel 3 Nov 16, 2022
Automated Content Feed Curator

Gathers posts from content feeds, filters, formats, delivers to you.

Alper S. Soylu 2 Jan 22, 2022
🦕 Compile Deno executables and compress them for all platforms easily

Denoc Compile Deno executables and compress them for all platforms easily. Install You can install denoc from PyPI like any other package: pip install

Eliaz Bobadilla 8 Apr 04, 2022
Null safe support for Python

Null Safe Python Null safe support for Python. Installation pip install nullsafe Quick Start Dummy Class class Dummy: pass Normal Python code: o =

Paaksing 13 Nov 17, 2022
La version open source du bot Discord Sblerboy

Sblerboy-Open-Source La version open source du bot Discord Sblerboy Sblerboy est un bot Discord permettant de jouer à des jeux de Gameboy directement

15 Nov 19, 2022
Types for the Rasterio package

types-rasterio Types for the rasterio package A work in progress Install Not yet published to PyPI pip install types-rasterio These type definitions

Kyle Barron 7 Sep 10, 2021
A corona information module

A corona information module

Fayas Noushad 3 Nov 28, 2021
Prints values and types during compilation!

Compile-Time Printer Compile-Time Printer prints values and types at compile-time in C++. Teaser test.cpp compile-time-printer

43 Dec 26, 2022
This is a vscode extension with a Virtual Assistant that you can play with when you are bored or you need help..

VS Code Virtual Assistant This is a vscode extension with a Virtual Assistant that you can play with when you are bored or you need help. Its currentl

Soham Ghugare 6 Aug 22, 2021