diff --git a/examples/experimental/dagster-airlift/dagster_airlift/core/airflow_instance.py b/examples/experimental/dagster-airlift/dagster_airlift/core/airflow_instance.py index 86cc9b968c846..5263da03a5fcf 100644 --- a/examples/experimental/dagster-airlift/dagster_airlift/core/airflow_instance.py +++ b/examples/experimental/dagster-airlift/dagster_airlift/core/airflow_instance.py @@ -289,6 +289,14 @@ def config(self) -> Dict[str, Any]: def start_date(self) -> float: return AirflowInstance.timestamp_from_airflow_date(self.metadata["start_date"]) + @property + def start_datetime(self) -> datetime.datetime: + return datetime.datetime.strptime(self.metadata["start_date"], "%Y-%m-%dT%H:%M:%S+00:00") + @property def end_date(self) -> float: return AirflowInstance.timestamp_from_airflow_date(self.metadata["end_date"]) + + @property + def end_datetime(self) -> datetime.datetime: + return datetime.datetime.strptime(self.metadata["end_date"], "%Y-%m-%dT%H:%M:%S+00:00") diff --git a/examples/experimental/dagster-airlift/dagster_airlift/test/__init__.py b/examples/experimental/dagster-airlift/dagster_airlift/test/__init__.py index a1445209a3821..5a050707b8c6e 100644 --- a/examples/experimental/dagster-airlift/dagster_airlift/test/__init__.py +++ b/examples/experimental/dagster-airlift/dagster_airlift/test/__init__.py @@ -3,6 +3,7 @@ DummyAuthBackend as DummyAuthBackend, make_dag_info as make_dag_info, make_dag_run as make_dag_run, + make_instance as make_instance, make_task_info as make_task_info, make_task_instance as make_task_instance, ) diff --git a/examples/experimental/dagster-airlift/dagster_airlift/test/airflow_test_instance.py b/examples/experimental/dagster-airlift/dagster_airlift/test/airflow_test_instance.py index 16847f65faa3e..1c051e02560e1 100644 --- a/examples/experimental/dagster-airlift/dagster_airlift/test/airflow_test_instance.py +++ b/examples/experimental/dagster-airlift/dagster_airlift/test/airflow_test_instance.py @@ -1,5 +1,5 @@ from collections import defaultdict -from datetime import datetime +from datetime import datetime, timedelta from typing import Dict, List, Optional, Tuple import requests @@ -47,6 +47,9 @@ def __init__( name="test_instance", ) + def list_dags(self) -> List[DagInfo]: + return list(self._dag_infos_by_dag_id.values()) + def get_dag_runs(self, dag_id: str, start_date: datetime, end_date: datetime) -> List[DagRun]: if dag_id not in self._dag_runs_by_dag_id: raise ValueError(f"Dag run not found for dag_id {dag_id}") @@ -108,13 +111,19 @@ def make_task_info(dag_id: str, task_id: str) -> TaskInfo: ) -def make_task_instance(dag_id: str, task_id: str, run_id: str) -> TaskInstance: +def make_task_instance( + dag_id: str, task_id: str, run_id: str, start_date: datetime, end_date: datetime +) -> TaskInstance: return TaskInstance( webserver_url="http://dummy.domain", dag_id=dag_id, task_id=task_id, run_id=run_id, - metadata={}, + metadata={ + "state": "success", + "start_date": AirflowInstance.airflow_date_from_datetime(start_date), + "end_date": AirflowInstance.airflow_date_from_datetime(end_date), + }, ) @@ -127,5 +136,50 @@ def make_dag_run(dag_id: str, run_id: str, start_date: datetime, end_date: datet "state": "success", "start_date": AirflowInstance.airflow_date_from_datetime(start_date), "end_date": AirflowInstance.airflow_date_from_datetime(end_date), + "run_type": "manual", + "note": "dummy note", + "conf": {}, }, ) + + +def make_instance( + dag_and_task_structure: Dict[str, List[str]], + dag_runs: List[DagRun] = [], +) -> AirflowInstanceFake: + """Constructs DagInfo, TaskInfo, and TaskInstance objects from provided data. + + Args: + dag_and_task_structure: A dictionary mapping dag_id to a list of task_ids. + dag_runs: A list of DagRun objects to include in the instance. A TaskInstance object will be + created for each task_id in the dag, for each DagRun in dag_runs pertaining to a particular dag. + """ + dag_infos = [] + task_infos = [] + for dag_id, task_ids in dag_and_task_structure.items(): + dag_info = make_dag_info(dag_id=dag_id, file_token=dag_id) + dag_infos.append(dag_info) + task_infos.extend([make_task_info(dag_id=dag_id, task_id=task_id) for task_id in task_ids]) + task_instances = [] + for dag_run in dag_runs: + task_instances.extend( + [ + make_task_instance( + dag_id=dag_run.dag_id, + task_id=task_id, + run_id=dag_run.run_id, + start_date=dag_run.start_datetime, + end_date=dag_run.end_datetime + - timedelta( + seconds=1 + ), # Ensure that the task ends before the full "dag" completes. + ) + for task_id in dag_and_task_structure[dag_run.dag_id] + ] + ) + return AirflowInstanceFake( + dag_infos=dag_infos, + task_infos=task_infos, + task_instances=task_instances, + dag_runs=dag_runs, + ) diff --git a/examples/experimental/dagster-airlift/dagster_airlift_tests/unit_tests/conftest.py b/examples/experimental/dagster-airlift/dagster_airlift_tests/unit_tests/conftest.py index 56bc3c3175079..b9dbda59dfe1d 100644 --- a/examples/experimental/dagster-airlift/dagster_airlift_tests/unit_tests/conftest.py +++ b/examples/experimental/dagster-airlift/dagster_airlift_tests/unit_tests/conftest.py @@ -1,210 +1,75 @@ +from collections import defaultdict from datetime import datetime, timedelta -from typing import Dict, List, Optional, Sequence +from typing import Dict, List, Sequence, Tuple, Union -import requests -from dagster import ( - AssetsDefinition, - AssetSpec, - SensorResult, - asset, - build_sensor_context, - multi_asset, - repository, -) +from dagster import AssetObservation, AssetSpec, Definitions, SensorResult, build_sensor_context +from dagster._core.definitions.asset_check_evaluation import AssetCheckEvaluation from dagster._core.definitions.events import AssetMaterialization from dagster._core.definitions.repository_definition.repository_definition import ( RepositoryDefinition, ) from dagster._time import get_current_datetime -from dagster_airlift.core import AirflowInstance -from dagster_airlift.core.airflow_instance import DagRun, TaskInfo, TaskInstance -from dagster_airlift.core.basic_auth import AirflowAuthBackend -from dagster_airlift.core.sensor import build_airflow_polling_sensor +from dagster_airlift.core import build_defs_from_airflow_instance +from dagster_airlift.test import make_dag_run, make_instance def strip_to_first_of_month(dt: datetime) -> datetime: return dt.replace(day=1, hour=0, minute=0, second=0, microsecond=0) -class DummyAuthBackend(AirflowAuthBackend): - def get_session(self) -> requests.Session: - raise NotImplementedError("This shouldn't be called from this mock context.") - - def get_webserver_url(self) -> str: - return "http://dummy.domain" - - -class DummyInstance(AirflowInstance): - """A dummy instance that returns a single dag run and task instance for each call. - Designed in such a way that timestamps mirror the task_id, so that we can easily test ordering. - If you want some task to complete after a different task, you can simply set the task_id to a higher number. - The dag id should be a number higher than any task id it contains, so that it will complete after all constituent tasks. - This instance is designed to be used with "frozen" time, so that a baseline can be established for testing. - """ - - def __init__(self) -> None: - super().__init__( - auth_backend=DummyAuthBackend(), - name="dummy_instance", - ) - - def get_dag_runs(self, dag_id: str, start_date: datetime, end_date: datetime) -> List[DagRun]: - """Return a single dag run that started and finished within the given range.""" - cur_date = strip_to_first_of_month(get_current_datetime()) - return [ - make_dag_run(cur_date, cur_date + timedelta(days=int(dag_id) + 1), dag_id), - ] - - def get_task_instance(self, dag_id: str, task_id: str, run_id: str) -> TaskInstance: - """Return a task instance that started and finished within the given range. Expects that time has been frozen.""" - cur_date = strip_to_first_of_month(get_current_datetime()) - return make_task_instance( - dag_id, - task_id, - cur_date + timedelta(days=int(task_id)), - cur_date + timedelta(days=int(task_id) + 1), - ) - - def get_task_info(self, dag_id, task_id) -> TaskInfo: - return TaskInfo( - webserver_url="http://localhost:8080", dag_id=dag_id, task_id=task_id, metadata={} - ) - - def get_dag_source_code(self, file_token: str) -> str: - return "source code" - - -def make_dag_run(dag_start: datetime, dag_end: datetime, dag_id: str) -> DagRun: - return DagRun( - metadata={ - "run_type": "manual", - "conf": {}, - "start_date": dag_start.strftime("%Y-%m-%dT%H:%M:%S+00:00"), - "end_date": dag_end.strftime("%Y-%m-%dT%H:%M:%S+00:00"), - "state": "success", - }, - dag_id=dag_id, - run_id="run", - webserver_url="http://localhost:8080", +def build_defs_from_airflow_asset_graph( + assets_per_task: Dict[str, Dict[str, List[Tuple[str, List[str]]]]], + additional_defs: Definitions = Definitions(), +) -> RepositoryDefinition: + specs = [] + dag_and_task_structure = defaultdict(list) + for dag_id, task_structure in assets_per_task.items(): + for task_id, asset_structure in task_structure.items(): + dag_and_task_structure[dag_id].append(task_id) + for asset_key, deps in asset_structure: + specs.append( + AssetSpec( + asset_key, + deps=deps, + tags={"airlift/dag_id": dag_id, "airlift/task_id": task_id}, + ) + ) + instance = make_instance( + dag_and_task_structure=dag_and_task_structure, + dag_runs=[ + make_dag_run( + dag_id=dag_id, + run_id=f"run-{dag_id}", + start_date=get_current_datetime() - timedelta(minutes=10), + end_date=get_current_datetime(), + ) + for dag_id in dag_and_task_structure.keys() + ], ) - - -def make_task_instance( - dag_id: str, task_id: str, task_start: datetime, task_end: datetime -) -> TaskInstance: - return TaskInstance( - metadata={ - "note": "note", - "start_date": task_start.strftime("%Y-%m-%dT%H:%M:%S+00:00"), - "end_date": task_end.strftime("%Y-%m-%dT%H:%M:%S+00:00"), - "state": "success", - }, - dag_id=dag_id, - task_id=task_id, - webserver_url="http://localhost:8080", - run_id="run", + defs = Definitions.merge( + additional_defs, + Definitions(assets=specs), ) - - -def build_task_asset( - deps_graph: Dict[str, List[str]], - task_id: str, - dag_id: str, -) -> AssetsDefinition: - asset_specs = [AssetSpec(key=key, deps=deps) for key, deps in deps_graph.items()] - - @multi_asset(specs=asset_specs, op_tags={"airlift/task_id": task_id, "airlift/dag_id": dag_id}) - def asset_fn(): - pass - - return asset_fn - - -def build_dag_asset( - dag_id: str, -) -> AssetsDefinition: - @asset(op_tags={"airlift/dag_id": dag_id}, key=dag_id) - def asset_fn(): - pass - - return asset_fn - - -def make_test_instance( - get_task_instance_override=None, get_dag_runs_override=None, list_dags_override=None -) -> DummyInstance: - klass_to_instantiate = DummyInstance - if get_task_instance_override: - - class TaskInstanceOverride(klass_to_instantiate): - def get_task_instance(self, dag_id: str, task_id: str, run_id: str) -> TaskInstance: - return get_task_instance_override(self, dag_id, task_id, run_id) - - klass_to_instantiate = TaskInstanceOverride - - if get_dag_runs_override: - - class DagRunsOverride(klass_to_instantiate): # type: ignore - def get_dag_runs( - self, dag_id: str, start_date: datetime, end_date: datetime - ) -> List[DagRun]: - return get_dag_runs_override(self, dag_id, start_date, end_date) - - klass_to_instantiate = DagRunsOverride - - if list_dags_override: - - class ListDagsOverride(klass_to_instantiate): # type: ignore - def list_dags(self): - return list_dags_override(self) - - klass_to_instantiate = ListDagsOverride - - return klass_to_instantiate() - - -def repo_from_defs(assets_defs: List[AssetsDefinition]) -> RepositoryDefinition: - @repository - def repo(): - return assets_defs - - return repo + repo_def = build_defs_from_airflow_instance(instance, defs=defs).get_repository_def() + repo_def.load_all_definitions() + return repo_def def build_and_invoke_sensor( - instance: AirflowInstance, - defs: List[AssetsDefinition], + assets_per_task: Dict[str, Dict[str, List[Tuple[str, List[str]]]]], + additional_defs: Definitions = Definitions(), ) -> SensorResult: - sensor = build_airflow_polling_sensor(instance) - context = build_sensor_context(repository_def=repo_from_defs(defs)) + repo_def = build_defs_from_airflow_asset_graph(assets_per_task, additional_defs=additional_defs) + sensor = next(iter(repo_def.sensor_defs)) + context = build_sensor_context(repository_def=repo_def) result = sensor(context) assert isinstance(result, SensorResult) return result -def build_dag_assets( - tasks_to_asset_deps_graph: Dict[str, Dict[str, List[str]]], - dag_id: Optional[str] = None, -) -> List[AssetsDefinition]: - resolved_dag_id = dag_id or str( - max(int(task_id) for task_id in tasks_to_asset_deps_graph.keys()) + 1 - ) - assets = [] - for task_id, deps_graph in tasks_to_asset_deps_graph.items(): - assets.append(build_task_asset(deps_graph, task_id, resolved_dag_id)) - assets.append(build_dag_asset(resolved_dag_id)) - return assets - - def assert_expected_key_order( - mats: Sequence[AssetMaterialization], expected_key_order: Sequence[str] + mats: Sequence[Union[AssetMaterialization, AssetObservation, AssetCheckEvaluation]], + expected_key_order: Sequence[str], ) -> None: + assert all(isinstance(mat, AssetMaterialization) for mat in mats) assert [mat.asset_key.to_user_string() for mat in mats] == expected_key_order - - -def make_asset(key, deps): - @asset(key=key, deps=deps) - def the_asset(): - pass - - return the_asset diff --git a/examples/experimental/dagster-airlift/dagster_airlift_tests/unit_tests/test_build_defs.py b/examples/experimental/dagster-airlift/dagster_airlift_tests/unit_tests/test_build_defs.py index 5d514c0904178..afa99bc8777dd 100644 --- a/examples/experimental/dagster-airlift/dagster_airlift_tests/unit_tests/test_build_defs.py +++ b/examples/experimental/dagster-airlift/dagster_airlift_tests/unit_tests/test_build_defs.py @@ -13,9 +13,7 @@ ) from dagster._core.definitions.assets import unique_id_from_asset_and_check_keys from dagster_airlift.core import build_defs_from_airflow_instance, dag_defs, task_defs -from dagster_airlift.core.airflow_instance import DagInfo - -from .conftest import make_test_instance +from dagster_airlift.test import make_instance @executor @@ -68,16 +66,8 @@ def the_job(): def test_defs_passthrough() -> None: """Test that passed-through definitions are present in the final definitions.""" - - def list_dags(self): - return [ - DagInfo( - webserver_url="http://localhost:8080", dag_id="dag", metadata={"file_token": "blah"} - ), - ] - defs = build_defs_from_airflow_instance( - airflow_instance=make_test_instance(list_dags_override=list_dags), + airflow_instance=make_instance({"dag": ["task"]}), defs=Definitions( assets=[a, b_spec], asset_checks=[a_check], @@ -115,16 +105,12 @@ def list_dags(self): def test_coerce_specs() -> None: - def list_dags(self): - return [ - DagInfo( - webserver_url="http://localhost:8080", dag_id="dag", metadata={"file_token": "blah"} - ), - ] + """Test that asset specs are properly coerced into asset keys.""" + # Initialize an airflow instance with a dag "dag", which contains a task "task". There are no task instances or runs. spec = AssetSpec(key="a", tags={"airlift/dag_id": "dag", "airlift/task_id": "task"}) defs = build_defs_from_airflow_instance( - airflow_instance=make_test_instance(list_dags_override=list_dags), + airflow_instance=make_instance({"dag": ["task"]}), defs=Definitions( assets=[spec], ), @@ -139,22 +125,12 @@ def list_dags(self): def test_invalid_dagster_named_tasks_and_dags() -> None: """Test that invalid dagster names are converted to valid names.""" - - def list_dags(self): - return [ - DagInfo( - webserver_url="http://localhost:8080", - dag_id="dag-with-hyphens", - metadata={"file_token": "blah"}, - ), - ] - a = AssetKey("a") spec = AssetSpec( key=a, tags={"airlift/dag_id": "dag-with-hyphens", "airlift/task_id": "task-with-hyphens"} ) defs = build_defs_from_airflow_instance( - airflow_instance=make_test_instance(list_dags_override=list_dags), + airflow_instance=make_instance({"dag-with-hyphens": ["task-with-hyphens"]}), defs=Definitions( assets=[spec], ), @@ -177,20 +153,10 @@ def test_unique_node_names_from_specs() -> None: Non-unique name issues manifest as input-output connection issues deep in the stack, so by loading the cacheable assets, we can check to make sure that inputs/outputs are properly hooked up. """ - - def list_dags(self): - return [ - DagInfo( - webserver_url="http://localhost:8080", - dag_id="somedag", - metadata={"file_token": "blah"}, - ), - ] - abc = AssetKey(["a", "b", "c"]) defg = AssetKey(["d", "e", "f", "g"]) defs = build_defs_from_airflow_instance( - airflow_instance=make_test_instance(list_dags_override=list_dags), + airflow_instance=make_instance({"somedag": ["sometask"]}), defs=dag_defs( "somedag", task_defs( diff --git a/examples/experimental/dagster-airlift/dagster_airlift_tests/unit_tests/test_dbt_defs_migrated.py b/examples/experimental/dagster-airlift/dagster_airlift_tests/unit_tests/test_dbt_defs_migrated.py index 91581a168ec21..41c6fc98d84ac 100644 --- a/examples/experimental/dagster-airlift/dagster_airlift_tests/unit_tests/test_dbt_defs_migrated.py +++ b/examples/experimental/dagster-airlift/dagster_airlift_tests/unit_tests/test_dbt_defs_migrated.py @@ -1,8 +1,6 @@ from pathlib import Path -from typing import List from dagster._core.definitions.definitions_class import Definitions -from dagster_airlift.core.airflow_instance import DagInfo from dagster_airlift.core.dag_defs import dag_defs, task_defs from dagster_airlift.core.defs_from_airflow import build_defs_from_airflow_instance from dagster_airlift.dbt import dbt_defs @@ -11,10 +9,9 @@ DagMigrationState, TaskMigrationState, ) +from dagster_airlift.test import make_instance from dagster_dbt.dbt_project import DbtProject -from .conftest import make_test_instance - def get_dbt_project_path() -> Path: return Path(__file__).parent.parent / "integration_tests" / "dbt_project" @@ -34,21 +31,9 @@ def test_dbt_defs() -> None: assert isinstance(dbt_defs_inst, Definitions) - def list_dags(self) -> List[DagInfo]: - return [ - DagInfo( - webserver_url="http://localhost:8080", - dag_id="dag_one", - metadata={"file_token": "blah"}, - ), - DagInfo( - webserver_url="http://localhost:8080", - dag_id="dag_two", - metadata={"file_token": "blah"}, - ), - ] - - test_airflow_instance = make_test_instance(list_dags_override=list_dags) + test_airflow_instance = make_instance( + dag_and_task_structure={"dag_one": ["task_one"], "dag_two": ["task_two"]} + ) initial_defs = Definitions.merge( dag_defs( diff --git a/examples/experimental/dagster-airlift/dagster_airlift_tests/unit_tests/test_sensor.py b/examples/experimental/dagster-airlift/dagster_airlift_tests/unit_tests/test_sensor.py index 9709f8f955c24..8b755d0e82082 100644 --- a/examples/experimental/dagster-airlift/dagster_airlift_tests/unit_tests/test_sensor.py +++ b/examples/experimental/dagster-airlift/dagster_airlift_tests/unit_tests/test_sensor.py @@ -1,30 +1,24 @@ from datetime import datetime, timedelta -from dagster import AssetCheckKey, AssetKey, asset_check +from dagster import AssetCheckKey, AssetKey, AssetSpec, Definitions, asset_check from dagster._core.definitions.events import AssetMaterialization from dagster._core.test_utils import freeze_time -from .conftest import ( - assert_expected_key_order, - build_and_invoke_sensor, - build_dag_assets, - make_asset, - make_test_instance, -) +from .conftest import assert_expected_key_order, build_and_invoke_sensor def test_dag_and_task_metadata() -> None: """Test the metadata produced by a sensor for a single dag and task.""" freeze_datetime = datetime(2021, 1, 1) - instance = make_test_instance() with freeze_time(freeze_datetime): result = build_and_invoke_sensor( - instance=instance, - defs=build_dag_assets(tasks_to_asset_deps_graph={"1": {"a": []}}), # dag will have id 2 + assets_per_task={ + "dag": {"task": [("a", [])]}, + }, ) assert len(result.asset_events) == 2 - assert_expected_key_order(result.asset_events, ["a", "2"]) # type: ignore + assert_expected_key_order(result.asset_events, ["a", "airflow_instance/dag/dag"]) dag_mat = result.asset_events[1] expected_dag_metadata_keys = { "Airflow Run ID", @@ -51,64 +45,67 @@ def test_dag_and_task_metadata() -> None: } assert set(task_mat.metadata.keys()) == expected_task_metadata_keys - assert task_mat.metadata["Airflow Run ID"].value == "run" + assert task_mat.metadata["Airflow Run ID"].value == "run-dag" assert ( task_mat.metadata["Start Date"].value - == (freeze_datetime + timedelta(days=1)).timestamp() + == (freeze_datetime - timedelta(minutes=10)).timestamp() ) assert ( - task_mat.metadata["End Date"].value == (freeze_datetime + timedelta(days=2)).timestamp() + task_mat.metadata["End Date"].value + == (freeze_datetime - timedelta(seconds=1)).timestamp() ) assert task_mat.metadata["Creation Timestamp"].value == freeze_datetime.timestamp() assert ( task_mat.metadata["Run Details"].value - == "[View Run](http://localhost:8080/dags/2/grid?dag_run_id=run&task_id=1)" + == "[View Run](http://dummy.domain/dags/dag/grid?dag_run_id=run-dag&task_id=task)" ) assert ( task_mat.metadata["Task Logs"].value - == "[View Logs](http://localhost:8080/dags/2/grid?dag_run_id=run&task_id=1&tab=logs)" + == "[View Logs](http://dummy.domain/dags/dag/grid?dag_run_id=run-dag&task_id=task&tab=logs)" ) assert task_mat.metadata["Airflow Config"].value == {} assert task_mat.metadata["Run Type"].value == "manual" - assert dag_mat.metadata["Airflow Run ID"].value == "run" - assert dag_mat.metadata["Start Date"].value == freeze_datetime.timestamp() + assert dag_mat.metadata["Airflow Run ID"].value == "run-dag" assert ( - dag_mat.metadata["End Date"].value == (freeze_datetime + timedelta(days=3)).timestamp() + dag_mat.metadata["Start Date"].value + == (freeze_datetime - timedelta(minutes=10)).timestamp() ) + assert dag_mat.metadata["End Date"].value == freeze_datetime.timestamp() assert dag_mat.metadata["Creation Timestamp"].value == freeze_datetime.timestamp() assert ( dag_mat.metadata["Run Details"].value - == "[View Run](http://localhost:8080/dags/2/grid?dag_run_id=run&tab=details)" + == "[View Run](http://dummy.domain/dags/dag/grid?dag_run_id=run-dag&tab=details)" ) assert dag_mat.metadata["Airflow Config"].value == {} assert dag_mat.metadata["Run Type"].value == "manual" def test_interleaved_exeutions() -> None: - """Test that the when task / dag completion is interleaved, the correct ordering is preserved.""" + """Test that the when task / dag completion is interleaved the correct ordering is preserved.""" # Asset graph structure: # a -> b where a and b are each in their own airflow tasks. # c -> d where c and d are each in their own airflow tasks, in a different dag. freeze_datetime = datetime(2021, 1, 1) - - instance = make_test_instance() with freeze_time(freeze_datetime): result = build_and_invoke_sensor( - instance=instance, - defs=[ - *build_dag_assets( - tasks_to_asset_deps_graph={"1": {"a": []}, "2": {"b": ["a"]}}, dag_id="5" - ), - *build_dag_assets( - tasks_to_asset_deps_graph={"3": {"c": []}, "4": {"d": ["c"]}}, dag_id="6" - ), - ], + assets_per_task={ + "dag1": {"task1": [("a", [])], "task2": [("b", ["a"])]}, + "dag2": {"task1": [("c", [])], "task2": [("d", ["c"])]}, + }, ) # We expect one asset materialization per asset. assert len(result.asset_events) == 6 assert all(isinstance(event, AssetMaterialization) for event in result.asset_events) - assert_expected_key_order(result.asset_events, ["a", "b", "c", "d", "5", "6"]) # type: ignore + + mats_order = [mat.asset_key.to_user_string() for mat in result.asset_events] + # a should be before b + assert mats_order.index("a") < mats_order.index("b") + # c should be before d + assert mats_order.index("c") < mats_order.index("d") + # dag1 and dag2 should be after all task-mapped assets + assert mats_order.index("airflow_instance/dag/dag1") >= 4 + assert mats_order.index("airflow_instance/dag/dag2") >= 4 def test_dependencies_within_tasks() -> None: @@ -124,57 +121,46 @@ def test_dependencies_within_tasks() -> None: # / \ # e f freeze_datetime = datetime(2021, 1, 1) - - instance = make_test_instance() with freeze_time(freeze_datetime): result = build_and_invoke_sensor( - instance=instance, - defs=build_dag_assets( - tasks_to_asset_deps_graph={ - "1": {"a": [], "b": ["a"], "c": ["a"]}, - "2": {"d": ["b", "c"], "e": ["d"], "f": ["d"]}, + assets_per_task={ + "dag": { + "task1": [("a", []), ("b", ["a"]), ("c", ["a"])], + "task2": [("d", ["b", "c"]), ("e", ["d"]), ("f", ["d"])], }, - ), + }, ) assert len(result.asset_events) == 7 - assert all(isinstance(event, AssetMaterialization) for event in result.asset_events) - assert_expected_key_order(result.asset_events, ["a", "b", "c", "d", "e", "f", "3"]) # type: ignore + assert_expected_key_order( + result.asset_events, ["a", "b", "c", "d", "e", "f", "airflow_instance/dag/dag"] + ) def test_outside_of_dag_dependency() -> None: """Test that if an asset has a transitive dependency on another asset within the same task, ordering is respected.""" # a -> b -> c where a and c are in the same task, and b is not in any dag. freeze_datetime = datetime(2021, 1, 1) - - instance = make_test_instance() with freeze_time(freeze_datetime): result = build_and_invoke_sensor( - instance=instance, - defs=[ - *build_dag_assets( - tasks_to_asset_deps_graph={ - "1": {"a": [], "c": ["b"]}, - }, - ), - make_asset("b", ["a"]), - ], + assets_per_task={ + "dag": {"task": [("a", []), ("c", ["b"])]}, + }, + additional_defs=Definitions(assets=[AssetSpec(key="b", deps=["a"])]), ) assert len(result.asset_events) == 3 assert all(isinstance(event, AssetMaterialization) for event in result.asset_events) - assert_expected_key_order(result.asset_events, ["a", "c", "2"]) # type: ignore + assert_expected_key_order(result.asset_events, ["a", "c", "airflow_instance/dag/dag"]) def test_request_asset_checks() -> None: """Test that when a new dag or task run is detected, a new check run is requested for all checks which may target that dag/task.""" freeze_datetime = datetime(2021, 1, 1) - instance = make_test_instance() - @asset_check(asset="a") def check_task_asset(): pass - @asset_check(asset="2") + @asset_check(asset=["airflow_instance", "dag", "dag"]) def check_dag_asset(): pass @@ -184,18 +170,13 @@ def check_unrelated_asset(): with freeze_time(freeze_datetime): result = build_and_invoke_sensor( - instance=instance, - defs=[ - *build_dag_assets( - tasks_to_asset_deps_graph={ - "1": {"a": [], "b": []}, - }, - ), - make_asset("c", []), - check_dag_asset, - check_task_asset, - check_unrelated_asset, - ], + assets_per_task={ + "dag": {"task": [("a", []), ("b", ["a"])]}, + }, + additional_defs=Definitions( + asset_checks=[check_task_asset, check_dag_asset, check_unrelated_asset], + assets=[AssetSpec(key="c")], + ), ) assert len(result.asset_events) == 3 @@ -205,5 +186,7 @@ def check_unrelated_asset(): assert run_request.asset_check_keys assert set(run_request.asset_check_keys) == { AssetCheckKey(name="check_task_asset", asset_key=AssetKey(["a"])), - AssetCheckKey(name="check_dag_asset", asset_key=AssetKey(["2"])), + AssetCheckKey( + name="check_dag_asset", asset_key=AssetKey(["airflow_instance", "dag", "dag"]) + ), } diff --git a/examples/experimental/dagster-airlift/dagster_airlift_tests/unit_tests/test_test_utils.py b/examples/experimental/dagster-airlift/dagster_airlift_tests/unit_tests/test_test_utils.py index a323d4aedfdb2..c6d612ec74dc2 100644 --- a/examples/experimental/dagster-airlift/dagster_airlift_tests/unit_tests/test_test_utils.py +++ b/examples/experimental/dagster-airlift/dagster_airlift_tests/unit_tests/test_test_utils.py @@ -11,10 +11,16 @@ def test_test_instance() -> None: - """Test the AirflowTestInstance class.""" + """Test the AirflowInstanceFake class.""" dag_info = make_dag_info(dag_id="test_dag", file_token="test_file_token") task_info = make_task_info(dag_id="test_dag", task_id="test_task") - task_instance = make_task_instance(dag_id="test_dag", task_id="test_task", run_id="test_run_id") + task_instance = make_task_instance( + dag_id="test_dag", + task_id="test_task", + run_id="test_run_id", + start_date=datetime(2022, 1, 1), + end_date=datetime(2022, 1, 2), + ) dag_run = make_dag_run( dag_id="test_dag", run_id="test_run_id", @@ -61,6 +67,7 @@ def test_test_instance() -> None: dag_id="nonexistent_dag", start_date=datetime(2022, 1, 1), end_date=datetime(2022, 1, 2) ) + assert test_instance.list_dags() == [dag_info] assert test_instance.get_dag_info(dag_id="test_dag") == dag_info # Matching range assert test_instance.get_dag_runs(