Skip to content

Commit

Permalink
Rename processor_poll_interval to scheduler_idle_sleep_time (#…
Browse files Browse the repository at this point in the history
…18704)

`[scheduler] processor_poll_interval` setting in `airflow.cfg` has been renamed to `[scheduler] scheduler_idle_sleep_time`
for better understanding.
  • Loading branch information
kaxil authored Oct 5, 2021
1 parent 57bb47f commit 4e10c25
Show file tree
Hide file tree
Showing 7 changed files with 45 additions and 14 deletions.
21 changes: 21 additions & 0 deletions UPDATING.md
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,27 @@ with DAG(dag_id="task_concurrency_example"):
BashOperator(task_id="t1", max_active_tis_per_dag=2, bash_command="echo Hi")
```
### `processor_poll_interval` config have been renamed to `scheduler_idle_sleep_time`
`[scheduler] processor_poll_interval` setting in `airflow.cfg` has been renamed to `[scheduler] scheduler_idle_sleep_time`
for better understanding.
It controls the 'time to sleep' at the end of the Scheduler loop if nothing was scheduled inside `SchedulerJob`.
**Before**:
```ini
[scheduler]
processor_poll_interval = 16
```
**Now**:
```ini
[scheduler]
scheduler_idle_sleep_time = 16
```
### Marking success/failed automatically clears failed downstream tasks
When marking a task success/failed in Graph View, its downstream tasks that are in failed/upstream_failed state are automatically cleared.
Expand Down
7 changes: 3 additions & 4 deletions airflow/config_templates/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1739,13 +1739,12 @@
type: string
example: ~
default: "-1"
- name: processor_poll_interval
- name: scheduler_idle_sleep_time
description: |
Controls how long the scheduler will sleep between loops, but if there was nothing to do
in the loop. i.e. if it scheduled something then it will start the next loop
iteration straight away. This parameter is badly named (historical reasons) and it will be
renamed in the future with deprecation of the current name.
version_added: 1.10.6
iteration straight away.
version_added: 2.2.0
type: string
example: ~
default: "1"
Expand Down
5 changes: 2 additions & 3 deletions airflow/config_templates/default_airflow.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -870,9 +870,8 @@ num_runs = -1

# Controls how long the scheduler will sleep between loops, but if there was nothing to do
# in the loop. i.e. if it scheduled something then it will start the next loop
# iteration straight away. This parameter is badly named (historical reasons) and it will be
# renamed in the future with deprecation of the current name.
processor_poll_interval = 1
# iteration straight away.
scheduler_idle_sleep_time = 1

# Number of seconds after which a DAG file is parsed. The DAG file is parsed every
# ``min_file_process_interval`` number of seconds. Updates to DAGs are reflected after
Expand Down
1 change: 1 addition & 0 deletions airflow/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ class AirflowConfigParser(ConfigParser):
('metrics', 'statsd_datadog_tags'): ('scheduler', 'statsd_datadog_tags', '2.0.0'),
('metrics', 'statsd_custom_client_path'): ('scheduler', 'statsd_custom_client_path', '2.0.0'),
('scheduler', 'parsing_processes'): ('scheduler', 'max_threads', '1.10.14'),
('scheduler', 'scheduler_idle_sleep_time'): ('scheduler', 'processor_poll_interval', '2.2.0'),
('operators', 'default_queue'): ('celery', 'default_queue', '2.1.0'),
('core', 'hide_sensitive_var_conn_fields'): ('admin', 'hide_sensitive_variable_fields', '2.1.0'),
('core', 'sensitive_var_conn_names'): ('admin', 'sensitive_variable_fields', '2.1.0'),
Expand Down
21 changes: 16 additions & 5 deletions airflow/jobs/scheduler_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import signal
import sys
import time
import warnings
from collections import defaultdict
from datetime import timedelta
from typing import Collection, DefaultDict, Dict, List, Optional, Tuple
Expand Down Expand Up @@ -86,9 +87,9 @@ class SchedulerJob(BaseJob):
:param num_times_parse_dags: The number of times to try to parse each DAG file.
-1 for unlimited times.
:type num_times_parse_dags: int
:param processor_poll_interval: The number of seconds to wait between
:param scheduler_idle_sleep_time: The number of seconds to wait between
polls of running processors
:type processor_poll_interval: int
:type scheduler_idle_sleep_time: int
:param do_pickle: once a DAG object is obtained by executing the Python
file, whether to serialize the DAG object to the DB
:type do_pickle: bool
Expand All @@ -104,9 +105,10 @@ def __init__(
subdir: str = settings.DAGS_FOLDER,
num_runs: int = conf.getint('scheduler', 'num_runs'),
num_times_parse_dags: int = -1,
processor_poll_interval: float = conf.getfloat('scheduler', 'processor_poll_interval'),
scheduler_idle_sleep_time: float = conf.getfloat('scheduler', 'scheduler_idle_sleep_time'),
do_pickle: bool = False,
log: logging.Logger = None,
processor_poll_interval: Optional[float] = None,
*args,
**kwargs,
):
Expand All @@ -117,7 +119,16 @@ def __init__(
# number of times. This is only to support testing, and isn't something a user is likely to want to
# configure -- they'll want num_runs
self.num_times_parse_dags = num_times_parse_dags
self._processor_poll_interval = processor_poll_interval
if processor_poll_interval:
# TODO: Remove in Airflow 3.0
warnings.warn(
"The 'processor_poll_interval' parameter is deprecated. "
"Please use 'scheduler_idle_sleep_time'.",
DeprecationWarning,
stacklevel=2,
)
scheduler_idle_sleep_time = processor_poll_interval
self._scheduler_idle_sleep_time = scheduler_idle_sleep_time

self.do_pickle = do_pickle
super().__init__(*args, **kwargs)
Expand Down Expand Up @@ -676,7 +687,7 @@ def _run_scheduler_loop(self) -> None:
# If the scheduler is doing things, don't sleep. This means when there is work to do, the
# scheduler will run "as quick as possible", but when it's stopped, it can sleep, dropping CPU
# usage when "idle"
time.sleep(min(self._processor_poll_interval, next_event))
time.sleep(min(self._scheduler_idle_sleep_time, next_event))

if loop_count >= self.num_runs > 0:
self.log.info(
Expand Down
2 changes: 1 addition & 1 deletion docs/apache-airflow/best-practices.rst
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,7 @@ In case you see long delays between updating it and the time it is ready to be t
at the following configuration parameters and fine tune them according your needs (see details of
each parameter by following the links):

* :ref:`config:scheduler__processor_poll_interval`
* :ref:`config:scheduler__scheduler_idle_sleep_time`
* :ref:`config:scheduler__min_file_process_interval`
* :ref:`config:scheduler__dag_dir_list_interval`
* :ref:`config:scheduler__parsing_processes`
Expand Down
2 changes: 1 addition & 1 deletion docs/apache-airflow/concepts/scheduler.rst
Original file line number Diff line number Diff line change
Expand Up @@ -366,7 +366,7 @@ However you can also look at other non-performance-related scheduler configurati
The scheduler can run multiple processes in parallel to parse DAG files. This defines
how many processes will run.

- :ref:`config:scheduler__processor_poll_interval`
- :ref:`config:scheduler__scheduler_idle_sleep_time`
Controls how long the scheduler will sleep between loops, but if there was nothing to do
in the loop. i.e. if it scheduled something then it will start the next loop
iteration straight away. This parameter is badly named (historical reasons) and it will be
Expand Down

0 comments on commit 4e10c25

Please sign in to comment.