From 19f73c214ba96e591d88953586fa618e7602095f Mon Sep 17 00:00:00 2001
From: Christopher DeCarolis <cdguitar817@gmail.com>
Date: Fri, 13 Sep 2024 09:44:18 -0700
Subject: [PATCH] [dagster-airlift][rfc] perf harness (#24277)

Create a harness for easily testing performance of airlift.
- shared file controls the number of dags and observe/migrated pieces
created.
- run make commands to run various perf scenarios.
## Example incantations
- make run_perf_10_10 (10 tasks with 10 dags)
- make run_perf_150_1 (150 dags, 1 task)
## Changelog
`NOCHANGELOG`

## How I tested this
An admittedly very slow unit test that runs live airflow, and flits
between peer observe and migrate steps, ensuring each can successfully
receive a materialization.
---
 .../dagster_buildkite/steps/packages.py       |   6 +
 .../dagster_airlift/core/airflow_instance.py  |  24 +-
 .../dagster_airlift/core/defs_from_airflow.py |   2 +
 .../in_airflow/mark_as_migrating.py           |   2 +-
 .../dagster_airlift/test/shared_fixtures.py   |  42 +++-
 .../test_dagster_operator.py                  |   4 +-
 .../.airflow_home/logs/scheduler/latest       |   1 -
 .../examples/perf-harness/.gitignore          | 189 +++++++++++++++
 .../examples/perf-harness/Makefile            |  67 +++++
 .../examples/perf-harness/README.md           |  24 ++
 .../examples/perf-harness/conftest.py         |   1 +
 .../airflow.db => perf_harness/__init__.py}   |   0
 .../perf_harness/airflow_dags/__init__.py     |   0
 .../perf_harness/airflow_dags/dags.py         |  60 +++++
 .../examples/perf-harness/perf_harness/cli.py | 229 ++++++++++++++++++
 .../perf_harness/dagster_defs/__init__.py     |   0
 .../perf_harness/dagster_defs/constants.py    |   9 +
 .../perf_harness/dagster_defs/migrate.py      |  44 ++++
 .../perf_harness/dagster_defs/observe.py      |  37 +++
 .../perf_harness/dagster_defs/peer.py         |  13 +
 .../perf_harness/shared/constants.py          |  33 +++
 .../perf_harness/shared/constants.txt         |   2 +
 .../perf-harness/perf_harness/shared/utils.py |  22 ++
 .../perf_harness_tests/__init__.py            |   0
 .../perf_harness_tests/conftest.py            |  31 +++
 .../perf_harness_tests/test_cli.py            |  38 +++
 .../perf_harness_tests/test_e2e.py            |  56 +++++
 .../examples/perf-harness/pyproject.toml      |   6 +
 .../scripts/generate_yaml_files.py            |  17 ++
 .../examples/perf-harness/setup.py            |  19 ++
 .../examples/perf-harness/tox.ini             |  29 +++
 pyright/master/requirements-pinned.txt        |   1 +
 pyright/master/requirements.txt               |   1 +
 33 files changed, 994 insertions(+), 15 deletions(-)
 delete mode 120000 examples/experimental/dagster-airlift/examples/perf-harness/.airflow_home/logs/scheduler/latest
 create mode 100644 examples/experimental/dagster-airlift/examples/perf-harness/.gitignore
 create mode 100644 examples/experimental/dagster-airlift/examples/perf-harness/Makefile
 create mode 100644 examples/experimental/dagster-airlift/examples/perf-harness/README.md
 create mode 100644 examples/experimental/dagster-airlift/examples/perf-harness/conftest.py
 rename examples/experimental/dagster-airlift/examples/perf-harness/{.airflow_home/airflow.db => perf_harness/__init__.py} (100%)
 create mode 100644 examples/experimental/dagster-airlift/examples/perf-harness/perf_harness/airflow_dags/__init__.py
 create mode 100644 examples/experimental/dagster-airlift/examples/perf-harness/perf_harness/airflow_dags/dags.py
 create mode 100644 examples/experimental/dagster-airlift/examples/perf-harness/perf_harness/cli.py
 create mode 100644 examples/experimental/dagster-airlift/examples/perf-harness/perf_harness/dagster_defs/__init__.py
 create mode 100644 examples/experimental/dagster-airlift/examples/perf-harness/perf_harness/dagster_defs/constants.py
 create mode 100644 examples/experimental/dagster-airlift/examples/perf-harness/perf_harness/dagster_defs/migrate.py
 create mode 100644 examples/experimental/dagster-airlift/examples/perf-harness/perf_harness/dagster_defs/observe.py
 create mode 100644 examples/experimental/dagster-airlift/examples/perf-harness/perf_harness/dagster_defs/peer.py
 create mode 100644 examples/experimental/dagster-airlift/examples/perf-harness/perf_harness/shared/constants.py
 create mode 100644 examples/experimental/dagster-airlift/examples/perf-harness/perf_harness/shared/constants.txt
 create mode 100644 examples/experimental/dagster-airlift/examples/perf-harness/perf_harness/shared/utils.py
 create mode 100644 examples/experimental/dagster-airlift/examples/perf-harness/perf_harness_tests/__init__.py
 create mode 100644 examples/experimental/dagster-airlift/examples/perf-harness/perf_harness_tests/conftest.py
 create mode 100644 examples/experimental/dagster-airlift/examples/perf-harness/perf_harness_tests/test_cli.py
 create mode 100644 examples/experimental/dagster-airlift/examples/perf-harness/perf_harness_tests/test_e2e.py
 create mode 100644 examples/experimental/dagster-airlift/examples/perf-harness/pyproject.toml
 create mode 100644 examples/experimental/dagster-airlift/examples/perf-harness/scripts/generate_yaml_files.py
 create mode 100644 examples/experimental/dagster-airlift/examples/perf-harness/setup.py
 create mode 100644 examples/experimental/dagster-airlift/examples/perf-harness/tox.ini

diff --git a/.buildkite/dagster-buildkite/dagster_buildkite/steps/packages.py b/.buildkite/dagster-buildkite/dagster_buildkite/steps/packages.py
index e2d7b7136153f..7b793a55483e3 100644
--- a/.buildkite/dagster-buildkite/dagster_buildkite/steps/packages.py
+++ b/.buildkite/dagster-buildkite/dagster_buildkite/steps/packages.py
@@ -366,6 +366,12 @@ def k8s_extra_cmds(version: str, _) -> List[str]:
             AvailablePythonVersion.V3_12,
         ],
     ),
+    PackageSpec(
+        "examples/experimental/dagster-airlift/examples/perf-harness",
+        unsupported_python_versions=[
+            AvailablePythonVersion.V3_12,
+        ],
+    ),
     PackageSpec(
         "examples/experimental/dagster-airlift/examples/tutorial-example",
         unsupported_python_versions=[
diff --git a/examples/experimental/dagster-airlift/dagster_airlift/core/airflow_instance.py b/examples/experimental/dagster-airlift/dagster_airlift/core/airflow_instance.py
index ff4606884db24..d6d9ddf3f1fdb 100644
--- a/examples/experimental/dagster-airlift/dagster_airlift/core/airflow_instance.py
+++ b/examples/experimental/dagster-airlift/dagster_airlift/core/airflow_instance.py
@@ -1,5 +1,6 @@
 import datetime
 import json
+import time
 from abc import ABC
 from functools import cached_property
 from typing import Any, Dict, List, Sequence
@@ -28,6 +29,7 @@
 # This corresponds directly to the page_limit parameter on airflow's batch dag runs rest API.
 # Airflow dag run batch API: https://airflow.apache.org/docs/apache-airflow/stable/stable-rest-api-ref.html#operation/get_dag_runs_batch
 DEFAULT_BATCH_DAG_RUNS_LIMIT = 100
+SLEEP_SECONDS = 1
 
 
 class AirflowAuthBackend(ABC):
@@ -278,8 +280,14 @@ def wait_for_run_completion(self, dag_id: str, run_id: str, timeout: int = 30) -
             dag_run = self.get_dag_run(dag_id, run_id)
             if dag_run.finished:
                 return
+            time.sleep(
+                SLEEP_SECONDS
+            )  # Sleep for a second before checking again. This way we don't flood the rest API with requests.
         raise DagsterError(f"Timed out waiting for airflow run {run_id} to finish.")
 
+    def get_run_state(self, dag_id: str, run_id: str) -> str:
+        return self.get_dag_run(dag_id, run_id).state
+
     @staticmethod
     def timestamp_from_airflow_date(airflow_date: str) -> float:
         try:
@@ -293,6 +301,16 @@ def timestamp_from_airflow_date(airflow_date: str) -> float:
     def airflow_date_from_datetime(datetime: datetime.datetime) -> str:
         return datetime.strftime("%Y-%m-%dT%H:%M:%S+00:00")
 
+    def delete_run(self, dag_id: str, run_id: str) -> None:
+        response = self.auth_backend.get_session().delete(
+            f"{self.get_api_url()}/dags/{dag_id}/dagRuns/{run_id}"
+        )
+        if response.status_code != 204:
+            raise DagsterError(
+                f"Failed to delete run for {dag_id}/{run_id}. Status code: {response.status_code}, Message: {response.text}"
+            )
+        return None
+
 
 @record
 class DagInfo:
@@ -385,7 +403,11 @@ def success(self) -> bool:
 
     @property
     def finished(self) -> bool:
-        return self.metadata["state"] in TERMINAL_STATES
+        return self.state in TERMINAL_STATES
+
+    @property
+    def state(self) -> str:
+        return self.metadata["state"]
 
     @property
     def run_type(self) -> str:
diff --git a/examples/experimental/dagster-airlift/dagster_airlift/core/defs_from_airflow.py b/examples/experimental/dagster-airlift/dagster_airlift/core/defs_from_airflow.py
index 5a13a73f766fa..2586834e598c8 100644
--- a/examples/experimental/dagster-airlift/dagster_airlift/core/defs_from_airflow.py
+++ b/examples/experimental/dagster-airlift/dagster_airlift/core/defs_from_airflow.py
@@ -1,6 +1,7 @@
 from typing import Optional
 
 from dagster import Definitions
+from dagster._utils.warnings import suppress_dagster_warnings
 
 from dagster_airlift.core.sensor import (
     DEFAULT_AIRFLOW_SENSOR_INTERVAL_SECONDS,
@@ -12,6 +13,7 @@
 from .airflow_instance import AirflowInstance
 
 
+@suppress_dagster_warnings
 def build_defs_from_airflow_instance(
     airflow_instance: AirflowInstance,
     cache_polling_interval: int = DEFAULT_POLL_INTERVAL,
diff --git a/examples/experimental/dagster-airlift/dagster_airlift/in_airflow/mark_as_migrating.py b/examples/experimental/dagster-airlift/dagster_airlift/in_airflow/mark_as_migrating.py
index d85bac2ce3662..e4ef079927e1d 100644
--- a/examples/experimental/dagster-airlift/dagster_airlift/in_airflow/mark_as_migrating.py
+++ b/examples/experimental/dagster-airlift/dagster_airlift/in_airflow/mark_as_migrating.py
@@ -130,7 +130,7 @@ def set_migration_state_for_dag_if_changed(
 
 
 def get_migration_var_for_dag(dag_id: str) -> Optional[DagMigrationState]:
-    migration_var = Variable.get(f"{dag_id}_dagster_migration_state")
+    migration_var = Variable.get(f"{dag_id}_dagster_migration_state", None)
     if not migration_var:
         return None
     return DagMigrationState.from_dict(json.loads(migration_var))
diff --git a/examples/experimental/dagster-airlift/dagster_airlift/test/shared_fixtures.py b/examples/experimental/dagster-airlift/dagster_airlift/test/shared_fixtures.py
index 1dd8ca3469254..56cafcc6d0029 100644
--- a/examples/experimental/dagster-airlift/dagster_airlift/test/shared_fixtures.py
+++ b/examples/experimental/dagster-airlift/dagster_airlift/test/shared_fixtures.py
@@ -2,9 +2,10 @@
 import signal
 import subprocess
 import time
+from contextlib import contextmanager
 from pathlib import Path
 from tempfile import TemporaryDirectory
-from typing import Any, Callable, Generator, Optional
+from typing import Any, Callable, Generator, List, Optional
 
 import mock
 import pytest
@@ -18,7 +19,7 @@
 # Sets up the airflow environment for testing. Running at localhost:8080.
 # Callsites are expected to provide implementations for dags_dir fixture.
 ####################################################################################################
-def _airflow_is_ready():
+def _airflow_is_ready() -> bool:
     try:
         response = requests.get("http://localhost:8080")
         return response.status_code == 200
@@ -58,13 +59,20 @@ def _reserialize_dags() -> None:
     return _reserialize_dags
 
 
-@pytest.fixture(name="airflow_instance")
-def airflow_instance_fixture(setup: None) -> Generator[subprocess.Popen, None, None]:
+@contextmanager
+def stand_up_airflow(
+    env: Any = {},
+    airflow_cmd: List[str] = ["airflow", "standalone"],
+    cwd: Optional[Path] = None,
+    stdout_channel: Optional[int] = None,
+) -> Generator[subprocess.Popen, None, None]:
     process = subprocess.Popen(
-        ["airflow", "standalone"],
-        env=os.environ,  # since we have some temp vars in the env right now
+        airflow_cmd,
+        cwd=cwd,
+        env=env,  # since we have some temp vars in the env right now
         shell=False,
         preexec_fn=os.setsid,  # noqa  # fuck it we ball
+        stdout=stdout_channel,
     )
     # Give airflow a second to stand up
     time.sleep(5)
@@ -83,6 +91,12 @@ def airflow_instance_fixture(setup: None) -> Generator[subprocess.Popen, None, N
     os.killpg(process.pid, signal.SIGKILL)
 
 
+@pytest.fixture(name="airflow_instance")
+def airflow_instance_fixture(setup: None) -> Generator[subprocess.Popen, None, None]:
+    with stand_up_airflow(env=os.environ) as process:
+        yield process
+
+
 ####################################################################################################
 # DAGSTER SETUP FIXTURES
 # Sets up the dagster environment for testing. Running at localhost:3333.
@@ -104,13 +118,21 @@ def setup_dagster_home() -> Generator[str, None, None]:
             yield tmpdir
 
 
+@pytest.fixture(name="dagster_defs_type")
+def dagster_defs_file_type() -> str:
+    """Return the type of file that contains the dagster definitions."""
+    return "-f"
+
+
 @pytest.fixture(name="dagster_dev")
-def setup_dagster(dagster_home: str, dagster_defs_path: str) -> Generator[Any, None, None]:
+def setup_dagster(
+    dagster_home: str, dagster_defs_path: str, dagster_defs_type: str
+) -> Generator[Any, None, None]:
     """Stands up a dagster instance using the dagster dev CLI. dagster_defs_path must be provided
     by a fixture included in the callsite.
     """
     process = subprocess.Popen(
-        ["dagster", "dev", "-f", dagster_defs_path, "-p", "3333"],
+        ["dagster", "dev", dagster_defs_type, dagster_defs_path, "-p", "3333"],
         env=os.environ.copy(),
         shell=False,
         preexec_fn=os.setsid,  # noqa
@@ -139,8 +161,8 @@ def setup_dagster(dagster_home: str, dagster_defs_path: str) -> Generator[Any, N
 VAR_DICT = {}
 
 
-def dummy_get_var(key: str) -> Optional[str]:
-    return VAR_DICT.get(key)
+def dummy_get_var(key: str, default: Any) -> Optional[str]:
+    return VAR_DICT.get(key, default)
 
 
 def dummy_set_var(key: str, value: str, session: Any) -> None:
diff --git a/examples/experimental/dagster-airlift/dagster_airlift_tests/integration_tests/test_dagster_operator.py b/examples/experimental/dagster-airlift/dagster_airlift_tests/integration_tests/test_dagster_operator.py
index c3e201b53807c..837157133134f 100644
--- a/examples/experimental/dagster-airlift/dagster_airlift_tests/integration_tests/test_dagster_operator.py
+++ b/examples/experimental/dagster-airlift/dagster_airlift_tests/integration_tests/test_dagster_operator.py
@@ -14,8 +14,8 @@ def dags_dir() -> Path:
 
 
 @pytest.fixture(name="dagster_defs_path")
-def dagster_defs_path() -> Path:
-    return Path(__file__).parent / "operator_test_project" / "dagster_defs.py"
+def dagster_defs_path_fixture() -> str:
+    return str(Path(__file__).parent / "operator_test_project" / "dagster_defs.py")
 
 
 def test_dagster_operator(airflow_instance: None, dagster_dev: None, dagster_home: str) -> None:
diff --git a/examples/experimental/dagster-airlift/examples/perf-harness/.airflow_home/logs/scheduler/latest b/examples/experimental/dagster-airlift/examples/perf-harness/.airflow_home/logs/scheduler/latest
deleted file mode 120000
index e76beaed29510..0000000000000
--- a/examples/experimental/dagster-airlift/examples/perf-harness/.airflow_home/logs/scheduler/latest
+++ /dev/null
@@ -1 +0,0 @@
-/Users/christopherdecarolis/dagster/examples/experimental/dagster-airlift/examples/perf-harness/.airflow_home/logs/scheduler/2024-09-09
\ No newline at end of file
diff --git a/examples/experimental/dagster-airlift/examples/perf-harness/.gitignore b/examples/experimental/dagster-airlift/examples/perf-harness/.gitignore
new file mode 100644
index 0000000000000..fe8d5a85d787b
--- /dev/null
+++ b/examples/experimental/dagster-airlift/examples/perf-harness/.gitignore
@@ -0,0 +1,189 @@
+.airflow_home
+.dagster_home
+customers.csv
+
+# Byte-compiled / optimized / DLL files
+__pycache__/
+*.py[cod]
+*$py.class
+
+# C extensions
+*.so
+
+# Distribution / packaging
+.Python
+env/
+build/
+develop-eggs/
+dist/
+downloads/
+eggs/
+.eggs/
+lib/
+lib64/
+parts/
+sdist/
+var/
+wheels/
+*.egg-info/
+.installed.cfg
+*.egg
+
+# PyInstaller
+#  Usually these files are written by a python script from a template
+#  before PyInstaller builds the exe, so as to inject date/other infos into it.
+*.manifest
+*.spec
+
+# Installer logs
+pip-log.txt
+pip-delete-this-directory.txt
+
+# Unit test / coverage reports
+htmlcov/
+.tox/
+.coverage
+.coverage.*
+.cache
+nosetests.xml
+coverage.xml
+*.cover
+.hypothesis/
+mlruns/
+
+# Translations
+*.mo
+*.pot
+
+# Django stuff:
+*.log
+local_settings.py
+
+# Flask stuff:
+.webassets-cache
+
+# Scrapy stuff:
+.scrapy
+
+# Sphinx documentation
+docs/_build/
+
+# PyBuilder
+target/
+
+# Jupyter Notebook
+.ipynb_checkpoints
+
+# pyenv
+.python-version
+
+# celery beat schedule file
+celerybeat-schedule
+
+# SageMath parsed files
+*.sage.py
+
+# dotenv
+.env
+.envrc
+
+# virtualenv
+.direnv/
+.venv
+venv/
+ENV/
+Pipfile
+Pipfile.lock
+
+# Spyder project settings
+.spyderproject
+.spyproject
+
+# Rope project settings
+.ropeproject
+
+# mkdocs documentation
+/site
+
+# ruff
+.ruff_cache/
+
+# mypy
+.mypy_cache/
+
+tags
+!python_modules/dagster/dagster/_core/definitions/tags
+
+.pytest_cache
+.DS_Store
+
+docs/_build
+python_modules/dagster/docs/_build
+
+dagit_run_logs
+
+python_modules/libraries/dagster-aws/dagster_aws/ecs/config.yaml
+
+python_modules/dagster-webserver/node_modules/
+python_modules/dagster-webserver/yarn.lock
+
+# old dagit stuff
+python_modules/dagit/node_modules/
+python_modules/dagit/yarn.lock
+js_modules/dagit
+
+# Gatsby stuff
+docs/gatsby/**/node_modules/
+docs/gatsby/**/_build
+docs/gatsby/**/public
+# Next stuff
+docs/next/.mdx-data
+docs/next/public/sitemap.xml
+# Data
+data
+# Don't ignore data folders in examples
+!examples/*/data
+!examples/**/**/data
+
+# Dask
+dask-worker-space
+
+# PyCharm IDE Config files
+.idea/
+
+# Codemod bookmarks
+.codemod.bookmark
+
+# Examples outputs
+examples/docs_snippets/docs_snippets/**/**/output/
+examples/docs_snippets/docs_snippets/**/**/output/
+examples/**/**/example.db
+
+# Telemetry instance id
+.telemetry
+
+test_results.xml
+
+# GitHub Codespaces
+pythonenv*/
+
+# Vim project-local settings
+.vim
+
+# DuckDB
+*.duckdb
+
+# PyRight config
+pyrightconfig*
+
+# Scripts working directory
+scripts/.build
+
+# dbt .user files
+.user.yml
+
+# output files
+**/**.o
+
+perf_harness/airflow_dags/migration_state/*
+**/**perf_output.txt
\ No newline at end of file
diff --git a/examples/experimental/dagster-airlift/examples/perf-harness/Makefile b/examples/experimental/dagster-airlift/examples/perf-harness/Makefile
new file mode 100644
index 0000000000000..b073c057c21c7
--- /dev/null
+++ b/examples/experimental/dagster-airlift/examples/perf-harness/Makefile
@@ -0,0 +1,67 @@
+.PHONY: help dev_install run_airflow wipe run_peer rm_migration_state scaffold_observe run_observe scaffold_migrate run_migrate setup_local_env run_perf_scenarios_test run_perf_10_10 run_perf_150_1 run_perf_150_5
+
+define GET_MAKEFILE_DIR
+$(shell dirname $(realpath $(lastword $(MAKEFILE_LIST))) | sed 's:/*$$::')
+endef
+
+MAKEFILE_DIR := $(GET_MAKEFILE_DIR)
+export DAGSTER_HOME := $(MAKEFILE_DIR)/.dagster_home
+export DAGSTER_AIRLIFT_MIGRATION_STATE_DIR := $(MAKEFILE_DIR)/perf_harness/airflow_dags/migration_state
+export AIRFLOW_HOME := $(MAKEFILE_DIR)/.airflow_home
+export DAGSTER_URL := http://localhost:3333
+
+help:
+	@egrep -h '\s##\s' $(MAKEFILE_LIST) | sort | awk 'BEGIN {FS = ":.*?## "}; {printf "\033[36m%-20s\033[0m %s\n", $$1, $$2}'
+
+dev_install:
+	pip install uv && \
+	uv pip install -e ../../../dagster-airlift
+	uv pip install -e .
+
+run_airflow: 
+	airflow standalone
+
+wipe: ## Wipe out all the files created by the Makefile
+	rm -rf $$AIRFLOW_HOME $$DAGSTER_HOME
+
+wipe_dagster: ## Wipe out all the files created by the Makefile
+	rm -rf $$DAGSTER_HOME
+
+run_peer:
+	dagster dev -m perf_harness.dagster_defs.peer
+
+rm_migration_state:
+	rm -rf $(MAKEFILE_DIR)/perf_harness/airflow_dags/migration_state
+
+scaffold_observe: rm_migration_state
+	python $(MAKEFILE_DIR)/scripts/generate_yaml_files.py False
+
+run_observe: scaffold_observe
+	dagster dev -m perf_harness.dagster_defs.observe
+
+scaffold_migrate: rm_migration_state
+	python $(MAKEFILE_DIR)/scripts/generate_yaml_files.py True
+
+run_migrate: scaffold_migrate
+	dagster dev -m perf_harness.dagster_defs.migrate
+
+# make airflow home and dagster home directories within current directory, set up env vars, and then
+# set up airflow environment.
+setup_local_env: scaffold_observe
+	make wipe && \
+	mkdir -p $$AIRFLOW_HOME && \
+	mkdir -p $$DAGSTER_HOME && \
+	chmod +x ../../scripts/airflow_setup.sh && \
+	../../scripts/airflow_setup.sh $(MAKEFILE_DIR)/perf_harness/airflow_dags
+
+run_perf_scenarios_test:
+	perf-harness 1 1
+
+run_perf_10_10:
+	perf-harness 10 10
+
+run_perf_150_1:
+	perf-harness 150 1
+
+run_perf_150_5:
+	perf-harness 150 5
\ No newline at end of file
diff --git a/examples/experimental/dagster-airlift/examples/perf-harness/README.md b/examples/experimental/dagster-airlift/examples/perf-harness/README.md
new file mode 100644
index 0000000000000..490da599f56ac
--- /dev/null
+++ b/examples/experimental/dagster-airlift/examples/perf-harness/README.md
@@ -0,0 +1,24 @@
+## Many dags example
+
+This is used to time various airlift components.
+
+## Installation
+
+From the Dagster directory
+
+```bash
+cd experimental/dagster-airlift/examples/perf-harness
+pip install uv
+uv pip install -e .
+```
+
+Sanity check run
+
+```bash
+perf-harness 1 1
+```
+
+Then check `experimental/dagster-airlift/examples/perf-harness/perf_harness/shared`. You should see a file
+`1_dags_1_tasks_perf_output.txt` with airlift timings.
+
+The first argument to `perf-harness` is the number of dags, the second is the number of tasks.
diff --git a/examples/experimental/dagster-airlift/examples/perf-harness/conftest.py b/examples/experimental/dagster-airlift/examples/perf-harness/conftest.py
new file mode 100644
index 0000000000000..15102201ce4c1
--- /dev/null
+++ b/examples/experimental/dagster-airlift/examples/perf-harness/conftest.py
@@ -0,0 +1 @@
+pytest_plugins = ["dagster_airlift.test.shared_fixtures"]
diff --git a/examples/experimental/dagster-airlift/examples/perf-harness/.airflow_home/airflow.db b/examples/experimental/dagster-airlift/examples/perf-harness/perf_harness/__init__.py
similarity index 100%
rename from examples/experimental/dagster-airlift/examples/perf-harness/.airflow_home/airflow.db
rename to examples/experimental/dagster-airlift/examples/perf-harness/perf_harness/__init__.py
diff --git a/examples/experimental/dagster-airlift/examples/perf-harness/perf_harness/airflow_dags/__init__.py b/examples/experimental/dagster-airlift/examples/perf-harness/perf_harness/airflow_dags/__init__.py
new file mode 100644
index 0000000000000..e69de29bb2d1d
diff --git a/examples/experimental/dagster-airlift/examples/perf-harness/perf_harness/airflow_dags/dags.py b/examples/experimental/dagster-airlift/examples/perf-harness/perf_harness/airflow_dags/dags.py
new file mode 100644
index 0000000000000..6662000260403
--- /dev/null
+++ b/examples/experimental/dagster-airlift/examples/perf-harness/perf_harness/airflow_dags/dags.py
@@ -0,0 +1,60 @@
+# ruff: noqa: T201
+import time
+
+# Start timing for imports
+import_start_time = time.time()
+from pathlib import Path
+
+from airflow import DAG
+from airflow.operators.python import PythonOperator
+from dagster._time import get_current_datetime
+from dagster_airlift.in_airflow import mark_as_dagster_migrating
+from dagster_airlift.migration_state import load_migration_state_from_yaml
+
+from perf_harness.shared.constants import get_num_dags, get_num_tasks
+
+# End timing for imports
+import_end_time = time.time()
+import_time = import_end_time - import_start_time
+
+# Start timing for DAG creation
+dag_creation_start_time = time.time()
+
+default_args = {
+    "owner": "airflow",
+    "depends_on_past": False,
+    "start_date": get_current_datetime(),
+}
+global_vars = globals()
+for i in range(get_num_dags()):
+    dag = DAG(
+        dag_id=f"dag_{i}",
+        default_args=default_args,
+        is_paused_upon_creation=False,
+    )
+    for j in range(get_num_tasks()):
+        global_vars[f"task_{i}_{j}"] = PythonOperator(
+            python_callable=lambda: print(f"Task {i}_{j}"),
+            task_id=f"task_{i}_{j}",
+            dag=dag,
+        )
+    global_vars[f"dag_{i}"] = dag
+
+# End timing for DAG creation
+dag_creation_end_time = time.time()
+dag_creation_time = dag_creation_end_time - dag_creation_start_time
+
+# Start timing for mark_as_dagster_migrating
+mark_as_dagster_start_time = time.time()
+
+mark_as_dagster_migrating(
+    global_vars=globals(),
+    migration_state=load_migration_state_from_yaml(Path(__file__).parent / "migration_state"),
+)
+
+# End timing for mark_as_dagster_migrating
+mark_as_dagster_end_time = time.time()
+mark_as_dagster_time = mark_as_dagster_end_time - mark_as_dagster_start_time
+
+# Calculate total time
+total_time = mark_as_dagster_end_time - import_start_time
diff --git a/examples/experimental/dagster-airlift/examples/perf-harness/perf_harness/cli.py b/examples/experimental/dagster-airlift/examples/perf-harness/perf_harness/cli.py
new file mode 100644
index 0000000000000..7dd3a2dc38180
--- /dev/null
+++ b/examples/experimental/dagster-airlift/examples/perf-harness/perf_harness/cli.py
@@ -0,0 +1,229 @@
+# ruff: noqa: T201
+import argparse
+import os
+import subprocess
+import time
+from contextlib import contextmanager
+from pathlib import Path
+from typing import Dict, Generator, List, Tuple
+
+import dagster._check as check
+from dagster import Definitions, RepositoryDefinition, build_sensor_context
+from dagster._core.test_utils import environ
+from dagster_airlift.core.airflow_instance import AirflowInstance
+from dagster_airlift.test.shared_fixtures import stand_up_airflow
+
+from perf_harness.shared.constants import CONSTANTS_FILE, get_perf_output_file
+from perf_harness.shared.utils import scaffold_migration_state
+
+MAKEFILE_DIR = Path(__file__).parent.parent
+DAGSTER_HOME = MAKEFILE_DIR / ".dagster_home"
+
+
+@contextmanager
+def modify_constants(num_dags, num_tasks) -> Generator[None, None, None]:
+    # Read the original content
+    with open(CONSTANTS_FILE, "r") as f:
+        original_content = f.read()
+
+    # Write new constants
+    modified_content = f"NUM_DAGS {num_dags}\nNUM_TASKS {num_tasks}\n"
+
+    # Write the modified content
+    with open(CONSTANTS_FILE, "w") as f:
+        f.write(modified_content)
+
+    try:
+        # Yield control back to the caller
+        yield
+    finally:
+        # Restore the original content
+        with open(CONSTANTS_FILE, "w") as f:
+            f.write(original_content)
+
+
+def main() -> None:
+    lines = []
+    parser = argparse.ArgumentParser(description="Performance scenario testing for airlift")
+    parser.add_argument("num_dags", type=int, help="Number of DAGs to generate")
+    parser.add_argument("num_tasks", type=int, help="Number of tasks per DAG")
+
+    args = parser.parse_args()
+
+    num_dags = check.int_param(args.num_dags, "num_dags")
+    num_tasks = check.int_param(args.num_tasks, "num_tasks")
+
+    with modify_constants(num_dags, num_tasks), environ({"DAGSTER_HOME": str(DAGSTER_HOME)}):
+        print("Scaffolding migration state...")
+        scaffold_migration_state(num_dags=num_dags, num_tasks=num_tasks, migration_state=True)
+
+        print("Importing airflow defs...")
+        from perf_harness.airflow_dags.dags import (
+            import_time,
+            mark_as_dagster_time,
+            total_time as total_load_time,
+        )
+
+        lines.append(f"Total airflow dags load time: {total_load_time:.4f} seconds\n")
+        lines.append(f"Airflow defs import time: {import_time:.4f} seconds\n")
+        lines.append(f"Mark as dagster migrating time: {mark_as_dagster_time:.4f} seconds\n")
+
+        print("Initializing airflow...")
+        # Take in as an argument the number of tasks and the number of dags.
+        # Create an ephemeral file where the results will be written, and assign it in shared.constants.py
+        # Stand up airflow.
+        # Stand up dagster at each stage (peer, observe, migrate), wiping .dagster_home in between.
+        # At each stage, time how long initial load takes
+        # Then, time how long subsequent load takes
+        # Run the test sensor graphql mutation and time how long it takes.
+        # Write all results to the ephemeral file.
+
+        airflow_setup_time = time.time()
+        subprocess.run(
+            ["make", "setup_local_env"], check=True, cwd=MAKEFILE_DIR, stdout=subprocess.DEVNULL
+        )
+        airflow_setup_completion_time = time.time()
+        lines.append(
+            f"Airflow setup time: {airflow_setup_completion_time - airflow_setup_time:.4f} seconds\n"
+        )
+
+        print("Standing up airflow...")
+        start_airflow_standup_time = time.time()
+        with stand_up_airflow(
+            env=os.environ,
+            airflow_cmd=["make", "run_airflow"],
+            cwd=MAKEFILE_DIR,
+            stdout_channel=subprocess.DEVNULL,
+        ):
+            module_name_to_defs_and_instance = {}
+            finished_airflow_standup_time = time.time()
+            lines.append(
+                f"Airflow standup time: {finished_airflow_standup_time - start_airflow_standup_time:.4f} seconds\n"
+            )
+            print("Loading peering defs...")
+            peered_defs_import_start_time = time.time()
+            from perf_harness.dagster_defs.peer import defs
+
+            peered_defs_import_end_time = time.time()
+            peered_defs_import_time = peered_defs_import_end_time - peered_defs_import_start_time
+            lines.append(f"Peered defs import time: {peered_defs_import_time:.4f} seconds\n")
+            from perf_harness.dagster_defs.peer import airflow_instance
+
+            module_name_to_defs_and_instance["peer"] = (defs, airflow_instance)
+
+            print("Loading observing defs...")
+            observe_defs_import_start_time = time.time()
+            from perf_harness.dagster_defs.observe import defs
+
+            observe_defs_import_end_time = time.time()
+            observe_defs_import_time = observe_defs_import_end_time - observe_defs_import_start_time
+            lines.append(f"Observed defs import time: {observe_defs_import_time:.4f} seconds\n")
+            from perf_harness.dagster_defs.observe import airflow_instance
+
+            module_name_to_defs_and_instance["observe"] = (defs, airflow_instance)
+
+            print("Loading migrating defs...")
+            migrate_defs_import_start_time = time.time()
+            from perf_harness.dagster_defs.migrate import defs
+
+            migrate_defs_import_end_time = time.time()
+            migrate_defs_import_time = migrate_defs_import_end_time - migrate_defs_import_start_time
+            lines.append(f"Migrate defs import time: {migrate_defs_import_time:.4f} seconds\n")
+            from perf_harness.dagster_defs.migrate import airflow_instance
+
+            module_name_to_defs_and_instance["migrate"] = (defs, airflow_instance)
+
+            print("Running performance tests across modules...")
+            lines.extend(
+                run_suite_for_defs(
+                    module_name_to_defs_and_instance=module_name_to_defs_and_instance,
+                    num_dags=num_dags,
+                )
+            )
+
+        with open(get_perf_output_file(), "w") as f:
+            f.writelines(lines)
+    print("Performance harness completed.")
+
+
+def run_suite_for_defs(
+    *,
+    module_name_to_defs_and_instance: Dict[str, Tuple[Definitions, AirflowInstance]],
+    num_dags: int,
+) -> List[str]:
+    lines = []
+    module_name_to_repo_def_and_instance: Dict[
+        str, Tuple[RepositoryDefinition, AirflowInstance]
+    ] = {}
+    for module_name, (defs, af_instance) in module_name_to_defs_and_instance.items():
+        defs_initial_load_time = time.time()
+        print(f"Loading cacheable assets for {module_name} defs...")
+        repo_def = defs.get_repository_def()
+        defs_initial_load_completion_time = time.time()
+        lines.append(
+            f"{module_name} defs initial load time: {defs_initial_load_completion_time - defs_initial_load_time:.4f} seconds\n"
+        )
+        module_name_to_repo_def_and_instance[module_name] = (repo_def, af_instance)
+
+    print("Running sensor ticks with no runs...")
+    for module_name, (repo_def, af_instance) in module_name_to_repo_def_and_instance.items():
+        sensor_def = repo_def.get_sensor_def("airflow_dag_status_sensor")
+        print(f"Running {module_name} sensor ticks...")
+        sensor_tick_no_runs_start_time = time.time()
+        sensor_def(build_sensor_context(repository_def=repo_def))
+        sensor_tick_no_runs_end_time = time.time()
+
+        lines.append(
+            f"{module_name} sensor tick with no runs time: {sensor_tick_no_runs_end_time - sensor_tick_no_runs_start_time:.4f} seconds\n"
+        )
+
+    print("Running dag once...")
+    run_id = af_instance.trigger_dag("dag_0")
+    af_instance.wait_for_run_completion("dag_0", run_id, timeout=400)
+    print("Dag run completed.")
+    print("Running sensor ticks with single run...")
+    for module_name, (repo_def, af_instance) in module_name_to_repo_def_and_instance.items():
+        sensor_def = repo_def.get_sensor_def("airflow_dag_status_sensor")
+        sensor_tick_with_single_run_start_time = time.time()
+        sensor_def(build_sensor_context(repository_def=repo_def))
+        sensor_tick_with_single_run_end_time = time.time()
+        lines.append(
+            f"{module_name} sensor tick with single run time: {sensor_tick_with_single_run_end_time - sensor_tick_with_single_run_start_time:.4f} seconds\n"
+        )
+    print("Deleting run...")
+    # Delete that run. Then add a run to every dag.
+    af_instance.delete_run("dag_0", run_id)
+    print("Running every dag...")
+    newly_added_runs = {}
+    for i in range(num_dags):
+        newly_added_runs[f"dag_{i}"] = af_instance.trigger_dag(f"dag_{i}")
+    completed_runs = []
+    while len(completed_runs) < num_dags:
+        for dag_id, run_id in newly_added_runs.items():
+            if run_id in completed_runs:
+                continue
+            run_state = af_instance.get_run_state(dag_id, run_id)
+            if run_state == "success":
+                completed_runs.append(run_id)
+                continue
+            if run_state == "failed":
+                raise Exception("A run failed.")
+    print("All runs completed.")
+    print("Running sensor ticks with all runs...")
+    for module_name, (repo_def, af_instance) in module_name_to_repo_def_and_instance.items():
+        sensor_def = repo_def.get_sensor_def("airflow_dag_status_sensor")
+        sensor_tick_with_all_runs_start_time = time.time()
+        sensor_def(build_sensor_context(repository_def=repo_def))
+        sensor_tick_with_all_runs_end_time = time.time()
+        lines.append(
+            f"{module_name} sensor tick with {num_dags} runs time: {sensor_tick_with_all_runs_end_time - sensor_tick_with_all_runs_start_time:.4f} seconds\n"
+        )
+    # Delete all runs.
+    for dag_id, run_id in newly_added_runs.items():
+        af_instance.delete_run(dag_id, run_id)
+    print("Deleted all runs.")
+    return lines
+
+
+if __name__ == "__main__":
+    main()
diff --git a/examples/experimental/dagster-airlift/examples/perf-harness/perf_harness/dagster_defs/__init__.py b/examples/experimental/dagster-airlift/examples/perf-harness/perf_harness/dagster_defs/__init__.py
new file mode 100644
index 0000000000000..e69de29bb2d1d
diff --git a/examples/experimental/dagster-airlift/examples/perf-harness/perf_harness/dagster_defs/constants.py b/examples/experimental/dagster-airlift/examples/perf-harness/perf_harness/dagster_defs/constants.py
new file mode 100644
index 0000000000000..0432c2f8ec72c
--- /dev/null
+++ b/examples/experimental/dagster-airlift/examples/perf-harness/perf_harness/dagster_defs/constants.py
@@ -0,0 +1,9 @@
+# Airflow instance running at localhost:8080
+
+
+AIRFLOW_BASE_URL = "http://localhost:8080"
+AIRFLOW_INSTANCE_NAME = "my_airflow_instance"
+
+# Authentication credentials (lol)
+USERNAME = "admin"
+PASSWORD = "admin"
diff --git a/examples/experimental/dagster-airlift/examples/perf-harness/perf_harness/dagster_defs/migrate.py b/examples/experimental/dagster-airlift/examples/perf-harness/perf_harness/dagster_defs/migrate.py
new file mode 100644
index 0000000000000..ee9458494011c
--- /dev/null
+++ b/examples/experimental/dagster-airlift/examples/perf-harness/perf_harness/dagster_defs/migrate.py
@@ -0,0 +1,44 @@
+from dagster import AssetsDefinition, Definitions, asset
+from dagster_airlift.core import (
+    AirflowInstance,
+    BasicAuthBackend,
+    build_defs_from_airflow_instance,
+    dag_defs,
+    task_defs,
+)
+
+from perf_harness.shared.constants import get_num_dags, get_num_tasks
+
+from .constants import AIRFLOW_BASE_URL, AIRFLOW_INSTANCE_NAME, PASSWORD, USERNAME
+
+airflow_instance = AirflowInstance(
+    auth_backend=BasicAuthBackend(
+        webserver_url=AIRFLOW_BASE_URL, username=USERNAME, password=PASSWORD
+    ),
+    name=AIRFLOW_INSTANCE_NAME,
+)
+
+
+def build_asset(key: str) -> AssetsDefinition:
+    @asset(key=key)
+    def asset_fn(_):
+        return key
+
+    return asset_fn
+
+
+defs = build_defs_from_airflow_instance(
+    airflow_instance=airflow_instance,
+    defs=Definitions.merge(
+        *[
+            dag_defs(
+                f"dag_{i}",
+                *[
+                    task_defs(f"task_{i}_{j}", Definitions(assets=[build_asset(f"asset_{i}_{j}")]))
+                    for j in range(get_num_tasks())
+                ],
+            )
+            for i in range(get_num_dags())
+        ]
+    ),
+)
diff --git a/examples/experimental/dagster-airlift/examples/perf-harness/perf_harness/dagster_defs/observe.py b/examples/experimental/dagster-airlift/examples/perf-harness/perf_harness/dagster_defs/observe.py
new file mode 100644
index 0000000000000..b30422fd8e1e0
--- /dev/null
+++ b/examples/experimental/dagster-airlift/examples/perf-harness/perf_harness/dagster_defs/observe.py
@@ -0,0 +1,37 @@
+from dagster import AssetSpec
+from dagster._core.definitions.definitions_class import Definitions
+from dagster_airlift.core import (
+    AirflowInstance,
+    BasicAuthBackend,
+    build_defs_from_airflow_instance,
+    dag_defs,
+    task_defs,
+)
+
+from perf_harness.shared.constants import get_num_dags, get_num_tasks
+
+from .constants import AIRFLOW_BASE_URL, AIRFLOW_INSTANCE_NAME, PASSWORD, USERNAME
+
+airflow_instance = AirflowInstance(
+    auth_backend=BasicAuthBackend(
+        webserver_url=AIRFLOW_BASE_URL, username=USERNAME, password=PASSWORD
+    ),
+    name=AIRFLOW_INSTANCE_NAME,
+)
+
+
+defs = build_defs_from_airflow_instance(
+    airflow_instance=airflow_instance,
+    defs=Definitions.merge(
+        *[
+            dag_defs(
+                f"dag_{i}",
+                *[
+                    task_defs(f"task_{i}_{j}", Definitions(assets=[AssetSpec(f"asset_{i}_{j}")]))
+                    for j in range(get_num_tasks())
+                ],
+            )
+            for i in range(get_num_dags())
+        ]
+    ),
+)
diff --git a/examples/experimental/dagster-airlift/examples/perf-harness/perf_harness/dagster_defs/peer.py b/examples/experimental/dagster-airlift/examples/perf-harness/perf_harness/dagster_defs/peer.py
new file mode 100644
index 0000000000000..7fe1153e80620
--- /dev/null
+++ b/examples/experimental/dagster-airlift/examples/perf-harness/perf_harness/dagster_defs/peer.py
@@ -0,0 +1,13 @@
+from dagster_airlift.core import AirflowInstance, BasicAuthBackend, build_defs_from_airflow_instance
+
+from .constants import AIRFLOW_BASE_URL, AIRFLOW_INSTANCE_NAME, PASSWORD, USERNAME
+
+airflow_instance = AirflowInstance(
+    auth_backend=BasicAuthBackend(
+        webserver_url=AIRFLOW_BASE_URL, username=USERNAME, password=PASSWORD
+    ),
+    name=AIRFLOW_INSTANCE_NAME,
+)
+
+
+defs = build_defs_from_airflow_instance(airflow_instance=airflow_instance)
diff --git a/examples/experimental/dagster-airlift/examples/perf-harness/perf_harness/shared/constants.py b/examples/experimental/dagster-airlift/examples/perf-harness/perf_harness/shared/constants.py
new file mode 100644
index 0000000000000..bcda8cf15df7b
--- /dev/null
+++ b/examples/experimental/dagster-airlift/examples/perf-harness/perf_harness/shared/constants.py
@@ -0,0 +1,33 @@
+from pathlib import Path
+from typing import Dict
+
+CONSTANTS_FILE = Path(__file__).parent / "constants.txt"
+
+
+def read_constants() -> Dict[str, int]:
+    constants = {}
+    with open(CONSTANTS_FILE, "r") as f:
+        for line in f.readlines():
+            key, value = line.strip().split(None, 1)
+            constants[key] = int(value)
+    return constants
+
+
+def get_num_dags() -> int:
+    constants = read_constants()
+    if "NUM_DAGS" not in constants:
+        raise ValueError("NUM_DAGS not found in constants file")
+    return constants["NUM_DAGS"]
+
+
+def get_num_tasks() -> int:
+    constants = read_constants()
+    if "NUM_TASKS" not in constants:
+        raise ValueError("NUM_TASKS not found in constants file")
+    return constants["NUM_TASKS"]
+
+
+def get_perf_output_file() -> Path:
+    pth = Path(__file__).parent / f"{get_num_dags()}_dags_{get_num_tasks()}_tasks_perf_output.txt"
+    pth.parent.mkdir(parents=True, exist_ok=True)
+    return pth
diff --git a/examples/experimental/dagster-airlift/examples/perf-harness/perf_harness/shared/constants.txt b/examples/experimental/dagster-airlift/examples/perf-harness/perf_harness/shared/constants.txt
new file mode 100644
index 0000000000000..39bc0f8d7f5a5
--- /dev/null
+++ b/examples/experimental/dagster-airlift/examples/perf-harness/perf_harness/shared/constants.txt
@@ -0,0 +1,2 @@
+NUM_DAGS 10
+NUM_TASKS 10
diff --git a/examples/experimental/dagster-airlift/examples/perf-harness/perf_harness/shared/utils.py b/examples/experimental/dagster-airlift/examples/perf-harness/perf_harness/shared/utils.py
new file mode 100644
index 0000000000000..c10cb27d2ffe8
--- /dev/null
+++ b/examples/experimental/dagster-airlift/examples/perf-harness/perf_harness/shared/utils.py
@@ -0,0 +1,22 @@
+from pathlib import Path
+
+import yaml
+
+target_dir = Path(__file__).parent.parent / "airflow_dags" / "migration_state"
+# Ensure the target directory exists
+target_dir.mkdir(parents=True, exist_ok=True)
+
+
+def scaffold_migration_state(num_dags: int, num_tasks: int, migration_state: bool):
+    # Ensure the target directory exists
+    target_dir.mkdir(parents=True, exist_ok=True)
+
+    for i in range(num_dags):
+        yaml_dict = {
+            "tasks": [
+                {"id": f"task_{i}_{j}", "migrated": migration_state} for j in range(num_tasks)
+            ],
+        }
+        # Write to a file dag_{i}.yaml
+        with open(target_dir / f"dag_{i}.yaml", "w") as f:
+            yaml.dump(yaml_dict, f)
diff --git a/examples/experimental/dagster-airlift/examples/perf-harness/perf_harness_tests/__init__.py b/examples/experimental/dagster-airlift/examples/perf-harness/perf_harness_tests/__init__.py
new file mode 100644
index 0000000000000..e69de29bb2d1d
diff --git a/examples/experimental/dagster-airlift/examples/perf-harness/perf_harness_tests/conftest.py b/examples/experimental/dagster-airlift/examples/perf-harness/perf_harness_tests/conftest.py
new file mode 100644
index 0000000000000..f33a6ce0bdfd4
--- /dev/null
+++ b/examples/experimental/dagster-airlift/examples/perf-harness/perf_harness_tests/conftest.py
@@ -0,0 +1,31 @@
+import os
+import subprocess
+from pathlib import Path
+from typing import Generator
+
+import pytest
+from dagster._core.test_utils import environ
+
+
+@pytest.fixture(name="local_env")
+def local_env_fixture() -> Generator[None, None, None]:
+    makefile_dir = Path(__file__).parent.parent
+    subprocess.run(["make", "setup_local_env"], cwd=makefile_dir, check=True)
+    with environ(
+        {
+            "AIRFLOW_HOME": str(makefile_dir / ".airflow_home"),
+            "DAGSTER_HOME": str(makefile_dir / ".dagster_home"),
+        }
+    ):
+        yield
+    subprocess.run(["make", "wipe"], cwd=makefile_dir, check=True)
+
+
+@pytest.fixture(name="dags_dir")
+def dags_dir_fixture() -> Path:
+    return Path(__file__).parent.parent / "perf_harness" / "airflow_dags"
+
+
+@pytest.fixture(name="airflow_home")
+def airflow_home_fixture(local_env: None) -> Path:
+    return Path(os.environ["AIRFLOW_HOME"])
diff --git a/examples/experimental/dagster-airlift/examples/perf-harness/perf_harness_tests/test_cli.py b/examples/experimental/dagster-airlift/examples/perf-harness/perf_harness_tests/test_cli.py
new file mode 100644
index 0000000000000..dc23f8f63d335
--- /dev/null
+++ b/examples/experimental/dagster-airlift/examples/perf-harness/perf_harness_tests/test_cli.py
@@ -0,0 +1,38 @@
+import subprocess
+from pathlib import Path
+
+expected_lines = [
+    "Total airflow dags load time",
+    "Airflow defs import time",
+    "Mark as dagster migrating time",
+    "Airflow setup time",
+    "Airflow standup time",
+    "Peered defs import time",
+    "Observed defs import time",
+    "Migrate defs import time",
+    "peer defs initial load time",
+    "observe defs initial load time",
+    "migrate defs initial load time",
+    "peer sensor tick with no runs time",
+    "observe sensor tick with no runs time",
+    "migrate sensor tick with no runs time",
+    "peer sensor tick with single run time",
+    "observe sensor tick with single run time",
+    "migrate sensor tick with single run time",
+    "peer sensor tick with 1 runs time",
+    "observe sensor tick with 1 runs time",
+    "migrate sensor tick with 1 runs time",
+]
+
+makefile_dir = Path(__file__).parent.parent
+expected_file = makefile_dir / "perf_harness" / "shared" / "1_dags_1_tasks_perf_output.txt"
+
+
+def test_cli() -> None:
+    """Test that the CLI can be run, and produces expected output."""
+    subprocess.call(
+        ["make", "run_perf_scenarios_test"], cwd=makefile_dir, stdout=subprocess.DEVNULL
+    )
+    assert expected_file.exists()
+    for i, line in enumerate(expected_file.read_text().split("\n")[:-1]):  # last line is empty
+        assert expected_lines[i] in line
diff --git a/examples/experimental/dagster-airlift/examples/perf-harness/perf_harness_tests/test_e2e.py b/examples/experimental/dagster-airlift/examples/perf-harness/perf_harness_tests/test_e2e.py
new file mode 100644
index 0000000000000..f378b778e13e7
--- /dev/null
+++ b/examples/experimental/dagster-airlift/examples/perf-harness/perf_harness_tests/test_e2e.py
@@ -0,0 +1,56 @@
+import os
+from datetime import timedelta
+
+import pytest
+from dagster import AssetKey, DagsterInstance
+from dagster._time import get_current_datetime
+from perf_harness.dagster_defs.peer import airflow_instance as af_instance
+
+
+@pytest.fixture(name="dagster_home")
+def dagster_home_fixture(local_env: None) -> str:
+    return os.environ["DAGSTER_HOME"]
+
+
+@pytest.fixture(name="dagster_defs_path")
+def dagster_defs_path_fixture(request) -> str:
+    return request.param
+
+
+@pytest.fixture(name="dagster_defs_type")
+def dagster_defs_type_fixture() -> str:
+    return "-m"
+
+
+@pytest.mark.parametrize(
+    "dagster_defs_path",
+    [
+        "perf_harness.dagster_defs.peer",
+        "perf_harness.dagster_defs.observe",
+        "perf_harness.dagster_defs.migrate",
+    ],
+    ids=["peer", "observe", "migrate"],
+    indirect=True,
+)
+def test_dagster_materializes(
+    airflow_instance: None, dagster_dev: None, dagster_home: str, dagster_defs_path: str
+) -> None:
+    """Test that assets can load properly, and that materializations register."""
+    run_id = af_instance.trigger_dag("dag_0")
+    af_instance.wait_for_run_completion(dag_id="dag_0", run_id=run_id)
+    dagster_instance = DagsterInstance.get()
+    start_time = get_current_datetime()
+    while get_current_datetime() - start_time < timedelta(seconds=30):
+        asset_materialization = dagster_instance.get_latest_materialization_event(
+            asset_key=AssetKey(["airflow_instance", "dag", "dag_0"])
+        )
+        if asset_materialization:
+            break
+
+    assert asset_materialization
+
+    if dagster_defs_path.endswith("observe") or dagster_defs_path.endswith("migrate"):
+        asset_materialization = dagster_instance.get_latest_materialization_event(
+            asset_key=AssetKey(["asset_0_0"])
+        )
+        assert asset_materialization
diff --git a/examples/experimental/dagster-airlift/examples/perf-harness/pyproject.toml b/examples/experimental/dagster-airlift/examples/perf-harness/pyproject.toml
new file mode 100644
index 0000000000000..6c1b81442f94d
--- /dev/null
+++ b/examples/experimental/dagster-airlift/examples/perf-harness/pyproject.toml
@@ -0,0 +1,6 @@
+[build-system]
+requires = ["setuptools"]
+build-backend = "setuptools.build_meta"
+
+[tool.dagster]
+module_name = "perf_harness.dagster_defs"
diff --git a/examples/experimental/dagster-airlift/examples/perf-harness/scripts/generate_yaml_files.py b/examples/experimental/dagster-airlift/examples/perf-harness/scripts/generate_yaml_files.py
new file mode 100644
index 0000000000000..2e2a48b00d6d5
--- /dev/null
+++ b/examples/experimental/dagster-airlift/examples/perf-harness/scripts/generate_yaml_files.py
@@ -0,0 +1,17 @@
+import argparse
+
+from perf_harness.shared.constants import get_num_dags, get_num_tasks
+from perf_harness.shared.utils import scaffold_migration_state
+
+# Set up argument parser
+parser = argparse.ArgumentParser(
+    description="Generate YAML files with a specified migration state."
+)
+parser.add_argument(
+    "migration_state", type=str, help="The migration state to use in the YAML files"
+)
+
+# Parse arguments
+args = parser.parse_args()
+
+scaffold_migration_state(get_num_dags(), get_num_tasks(), args.migration_state == "True")
diff --git a/examples/experimental/dagster-airlift/examples/perf-harness/setup.py b/examples/experimental/dagster-airlift/examples/perf-harness/setup.py
new file mode 100644
index 0000000000000..2fc60b1b2aa91
--- /dev/null
+++ b/examples/experimental/dagster-airlift/examples/perf-harness/setup.py
@@ -0,0 +1,19 @@
+from setuptools import find_packages, setup
+
+setup(
+    name="perf-harness",
+    packages=find_packages(),
+    install_requires=[
+        "dagster",
+        "dagster-webserver",
+        "dagster-airlift[dbt,core,in-airflow]",
+        "dbt-duckdb",
+        "pandas",
+    ],
+    extras_require={"test": ["pytest"]},
+    entry_points={
+        "console_scripts": [
+            "perf-harness=perf_harness.cli:main",
+        ],
+    },
+)
diff --git a/examples/experimental/dagster-airlift/examples/perf-harness/tox.ini b/examples/experimental/dagster-airlift/examples/perf-harness/tox.ini
new file mode 100644
index 0000000000000..7a9f981e5c434
--- /dev/null
+++ b/examples/experimental/dagster-airlift/examples/perf-harness/tox.ini
@@ -0,0 +1,29 @@
+[tox]
+skipsdist = true
+
+[testenv]
+download = True
+passenv =
+    CI_*
+    COVERALLS_REPO_TOKEN
+    BUILDKITE*
+install_command = uv pip install {opts} {packages}
+deps =
+  -e ../../../../../python_modules/dagster[test]
+  -e ../../../../../python_modules/dagster-webserver
+  -e ../../../../../python_modules/dagster-test
+  -e ../../../../../python_modules/dagster-pipes
+  -e ../../../../../python_modules/dagster-graphql
+  -e ../../../../../python_modules/libraries/dagster-dbt
+  -e ../../../dagster-airlift[core,dbt,test,in-airflow]
+  -e .
+  pandas
+allowlist_externals =
+  /bin/bash
+  uv
+  make
+commands =
+  # We need to rebuild the UI to ensure that the dagster-webserver can run
+  make -C ../../../../.. rebuild_ui
+  !windows: /bin/bash -c '! pip list --exclude-editable | grep -e dagster'
+  pytest -c ../../../../../pyproject.toml ./perf_harness_tests --snapshot-warn-unused -vv {posargs}
diff --git a/pyright/master/requirements-pinned.txt b/pyright/master/requirements-pinned.txt
index 405db28745440..3a510e95af024 100644
--- a/pyright/master/requirements-pinned.txt
+++ b/pyright/master/requirements-pinned.txt
@@ -182,6 +182,7 @@ dbt-common==1.3.0
 dbt-core==1.8.6
 dbt-duckdb==1.8.3
 -e examples/experimental/dagster-airlift/examples/dbt-example
+-e examples/experimental/dagster-airlift/examples/perf-harness
 dbt-extractor==0.5.1
 dbt-semantic-interfaces==0.5.1
 debugpy==1.8.5
diff --git a/pyright/master/requirements.txt b/pyright/master/requirements.txt
index 067db0e6d6ad0..48f70970da1eb 100644
--- a/pyright/master/requirements.txt
+++ b/pyright/master/requirements.txt
@@ -132,5 +132,6 @@ pendulum<3
 -e examples/experimental/dagster-blueprints
 -e examples/experimental/dagster-airlift[mwaa,dbt,test] # (includes airflow dependencies)
 -e examples/experimental/dagster-airlift/examples/dbt-example
+-e examples/experimental/dagster-airlift/examples/perf-harness
 -e examples/experimental/dagster-airlift/examples/tutorial-example
 -e examples/use_case_repository[dev]