Skip to content

Commit

Permalink
Store relative fileloc on DagModel and SerializedDAG (apache#46029)
Browse files Browse the repository at this point in the history
Store the relative fileloc at parse time so that way when we queue the task for running, we can specify the real relative fileloc along with the bundle name and version, and this will let the task run with the right file / bundle / version

---------

Co-authored-by: Jed Cunningham <[email protected]>
  • Loading branch information
dstandish and jedcunningham authored Jan 30, 2025
1 parent 63d3602 commit 97ebffe
Show file tree
Hide file tree
Showing 23 changed files with 352 additions and 262 deletions.
5 changes: 4 additions & 1 deletion airflow/cli/commands/remote_commands/task_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import textwrap
from collections.abc import Generator
from contextlib import contextmanager, redirect_stderr, redirect_stdout, suppress
from pathlib import Path
from typing import TYPE_CHECKING, Protocol, cast

import pendulum
Expand Down Expand Up @@ -299,7 +300,9 @@ def _run_task_by_executor(args, dag: DAG, ti: TaskInstance) -> None:
if executor.queue_workload.__func__ is not BaseExecutor.queue_workload: # type: ignore[attr-defined]
from airflow.executors import workloads

workload = workloads.ExecuteTask.make(ti, dag_rel_path=dag.relative_fileloc)
if TYPE_CHECKING:
assert dag.relative_fileloc
workload = workloads.ExecuteTask.make(ti, dag_rel_path=Path(dag.relative_fileloc))
with create_session() as session:
executor.queue_workload(workload, session)
else:
Expand Down
1 change: 1 addition & 0 deletions airflow/dag_processing/collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -422,6 +422,7 @@ def update_dags(
for dag_id, dm in sorted(orm_dags.items()):
dag = self.dags[dag_id]
dm.fileloc = dag.fileloc
dm.relative_fileloc = dag.relative_fileloc
dm.owners = dag.owner or conf.get("operators", "default_owner")
dm.is_active = True
dm.has_import_errors = False
Expand Down
27 changes: 16 additions & 11 deletions airflow/dag_processing/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,11 @@
import zipfile
from collections import defaultdict, deque
from collections.abc import Callable, Iterator
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from importlib import import_module
from pathlib import Path
from typing import TYPE_CHECKING, Any, NamedTuple
from typing import TYPE_CHECKING, Any, NamedTuple, cast

import attrs
import structlog
Expand Down Expand Up @@ -97,11 +98,13 @@ class DagFileStat:
log = logging.getLogger("airflow.processor_manager")


class DagFileInfo(NamedTuple):
@dataclass(frozen=True)
class DagFileInfo:
"""Information about a DAG file."""

path: str # absolute path of the file
bundle_name: str
bundle_path: Path | None = field(compare=False, default=None)


def _config_int_factory(section: str, key: str):
Expand Down Expand Up @@ -240,23 +243,23 @@ def _scan_stale_dags(self):
last_parsed = {
fp: stat.last_finish_time for fp, stat in self._file_stats.items() if stat.last_finish_time
}
self.deactivate_stale_dags(
last_parsed=last_parsed,
stale_dag_threshold=self.stale_dag_threshold,
)
self.deactivate_stale_dags(last_parsed=last_parsed)
self._last_deactivate_stale_dags_time = time.monotonic()

@provide_session
def deactivate_stale_dags(
self,
last_parsed: dict[DagFileInfo, datetime | None],
stale_dag_threshold: int,
session: Session = NEW_SESSION,
):
"""Detect and deactivate DAGs which are no longer present in files."""
to_deactivate = set()
query = select(
DagModel.dag_id, DagModel.bundle_name, DagModel.fileloc, DagModel.last_parsed_time
DagModel.dag_id,
DagModel.bundle_name,
DagModel.fileloc,
DagModel.last_parsed_time,
DagModel.relative_fileloc,
).where(DagModel.is_active)
# TODO: AIP-66 by bundle!
dags_parsed = session.execute(query)
Expand All @@ -269,7 +272,7 @@ def deactivate_stale_dags(
dag_file_path = DagFileInfo(path=dag.fileloc, bundle_name=dag.bundle_name)
if (
dag_file_path in last_parsed
and (dag.last_parsed_time + timedelta(seconds=stale_dag_threshold))
and (dag.last_parsed_time + timedelta(seconds=self.stale_dag_threshold))
< last_parsed[dag_file_path]
):
self.log.info("DAG %s is missing and will be deactivated.", dag.dag_id)
Expand Down Expand Up @@ -484,7 +487,8 @@ def _refresh_dag_bundles(self):

new_file_paths = [f for f in self._file_paths if f.bundle_name != bundle.name]
new_file_paths.extend(
DagFileInfo(path=path, bundle_name=bundle.name) for path in bundle_file_paths
DagFileInfo(path=path, bundle_path=bundle.path, bundle_name=bundle.name)
for path in bundle_file_paths
)
self.set_file_paths(new_file_paths)

Expand Down Expand Up @@ -765,6 +769,7 @@ def _create_process(self, dag_file: DagFileInfo) -> DagFileProcessorProcess:
return DagFileProcessorProcess.start(
id=id,
path=dag_file.path,
bundle_path=cast(Path, dag_file.bundle_path),
callbacks=callback_to_execute_for_file,
selector=self.selector,
logger=self._get_logger_for_dag_file(dag_file),
Expand Down Expand Up @@ -841,7 +846,7 @@ def prepare_file_path_queue(self):
if is_mtime_mode:
file_paths = sorted(files_with_mtime, key=files_with_mtime.get, reverse=True)
elif list_mode == "alphabetical":
file_paths.sort()
file_paths.sort(key=lambda f: f.path)
elif list_mode == "random_seeded_by_host":
# Shuffle the list seeded by hostname so multiple DAG processors can work on different
# set of files. Since we set the seed, the sort order will remain same per host
Expand Down
17 changes: 15 additions & 2 deletions airflow/dag_processing/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import os
import sys
import traceback
from pathlib import Path
from typing import TYPE_CHECKING, Annotated, Callable, Literal, Union

import attrs
Expand Down Expand Up @@ -73,6 +74,7 @@ def _parse_file(msg: DagFileParseRequest, log: FilteringBoundLogger) -> DagFileP
# TODO: Set known_pool names on DagBag!
bag = DagBag(
dag_folder=msg.file,
bundle_path=msg.bundle_path,
include_examples=False,
safe_mode=True,
load_op_links=False,
Expand Down Expand Up @@ -159,6 +161,10 @@ class DagFileParseRequest(BaseModel):
"""

file: str

bundle_path: Path
"""Passing bundle path around lets us figure out relative file path."""

requests_fd: int
callback_requests: list[CallbackRequest] = Field(default_factory=list)
type: Literal["DagFileParseRequest"] = "DagFileParseRequest"
Expand Down Expand Up @@ -205,17 +211,24 @@ def start( # type: ignore[override]
cls,
*,
path: str | os.PathLike[str],
bundle_path: Path,
callbacks: list[CallbackRequest],
target: Callable[[], None] = _parse_file_entrypoint,
**kwargs,
) -> Self:
proc: Self = super().start(target=target, **kwargs)
proc._on_child_started(callbacks, path)
proc._on_child_started(callbacks, path, bundle_path)
return proc

def _on_child_started(self, callbacks: list[CallbackRequest], path: str | os.PathLike[str]) -> None:
def _on_child_started(
self,
callbacks: list[CallbackRequest],
path: str | os.PathLike[str],
bundle_path: Path,
) -> None:
msg = DagFileParseRequest(
file=os.fspath(path),
bundle_path=bundle_path,
requests_fd=self._requests_fd,
callback_requests=callbacks,
)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

"""
Add relative fileloc column.
Revision ID: 8ea135928435
Revises: e39a26ac59f6
Create Date: 2025-01-24 13:17:13.444341
"""

from __future__ import annotations

import sqlalchemy as sa
from alembic import op

revision = "8ea135928435"
down_revision = "e39a26ac59f6"
branch_labels = None
depends_on = None
airflow_version = "3.0.0"


def upgrade():
"""Apply Add relative fileloc column."""
with op.batch_alter_table("dag", schema=None) as batch_op:
batch_op.add_column(sa.Column("relative_fileloc", sa.String(length=2000), nullable=True))


def downgrade():
"""Unapply Add relative fileloc column."""
with op.batch_alter_table("dag", schema=None) as batch_op:
batch_op.drop_column("relative_fileloc")
28 changes: 1 addition & 27 deletions airflow/models/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import copy
import functools
import logging
import pathlib
import sys
import time
from collections import defaultdict
Expand Down Expand Up @@ -735,20 +734,6 @@ def dag_id(self, value: str) -> None:
def timetable_summary(self) -> str:
return self.timetable.summary

@property
def relative_fileloc(self) -> pathlib.Path:
"""File location of the importable dag 'file' relative to the configured DAGs folder."""
path = pathlib.Path(self.fileloc)
try:
rel_path = path.relative_to(self._processor_dags_folder or settings.DAGS_FOLDER)
if rel_path == pathlib.Path("."):
return path
else:
return rel_path
except ValueError:
# Not relative to DAGS_FOLDER.
return path

@provide_session
def get_concurrency_reached(self, session=NEW_SESSION) -> bool:
"""Return a boolean indicating whether the max_active_tasks limit for this DAG has been reached."""
Expand Down Expand Up @@ -2045,6 +2030,7 @@ class DagModel(Base):
# packaged DAG, it will point to the subpath of the DAG within the
# associated zip.
fileloc = Column(String(2000))
relative_fileloc = Column(String(2000))
bundle_name = Column(StringID(), ForeignKey("dag_bundle.name"), nullable=True)
# The version of the bundle the last time the DAG was processed
bundle_version = Column(String(200), nullable=True)
Expand Down Expand Up @@ -2214,18 +2200,6 @@ def get_default_view(self) -> str:
def safe_dag_id(self):
return self.dag_id.replace(".", "__dot__")

@property
def relative_fileloc(self) -> pathlib.Path | None:
"""File location of the importable dag 'file' relative to the configured DAGs folder."""
if self.fileloc is None:
return None
path = pathlib.Path(self.fileloc)
try:
return path.relative_to(settings.DAGS_FOLDER)
except ValueError:
# Not relative to DAGS_FOLDER.
return path

@provide_session
def set_is_paused(self, is_paused: bool, session=NEW_SESSION) -> None:
"""
Expand Down
7 changes: 6 additions & 1 deletion airflow/models/dagbag.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,9 +126,10 @@ def __init__(
load_op_links: bool = True,
collect_dags: bool = True,
known_pools: set[str] | None = None,
bundle_path: Path | None = None,
):
super().__init__()

self.bundle_path: Path | None = bundle_path
include_examples = (
include_examples
if isinstance(include_examples, bool)
Expand Down Expand Up @@ -482,6 +483,10 @@ def _process_modules(self, filepath, mods, file_last_changed_on_disk):

for dag, mod in top_level_dags:
dag.fileloc = mod.__file__
if self.bundle_path:
dag.relative_fileloc = str(Path(mod.__file__).relative_to(self.bundle_path))
else:
dag.relative_fileloc = dag.fileloc
try:
dag.validate()
self.bag_dag(dag=dag)
Expand Down
5 changes: 4 additions & 1 deletion airflow/models/taskinstance.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
from datetime import timedelta
from enum import Enum
from functools import cache
from pathlib import Path
from typing import TYPE_CHECKING, Any, Callable
from urllib.parse import quote

Expand Down Expand Up @@ -1946,7 +1947,9 @@ def _command_as_list(
if dag is None:
raise ValueError("DagModel is empty")

path = dag.relative_fileloc
path = None
if dag.relative_fileloc:
path = Path(dag.relative_fileloc)

if path:
if not path.is_absolute():
Expand Down
1 change: 1 addition & 0 deletions airflow/serialization/schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@
},
"catchup": { "type": "boolean" },
"fileloc": { "type" : "string"},
"relative_fileloc": { "type" : "string"},
"_processor_dags_folder": {
"anyOf": [
{ "type": "null" },
Expand Down
2 changes: 1 addition & 1 deletion airflow/utils/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ class MappedClassProtocol(Protocol):
"2.9.2": "686269002441",
"2.10.0": "22ed7efa9da2",
"2.10.3": "5f2621c13b39",
"3.0.0": "e39a26ac59f6",
"3.0.0": "8ea135928435",
}


Expand Down
2 changes: 1 addition & 1 deletion docs/apache-airflow/img/airflow_erd.sha256
Original file line number Diff line number Diff line change
@@ -1 +1 @@
eb25e0718c9382cdbb02368c9c3e29c90da06ddaba8e8e92d9fc53417b714039
ff7265e5bc09d6b46d8e95f0c247b3dc5b1262451ab128c711888ffafa21c9db
Loading

0 comments on commit 97ebffe

Please sign in to comment.