Skip to content

Commit

Permalink
[dagster-airlift][rfc] perf harness (#24277)
Browse files Browse the repository at this point in the history
Create a harness for easily testing performance of airlift.
- shared file controls the number of dags and observe/migrated pieces
created.
- run make commands to run various perf scenarios.
## Example incantations
- make run_perf_10_10 (10 tasks with 10 dags)
- make run_perf_150_1 (150 dags, 1 task)
## Changelog
`NOCHANGELOG`

## How I tested this
An admittedly very slow unit test that runs live airflow, and flits
between peer observe and migrate steps, ensuring each can successfully
receive a materialization.
  • Loading branch information
dpeng817 authored Sep 13, 2024
1 parent 7c61ee0 commit 19f73c2
Show file tree
Hide file tree
Showing 33 changed files with 994 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -366,6 +366,12 @@ def k8s_extra_cmds(version: str, _) -> List[str]:
AvailablePythonVersion.V3_12,
],
),
PackageSpec(
"examples/experimental/dagster-airlift/examples/perf-harness",
unsupported_python_versions=[
AvailablePythonVersion.V3_12,
],
),
PackageSpec(
"examples/experimental/dagster-airlift/examples/tutorial-example",
unsupported_python_versions=[
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import datetime
import json
import time
from abc import ABC
from functools import cached_property
from typing import Any, Dict, List, Sequence
Expand Down Expand Up @@ -28,6 +29,7 @@
# This corresponds directly to the page_limit parameter on airflow's batch dag runs rest API.
# Airflow dag run batch API: https://airflow.apache.org/docs/apache-airflow/stable/stable-rest-api-ref.html#operation/get_dag_runs_batch
DEFAULT_BATCH_DAG_RUNS_LIMIT = 100
SLEEP_SECONDS = 1


class AirflowAuthBackend(ABC):
Expand Down Expand Up @@ -278,8 +280,14 @@ def wait_for_run_completion(self, dag_id: str, run_id: str, timeout: int = 30) -
dag_run = self.get_dag_run(dag_id, run_id)
if dag_run.finished:
return
time.sleep(
SLEEP_SECONDS
) # Sleep for a second before checking again. This way we don't flood the rest API with requests.
raise DagsterError(f"Timed out waiting for airflow run {run_id} to finish.")

def get_run_state(self, dag_id: str, run_id: str) -> str:
return self.get_dag_run(dag_id, run_id).state

@staticmethod
def timestamp_from_airflow_date(airflow_date: str) -> float:
try:
Expand All @@ -293,6 +301,16 @@ def timestamp_from_airflow_date(airflow_date: str) -> float:
def airflow_date_from_datetime(datetime: datetime.datetime) -> str:
return datetime.strftime("%Y-%m-%dT%H:%M:%S+00:00")

def delete_run(self, dag_id: str, run_id: str) -> None:
response = self.auth_backend.get_session().delete(
f"{self.get_api_url()}/dags/{dag_id}/dagRuns/{run_id}"
)
if response.status_code != 204:
raise DagsterError(
f"Failed to delete run for {dag_id}/{run_id}. Status code: {response.status_code}, Message: {response.text}"
)
return None


@record
class DagInfo:
Expand Down Expand Up @@ -385,7 +403,11 @@ def success(self) -> bool:

@property
def finished(self) -> bool:
return self.metadata["state"] in TERMINAL_STATES
return self.state in TERMINAL_STATES

@property
def state(self) -> str:
return self.metadata["state"]

@property
def run_type(self) -> str:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from typing import Optional

from dagster import Definitions
from dagster._utils.warnings import suppress_dagster_warnings

from dagster_airlift.core.sensor import (
DEFAULT_AIRFLOW_SENSOR_INTERVAL_SECONDS,
Expand All @@ -12,6 +13,7 @@
from .airflow_instance import AirflowInstance


@suppress_dagster_warnings
def build_defs_from_airflow_instance(
airflow_instance: AirflowInstance,
cache_polling_interval: int = DEFAULT_POLL_INTERVAL,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ def set_migration_state_for_dag_if_changed(


def get_migration_var_for_dag(dag_id: str) -> Optional[DagMigrationState]:
migration_var = Variable.get(f"{dag_id}_dagster_migration_state")
migration_var = Variable.get(f"{dag_id}_dagster_migration_state", None)
if not migration_var:
return None
return DagMigrationState.from_dict(json.loads(migration_var))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@
import signal
import subprocess
import time
from contextlib import contextmanager
from pathlib import Path
from tempfile import TemporaryDirectory
from typing import Any, Callable, Generator, Optional
from typing import Any, Callable, Generator, List, Optional

import mock
import pytest
Expand All @@ -18,7 +19,7 @@
# Sets up the airflow environment for testing. Running at localhost:8080.
# Callsites are expected to provide implementations for dags_dir fixture.
####################################################################################################
def _airflow_is_ready():
def _airflow_is_ready() -> bool:
try:
response = requests.get("http://localhost:8080")
return response.status_code == 200
Expand Down Expand Up @@ -58,13 +59,20 @@ def _reserialize_dags() -> None:
return _reserialize_dags


@pytest.fixture(name="airflow_instance")
def airflow_instance_fixture(setup: None) -> Generator[subprocess.Popen, None, None]:
@contextmanager
def stand_up_airflow(
env: Any = {},
airflow_cmd: List[str] = ["airflow", "standalone"],
cwd: Optional[Path] = None,
stdout_channel: Optional[int] = None,
) -> Generator[subprocess.Popen, None, None]:
process = subprocess.Popen(
["airflow", "standalone"],
env=os.environ, # since we have some temp vars in the env right now
airflow_cmd,
cwd=cwd,
env=env, # since we have some temp vars in the env right now
shell=False,
preexec_fn=os.setsid, # noqa # fuck it we ball
stdout=stdout_channel,
)
# Give airflow a second to stand up
time.sleep(5)
Expand All @@ -83,6 +91,12 @@ def airflow_instance_fixture(setup: None) -> Generator[subprocess.Popen, None, N
os.killpg(process.pid, signal.SIGKILL)


@pytest.fixture(name="airflow_instance")
def airflow_instance_fixture(setup: None) -> Generator[subprocess.Popen, None, None]:
with stand_up_airflow(env=os.environ) as process:
yield process


####################################################################################################
# DAGSTER SETUP FIXTURES
# Sets up the dagster environment for testing. Running at localhost:3333.
Expand All @@ -104,13 +118,21 @@ def setup_dagster_home() -> Generator[str, None, None]:
yield tmpdir


@pytest.fixture(name="dagster_defs_type")
def dagster_defs_file_type() -> str:
"""Return the type of file that contains the dagster definitions."""
return "-f"


@pytest.fixture(name="dagster_dev")
def setup_dagster(dagster_home: str, dagster_defs_path: str) -> Generator[Any, None, None]:
def setup_dagster(
dagster_home: str, dagster_defs_path: str, dagster_defs_type: str
) -> Generator[Any, None, None]:
"""Stands up a dagster instance using the dagster dev CLI. dagster_defs_path must be provided
by a fixture included in the callsite.
"""
process = subprocess.Popen(
["dagster", "dev", "-f", dagster_defs_path, "-p", "3333"],
["dagster", "dev", dagster_defs_type, dagster_defs_path, "-p", "3333"],
env=os.environ.copy(),
shell=False,
preexec_fn=os.setsid, # noqa
Expand Down Expand Up @@ -139,8 +161,8 @@ def setup_dagster(dagster_home: str, dagster_defs_path: str) -> Generator[Any, N
VAR_DICT = {}


def dummy_get_var(key: str) -> Optional[str]:
return VAR_DICT.get(key)
def dummy_get_var(key: str, default: Any) -> Optional[str]:
return VAR_DICT.get(key, default)


def dummy_set_var(key: str, value: str, session: Any) -> None:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ def dags_dir() -> Path:


@pytest.fixture(name="dagster_defs_path")
def dagster_defs_path() -> Path:
return Path(__file__).parent / "operator_test_project" / "dagster_defs.py"
def dagster_defs_path_fixture() -> str:
return str(Path(__file__).parent / "operator_test_project" / "dagster_defs.py")


def test_dagster_operator(airflow_instance: None, dagster_dev: None, dagster_home: str) -> None:
Expand Down

This file was deleted.

189 changes: 189 additions & 0 deletions examples/experimental/dagster-airlift/examples/perf-harness/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
.airflow_home
.dagster_home
customers.csv

# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class

# C extensions
*.so

# Distribution / packaging
.Python
env/
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
*.egg-info/
.installed.cfg
*.egg

# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec

# Installer logs
pip-log.txt
pip-delete-this-directory.txt

# Unit test / coverage reports
htmlcov/
.tox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
.hypothesis/
mlruns/

# Translations
*.mo
*.pot

# Django stuff:
*.log
local_settings.py

# Flask stuff:
.webassets-cache

# Scrapy stuff:
.scrapy

# Sphinx documentation
docs/_build/

# PyBuilder
target/

# Jupyter Notebook
.ipynb_checkpoints

# pyenv
.python-version

# celery beat schedule file
celerybeat-schedule

# SageMath parsed files
*.sage.py

# dotenv
.env
.envrc

# virtualenv
.direnv/
.venv
venv/
ENV/
Pipfile
Pipfile.lock

# Spyder project settings
.spyderproject
.spyproject

# Rope project settings
.ropeproject

# mkdocs documentation
/site

# ruff
.ruff_cache/

# mypy
.mypy_cache/

tags
!python_modules/dagster/dagster/_core/definitions/tags

.pytest_cache
.DS_Store

docs/_build
python_modules/dagster/docs/_build

dagit_run_logs

python_modules/libraries/dagster-aws/dagster_aws/ecs/config.yaml

python_modules/dagster-webserver/node_modules/
python_modules/dagster-webserver/yarn.lock

# old dagit stuff
python_modules/dagit/node_modules/
python_modules/dagit/yarn.lock
js_modules/dagit

# Gatsby stuff
docs/gatsby/**/node_modules/
docs/gatsby/**/_build
docs/gatsby/**/public
# Next stuff
docs/next/.mdx-data
docs/next/public/sitemap.xml
# Data
data
# Don't ignore data folders in examples
!examples/*/data
!examples/**/**/data

# Dask
dask-worker-space

# PyCharm IDE Config files
.idea/

# Codemod bookmarks
.codemod.bookmark

# Examples outputs
examples/docs_snippets/docs_snippets/**/**/output/
examples/docs_snippets/docs_snippets/**/**/output/
examples/**/**/example.db

# Telemetry instance id
.telemetry

test_results.xml

# GitHub Codespaces
pythonenv*/

# Vim project-local settings
.vim

# DuckDB
*.duckdb

# PyRight config
pyrightconfig*

# Scripts working directory
scripts/.build

# dbt .user files
.user.yml

# output files
**/**.o

perf_harness/airflow_dags/migration_state/*
**/**perf_output.txt
Loading

0 comments on commit 19f73c2

Please sign in to comment.