diff --git a/examples/experimental/dagster-airlift/examples/simple-migration/.gitignore b/examples/experimental/dagster-airlift/examples/simple-migration/.gitignore deleted file mode 100644 index 082a9f8cc048a..0000000000000 --- a/examples/experimental/dagster-airlift/examples/simple-migration/.gitignore +++ /dev/null @@ -1,2 +0,0 @@ -.airflow_home -.dagster_home \ No newline at end of file diff --git a/examples/experimental/dagster-airlift/examples/simple-migration/.vscode/settings.json b/examples/experimental/dagster-airlift/examples/simple-migration/.vscode/settings.json deleted file mode 100644 index 2121b578c87a3..0000000000000 --- a/examples/experimental/dagster-airlift/examples/simple-migration/.vscode/settings.json +++ /dev/null @@ -1,12 +0,0 @@ -{ - "python.analysis.userFileIndexingLimit": 5000, - "python.analysis.indexing": true, - "python.analysis.extraPaths": [ - "../../../../../python_modules/dagster", - "../../../../../examples/experimental/dagster-airlift", - "../../../../../examples/experimental/dagster-blueprints", - "../../../../../python_modules/libraries/dagster-dbt" - ], - "python.analysis.diagnosticMode": "openFilesOnly", - "python.analysis.useLibraryCodeForTypes": false, -} \ No newline at end of file diff --git a/examples/experimental/dagster-airlift/examples/simple-migration/Makefile b/examples/experimental/dagster-airlift/examples/simple-migration/Makefile deleted file mode 100644 index 881cfb26660e0..0000000000000 --- a/examples/experimental/dagster-airlift/examples/simple-migration/Makefile +++ /dev/null @@ -1,48 +0,0 @@ -.PHONY: help - -define GET_MAKEFILE_DIR -$(shell dirname $(realpath $(lastword $(MAKEFILE_LIST))) | sed 's:/*$$::') -endef - -MAKEFILE_DIR := $(GET_MAKEFILE_DIR) -export DAGSTER_HOME := $(MAKEFILE_DIR)/.dagster_home -export AIRFLOW_HOME := $(MAKEFILE_DIR)/.airflow_home -export DAGSTER_URL := http://localhost:3333 - -help: - @egrep -h '\s##\s' $(MAKEFILE_LIST) | sort | awk 'BEGIN {FS = ":.*?## "}; {printf "\033[36m%-20s\033[0m %s\n", $$1, $$2}' - -dbt_setup: ## Initialize dbt project - dbt seed - -dev_install: - pip install uv && \ - uv pip install -e ../../../dagster-airlift[core,in-airflow] && \ - uv pip install -e . - -# make airflow home and dagster home directories within current directory, set up env vars, and then -# set up airflow environment. -setup_local_env: - make wipe && \ - mkdir -p $$AIRFLOW_HOME && \ - mkdir -p $$DAGSTER_HOME && \ - chmod +x ../../scripts/airflow_setup.sh && \ - ../../scripts/airflow_setup.sh $(MAKEFILE_DIR)/simple_migration/airflow_dags - -run_airflow: - airflow standalone - -run_peer: - dagster dev -m simple_migration.dagster_defs.peer -p 3333 - -run_observe: - dagster dev -m simple_migration.dagster_defs.observe -p 3333 - -run_migrate: - dagster dev -m simple_migration.dagster_defs.migrate -p 3333 - -wipe: ## Wipe out all the files created by the Makefile - rm -rf $$AIRFLOW_HOME $$DAGSTER_HOME - -wipe_dagster: ## Wipe out all the files created by the Makefile - rm -rf $$DAGSTER_HOME diff --git a/examples/experimental/dagster-airlift/examples/simple-migration/README.md b/examples/experimental/dagster-airlift/examples/simple-migration/README.md deleted file mode 100644 index d5522e6a3749b..0000000000000 --- a/examples/experimental/dagster-airlift/examples/simple-migration/README.md +++ /dev/null @@ -1,38 +0,0 @@ -## Example: Peering, Observing, then Migrating with Airlift - -With no changes to airflow code, and minimal dagster code, `dagster-airlift` allows you to "peer" your -airflow dags into dagster as assets. - -### Try it out - -From the root of the `simple-migration` directory, run the `dev_install` make command to install python dependencies. - -```bash -make dev_install -``` - -Run setup commands, which will scaffold a local airflow, dagster instance, and dbt project. - -```bash -make setup_local_env -``` - -Launch airflow, where we've loaded one dag: - -- `simple`, which has three sequential tasks: `t1` -> `t2` -> `t3`. - -```bash -make run_airflow -``` - -In another shell, we can run dagster at the `peer`, `observe`, or `migrate` steps of the migration using any of the following commands: - -```bash -make `run_peer` -make `run_observe` -make `run_migrate` -``` - -Note that in order to run the observation step with `run_observe`, you must set `migrated` to `False` for each task in the dags. These can be found in `./airflow_dags/migration_state/.yaml`. - -These three steps will show the process of constructing dagster assets from each of these tasks (even when a given task actually produces multiple assets). diff --git a/examples/experimental/dagster-airlift/examples/simple-migration/conftest.py b/examples/experimental/dagster-airlift/examples/simple-migration/conftest.py deleted file mode 100644 index 15102201ce4c1..0000000000000 --- a/examples/experimental/dagster-airlift/examples/simple-migration/conftest.py +++ /dev/null @@ -1 +0,0 @@ -pytest_plugins = ["dagster_airlift.test.shared_fixtures"] diff --git a/examples/experimental/dagster-airlift/examples/simple-migration/setup.py b/examples/experimental/dagster-airlift/examples/simple-migration/setup.py deleted file mode 100644 index c52a8c7b2016c..0000000000000 --- a/examples/experimental/dagster-airlift/examples/simple-migration/setup.py +++ /dev/null @@ -1,12 +0,0 @@ -from setuptools import find_packages, setup - -setup( - name="simple-migration", - packages=find_packages(), - install_requires=[ - "dagster", - "dagster-webserver", - "dagster-airlift[core,in-airflow]", - ], - extras_require={"test": ["pytest"]}, -) diff --git a/examples/experimental/dagster-airlift/examples/simple-migration/simple_migration/__init__.py b/examples/experimental/dagster-airlift/examples/simple-migration/simple_migration/__init__.py deleted file mode 100644 index e69de29bb2d1d..0000000000000 diff --git a/examples/experimental/dagster-airlift/examples/simple-migration/simple_migration/airflow_dags/dagster_migration/simple.yaml b/examples/experimental/dagster-airlift/examples/simple-migration/simple_migration/airflow_dags/dagster_migration/simple.yaml deleted file mode 100644 index ab919ee8b6095..0000000000000 --- a/examples/experimental/dagster-airlift/examples/simple-migration/simple_migration/airflow_dags/dagster_migration/simple.yaml +++ /dev/null @@ -1,7 +0,0 @@ -tasks: - -id: t1 - migrated: True - -id: t2 - migrated: False - -id: t3 - migrated: True \ No newline at end of file diff --git a/examples/experimental/dagster-airlift/examples/simple-migration/simple_migration/airflow_dags/simple.py b/examples/experimental/dagster-airlift/examples/simple-migration/simple_migration/airflow_dags/simple.py deleted file mode 100644 index 46582747b8d04..0000000000000 --- a/examples/experimental/dagster-airlift/examples/simple-migration/simple_migration/airflow_dags/simple.py +++ /dev/null @@ -1,32 +0,0 @@ -from datetime import datetime -from pathlib import Path - -from airflow import DAG -from airflow.operators.python import PythonOperator -from dagster_airlift.in_airflow import mark_as_dagster_migrating -from dagster_airlift.migration_state import load_migration_state_from_yaml -from simple_migration.shared import t1_work, t2_work, t3_work - -MIGRATION_YAML_DIR = Path(__file__).parent / "dagster_migration" - - -default_args = { - "owner": "airflow", - "depends_on_past": False, - "start_date": datetime(2023, 1, 1), - "retries": 1, -} - -dag = DAG( - "simple", default_args=default_args, schedule_interval=None, is_paused_upon_creation=False -) -t1 = PythonOperator(task_id="t1", python_callable=t1_work, dag=dag) -t2 = PythonOperator(task_id="t2", python_callable=t2_work, dag=dag) -t3 = PythonOperator(task_id="t3", python_callable=t3_work, dag=dag) -t1.set_downstream(t2) -t2.set_downstream(t3) -# Graph looks like this: t1 -> t2 -> t3 -# Uncomment the line below to mark the dag as migrating -mark_as_dagster_migrating( - migration_state=load_migration_state_from_yaml(MIGRATION_YAML_DIR), global_vars=globals() -) diff --git a/examples/experimental/dagster-airlift/examples/simple-migration/simple_migration/dagster_defs/__init__.py b/examples/experimental/dagster-airlift/examples/simple-migration/simple_migration/dagster_defs/__init__.py deleted file mode 100644 index e69de29bb2d1d..0000000000000 diff --git a/examples/experimental/dagster-airlift/examples/simple-migration/simple_migration/dagster_defs/constants.py b/examples/experimental/dagster-airlift/examples/simple-migration/simple_migration/dagster_defs/constants.py deleted file mode 100644 index 44184409bf3c0..0000000000000 --- a/examples/experimental/dagster-airlift/examples/simple-migration/simple_migration/dagster_defs/constants.py +++ /dev/null @@ -1,10 +0,0 @@ -from pathlib import Path - -AIRFLOW_BASE_URL = "http://localhost:8080" -AIRFLOW_INSTANCE_NAME = "my_airflow_instance" - -# Authentication credentials (lol) -USERNAME = "admin" -PASSWORD = "admin" - -MIGRATION_STATE_PATH = Path(__file__).parent / "migration" diff --git a/examples/experimental/dagster-airlift/examples/simple-migration/simple_migration/dagster_defs/migrate.py b/examples/experimental/dagster-airlift/examples/simple-migration/simple_migration/dagster_defs/migrate.py deleted file mode 100644 index efad1ac6e2d09..0000000000000 --- a/examples/experimental/dagster-airlift/examples/simple-migration/simple_migration/dagster_defs/migrate.py +++ /dev/null @@ -1,42 +0,0 @@ -from typing import Callable, Sequence - -from dagster import AssetSpec -from dagster_airlift.core import AirflowInstance, BasicAuthBackend, build_defs_from_airflow_instance -from dagster_airlift.core.dag_defs import TaskDefs, dag_defs, task_defs -from dagster_airlift.core.python_callable import defs_for_python_callable - -from simple_migration.shared import t1_work, t2_work, t3_work - -from .constants import AIRFLOW_BASE_URL, AIRFLOW_INSTANCE_NAME, PASSWORD, USERNAME - -airflow_instance = AirflowInstance( - auth_backend=BasicAuthBackend( - webserver_url=AIRFLOW_BASE_URL, username=USERNAME, password=PASSWORD - ), - name=AIRFLOW_INSTANCE_NAME, -) - -# Asset graph -a1 = AssetSpec(key="a1") -a2 = AssetSpec(key="a2", deps=[a1]) -a3 = AssetSpec(key="a3", deps=[a1]) -a4 = AssetSpec(key="a4", deps=[a2, a3]) - - -def python_callable_defs_for_task( - task_id: str, python_callable: Callable, asset_specs: Sequence[AssetSpec] -) -> TaskDefs: - return task_defs( - task_id, defs_for_python_callable(python_callable=python_callable, asset_specs=asset_specs) - ) - - -defs = build_defs_from_airflow_instance( - airflow_instance=airflow_instance, - defs=dag_defs( - "simple", - python_callable_defs_for_task("t1", t1_work, [a1]), - python_callable_defs_for_task("t2", t2_work, [a2, a3]), - python_callable_defs_for_task("t3", t3_work, [a4]), - ), -) diff --git a/examples/experimental/dagster-airlift/examples/simple-migration/simple_migration/dagster_defs/observe.py b/examples/experimental/dagster-airlift/examples/simple-migration/simple_migration/dagster_defs/observe.py deleted file mode 100644 index cd1093902e8b1..0000000000000 --- a/examples/experimental/dagster-airlift/examples/simple-migration/simple_migration/dagster_defs/observe.py +++ /dev/null @@ -1,28 +0,0 @@ -from dagster import AssetSpec, Definitions -from dagster_airlift.core import AirflowInstance, BasicAuthBackend, build_defs_from_airflow_instance -from dagster_airlift.core.dag_defs import dag_defs, task_defs - -from .constants import AIRFLOW_BASE_URL, AIRFLOW_INSTANCE_NAME, PASSWORD, USERNAME - -airflow_instance = AirflowInstance( - auth_backend=BasicAuthBackend( - webserver_url=AIRFLOW_BASE_URL, username=USERNAME, password=PASSWORD - ), - name=AIRFLOW_INSTANCE_NAME, -) - -# Asset graph -a1 = AssetSpec(key="a1") -a2 = AssetSpec(key="a2", deps=[a1]) -a3 = AssetSpec(key="a3", deps=[a1]) -a4 = AssetSpec(key="a4", deps=[a2, a3]) - -defs = build_defs_from_airflow_instance( - airflow_instance=airflow_instance, - defs=dag_defs( - "simple", - task_defs("t1", Definitions(assets=[a1])), - task_defs("t2", Definitions(assets=[a2, a3])), - task_defs("t3", Definitions(assets=[a4])), - ), -) diff --git a/examples/experimental/dagster-airlift/examples/simple-migration/simple_migration/dagster_defs/peer.py b/examples/experimental/dagster-airlift/examples/simple-migration/simple_migration/dagster_defs/peer.py deleted file mode 100644 index 7fe1153e80620..0000000000000 --- a/examples/experimental/dagster-airlift/examples/simple-migration/simple_migration/dagster_defs/peer.py +++ /dev/null @@ -1,13 +0,0 @@ -from dagster_airlift.core import AirflowInstance, BasicAuthBackend, build_defs_from_airflow_instance - -from .constants import AIRFLOW_BASE_URL, AIRFLOW_INSTANCE_NAME, PASSWORD, USERNAME - -airflow_instance = AirflowInstance( - auth_backend=BasicAuthBackend( - webserver_url=AIRFLOW_BASE_URL, username=USERNAME, password=PASSWORD - ), - name=AIRFLOW_INSTANCE_NAME, -) - - -defs = build_defs_from_airflow_instance(airflow_instance=airflow_instance) diff --git a/examples/experimental/dagster-airlift/examples/simple-migration/simple_migration/shared.py b/examples/experimental/dagster-airlift/examples/simple-migration/simple_migration/shared.py deleted file mode 100644 index e717128022e90..0000000000000 --- a/examples/experimental/dagster-airlift/examples/simple-migration/simple_migration/shared.py +++ /dev/null @@ -1,19 +0,0 @@ -import time - - -def t1_work() -> None: - print("Executing t1_work") # noqa: T201 - time.sleep(1) - print("Finished t1_work") # noqa: T201 - - -def t2_work() -> None: - print("Executing t2_work") # noqa: T201 - time.sleep(1) - print("Finished t2_work") # noqa: T201 - - -def t3_work() -> None: - print("Executing t3_work") # noqa: T201 - time.sleep(1) - print("Finished t3_work") # noqa: T201 diff --git a/examples/experimental/dagster-airlift/examples/simple-migration/simple_migration_tests/__init__.py b/examples/experimental/dagster-airlift/examples/simple-migration/simple_migration_tests/__init__.py deleted file mode 100644 index e69de29bb2d1d..0000000000000 diff --git a/examples/experimental/dagster-airlift/examples/simple-migration/simple_migration_tests/conftest.py b/examples/experimental/dagster-airlift/examples/simple-migration/simple_migration_tests/conftest.py deleted file mode 100644 index f827fe8aaef5f..0000000000000 --- a/examples/experimental/dagster-airlift/examples/simple-migration/simple_migration_tests/conftest.py +++ /dev/null @@ -1,31 +0,0 @@ -import os -import subprocess -from pathlib import Path -from typing import Generator - -import pytest -from dagster._core.test_utils import environ - - -@pytest.fixture(name="local_env") -def local_env_fixture() -> Generator[None, None, None]: - makefile_dir = Path(__file__).parent.parent - subprocess.run(["make", "setup_local_env"], cwd=makefile_dir, check=True) - with environ( - { - "AIRFLOW_HOME": str(makefile_dir / ".airflow_home"), - "DAGSTER_HOME": str(makefile_dir / ".dagster_home"), - } - ): - yield - subprocess.run(["make", "wipe"], check=True) - - -@pytest.fixture(name="dags_dir") -def dags_dir_fixture() -> Path: - return Path(__file__).parent.parent / "peering_with_dbt" / "airflow_dags" - - -@pytest.fixture(name="airflow_home") -def airflow_home_fixture(local_env) -> Path: - return Path(os.environ["AIRFLOW_HOME"]) diff --git a/examples/experimental/dagster-airlift/examples/simple-migration/simple_migration_tests/test_load_dagster.py b/examples/experimental/dagster-airlift/examples/simple-migration/simple_migration_tests/test_load_dagster.py deleted file mode 100644 index 6f045a524b5ea..0000000000000 --- a/examples/experimental/dagster-airlift/examples/simple-migration/simple_migration_tests/test_load_dagster.py +++ /dev/null @@ -1,12 +0,0 @@ -def test_defs_loads(airflow_instance) -> None: - from simple_migration.dagster_defs.migrate import defs - - assert defs - - from simple_migration.dagster_defs.observe import defs - - assert defs - - from simple_migration.dagster_defs.peer import defs - - assert defs diff --git a/examples/experimental/dagster-airlift/examples/simple-migration/tox.ini b/examples/experimental/dagster-airlift/examples/simple-migration/tox.ini deleted file mode 100644 index 7a1f75e455940..0000000000000 --- a/examples/experimental/dagster-airlift/examples/simple-migration/tox.ini +++ /dev/null @@ -1,26 +0,0 @@ -[tox] -skipsdist = true - -[testenv] -download = True -passenv = - CI_* - COVERALLS_REPO_TOKEN - BUILDKITE* -install_command = uv pip install {opts} {packages} -deps = - -e ../../../../../python_modules/dagster[test] - -e ../../../../../python_modules/dagster-webserver - -e ../../../../../python_modules/dagster-test - -e ../../../../../python_modules/dagster-pipes - -e ../../../../../python_modules/dagster-graphql - -e ../../../../../python_modules/libraries/dagster-dbt - -e ../../../dagster-airlift[core,test,in-airflow] - -e . - pandas -allowlist_externals = - /bin/bash - uv -commands = - !windows: /bin/bash -c '! pip list --exclude-editable | grep -e dagster' - pytest -c ../../../../../pyproject.toml ./simple_migration_tests --snapshot-warn-unused -vv {posargs} \ No newline at end of file diff --git a/pyright/master/requirements-pinned.txt b/pyright/master/requirements-pinned.txt index 5e590b66c306b..c78b3147dc8da 100644 --- a/pyright/master/requirements-pinned.txt +++ b/pyright/master/requirements-pinned.txt @@ -513,7 +513,6 @@ sentry-sdk==2.13.0 setproctitle==1.3.3 setuptools==70.3.0 shellingham==1.5.4 --e examples/experimental/dagster-airlift/examples/simple-migration simplejson==3.19.3 six==1.16.0 skein==0.8.2 diff --git a/pyright/master/requirements.txt b/pyright/master/requirements.txt index 5265a5f68adff..5a776a4a569df 100644 --- a/pyright/master/requirements.txt +++ b/pyright/master/requirements.txt @@ -131,6 +131,5 @@ pendulum<3 -e examples/experimental/dagster-blueprints -e examples/experimental/dagster-airlift[mwaa,dbt,test] # (includes airflow dependencies) -e examples/experimental/dagster-airlift/examples/dbt-example --e examples/experimental/dagster-airlift/examples/simple-migration -e examples/experimental/dagster-airlift/examples/tutorial-example -e examples/use_case_repository[dev]