diff --git a/examples/experimental/dagster-airlift/dagster_airlift/constants.py b/examples/experimental/dagster-airlift/dagster_airlift/constants.py index 170b8902d50a8..9905d5577c591 100644 --- a/examples/experimental/dagster-airlift/dagster_airlift/constants.py +++ b/examples/experimental/dagster-airlift/dagster_airlift/constants.py @@ -4,7 +4,11 @@ TASK_MAPPING_METADATA_KEY = "dagster-airlift/task-mapping" AUTOMAPPED_TASK_METADATA_KEY = "dagster-airlift/automapped-task" # This represents the timestamp used in ordering the materializatons. -EFFECTIVE_TIMESTAMP_METADATA_KEY = "dagster-airlift/effective_timestamp" +EFFECTIVE_TIMESTAMP_METADATA_KEY = "dagster-airlift/effective-timestamp" +AIRFLOW_TASK_INSTANCE_LOGICAL_DATE_METADATA_KEY = ( + "dagster-airlift/airflow-task-instance-logical-date" +) +AIRFLOW_RUN_ID_METADATA_KEY = "dagster-airlift/airflow-run-id" DAG_RUN_ID_TAG_KEY = "dagster-airlift/airflow-dag-run-id" DAG_ID_TAG_KEY = "dagster-airlift/airflow-dag-id" TASK_ID_TAG_KEY = "dagster-airlift/airflow-task-id" diff --git a/examples/experimental/dagster-airlift/dagster_airlift/core/airflow_defs_data.py b/examples/experimental/dagster-airlift/dagster_airlift/core/airflow_defs_data.py index 09985e6f2da7b..2d9ccaff3e52a 100644 --- a/examples/experimental/dagster-airlift/dagster_airlift/core/airflow_defs_data.py +++ b/examples/experimental/dagster-airlift/dagster_airlift/core/airflow_defs_data.py @@ -2,7 +2,7 @@ from functools import cached_property from typing import AbstractSet, Mapping, Set -from dagster import AssetKey, Definitions +from dagster import AssetKey, AssetSpec, Definitions from dagster._record import record from dagster_airlift.core.airflow_instance import AirflowInstance @@ -31,6 +31,10 @@ def instance_name(self) -> str: def mapping_info(self) -> AirliftMetadataMappingInfo: return AirliftMetadataMappingInfo(asset_specs=list(self.mapped_defs.get_all_asset_specs())) + @cached_property + def all_asset_specs_by_key(self) -> Mapping[AssetKey, AssetSpec]: + return {spec.key: spec for spec in self.mapped_defs.get_all_asset_specs()} + def task_ids_in_dag(self, dag_id: str) -> Set[str]: return self.mapping_info.task_id_map[dag_id] diff --git a/examples/experimental/dagster-airlift/dagster_airlift/core/load_defs.py b/examples/experimental/dagster-airlift/dagster_airlift/core/load_defs.py index 8dc3d0b66585c..86b816fbef4c7 100644 --- a/examples/experimental/dagster-airlift/dagster_airlift/core/load_defs.py +++ b/examples/experimental/dagster-airlift/dagster_airlift/core/load_defs.py @@ -12,7 +12,10 @@ from dagster._utils.warnings import suppress_dagster_warnings from dagster_airlift.core.airflow_instance import AirflowInstance -from dagster_airlift.core.sensor.event_translation import DagsterEventTransformerFn +from dagster_airlift.core.sensor.event_translation import ( + DagsterEventTransformerFn, + default_event_transformer, +) from dagster_airlift.core.sensor.sensor_builder import ( DEFAULT_AIRFLOW_SENSOR_INTERVAL_SECONDS, build_airflow_polling_sensor_defs, @@ -68,7 +71,7 @@ def build_defs_from_airflow_instance( airflow_instance: AirflowInstance, defs: Optional[Definitions] = None, sensor_minimum_interval_seconds: int = DEFAULT_AIRFLOW_SENSOR_INTERVAL_SECONDS, - event_transformer_fn: Optional[DagsterEventTransformerFn] = None, + event_transformer_fn: DagsterEventTransformerFn = default_event_transformer, ) -> Definitions: mapped_defs = build_airflow_mapped_defs(airflow_instance=airflow_instance, defs=defs) return Definitions.merge( @@ -123,7 +126,6 @@ def build_full_automapped_dags_from_airflow_instance( minimum_interval_seconds=sensor_minimum_interval_seconds, mapped_defs=resolved_defs, airflow_instance=airflow_instance, - event_transformer_fn=None, ), ) diff --git a/examples/experimental/dagster-airlift/dagster_airlift/core/sensor/event_translation.py b/examples/experimental/dagster-airlift/dagster_airlift/core/sensor/event_translation.py index 17d003fa639fb..b5c246bf190b6 100644 --- a/examples/experimental/dagster-airlift/dagster_airlift/core/sensor/event_translation.py +++ b/examples/experimental/dagster-airlift/dagster_airlift/core/sensor/event_translation.py @@ -1,4 +1,5 @@ -from typing import AbstractSet, Any, Callable, Iterable, Mapping, Sequence, Union +from collections import defaultdict +from typing import AbstractSet, Any, Callable, Iterable, Mapping, Sequence, Union, cast from dagster import ( AssetMaterialization, @@ -11,9 +12,14 @@ ) from dagster._core.definitions.asset_check_evaluation import AssetCheckEvaluation from dagster._core.definitions.asset_key import AssetKey -from dagster._time import get_current_timestamp +from dagster._core.definitions.time_window_partitions import TimeWindowPartitionsDefinition +from dagster._time import datetime_from_timestamp, get_current_timestamp -from dagster_airlift.constants import EFFECTIVE_TIMESTAMP_METADATA_KEY +from dagster_airlift.constants import ( + AIRFLOW_RUN_ID_METADATA_KEY, + AIRFLOW_TASK_INSTANCE_LOGICAL_DATE_METADATA_KEY, + EFFECTIVE_TIMESTAMP_METADATA_KEY, +) from dagster_airlift.core.airflow_defs_data import AirflowDefinitionsData from dagster_airlift.core.airflow_instance import DagRun, TaskInstance from dagster_airlift.core.serialization.serialized_data import DagHandle @@ -25,6 +31,66 @@ ] +def default_event_transformer( + context: SensorEvaluationContext, + airflow_data: AirflowDefinitionsData, + materializations: Sequence[AssetMaterialization], +) -> Iterable[AssetEvent]: + """The default event transformer function, which attaches a partition key to materializations which are from time-window partitioned assets.""" + cached_partition_calculations = defaultdict(dict) + for mat in materializations: + asset_spec = airflow_data.all_asset_specs_by_key[mat.asset_key] + if not asset_spec.partitions_def or not isinstance( + asset_spec.partitions_def, TimeWindowPartitionsDefinition + ): + yield mat + continue + airflow_logical_date_timestamp: float = cast( + TimestampMetadataValue, mat.metadata[AIRFLOW_TASK_INSTANCE_LOGICAL_DATE_METADATA_KEY] + ).value + partitions_def = cast(TimeWindowPartitionsDefinition, asset_spec.partitions_def) + calcs_for_def = cached_partition_calculations[partitions_def] + if airflow_logical_date_timestamp not in calcs_for_def: + cached_partition_calculations[partitions_def][airflow_logical_date_timestamp] = ( + get_partition_key_from_timestamp( + partitions_def=cast(TimeWindowPartitionsDefinition, asset_spec.partitions_def), + timestamp=airflow_logical_date_timestamp, + ) + ) + partition = cached_partition_calculations[partitions_def][airflow_logical_date_timestamp] + partitioned_mat = mat._replace(partition=partition) + yield partitioned_mat + + +def get_partition_key_from_timestamp( + partitions_def: TimeWindowPartitionsDefinition, + timestamp: float, +) -> str: + datetime_in_tz = datetime_from_timestamp(timestamp, partitions_def.timezone) + # Assuming that "logical_date" lies on a partition, the previous partition window + # (where upper bound can be the passed-in date, which is why we set respect_bounds=False) + # will end on the logical date. This would indicate that there is a partition for the logical date. + partition_window = check.not_none( + partitions_def.get_prev_partition_window(datetime_in_tz, respect_bounds=False), + f"Could not find partition for airflow logical date {datetime_in_tz.isoformat()}. This likely means that your partition range is too small to cover the logical date.", + ) + check.invariant( + datetime_in_tz.timestamp() == partition_window.end.timestamp(), + ( + "Expected logical date to match a partition in the partitions definition. This likely means that " + "The partition range is not aligned with the scheduling interval in airflow." + ), + ) + check.invariant( + datetime_in_tz.timestamp() >= partitions_def.start.timestamp(), + ( + "provided date is before the start of the partitions definition. " + "Ensure that the start date of your PartitionsDefinition is early enough to capture the provided date {datetime_in_tz.isoformat()}." + ), + ) + return partitions_def.get_partition_key_for_timestamp(timestamp) + + def get_timestamp_from_materialization(event: AssetEvent) -> float: return check.float_param( event.metadata[EFFECTIVE_TIMESTAMP_METADATA_KEY].value, @@ -71,6 +137,7 @@ def get_dag_run_metadata(dag_run: DagRun) -> Mapping[str, Any]: def get_common_metadata(dag_run: DagRun) -> Mapping[str, Any]: return { "Airflow Run ID": dag_run.run_id, + AIRFLOW_RUN_ID_METADATA_KEY: dag_run.run_id, "Run Metadata (raw)": JsonMetadataValue(dag_run.metadata), "Run Type": dag_run.run_type, "Airflow Config": JsonMetadataValue(dag_run.config), @@ -88,6 +155,9 @@ def get_task_instance_metadata(dag_run: DagRun, task_instance: TaskInstance) -> EFFECTIVE_TIMESTAMP_METADATA_KEY: TimestampMetadataValue( task_instance.end_date.timestamp() ), + AIRFLOW_TASK_INSTANCE_LOGICAL_DATE_METADATA_KEY: TimestampMetadataValue( + task_instance.logical_date.timestamp() + ), } diff --git a/examples/experimental/dagster-airlift/dagster_airlift/core/sensor/sensor_builder.py b/examples/experimental/dagster-airlift/dagster_airlift/core/sensor/sensor_builder.py index 7248ea358c112..f22c953fa8dea 100644 --- a/examples/experimental/dagster-airlift/dagster_airlift/core/sensor/sensor_builder.py +++ b/examples/experimental/dagster-airlift/dagster_airlift/core/sensor/sensor_builder.py @@ -42,6 +42,7 @@ from dagster_airlift.core.sensor.event_translation import ( AssetEvent, DagsterEventTransformerFn, + default_event_transformer, get_timestamp_from_materialization, synthetic_mats_for_mapped_asset_keys, synthetic_mats_for_mapped_dag_asset_keys, @@ -81,7 +82,7 @@ def build_airflow_polling_sensor_defs( *, mapped_defs: Definitions, airflow_instance: AirflowInstance, - event_transformer_fn: Optional[DagsterEventTransformerFn], + event_transformer_fn: DagsterEventTransformerFn = default_event_transformer, minimum_interval_seconds: int = DEFAULT_AIRFLOW_SENSOR_INTERVAL_SECONDS, ) -> Definitions: """The constructed sensor polls the Airflow instance for activity, and inserts asset events into Dagster's event log. diff --git a/examples/experimental/dagster-airlift/dagster_airlift/test/airflow_test_instance.py b/examples/experimental/dagster-airlift/dagster_airlift/test/airflow_test_instance.py index bcf50e43a2b8c..271d426d70296 100644 --- a/examples/experimental/dagster-airlift/dagster_airlift/test/airflow_test_instance.py +++ b/examples/experimental/dagster-airlift/dagster_airlift/test/airflow_test_instance.py @@ -245,6 +245,7 @@ def make_instance( - timedelta( seconds=1 ), # Ensure that the task ends before the full "dag" completes. + logical_date=dag_run.logical_date, ) for task_id in dag_and_task_structure[dag_run.dag_id] ] diff --git a/examples/experimental/dagster-airlift/dagster_airlift_tests/unit_tests/conftest.py b/examples/experimental/dagster-airlift/dagster_airlift_tests/unit_tests/conftest.py index 1b96bedf1bf8c..0359e95bacc7d 100644 --- a/examples/experimental/dagster-airlift/dagster_airlift_tests/unit_tests/conftest.py +++ b/examples/experimental/dagster-airlift/dagster_airlift_tests/unit_tests/conftest.py @@ -28,7 +28,10 @@ from dagster_airlift.core import ( build_defs_from_airflow_instance as build_defs_from_airflow_instance, ) -from dagster_airlift.core.sensor.event_translation import DagsterEventTransformerFn +from dagster_airlift.core.sensor.event_translation import ( + DagsterEventTransformerFn, + default_event_transformer, +) from dagster_airlift.core.utils import metadata_for_dag_mapping, metadata_for_task_mapping from dagster_airlift.test import make_dag_run, make_instance @@ -42,7 +45,7 @@ def fully_loaded_repo_from_airflow_asset_graph( additional_defs: Definitions = Definitions(), create_runs: bool = True, dag_level_asset_overrides: Optional[Dict[str, List[str]]] = None, - event_transformer_fn: Optional[DagsterEventTransformerFn] = None, + event_transformer_fn: DagsterEventTransformerFn = default_event_transformer, ) -> RepositoryDefinition: defs = load_definitions_airflow_asset_graph( assets_per_task, @@ -62,7 +65,7 @@ def load_definitions_airflow_asset_graph( create_runs: bool = True, create_assets_defs: bool = True, dag_level_asset_overrides: Optional[Dict[str, List[str]]] = None, - event_transformer_fn: Optional[DagsterEventTransformerFn] = None, + event_transformer_fn: DagsterEventTransformerFn = default_event_transformer, ) -> Definitions: assets = [] dag_and_task_structure = defaultdict(list) @@ -133,7 +136,7 @@ def build_and_invoke_sensor( instance: DagsterInstance, additional_defs: Definitions = Definitions(), dag_level_asset_overrides: Optional[Dict[str, List[str]]] = None, - event_transformer_fn: Optional[DagsterEventTransformerFn] = None, + event_transformer_fn: DagsterEventTransformerFn = default_event_transformer, ) -> Tuple[SensorResult, SensorEvaluationContext]: repo_def = fully_loaded_repo_from_airflow_asset_graph( assets_per_task, diff --git a/examples/experimental/dagster-airlift/dagster_airlift_tests/unit_tests/core_tests/test_sensor.py b/examples/experimental/dagster-airlift/dagster_airlift_tests/unit_tests/core_tests/test_sensor.py index bcfb2c56976ac..dea8027df83a4 100644 --- a/examples/experimental/dagster-airlift/dagster_airlift_tests/unit_tests/core_tests/test_sensor.py +++ b/examples/experimental/dagster-airlift/dagster_airlift_tests/unit_tests/core_tests/test_sensor.py @@ -20,6 +20,7 @@ from dagster._core.definitions.materialize import materialize from dagster._core.definitions.metadata.metadata_value import TimestampMetadataValue from dagster._core.definitions.sensor_definition import SensorEvaluationContext +from dagster._core.definitions.time_window_partitions import DailyPartitionsDefinition from dagster._core.errors import DagsterInvariantViolationError from dagster._core.execution.execute_in_process_result import ExecuteInProcessResult from dagster._core.test_utils import freeze_time @@ -83,7 +84,8 @@ def test_dag_and_task_metadata(init_load_context: None, instance: DagsterInstanc "Run Details", "Airflow Config", "Run Type", - "dagster-airlift/effective_timestamp", + "dagster-airlift/effective-timestamp", + "dagster-airlift/airflow-run-id", } assert set(dag_mat.metadata.keys()) == expected_dag_metadata_keys task_mat = result.asset_events[0] @@ -97,7 +99,9 @@ def test_dag_and_task_metadata(init_load_context: None, instance: DagsterInstanc "Task Logs", "Airflow Config", "Run Type", - "dagster-airlift/effective_timestamp", + "dagster-airlift/effective-timestamp", + "dagster-airlift/airflow-run-id", + "dagster-airlift/airflow-task-instance-logical-date", } assert set(task_mat.metadata.keys()) == expected_task_metadata_keys @@ -789,3 +793,230 @@ def explicit_asset2() -> None: assert isinstance(result, SensorResult) assert len(result.asset_events) == 1 assert result.asset_events[0].asset_key.to_user_string() == make_dag_key_str(dag_id) + + +def test_default_time_partitioned_asset(init_load_context: None, instance: DagsterInstance) -> None: + """Test that a task instance for a time-partitioned asset is correctly ingested.""" + defs = build_defs_from_airflow_instance( + airflow_instance=make_instance( + dag_and_task_structure={ + "dag": ["task"], + }, + dag_runs=[ + make_dag_run( + dag_id="dag", + run_id="run-dag", + start_date=datetime(2021, 1, 2, 5, tzinfo=timezone.utc), + end_date=datetime(2021, 1, 2, 6, tzinfo=timezone.utc), + logical_date=datetime(2021, 1, 1, tzinfo=timezone.utc), + ) + ], + ), + defs=dag_defs( + "dag", + task_defs( + "task", + Definitions( + assets=[ + AssetSpec( + key="a", + partitions_def=DailyPartitionsDefinition( + start_date=datetime(2021, 1, 1, tzinfo=timezone.utc) + ), + ) + ], + ), + ), + ), + ) + assert defs.sensors + sensor = next(iter(defs.sensors)) + with freeze_time(datetime(2021, 1, 2, 6, tzinfo=timezone.utc)): + sensor_context = build_sensor_context( + repository_def=defs.get_repository_def(), instance=instance + ) + result = sensor(sensor_context) + assert isinstance(result, SensorResult) + assert len(result.asset_events) == 2 + assert_expected_key_order(result.asset_events, ["a", "test_instance/dag/dag"]) + a_asset_mat = result.asset_events[0] + assert isinstance(a_asset_mat, AssetMaterialization) + # We expect the partition to match the logical date. + assert a_asset_mat.partition == "2021-01-01" + + +def test_before_start_of_partitioned_asset( + init_load_context: None, instance: DagsterInstance +) -> None: + """We expect to throw an error if there is no matching partition after the start date.""" + defs = build_defs_from_airflow_instance( + airflow_instance=make_instance( + dag_and_task_structure={ + "dag": ["task"], + }, + dag_runs=[ + make_dag_run( + dag_id="dag", + run_id="run-dag", + start_date=datetime(2021, 1, 1, 5, tzinfo=timezone.utc), + end_date=datetime(2021, 1, 1, 6, tzinfo=timezone.utc), + logical_date=datetime(2021, 1, 1, tzinfo=timezone.utc), + ) + ], + ), + defs=dag_defs( + "dag", + task_defs( + "task", + Definitions( + assets=[ + AssetSpec( + key="a", + partitions_def=DailyPartitionsDefinition( + # Partitions definition starts after the logical date. + start_date=datetime(2021, 1, 2, tzinfo=timezone.utc) + ), + ) + ], + ), + ), + ), + ) + assert defs.sensors + sensor = next(iter(defs.sensors)) + with freeze_time(datetime(2021, 1, 1, 6, tzinfo=timezone.utc)): + sensor_context = build_sensor_context( + repository_def=defs.get_repository_def(), instance=instance + ) + with pytest.raises(AirliftSensorEventTransformerError): + sensor(sensor_context) + + +def test_logical_date_mismatch(init_load_context: None, instance: DagsterInstance) -> None: + """Test a logical date which does not align with the partition definition due to date mismatch.""" + defs = build_defs_from_airflow_instance( + airflow_instance=make_instance( + dag_and_task_structure={ + "dag": ["task"], + }, + dag_runs=[ + make_dag_run( + dag_id="dag", + run_id="run-dag", + start_date=datetime(2021, 1, 1, 5, tzinfo=timezone.utc), + end_date=datetime(2021, 1, 1, 6, tzinfo=timezone.utc), + # Logical date isn't aligned with midnight. + logical_date=datetime(2021, 1, 1, 3, tzinfo=timezone.utc), + ) + ], + ), + defs=dag_defs( + "dag", + task_defs( + "task", + Definitions( + assets=[ + AssetSpec( + key="a", + partitions_def=DailyPartitionsDefinition( + start_date=datetime(2021, 1, 1, tzinfo=timezone.utc) + ), + ) + ], + ), + ), + ), + ) + assert defs.sensors + sensor = next(iter(defs.sensors)) + with freeze_time(datetime(2021, 1, 1, 6, tzinfo=timezone.utc)): + sensor_context = build_sensor_context( + repository_def=defs.get_repository_def(), instance=instance + ) + with pytest.raises(AirliftSensorEventTransformerError): + sensor(sensor_context) + + +def test_partition_offset_mismatch(init_load_context: None, instance: DagsterInstance) -> None: + """Test that partition offsets are respected when determining the partition corresponding to a logical date.""" + airflow_instance = make_instance( + dag_and_task_structure={ + "dag": ["task"], + }, + dag_runs=[ + make_dag_run( + dag_id="dag", + run_id="run-dag", + start_date=datetime(2021, 1, 1, 5, tzinfo=timezone.utc), + end_date=datetime(2021, 1, 1, 6, tzinfo=timezone.utc), + # Logical date is 3 AM. + logical_date=datetime(2021, 1, 1, 3, tzinfo=timezone.utc), + ) + ], + ) + + defs = build_defs_from_airflow_instance( + airflow_instance=airflow_instance, + defs=dag_defs( + "dag", + task_defs( + "task", + Definitions( + assets=[ + AssetSpec( + key="a", + partitions_def=DailyPartitionsDefinition( + start_date=datetime(2021, 1, 1, tzinfo=timezone.utc), + hour_offset=6, + ), + ) + ], + ), + ), + ), + ) + # Due to differing hours offset, we expect an error to throw. + assert defs.sensors + sensor = next(iter(defs.sensors)) + with freeze_time(datetime(2021, 1, 1, 6, tzinfo=timezone.utc)): + sensor_context = build_sensor_context( + repository_def=defs.get_repository_def(), instance=instance + ) + with pytest.raises(AirliftSensorEventTransformerError): + sensor(sensor_context) + + # now, align the offset and expect success. + defs = build_defs_from_airflow_instance( + airflow_instance=airflow_instance, + defs=dag_defs( + "dag", + task_defs( + "task", + Definitions( + assets=[ + AssetSpec( + key="a", + partitions_def=DailyPartitionsDefinition( + start_date=datetime(2021, 1, 1, tzinfo=timezone.utc), + hour_offset=3, + ), + ) + ], + ), + ), + ), + ) + assert defs.sensors + sensor = next(iter(defs.sensors)) + with freeze_time(datetime(2021, 1, 1, 6, tzinfo=timezone.utc)): + sensor_context = build_sensor_context( + repository_def=defs.get_repository_def(), instance=instance + ) + result = sensor(sensor_context) + assert isinstance(result, SensorResult) + assert len(result.asset_events) == 2 + assert_expected_key_order(result.asset_events, ["a", "test_instance/dag/dag"]) + a_asset_mat = result.asset_events[0] + assert isinstance(a_asset_mat, AssetMaterialization) + # We expect the partition to match the logical date. + assert a_asset_mat.partition == "2021-01-01"