A reproduction repo for a Scheduling bug in AirFlow 2.2.3

Overview

How to run

docker-compose build
docker-compose up

Setup

Have 3 DAGs:

  1. ~240 Tasks, executes every hour, runs for about 45-50 minutes total
  2. ~600 Tasks, executes every 5 days, runs for days
  3. ~10 Tasks, executes on trigger, runs for 10-50 minutes

DAGs run on the default_pool with max_active_tasks set to 2.

My AirFlow config file has the following:

parallelism = 32
default_pool_task_slot_count = 6
executor = LocalExecutor
default_task_weight_rule = absolute

parallelism was set initially like that. default_pool_task_slot_count was set to 6, because it seemed rational that if all of my 3 dags are executing at the same time, the maximum amount of tasks that can be executed is 3*2=6.

The problem:

Almost every time any one of the DAGs is executed, no tasks from other DAGs will start until all the tasks from the first one finish. That is, if slow DAG_2 with 600 tasks starts, DAG_1 will have to wait for days to start even a single Task.

the_bug

The Logs for the Scheduler look like this:

scheduler_1  | 
scheduler_1  | [2022-02-09 10:02:27,116] {scheduler_job.py:288} INFO - 4 tasks up for execution:
scheduler_1  |  
   
    
scheduler_1  |  
    
     
scheduler_1  |  
     
      
scheduler_1  |  
      
       
scheduler_1  | [2022-02-09 10:02:27,119] {scheduler_job.py:322} INFO - Figuring out tasks to run in Pool(name=default_pool) with 4 open slots and 4 task instances ready to be queued
scheduler_1  | [2022-02-09 10:02:27,119] {scheduler_job.py:349} INFO - DAG Slow_DAG has 2/2 running and queued tasks
scheduler_1  | [2022-02-09 10:02:27,119] {scheduler_job.py:357} INFO - Not executing 
       
         since the number of tasks running or queued from DAG Slow_DAG is >= to the DAG's max_active_tasks limit of 2 scheduler_1 | [2022-02-09 10:02:27,119] {scheduler_job.py:349} INFO - DAG Slow_DAG has 2/2 running and queued tasks scheduler_1 | [2022-02-09 10:02:27,119] {scheduler_job.py:357} INFO - Not executing 
        
          since the number of tasks running or queued from DAG Slow_DAG is >= to the DAG's max_active_tasks limit of 2 scheduler_1 | [2022-02-09 10:02:27,119] {scheduler_job.py:349} INFO - DAG Slow_DAG has 2/2 running and queued tasks scheduler_1 | [2022-02-09 10:02:27,120] {scheduler_job.py:357} INFO - Not executing 
         
           since the number of tasks running or queued from DAG Slow_DAG is >= to the DAG's max_active_tasks limit of 2 scheduler_1 | [2022-02-09 10:02:27,120] {scheduler_job.py:349} INFO - DAG Slow_DAG has 2/2 running and queued tasks scheduler_1 | [2022-02-09 10:02:27,120] {scheduler_job.py:357} INFO - Not executing 
          
            since the number of tasks running or queued from DAG Slow_DAG is >= to the DAG's max_active_tasks limit of 2 scheduler_1 | [2022-02-09 10:02:27,120] {scheduler_job.py:410} INFO - Setting the following tasks to queued state: scheduler_1 | 
          
         
        
       
      
     
    
   

I have fixed this by setting

parallelism = 1000
default_pool_task_slot_count = 999

The reason why this occurs and why the solution works

We need to look at this method of the Scheduler: https://github.com/apache/airflow/blob/2.2.3/airflow/jobs/scheduler_job.py#L229
In it, the overall logic is the following:

  1. Receive max_tis(== parallelism - active running tasks from the config) in the arguments. In my case that would be 30
  2. Calculate how many free slots in all all Pools we have. In my case that would be 4.
  3. Select the minimum and update the max_tis variable: max_tis = min(4, 30) = 4
  4. Query the DB for tasks that are Scheduled in an unpaused DAGs that are running normally and order them by DAG execution date.
  5. Limit the query by max_tis.
  6. Loop over each returned task and check if we can run it. Run, if possible.

Now, since the tasks are ordered by execution date, this will return us tasks for a DAG that started first, which lets say was DAG_2. It has over 600 tasks. The LIMIT operation will return 4 of them. Each off these tasks cannot be run since there are already 2 tasks running and the max_active_tasks of the DAG is 2. Thus, we just wait till one of these tasks finish and start another task from the same DAG.

Owner
Ilya Strelnikov
Ilya Strelnikov
NORETURN is an esoteric programming language, based around the idea of not going back

NORETURN NORETURN is an esoteric programming language, based around the idea of not going back Concept Program coded in noreturn runs over one array,

1 Dec 15, 2021
Calculate the efficient frontier

关于 代码主要参考Fábio Neves的文章,你可以在他的文章中找到一些细节性的解释

Wyman Lin 104 Nov 11, 2022
Tracking development of the Class Schedule Siri Shortcut, an iOS program that checks the type of school day and tells you class scheduling.

Class Schedule Shortcut Tracking development of the Class Schedule Siri Shortcut, an iOS program that checks the type of school day and tells you clas

3 Jun 28, 2022
Allows you to purge all reply comments left by a user on a YouTube channel or video.

YouTube Spammer Purge Allows you to purge all reply comments left by a user on a YouTube channel or video. Purpose Recently, there has been a massive

4.3k Jan 09, 2023
A set of simple functions to upload and fetch pastes on paste.uploadgram.me

pastegram-py A set of simple functions to upload and fetch pastes on paste.uploadgram.me. API Documentation Methods upload_paste(contents: bytes, file

Uploadgram 3 Sep 13, 2022
Blender addons - A collection of Blender tools I've written for myself over the years.

gret A collection of Blender tools I've written for myself over the years. I use these daily so they should be bug-free, mostly. Feel free to take and

217 Jan 08, 2023
An open-source hyper-heuristic framework for multi-objective optimization

MOEA-HH An open-source hyper-heuristic framework for multi-objective optimization. Introduction The multi-objective optimization technique is widely u

Hengzhe Zhang 1 Feb 10, 2022
Singularity Containers on Apple M1 (ARM64)

Singularity Containers on Apple M1 (ARM64) This is a repository containing a ready-to-use environment for singularity in arm64 (M1). It has been prepa

Manuel Parra 4 Nov 14, 2022
A programming language that for tech savvy graphic designers

Microsoft Hackathon - PhoTex Idea A programming language that allows tech savvy graphic designers develop scalable vector graphics using plain text co

Joe Furfaro 5 Nov 14, 2021
Urban Big Data Centre Housing Sensor Project

Housing Sensor Project The Urban Big Data Centre is conducting a study of indoor environmental data in Scottish houses. We are using Raspberry Pi devi

Jeremy Singer 2 Dec 13, 2021
Donatus Prince 6 Feb 25, 2022
A gamey, snakey esoteric programming language

Snak Snak is an esolang based on the classic snake game. Installation You will need python3. To use the visualizer, you will need the curses module. T

David Rutter 3 Oct 10, 2022
A patch and keygen tools for typora.

A patch and keygen tools for typora.

Mason Shi 1.4k Apr 12, 2022
This simple script generates a backup of a given Python and R environment

Python Environment Backup It’s always good to maintain your Python and R Anaconda environment packages properly listed and well-kept in case you have

Andrew Laganaro 1 Jul 13, 2022
How did Covid affect businesses?

NYC_Business_Analysis How did Covid affect businesses? COVID's effect on NYC businesses We all know that businesses in NYC have been affected by COVID

AK 1 Jan 15, 2022
On this repo, you'll find every codes I made during my NSI classes (informatical courses)

👨‍💻 👩‍💻 school-codes On this repo, you'll find every codes I made during my NSI classes (informatical courses) French for now since this repo is d

EDM 1.15 3 Dec 17, 2022
Data wrangling & common calculations for results from qMem measurement software

qMem Datawrangler This script processes output of qMem measurement software into an Origin ® compatible *.csv files and matplotlib graphs to quickly v

Julian 1 Nov 30, 2021
NCAR/UCAR virtual Python Tutorial Seminar Series lesson on MetPy.

The Project Pythia Python Tutorial Seminar Series continues with a lesson on MetPy on Wednesday, 2 February 2022 at 1 PM Mountain Standard Time.

Project Pythia Tutorials 6 Oct 09, 2022
An early stage integration of Hotwire Turbo with Django

Note: This is not ready for production. APIs likely to change dramatically. Please drop by our Slack channel to discuss!

Hotwire for Django 352 Jan 06, 2023
Python / C++ based particle reaction-diffusion simulator

ReaDDy (Reaction Diffusion Dynamics) is an open source particle based reaction-diffusion simulator that can be configured and run via Python. Currentl

ReaDDy 46 Dec 09, 2022