diff --git a/airflow/api_connexion/openapi/v1.yaml b/airflow/api_connexion/openapi/v1.yaml index a92499fc81e49..24b9c1be0d339 100644 --- a/airflow/api_connexion/openapi/v1.yaml +++ b/airflow/api_connexion/openapi/v1.yaml @@ -3196,14 +3196,20 @@ components: description: > User-provided DAG description, which can consist of several sentences or paragraphs that describe DAG contents. - schedule_interval: - $ref: "#/components/schemas/ScheduleInterval" + timetable_summary: + type: string + readOnly: true + nullable: true + description: | + Timetable summary. + + *New in version 3.0.0* timetable_description: type: string readOnly: true nullable: true description: | - Timetable/Schedule Interval description. + Timetable description. *New in version 2.3.0* tags: @@ -5190,19 +5196,6 @@ components: *New in version 3.0.0* # Common data type - ScheduleInterval: - description: | - Schedule interval. Defines how often DAG runs, this object gets added to your latest task instance's - execution_date to figure out the next schedule. - nullable: true - readOnly: true - anyOf: - - $ref: "#/components/schemas/TimeDelta" - - $ref: "#/components/schemas/RelativeDelta" - - $ref: "#/components/schemas/CronExpression" - discriminator: - propertyName: __type - TimeDelta: description: Time delta type: object diff --git a/airflow/api_connexion/schemas/common_schema.py b/airflow/api_connexion/schemas/common_schema.py index e91c7b23d8855..569a745a62f52 100644 --- a/airflow/api_connexion/schemas/common_schema.py +++ b/airflow/api_connexion/schemas/common_schema.py @@ -24,7 +24,6 @@ import marshmallow from dateutil import relativedelta from marshmallow import Schema, fields, validate -from marshmallow_oneofschema import OneOfSchema from airflow.models.mappedoperator import MappedOperator from airflow.serialization.serialized_objects import SerializedBaseOperator @@ -90,42 +89,6 @@ def make_cron_expression(self, data, **kwargs): return CronExpression(data["value"]) -class ScheduleIntervalSchema(OneOfSchema): - """ - Schedule interval. - - It supports the following types: - - * TimeDelta - * RelativeDelta - * CronExpression - """ - - type_field = "__type" - type_schemas = { - "TimeDelta": TimeDeltaSchema, - "RelativeDelta": RelativeDeltaSchema, - "CronExpression": CronExpressionSchema, - } - - def _dump(self, obj, update_fields=True, **kwargs): - if isinstance(obj, str): - obj = CronExpression(obj) - - return super()._dump(obj, update_fields=update_fields, **kwargs) - - def get_obj_type(self, obj): - """Select schema based on object type.""" - if isinstance(obj, datetime.timedelta): - return "TimeDelta" - elif isinstance(obj, relativedelta.relativedelta): - return "RelativeDelta" - elif isinstance(obj, CronExpression): - return "CronExpression" - else: - raise TypeError(f"Unknown object type: {obj.__class__.__name__}") - - class ColorField(fields.String): """Schema for color property.""" diff --git a/airflow/api_connexion/schemas/dag_schema.py b/airflow/api_connexion/schemas/dag_schema.py index 32eca2f0b8903..61046a0f4ab8e 100644 --- a/airflow/api_connexion/schemas/dag_schema.py +++ b/airflow/api_connexion/schemas/dag_schema.py @@ -22,7 +22,7 @@ from marshmallow import Schema, fields from marshmallow_sqlalchemy import SQLAlchemySchema, auto_field -from airflow.api_connexion.schemas.common_schema import ScheduleIntervalSchema, TimeDeltaSchema, TimezoneField +from airflow.api_connexion.schemas.common_schema import TimeDeltaSchema, TimezoneField from airflow.configuration import conf from airflow.models.dag import DagModel, DagTag @@ -63,7 +63,7 @@ class Meta: file_token = fields.Method("get_token", dump_only=True) owners = fields.Method("get_owners", dump_only=True) description = auto_field(dump_only=True) - schedule_interval = fields.Nested(ScheduleIntervalSchema) + timetable_summary = auto_field(dump_only=True) timetable_description = auto_field(dump_only=True) tags = fields.List(fields.Nested(DagTagSchema), dump_only=True) max_active_tasks = auto_field(dump_only=True) diff --git a/airflow/cli/commands/dag_command.py b/airflow/cli/commands/dag_command.py index 2f300ebef2144..d89be8b589f73 100644 --- a/airflow/cli/commands/dag_command.py +++ b/airflow/cli/commands/dag_command.py @@ -345,7 +345,7 @@ def _get_dagbag_dag_details(dag: DAG) -> dict: "file_token": None, "owners": dag.owner, "description": dag.description, - "schedule_interval": dag.schedule_interval, + "timetable_summary": dag.timetable.summary, "timetable_description": dag.timetable.description, "tags": dag.tags, "max_active_tasks": dag.max_active_tasks, diff --git a/airflow/config_templates/config.yml b/airflow/config_templates/config.yml index 70859aebcd594..a458105d28798 100644 --- a/airflow/config_templates/config.yml +++ b/airflow/config_templates/config.yml @@ -2521,7 +2521,7 @@ scheduler: allow_trigger_in_future: description: | Allow externally triggered DagRuns for Execution Dates in the future - Only has effect if schedule_interval is set to None in DAG + Only has effect if schedule is set to None in DAG version_added: 1.10.8 type: boolean example: ~ diff --git a/airflow/exceptions.py b/airflow/exceptions.py index 3831d909fc272..55dd02fdae313 100644 --- a/airflow/exceptions.py +++ b/airflow/exceptions.py @@ -199,10 +199,6 @@ def __str__(self) -> str: return f"Ignoring DAG {self.dag_id} from {self.incoming} - also found in {self.existing}" -class AirflowDagInconsistent(AirflowException): - """Raise when a DAG has inconsistent attributes.""" - - class AirflowClusterPolicyViolation(AirflowException): """Raise when there is a violation of a Cluster Policy in DAG definition.""" diff --git a/airflow/migrations/versions/0004_3_0_0_rename_schedule_interval_to_timetable_.py b/airflow/migrations/versions/0004_3_0_0_rename_schedule_interval_to_timetable_.py new file mode 100644 index 0000000000000..b4434a65f3f48 --- /dev/null +++ b/airflow/migrations/versions/0004_3_0_0_rename_schedule_interval_to_timetable_.py @@ -0,0 +1,60 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +""" +Rename DagModel schedule_interval to timetable_summary. + +Revision ID: 0bfc26bc256e +Revises: d0f1c55954fa +Create Date: 2024-08-15 06:24:14.363316 + +""" + +from __future__ import annotations + +import sqlalchemy as sa +from alembic import op + +# revision identifiers, used by Alembic. +revision = "0bfc26bc256e" +down_revision = "d0f1c55954fa" +branch_labels = None +depends_on = None +airflow_version = "3.0.0" + + +def upgrade(): + """Rename DagModel schedule_interval to timetable_summary.""" + with op.batch_alter_table("dag", schema=None) as batch_op: + batch_op.alter_column( + "schedule_interval", + new_column_name="timetable_summary", + type_=sa.Text, + nullable=True, + ) + + +def downgrade(): + """Rename timetable_summary back to schedule_interval.""" + with op.batch_alter_table("dag", schema=None) as batch_op: + batch_op.alter_column( + "timetable_summary", + new_column_name="schedule_interval", + type_=sa.Text, + nullable=True, + ) diff --git a/airflow/models/baseoperator.py b/airflow/models/baseoperator.py index ea100cd4e2abf..99ea294e1ec55 100644 --- a/airflow/models/baseoperator.py +++ b/airflow/models/baseoperator.py @@ -50,7 +50,6 @@ import attr import pendulum -from dateutil.relativedelta import relativedelta from sqlalchemy import select from sqlalchemy.orm.exc import NoResultFound @@ -120,8 +119,6 @@ from airflow.utils.task_group import TaskGroup from airflow.utils.types import ArgNotSet -ScheduleInterval = Union[str, timedelta, relativedelta] - TaskPreExecuteHook = Callable[[Context], None] TaskPostExecuteHook = Callable[[Context, Any], None] @@ -612,10 +609,10 @@ class derived from this one results in the creation of a task object, :param start_date: The ``start_date`` for the task, determines the ``execution_date`` for the first task instance. The best practice is to have the start_date rounded - to your DAG's ``schedule_interval``. Daily jobs have their start_date + to your DAG's schedule. Daily jobs have their start_date some day at 00:00:00, hourly jobs have their start_date at 00:00 of a specific hour. Note that Airflow simply looks at the latest - ``execution_date`` and adds the ``schedule_interval`` to determine + ``execution_date`` and adds the schedule to determine the next ``execution_date``. It is also very important to note that different tasks' dependencies need to line up in time. If task A depends on task B and their diff --git a/airflow/models/dag.py b/airflow/models/dag.py index bdc508951a19c..93ff43711eff9 100644 --- a/airflow/models/dag.py +++ b/airflow/models/dag.py @@ -86,7 +86,6 @@ from airflow.datasets import BaseDataset, Dataset, DatasetAlias, DatasetAll from airflow.datasets.manager import dataset_manager from airflow.exceptions import ( - AirflowDagInconsistent, AirflowException, DuplicateTaskIdFound, FailStopDagInvalidTriggerRule, @@ -134,19 +133,13 @@ from airflow.utils import timezone from airflow.utils.dag_cycle_tester import check_cycle from airflow.utils.decorators import fixup_decorator_warning_stack -from airflow.utils.helpers import at_most_one, exactly_one, validate_instance_args, validate_key +from airflow.utils.helpers import exactly_one, validate_instance_args, validate_key from airflow.utils.log.logging_mixin import LoggingMixin from airflow.utils.session import NEW_SESSION, provide_session -from airflow.utils.sqlalchemy import ( - Interval, - UtcDateTime, - lock_rows, - tuple_in_condition, - with_row_locks, -) +from airflow.utils.sqlalchemy import UtcDateTime, lock_rows, tuple_in_condition, with_row_locks from airflow.utils.state import DagRunState, State, TaskInstanceState from airflow.utils.trigger_rule import TriggerRule -from airflow.utils.types import NOTSET, ArgNotSet, DagRunType, EdgeInfoType +from airflow.utils.types import NOTSET, DagRunType, EdgeInfoType if TYPE_CHECKING: from types import ModuleType @@ -174,20 +167,15 @@ DagStateChangeCallback = Callable[[Context], None] ScheduleInterval = Union[None, str, timedelta, relativedelta] -# FIXME: Ideally this should be Union[Literal[NOTSET], ScheduleInterval], -# but Mypy cannot handle that right now. Track progress of PEP 661 for progress. -# See also: https://discuss.python.org/t/9126/7 -ScheduleIntervalArg = Union[ArgNotSet, ScheduleInterval] ScheduleArg = Union[ - ArgNotSet, ScheduleInterval, Timetable, BaseDataset, Collection[Union["Dataset", "DatasetAlias"]] + ScheduleInterval, + Timetable, + BaseDataset, + Collection[Union["Dataset", "DatasetAlias"]], ] SLAMissCallback = Callable[["DAG", str, str, List["SlaMiss"], List[TaskInstance]], None] -# Backward compatibility: If neither schedule_interval nor timetable is -# *provided by the user*, default to a one-day interval. -DEFAULT_SCHEDULE_INTERVAL = timedelta(days=1) - class InconsistentDataInterval(AirflowException): """ @@ -228,10 +216,8 @@ def _get_model_data_interval( return DataInterval(start, end) -def create_timetable(interval: ScheduleIntervalArg, timezone: Timezone | FixedTimezone) -> Timetable: - """Create a Timetable instance from a ``schedule_interval`` argument.""" - if interval is NOTSET: - return DeltaDataIntervalTimetable(DEFAULT_SCHEDULE_INTERVAL) +def create_timetable(interval: ScheduleInterval, timezone: Timezone | FixedTimezone) -> Timetable: + """Create a Timetable instance from a plain ``schedule`` value.""" if interval is None: return NullTimetable() if interval == "@once": @@ -245,7 +231,7 @@ def create_timetable(interval: ScheduleIntervalArg, timezone: Timezone | FixedTi return CronDataIntervalTimetable(interval, timezone) else: return CronTriggerTimetable(interval, timezone=timezone) - raise ValueError(f"{interval!r} is not a valid schedule_interval.") + raise ValueError(f"{interval!r} is not a valid schedule.") def get_last_dagrun(dag_id, session, include_externally_triggered=False): @@ -398,21 +384,22 @@ class DAG(LoggingMixin): The *schedule* argument to specify either time-based scheduling logic (timetable), or dataset-driven triggers. - .. deprecated:: 2.4 - The arguments *schedule_interval* and *timetable*. Their functionalities - are merged into the new *schedule* argument. + .. versionchanged:: 3.0 + The default value of *schedule* has been changed to *None* (no schedule). + The previous default was ``timedelta(days=1)``. :param dag_id: The id of the DAG; must consist exclusively of alphanumeric characters, dashes, dots and underscores (all ASCII) :param description: The description for the DAG to e.g. be shown on the webserver - :param schedule: Defines the rules according to which DAG runs are scheduled. Can - accept cron string, timedelta object, Timetable, or list of Dataset objects. - If this is not provided, the DAG will be set to the default - schedule ``timedelta(days=1)``. See also :doc:`/howto/timetable`. + :param schedule: If provided, this defines the rules according to which DAG + runs are scheduled. Possible values include a cron expression string, + timedelta object, Timetable, or list of Dataset objects. + See also :doc:`/howto/timetable`. :param start_date: The timestamp from which the scheduler will - attempt to backfill + attempt to backfill. If this is not provided, backfilling must be done + manually with an explicit time range. :param end_date: A date beyond which your DAG won't run, leave to None - for open-ended scheduling + for open-ended scheduling. :param template_searchpath: This list of folders (non-relative) defines where jinja will look for your templates. Order matters. Note that jinja/airflow includes the path of your DAG file by @@ -500,7 +487,6 @@ class DAG(LoggingMixin): "task_ids", "start_date", "end_date", - "schedule_interval", "fileloc", "template_searchpath", "last_loaded", @@ -522,9 +508,7 @@ def __init__( self, dag_id: str, description: str | None = None, - schedule: ScheduleArg = NOTSET, - schedule_interval: ScheduleIntervalArg = NOTSET, - timetable: Timetable | None = None, + schedule: ScheduleArg = None, start_date: datetime | None = None, end_date: datetime | None = None, full_filepath: str | None = None, @@ -636,63 +620,20 @@ def __init__( if "end_date" in self.default_args: self.default_args["end_date"] = timezone.convert_to_utc(self.default_args["end_date"]) - # sort out DAG's scheduling behavior - scheduling_args = [schedule_interval, timetable, schedule] - - has_scheduling_args = any(a is not NOTSET and bool(a) for a in scheduling_args) - has_empty_start_date = not ("start_date" in self.default_args or self.start_date) - - if has_scheduling_args and has_empty_start_date: - raise ValueError("DAG is missing the start_date parameter") - - if not at_most_one(*scheduling_args): - raise ValueError("At most one allowed for args 'schedule_interval', 'timetable', and 'schedule'.") - if schedule_interval is not NOTSET: - warnings.warn( - "Param `schedule_interval` is deprecated and will be removed in a future release. " - "Please use `schedule` instead. ", - RemovedInAirflow3Warning, - stacklevel=2, - ) - if timetable is not None: - warnings.warn( - "Param `timetable` is deprecated and will be removed in a future release. " - "Please use `schedule` instead. ", - RemovedInAirflow3Warning, - stacklevel=2, - ) - - if timetable is not None: - schedule = timetable - elif schedule_interval is not NOTSET: - schedule = schedule_interval - - # Kept for compatibility. Do not use in new code. - self.schedule_interval: ScheduleInterval - if isinstance(schedule, Timetable): self.timetable = schedule - self.schedule_interval = schedule.summary elif isinstance(schedule, BaseDataset): self.timetable = DatasetTriggeredTimetable(schedule) - self.schedule_interval = self.timetable.summary elif isinstance(schedule, Collection) and not isinstance(schedule, str): if not all(isinstance(x, (Dataset, DatasetAlias)) for x in schedule): raise ValueError("All elements in 'schedule' should be datasets or dataset aliases") self.timetable = DatasetTriggeredTimetable(DatasetAll(*schedule)) - self.schedule_interval = self.timetable.summary - elif isinstance(schedule, ArgNotSet): - warnings.warn( - "Creating a DAG with an implicit schedule is deprecated, and will stop working " - "in a future release. Set `schedule=datetime.timedelta(days=1)` explicitly.", - RemovedInAirflow3Warning, - stacklevel=2, - ) - self.timetable = create_timetable(schedule, self.timezone) - self.schedule_interval = DEFAULT_SCHEDULE_INTERVAL else: self.timetable = create_timetable(schedule, self.timezone) - self.schedule_interval = schedule + + requires_automatic_backfilling = self.timetable.can_be_scheduled and catchup + if requires_automatic_backfilling and not ("start_date" in self.default_args or self.start_date): + raise ValueError("start_date is required when catchup=True") if isinstance(template_searchpath, str): template_searchpath = [template_searchpath] @@ -796,46 +737,12 @@ def get_doc_md(self, doc_md: str | None) -> str | None: return doc_md - def _check_schedule_interval_matches_timetable(self) -> bool: - """ - Check ``schedule_interval`` and ``timetable`` match. - - This is done as a part of the DAG validation done before it's bagged, to - guard against the DAG's ``timetable`` (or ``schedule_interval``) from - being changed after it's created, e.g. - - .. code-block:: python - - dag1 = DAG("d1", timetable=MyTimetable()) - dag1.schedule_interval = "@once" - - dag2 = DAG("d2", schedule="@once") - dag2.timetable = MyTimetable() - - Validation is done by creating a timetable and check its summary matches - ``schedule_interval``. The logic is not bullet-proof, especially if a - custom timetable does not provide a useful ``summary``. But this is the - best we can do. - """ - if self.schedule_interval == self.timetable.summary: - return True - try: - timetable = create_timetable(self.schedule_interval, self.timezone) - except ValueError: - return False - return timetable.summary == self.timetable.summary - def validate(self): """ Validate the DAG has a coherent setup. This is called by the DAG bag before bagging the DAG. """ - if not self._check_schedule_interval_matches_timetable(): - raise AirflowDagInconsistent( - f"inconsistent schedule: timetable {self.timetable.summary!r} " - f"does not match schedule_interval {self.schedule_interval!r}", - ) self.validate_executor_field() self.validate_schedule_and_params() self.timetable.validate() @@ -1238,6 +1145,10 @@ def dag_id(self) -> str: def dag_id(self, value: str) -> None: self._dag_id = value + @property + def timetable_summary(self) -> str: + return self.timetable.summary + @property def max_active_tasks(self) -> int: return self._max_active_tasks @@ -2942,7 +2853,7 @@ def bulk_write_to_db( t.max_active_tis_per_dag is not None or t.max_active_tis_per_dagrun is not None for t in dag.tasks ) - orm_dag.schedule_interval = dag.schedule_interval + orm_dag.timetable_summary = dag.timetable.summary orm_dag.timetable_description = dag.timetable.description orm_dag.dataset_expression = dag.timetable.dataset_condition.as_expression() @@ -3448,9 +3359,9 @@ class DagModel(Base): description = Column(Text) # Default view of the DAG inside the webserver default_view = Column(String(25)) - # Schedule interval - schedule_interval = Column(Interval) - # Timetable/Schedule Interval description + # Timetable summary + timetable_summary = Column(Text, nullable=True) + # Timetable description timetable_description = Column(String(1000), nullable=True) # Dataset expression based on dataset triggers dataset_expression = Column(sqlalchemy_jsonfield.JSONField(json=json), nullable=True) @@ -3795,7 +3706,7 @@ def calculate_dagrun_date_fields( @provide_session def get_dataset_triggered_next_run_info(self, *, session=NEW_SESSION) -> dict[str, int | str] | None: - if self.schedule_interval != "Dataset": + if self.dataset_expression is None: return None return get_dataset_triggered_next_run_info([self.dag_id], session=session)[self.dag_id] @@ -3805,9 +3716,7 @@ def get_dataset_triggered_next_run_info(self, *, session=NEW_SESSION) -> dict[st def dag( dag_id: str = "", description: str | None = None, - schedule: ScheduleArg = NOTSET, - schedule_interval: ScheduleIntervalArg = NOTSET, - timetable: Timetable | None = None, + schedule: ScheduleArg = None, start_date: datetime | None = None, end_date: datetime | None = None, full_filepath: str | None = None, @@ -3864,8 +3773,6 @@ def factory(*args, **kwargs): with DAG( dag_id or f.__name__, description=description, - schedule_interval=schedule_interval, - timetable=timetable, start_date=start_date, end_date=end_date, full_filepath=full_filepath, diff --git a/airflow/providers/openlineage/utils/utils.py b/airflow/providers/openlineage/utils/utils.py index 26b70555708d6..316666f2c009f 100644 --- a/airflow/providers/openlineage/utils/utils.py +++ b/airflow/providers/openlineage/utils/utils.py @@ -244,7 +244,16 @@ def _include_fields(self): class DagInfo(InfoJsonEncodable): """Defines encoding DAG object to JSON.""" - includes = ["dag_id", "description", "fileloc", "owner", "schedule_interval", "start_date", "tags"] + includes = [ + "dag_id", + "description", + "fileloc", + "owner", + "schedule_interval", # For Airflow 2. + "timetable_summary", # For Airflow 3. + "start_date", + "tags", + ] casts = {"timetable": lambda dag: dag.timetable.serialize() if getattr(dag, "timetable", None) else None} renames = {"_dag_id": "dag_id"} diff --git a/airflow/serialization/pydantic/dag.py b/airflow/serialization/pydantic/dag.py index a1fea6384aade..b916a0e580efd 100644 --- a/airflow/serialization/pydantic/dag.py +++ b/airflow/serialization/pydantic/dag.py @@ -17,10 +17,9 @@ from __future__ import annotations import pathlib -from datetime import datetime, timedelta +from datetime import datetime from typing import Any, List, Optional -from dateutil import relativedelta from typing_extensions import Annotated from airflow import DAG, settings @@ -32,37 +31,6 @@ PlainValidator, ValidationInfo, ) -from airflow.utils.sqlalchemy import Interval - - -def serialize_interval(value: Interval) -> Interval: - interval = Interval() - return interval.process_bind_param(value, None) - - -def validate_interval(value: Interval | Any, _info: ValidationInfo) -> Any: - if ( - isinstance(value, Interval) - or isinstance(value, timedelta) - or isinstance(value, relativedelta.relativedelta) - ): - return value - interval = Interval() - try: - return interval.process_result_value(value, None) - except ValueError as e: - # Interval may be provided in string format (cron), - # so it must be returned as valid value. - if isinstance(value, str): - return value - raise e - - -PydanticInterval = Annotated[ - Interval, - PlainValidator(validate_interval), - PlainSerializer(serialize_interval, return_type=Interval), -] def serialize_operator(x: DAG) -> dict: @@ -121,7 +89,7 @@ class DagModelPydantic(BaseModelPydantic): owners: Optional[str] description: Optional[str] default_view: Optional[str] - schedule_interval: Optional[PydanticInterval] + timetable_summary: Optional[str] timetable_description: Optional[str] tags: List[DagTagPydantic] # noqa: UP006 dag_owner_links: List[DagOwnerAttributesPydantic] # noqa: UP006 diff --git a/airflow/serialization/schema.json b/airflow/serialization/schema.json index d76bfcb1a40bd..63cdf67b7d702 100644 --- a/airflow/serialization/schema.json +++ b/airflow/serialization/schema.json @@ -140,14 +140,6 @@ "_dag_id": { "type": "string" }, "tasks": { "$ref": "#/definitions/tasks" }, "timezone": { "$ref": "#/definitions/timezone" }, - "schedule_interval": { - "anyOf": [ - { "type": "null" }, - { "type": "string" }, - { "$ref": "#/definitions/typed_timedelta" }, - { "$ref": "#/definitions/typed_relativedelta" } - ] - }, "owner_links": { "type": "object" }, "timetable": { "type": "object", diff --git a/airflow/serialization/serialized_objects.py b/airflow/serialization/serialized_objects.py index 9adb9f7334ba0..54542a79c6a9c 100644 --- a/airflow/serialization/serialized_objects.py +++ b/airflow/serialization/serialized_objects.py @@ -49,7 +49,7 @@ from airflow.models import Trigger from airflow.models.baseoperator import BaseOperator from airflow.models.connection import Connection -from airflow.models.dag import DAG, DagModel, create_timetable +from airflow.models.dag import DAG, DagModel from airflow.models.dagrun import DagRun from airflow.models.expandinput import EXPAND_INPUT_EMPTY, create_expand_input, get_map_type_key from airflow.models.mappedoperator import MappedOperator @@ -1579,7 +1579,7 @@ class SerializedDAG(DAG, BaseSerialization): not pickle-able. SerializedDAG works for all DAGs. """ - _decorated_fields = {"schedule_interval", "default_args", "_access_control"} + _decorated_fields = {"default_args", "_access_control"} @staticmethod def __get_constructor_defaults(): @@ -1606,16 +1606,7 @@ def serialize_dag(cls, dag: DAG) -> dict: """Serialize a DAG into a JSON object.""" try: serialized_dag = cls.serialize_to_json(dag, cls._decorated_fields) - serialized_dag["_processor_dags_folder"] = DAGS_FOLDER - - # If schedule_interval is backed by timetable, serialize only - # timetable; vice versa for a timetable backed by schedule_interval. - if dag.timetable.summary == dag.schedule_interval: - del serialized_dag["schedule_interval"] - else: - del serialized_dag["timetable"] - serialized_dag["tasks"] = [cls.serialize(task) for _, task in dag.task_dict.items()] dag_deps = [ @@ -1682,14 +1673,6 @@ def deserialize_dag(cls, encoded_dag: dict[str, Any]) -> SerializedDAG: setattr(dag, k, v) - # A DAG is always serialized with only one of schedule_interval and - # timetable. This back-populates the other to ensure the two attributes - # line up correctly on the DAG instance. - if "timetable" in encoded_dag: - dag.schedule_interval = dag.timetable.summary - else: - dag.timetable = create_timetable(dag.schedule_interval, dag.timezone) - # Set _task_group if "_task_group" in encoded_dag: dag._task_group = TaskGroupSerialization.deserialize_task_group( diff --git a/airflow/ti_deps/deps/runnable_exec_date_dep.py b/airflow/ti_deps/deps/runnable_exec_date_dep.py index b7a23e91e27e6..da9d88a2ec061 100644 --- a/airflow/ti_deps/deps/runnable_exec_date_dep.py +++ b/airflow/ti_deps/deps/runnable_exec_date_dep.py @@ -33,7 +33,7 @@ def _get_dep_statuses(self, ti, session, dep_context): cur_date = timezone.utcnow() # don't consider runs that are executed in the future unless - # specified by config and schedule_interval is None + # specified by config and schedule is None logical_date = ti.get_dagrun(session).execution_date if logical_date > cur_date and not ti.task.dag.allow_future_exec_dates: yield self._failing_status( diff --git a/airflow/timetables/base.py b/airflow/timetables/base.py index 9e173656f1141..ce701794a4c63 100644 --- a/airflow/timetables/base.py +++ b/airflow/timetables/base.py @@ -17,7 +17,6 @@ from __future__ import annotations from typing import TYPE_CHECKING, Any, Iterator, NamedTuple, Sequence -from warnings import warn from airflow.datasets import BaseDataset from airflow.typing_compat import Protocol, runtime_checkable @@ -163,25 +162,14 @@ class Timetable(Protocol): like ``schedule=None`` and ``"@once"`` set it to *False*. """ - _can_be_scheduled: bool = True - - @property - def can_be_scheduled(self): - """ - Whether this timetable can actually schedule runs in an automated manner. + can_be_scheduled: bool = True + """ + Whether this timetable can actually schedule runs in an automated manner. - This defaults to and should generally be *True* (including non periodic - execution types like *@once* and data triggered tables), but - ``NullTimetable`` sets this to *False*. - """ - if hasattr(self, "can_run"): - warn( - 'can_run class variable is deprecated. Use "can_be_scheduled" instead.', - DeprecationWarning, - stacklevel=2, - ) - return self.can_run - return self._can_be_scheduled + This defaults to and should generally be *True* (including non periodic + execution types like *@once* and data triggered tables), but + ``NullTimetable`` sets this to *False*. + """ run_ordering: Sequence[str] = ("data_interval_end", "execution_date") """How runs triggered from this timetable should be ordered in UI. diff --git a/airflow/timetables/datasets.py b/airflow/timetables/datasets.py index 4c27f39b266b3..05db0d66cc2df 100644 --- a/airflow/timetables/datasets.py +++ b/airflow/timetables/datasets.py @@ -50,7 +50,7 @@ def __init__( self.description = f"Triggered by datasets or {timetable.description}" self.periodic = timetable.periodic - self._can_be_scheduled = timetable._can_be_scheduled + self.can_be_scheduled = timetable.can_be_scheduled self.active_runs_limit = timetable.active_runs_limit @classmethod diff --git a/airflow/utils/db.py b/airflow/utils/db.py index b963eef99a230..5e15e59d3a3ef 100644 --- a/airflow/utils/db.py +++ b/airflow/utils/db.py @@ -96,7 +96,7 @@ class MappedClassProtocol(Protocol): _REVISION_HEADS_MAP = { "2.10.0": "22ed7efa9da2", - "3.0.0": "d0f1c55954fa", + "3.0.0": "0bfc26bc256e", } diff --git a/airflow/utils/sqlalchemy.py b/airflow/utils/sqlalchemy.py index b73757c9875aa..e7e44ef254725 100644 --- a/airflow/utils/sqlalchemy.py +++ b/airflow/utils/sqlalchemy.py @@ -20,12 +20,10 @@ import contextlib import copy import datetime -import json import logging from importlib import metadata from typing import TYPE_CHECKING, Any, Generator, Iterable, overload -from dateutil import relativedelta from packaging import version from sqlalchemy import TIMESTAMP, PickleType, event, nullsfirst, tuple_ from sqlalchemy.dialects import mysql @@ -293,50 +291,6 @@ def compare_values(self, x, y): return False -class Interval(TypeDecorator): - """Base class representing a time interval.""" - - impl = Text - - cache_ok = True - - attr_keys = { - datetime.timedelta: ("days", "seconds", "microseconds"), - relativedelta.relativedelta: ( - "years", - "months", - "days", - "leapdays", - "hours", - "minutes", - "seconds", - "microseconds", - "year", - "month", - "day", - "hour", - "minute", - "second", - "microsecond", - ), - } - - def process_bind_param(self, value, dialect): - if isinstance(value, tuple(self.attr_keys)): - attrs = {key: getattr(value, key) for key in self.attr_keys[type(value)]} - return json.dumps({"type": type(value).__name__, "attrs": attrs}) - return json.dumps(value) - - def process_result_value(self, value, dialect): - if not value: - return value - data = json.loads(value) - if isinstance(data, dict): - type_map = {key.__name__: key for key in self.attr_keys} - return type_map[data["type"]](**data["attrs"]) - return data - - def nulls_first(col, session: Session) -> dict[str, Any]: """ Specify *NULLS FIRST* to the column ordering. diff --git a/airflow/www/static/js/dag/details/dag/Dag.tsx b/airflow/www/static/js/dag/details/dag/Dag.tsx index 7cfd36f516418..9643064d2a3b1 100644 --- a/airflow/www/static/js/dag/details/dag/Dag.tsx +++ b/airflow/www/static/js/dag/details/dag/Dag.tsx @@ -70,7 +70,7 @@ const Dag = () => { const dagDataExcludeFields = [ "defaultView", "fileToken", - "scheduleInterval", + "timetableSummary", "tags", "owners", "params", @@ -298,20 +298,9 @@ const Dag = () => { - Schedule interval + Timetable - {dagDetailsData.scheduleInterval?.type === - "CronExpression" ? ( - {dagDetailsData.scheduleInterval?.value} - ) : ( - // for TimeDelta and RelativeDelta - - )} + {dagDetailsData.timetableSummary || ""} diff --git a/airflow/www/static/js/types/api-generated.ts b/airflow/www/static/js/types/api-generated.ts index 1897fd90ce3b3..15de3545b6bb2 100644 --- a/airflow/www/static/js/types/api-generated.ts +++ b/airflow/www/static/js/types/api-generated.ts @@ -1088,9 +1088,14 @@ export interface components { owners?: string[]; /** @description User-provided DAG description, which can consist of several sentences or paragraphs that describe DAG contents. */ description?: string | null; - schedule_interval?: components["schemas"]["ScheduleInterval"]; /** - * @description Timetable/Schedule Interval description. + * @description Timetable summary. + * + * *New in version 3.0.0* + */ + timetable_summary?: string | null; + /** + * @description Timetable description. * * *New in version 2.3.0* */ @@ -2315,15 +2320,6 @@ export interface components { */ order_by?: string; }; - /** - * @description Schedule interval. Defines how often DAG runs, this object gets added to your latest task instance's - * execution_date to figure out the next schedule. - */ - ScheduleInterval: - | (Partial & - Partial & - Partial) - | null; /** @description Time delta */ TimeDelta: { __type: string; @@ -5686,9 +5682,6 @@ export type ListDagRunsForm = CamelCasedPropertiesDeep< export type ListTaskInstanceForm = CamelCasedPropertiesDeep< components["schemas"]["ListTaskInstanceForm"] >; -export type ScheduleInterval = CamelCasedPropertiesDeep< - components["schemas"]["ScheduleInterval"] ->; export type TimeDelta = CamelCasedPropertiesDeep< components["schemas"]["TimeDelta"] >; diff --git a/airflow/www/templates/airflow/dag.html b/airflow/www/templates/airflow/dag.html index 973ca812e3fb2..1fc711d993ebd 100644 --- a/airflow/www/templates/airflow/dag.html +++ b/airflow/www/templates/airflow/dag.html @@ -138,17 +138,17 @@

- Schedule: {{ dag_model is defined and dag_model and dag_model.schedule_interval }} + Schedule: {{ dag_model is defined and dag_model and dag_model.timetable_summary }} {% if dag_model is defined and dag_model and dag_model.timetable_description %} {% endif %} - {% if dag_model is defined and dag_model.next_dagrun is defined and dag_model.schedule_interval != 'Dataset' %} + {% if dag_model is defined and dag_model.next_dagrun is defined and dag_model.dataset_expression != None %}

Next Run ID:

{% endif %} - {% if dag_model is defined and dag_model.schedule_interval is defined and dag_model.schedule_interval == 'Dataset' %} + {% if dag_model is defined and dag_model.dataset_expression is defined and dag_model.dataset_expression != None %} {%- with ds_info = dag_model.get_dataset_triggered_next_run_info() -%} {{ page_title }}

- {{ dag.schedule_interval }} + {{ dag.timetable_summary }} {% if dag is defined and dag.timetable_description %}