Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove dag processor from the scheduler #45729

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 17 additions & 22 deletions airflow/api/common/airflow_health.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

from typing import Any

from airflow.configuration import conf
from airflow.jobs.dag_processor_job_runner import DagProcessorJobRunner
from airflow.jobs.scheduler_job_runner import SchedulerJobRunner
from airflow.jobs.triggerer_job_runner import TriggererJobRunner
Expand All @@ -29,13 +28,14 @@

def get_airflow_health() -> dict[str, Any]:
"""Get the health for Airflow metadatabase, scheduler and triggerer."""
dag_processor_enabled = conf.getboolean("scheduler", "standalone_dag_processor")
metadatabase_status = HEALTHY
latest_scheduler_heartbeat = None
latest_triggerer_heartbeat = None
latest_dag_processor_heartbeat = None

scheduler_status = UNHEALTHY
triggerer_status: str | None = UNHEALTHY
dag_processor_status: str | None = UNHEALTHY

try:
latest_scheduler_job = SchedulerJobRunner.most_recent_job()
Expand All @@ -59,6 +59,18 @@ def get_airflow_health() -> dict[str, Any]:
except Exception:
metadatabase_status = UNHEALTHY

try:
latest_dag_processor_job = DagProcessorJobRunner.most_recent_job()

if latest_dag_processor_job:
latest_dag_processor_heartbeat = latest_dag_processor_job.latest_heartbeat.isoformat()
if latest_dag_processor_job.is_alive():
dag_processor_status = HEALTHY
else:
dag_processor_status = None
except Exception:
metadatabase_status = UNHEALTHY

airflow_health_status = {
"metadatabase": {"status": metadatabase_status},
"scheduler": {
Expand All @@ -69,27 +81,10 @@ def get_airflow_health() -> dict[str, Any]:
"status": triggerer_status,
"latest_triggerer_heartbeat": latest_triggerer_heartbeat,
},
}

if dag_processor_enabled:
latest_dag_processor_heartbeat = None
dag_processor_status: str | None = UNHEALTHY

try:
latest_dag_processor_job = DagProcessorJobRunner.most_recent_job()

if latest_dag_processor_job:
latest_dag_processor_heartbeat = latest_dag_processor_job.latest_heartbeat.isoformat()
if latest_dag_processor_job.is_alive():
dag_processor_status = HEALTHY
else:
dag_processor_status = None
except Exception:
metadatabase_status = UNHEALTHY

airflow_health_status["dag_processor"] = {
"dag_processor": {
"status": dag_processor_status,
"latest_dag_processor_heartbeat": latest_dag_processor_heartbeat,
}
},
}

return airflow_health_status
2 changes: 0 additions & 2 deletions airflow/auth/managers/simple/views/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ def login(self):
"""Start login process."""
state_color_mapping = State.state_color.copy()
state_color_mapping["no_status"] = state_color_mapping.pop(None)
standalone_dag_processor = conf.getboolean("scheduler", "standalone_dag_processor")
return self.render_template(
"airflow/login.html",
disable_nav_bar=True,
Expand All @@ -57,7 +56,6 @@ def login(self):
),
auto_refresh_interval=conf.getint("webserver", "auto_refresh_interval"),
state_color_mapping=state_color_mapping,
standalone_dag_processor=standalone_dag_processor,
)

@expose("/logout", methods=["GET", "POST"])
Expand Down
1 change: 0 additions & 1 deletion airflow/cli/cli_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -1902,7 +1902,6 @@ class GroupCommand(NamedTuple):
help="Start a scheduler instance",
func=lazy_load_command("airflow.cli.commands.local_commands.scheduler_command.scheduler"),
args=(
ARG_SUBDIR,
ARG_NUM_RUNS,
ARG_PID,
ARG_DAEMON,
Expand Down
3 changes: 0 additions & 3 deletions airflow/cli/commands/local_commands/dag_processor_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,6 @@ def _create_dag_processor_job_runner(args: Any) -> DagProcessorJobRunner:
@providers_configuration_loaded
def dag_processor(args):
"""Start Airflow Dag Processor Job."""
if not conf.getboolean("scheduler", "standalone_dag_processor"):
raise SystemExit("The option [scheduler/standalone_dag_processor] must be True.")

job_runner = _create_dag_processor_job_runner(args)

reload_configuration_for_dag_processing()
Expand Down
3 changes: 1 addition & 2 deletions airflow/cli/commands/local_commands/scheduler_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
from airflow.jobs.job import Job, run_job
from airflow.jobs.scheduler_job_runner import SchedulerJobRunner
from airflow.utils import cli as cli_utils
from airflow.utils.cli import process_subdir
from airflow.utils.providers_configuration_loader import providers_configuration_loaded
from airflow.utils.scheduler_health import serve_health_check
from airflow.utils.usage_data_collection import usage_data_collection
Expand All @@ -39,7 +38,7 @@


def _run_scheduler_job(args) -> None:
job_runner = SchedulerJobRunner(job=Job(), subdir=process_subdir(args.subdir), num_runs=args.num_runs)
job_runner = SchedulerJobRunner(job=Job(), num_runs=args.num_runs)
enable_health_check = conf.getboolean("scheduler", "ENABLE_HEALTH_CHECK")
with _serve_logs(args.skip_serve_logs), _serve_health_check(enable_health_check):
run_job(job=job_runner.job, execute_callable=job_runner._execute)
Expand Down
1 change: 0 additions & 1 deletion airflow/cli/commands/local_commands/standalone_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,6 @@ def calculate_env(self):
We override some settings as part of being standalone.
"""
env = dict(os.environ)
env["AIRFLOW__SCHEDULER__STANDALONE_DAG_PROCESSOR"] = "True"

# Make sure we're using a local executor flavour
executor_class, _ = ExecutorLoader.import_default_executor_cls()
Expand Down
12 changes: 1 addition & 11 deletions airflow/config_templates/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2447,25 +2447,15 @@ scheduler:
type: string
example: ~
default: "modified_time"
standalone_dag_processor:
description: |
Whether the dag processor is running as a standalone process or it is a subprocess of a scheduler
job.
version_added: 2.3.0
type: boolean
example: ~
default: "False"
max_callbacks_per_loop:
description: |
Only applicable if ``[scheduler] standalone_dag_processor`` is true and callbacks are stored
in database. Contains maximum number of callbacks that are fetched during a single loop.
The maximum number of callbacks that are fetched during a single loop.
version_added: 2.3.0
type: integer
example: ~
default: "20"
dag_stale_not_seen_duration:
description: |
Only applicable if ``[scheduler] standalone_dag_processor`` is true.
Time in seconds after which dags, which were not updated by Dag Processor are deactivated.
version_added: 2.4.0
type: integer
Expand Down
Loading