From 30c52d7d6347fb4d55f38344ad298034d1c13fd4 Mon Sep 17 00:00:00 2001 From: Alex Langenfeld Date: Wed, 6 Nov 2024 11:44:20 -0600 Subject: [PATCH] uv bump and use on airflow (#25766) * bump `uv` pin * enable `uv` for `airflow` `tox` setups * remove need to import `dagster_airflow_tests` by directly using `pytest.mark` * pin `pendulum` next to existing "temporary" `airflow` pin to < 2.8 --- .../dagster_buildkite/utils.py | 2 +- examples/with_airflow/tox.ini | 4 +--- .../dagster_airflow_tests/marks.py | 5 ----- .../test_dag_run_conf.py | 5 ++--- .../test_load_assets.py | 6 ++---- .../test_load_assets_airflow_2.py | 4 +--- .../test_load_dag_bag.py | 5 ++--- .../test_load_dag_bag_airflow_2.py | 7 +++---- .../test_dagster_operator.py | 4 +--- .../test_dependency_structure_translation.py | 19 +++++++++---------- .../test_load_connections.py | 6 ++---- .../test_load_dag_bag.py | 3 +-- .../test_load_dag_bag_airflow_2.py | 5 ++--- .../test_op_execution.py | 14 +++++++------- .../test_tags.py | 7 +++---- .../test_persistent_airflow_2_db.py | 12 +++++------- .../test_persistent_airflow_db.py | 8 +++----- .../libraries/dagster-airflow/setup.py | 3 ++- .../libraries/dagster-airflow/tox.ini | 4 +--- 19 files changed, 48 insertions(+), 75 deletions(-) delete mode 100644 python_modules/libraries/dagster-airflow/dagster_airflow_tests/marks.py diff --git a/.buildkite/dagster-buildkite/dagster_buildkite/utils.py b/.buildkite/dagster-buildkite/dagster_buildkite/utils.py index 485cde291d063..8df9738948e50 100644 --- a/.buildkite/dagster-buildkite/dagster_buildkite/utils.py +++ b/.buildkite/dagster-buildkite/dagster_buildkite/utils.py @@ -100,7 +100,7 @@ class GroupStep(TypedDict): BuildkiteLeafStep = Union[CommandStep, TriggerStep, WaitStep] BuildkiteTopLevelStep = Union[CommandStep, GroupStep] -UV_PIN = "uv==0.4.8" +UV_PIN = "uv==0.4.30" def is_command_step(step: BuildkiteStep) -> TypeGuard[CommandStep]: diff --git a/examples/with_airflow/tox.ini b/examples/with_airflow/tox.ini index 0542546b7d37c..f8f8d007d0391 100644 --- a/examples/with_airflow/tox.ini +++ b/examples/with_airflow/tox.ini @@ -7,9 +7,7 @@ passenv = CI_* COVERALLS_REPO_TOKEN BUILDKITE* -; uv has trouble with resolve -; https://buildkite.com/dagster/dagster-dagster/builds/76502#018dd221-e24c-40c2-a459-693bdb456f8f -; install_command = uv pip install {opts} {packages} +install_command = uv pip install {opts} {packages} deps = -e ../../python_modules/dagster[test] -e ../../python_modules/dagster-pipes diff --git a/python_modules/libraries/dagster-airflow/dagster_airflow_tests/marks.py b/python_modules/libraries/dagster-airflow/dagster_airflow_tests/marks.py deleted file mode 100644 index b3e72aa007921..0000000000000 --- a/python_modules/libraries/dagster-airflow/dagster_airflow_tests/marks.py +++ /dev/null @@ -1,5 +0,0 @@ -import pytest - -requires_local_db = pytest.mark.requires_local_db # requires airflow db (but not k8s) -requires_persistent_db = pytest.mark.requires_persistent_db # requires persistent airflow db -requires_no_db = pytest.mark.requires_no_db # requires no database diff --git a/python_modules/libraries/dagster-airflow/dagster_airflow_tests/test_dagster_job_factory/test_dag_run_conf.py b/python_modules/libraries/dagster-airflow/dagster_airflow_tests/test_dagster_job_factory/test_dag_run_conf.py index 1d6532e6c480f..e28c06677559a 100644 --- a/python_modules/libraries/dagster-airflow/dagster_airflow_tests/test_dagster_job_factory/test_dag_run_conf.py +++ b/python_modules/libraries/dagster-airflow/dagster_airflow_tests/test_dagster_job_factory/test_dag_run_conf.py @@ -1,11 +1,10 @@ import os import tempfile +import pytest from airflow.models import DagBag, Variable from dagster_airflow import make_dagster_job_from_airflow_dag, make_ephemeral_airflow_db_resource -from dagster_airflow_tests.marks import requires_local_db - DAG_RUN_CONF_DAG = """ from airflow import models @@ -30,7 +29,7 @@ def test_function(**kwargs): """ -@requires_local_db +@pytest.mark.requires_local_db def test_dag_run_conf_local() -> None: with tempfile.TemporaryDirectory() as dags_path: with open(os.path.join(dags_path, "dag.py"), "wb") as f: diff --git a/python_modules/libraries/dagster-airflow/dagster_airflow_tests/test_dagster_job_factory/test_load_assets.py b/python_modules/libraries/dagster-airflow/dagster_airflow_tests/test_dagster_job_factory/test_load_assets.py index 5d6512c2ef323..bca6dbd64a3a4 100644 --- a/python_modules/libraries/dagster-airflow/dagster_airflow_tests/test_dagster_job_factory/test_load_assets.py +++ b/python_modules/libraries/dagster-airflow/dagster_airflow_tests/test_dagster_job_factory/test_load_assets.py @@ -8,8 +8,6 @@ from dagster._check import CheckError from dagster_airflow import load_assets_from_airflow_dag, make_ephemeral_airflow_db_resource -from dagster_airflow_tests.marks import requires_local_db - ASSET_DAG = """ from airflow import models @@ -39,7 +37,7 @@ @pytest.mark.skipif(airflow_version >= "2.0.0", reason="requires airflow 1") -@requires_local_db +@pytest.mark.requires_local_db def test_load_assets_from_airflow_dag(): with tempfile.TemporaryDirectory(suffix="assets") as tmpdir_path: with open(os.path.join(tmpdir_path, "dag.py"), "wb") as f: @@ -82,7 +80,7 @@ def new_upstream_asset(): @pytest.mark.skipif(airflow_version >= "2.0.0", reason="requires airflow 1") -@requires_local_db +@pytest.mark.requires_local_db def test_load_assets_from_airflow_dag_multiple_tasks_per_asset(): with tempfile.TemporaryDirectory(suffix="assets") as tmpdir_path: with open(os.path.join(tmpdir_path, "dag.py"), "wb") as f: diff --git a/python_modules/libraries/dagster-airflow/dagster_airflow_tests/test_dagster_job_factory/test_load_assets_airflow_2.py b/python_modules/libraries/dagster-airflow/dagster_airflow_tests/test_dagster_job_factory/test_load_assets_airflow_2.py index 2ef640364e17c..b947e739f9831 100644 --- a/python_modules/libraries/dagster-airflow/dagster_airflow_tests/test_dagster_job_factory/test_load_assets_airflow_2.py +++ b/python_modules/libraries/dagster-airflow/dagster_airflow_tests/test_dagster_job_factory/test_load_assets_airflow_2.py @@ -7,8 +7,6 @@ from dagster import AssetKey, asset, materialize from dagster_airflow import load_assets_from_airflow_dag, make_ephemeral_airflow_db_resource -from dagster_airflow_tests.marks import requires_local_db - ASSET_DAG = """ from airflow import models @@ -39,7 +37,7 @@ @pytest.mark.skipif(airflow_version < "2.0.0", reason="requires airflow 2") -@requires_local_db +@pytest.mark.requires_local_db def test_load_assets_from_airflow_dag(): with tempfile.TemporaryDirectory(suffix="assets") as tmpdir_path: with open(os.path.join(tmpdir_path, "dag.py"), "wb") as f: diff --git a/python_modules/libraries/dagster-airflow/dagster_airflow_tests/test_dagster_job_factory/test_load_dag_bag.py b/python_modules/libraries/dagster-airflow/dagster_airflow_tests/test_dagster_job_factory/test_load_dag_bag.py index d73162a72662e..90e148f19b7af 100644 --- a/python_modules/libraries/dagster-airflow/dagster_airflow_tests/test_dagster_job_factory/test_load_dag_bag.py +++ b/python_modules/libraries/dagster-airflow/dagster_airflow_tests/test_dagster_job_factory/test_load_dag_bag.py @@ -11,7 +11,6 @@ ) from dagster_airflow_tests.airflow_utils import test_make_from_dagbag_inputs -from dagster_airflow_tests.marks import requires_local_db @pytest.mark.skipif(airflow_version >= "2.0.0", reason="requires airflow 1") @@ -104,7 +103,7 @@ def test_make_definitions( "expected_job_names, exclude_from_execution_tests", test_airflow_example_dags_inputs, ) -@requires_local_db +@pytest.mark.requires_local_db def test_airflow_example_dags( expected_job_names, exclude_from_execution_tests, @@ -147,7 +146,7 @@ def test_airflow_example_dags( @pytest.mark.skipif(airflow_version >= "2.0.0", reason="requires airflow 1") -@requires_local_db +@pytest.mark.requires_local_db def test_retry_conversion(): with tempfile.TemporaryDirectory(suffix="retries") as tmpdir_path: with open(os.path.join(tmpdir_path, "dag.py"), "wb") as f: diff --git a/python_modules/libraries/dagster-airflow/dagster_airflow_tests/test_dagster_job_factory/test_load_dag_bag_airflow_2.py b/python_modules/libraries/dagster-airflow/dagster_airflow_tests/test_dagster_job_factory/test_load_dag_bag_airflow_2.py index e14fe9e8b04d8..f279f4cd910b7 100644 --- a/python_modules/libraries/dagster-airflow/dagster_airflow_tests/test_dagster_job_factory/test_load_dag_bag_airflow_2.py +++ b/python_modules/libraries/dagster-airflow/dagster_airflow_tests/test_dagster_job_factory/test_load_dag_bag_airflow_2.py @@ -11,7 +11,6 @@ ) from dagster_airflow_tests.airflow_utils import test_make_from_dagbag_inputs_airflow_2 -from dagster_airflow_tests.marks import requires_local_db, requires_no_db @pytest.mark.skipif(airflow_version < "2.0.0", reason="requires airflow 2") @@ -19,7 +18,7 @@ "path_and_content_tuples, fn_arg_path, expected_job_names", test_make_from_dagbag_inputs_airflow_2, ) -@requires_no_db +@pytest.mark.requires_no_db def test_make_definition( path_and_content_tuples, fn_arg_path, @@ -94,7 +93,7 @@ def get_examples_airflow_repo_params(): "job_name, exclude_from_execution_tests", get_examples_airflow_repo_params(), ) -@requires_local_db +@pytest.mark.requires_local_db def test_airflow_example_dags( airflow_examples_repo, job_name, @@ -132,7 +131,7 @@ def test_airflow_example_dags( @pytest.mark.skipif(airflow_version < "2.0.0", reason="requires airflow 2") -@requires_local_db +@pytest.mark.requires_local_db def test_retry_conversion(): with tempfile.TemporaryDirectory(suffix="retries") as tmpdir_path: with open(os.path.join(tmpdir_path, "dag.py"), "wb") as f: diff --git a/python_modules/libraries/dagster-airflow/dagster_airflow_tests/test_dagster_operator.py b/python_modules/libraries/dagster-airflow/dagster_airflow_tests/test_dagster_operator.py index d0907a86a69c1..c4bdfdb370f71 100644 --- a/python_modules/libraries/dagster-airflow/dagster_airflow_tests/test_dagster_operator.py +++ b/python_modules/libraries/dagster-airflow/dagster_airflow_tests/test_dagster_operator.py @@ -11,8 +11,6 @@ from airflow.models import Connection, TaskInstance from dagster_airflow import DagsterCloudOperator -from dagster_airflow_tests.marks import requires_local_db - if airflow_version >= "2.0.0": from airflow.utils.state import DagRunState, TaskInstanceState from airflow.utils.types import DagRunType @@ -35,7 +33,7 @@ ) -@requires_local_db +@pytest.mark.requires_local_db class TestDagsterOperator(unittest.TestCase): @mock.patch("dagster_airflow.hooks.dagster_hook.DagsterHook.launch_run", return_value="run_id") @mock.patch("dagster_airflow.hooks.dagster_hook.DagsterHook.wait_for_run") diff --git a/python_modules/libraries/dagster-airflow/dagster_airflow_tests/test_dagster_pipeline_factory/test_dependency_structure_translation.py b/python_modules/libraries/dagster-airflow/dagster_airflow_tests/test_dagster_pipeline_factory/test_dependency_structure_translation.py index d26504569f429..4f5ac624445f4 100644 --- a/python_modules/libraries/dagster-airflow/dagster_airflow_tests/test_dagster_pipeline_factory/test_dependency_structure_translation.py +++ b/python_modules/libraries/dagster-airflow/dagster_airflow_tests/test_dagster_pipeline_factory/test_dependency_structure_translation.py @@ -1,3 +1,4 @@ +import pytest from airflow import __version__ as airflow_version from airflow.models.dag import DAG from airflow.operators.dummy_operator import DummyOperator # type: ignore @@ -13,15 +14,13 @@ from dagster._serdes import serialize_pp from dagster_airflow.dagster_job_factory import make_dagster_job_from_airflow_dag -from dagster_airflow_tests.marks import requires_no_db - default_args = { "owner": "dagster", "start_date": days_ago(1), } -@requires_no_db +@pytest.mark.requires_no_db def test_one_task_dag(snapshot): if airflow_version >= "2.0.0": dag = DAG( @@ -47,7 +46,7 @@ def test_one_task_dag(snapshot): ) -@requires_no_db +@pytest.mark.requires_no_db def test_two_task_dag_no_dep(snapshot): if airflow_version >= "2.0.0": dag = DAG( @@ -77,7 +76,7 @@ def test_two_task_dag_no_dep(snapshot): ) -@requires_no_db +@pytest.mark.requires_no_db def test_two_task_dag_with_dep(snapshot): if airflow_version >= "2.0.0": dag = DAG( @@ -109,7 +108,7 @@ def test_two_task_dag_with_dep(snapshot): ) -@requires_no_db +@pytest.mark.requires_no_db def test_diamond_task_dag(snapshot): if airflow_version >= "2.0.0": dag = DAG( @@ -151,7 +150,7 @@ def test_diamond_task_dag(snapshot): ) -@requires_no_db +@pytest.mark.requires_no_db def test_multi_root_dag(snapshot): if airflow_version >= "2.0.0": dag = DAG( @@ -193,7 +192,7 @@ def test_multi_root_dag(snapshot): ) -@requires_no_db +@pytest.mark.requires_no_db def test_multi_leaf_dag(snapshot): if airflow_version >= "2.0.0": dag = DAG( @@ -234,7 +233,7 @@ def test_multi_leaf_dag(snapshot): ) -@requires_no_db +@pytest.mark.requires_no_db def test_complex_dag(snapshot): if airflow_version >= "2.0.0": dag = DAG( @@ -489,7 +488,7 @@ def test_complex_dag(snapshot): ) -@requires_no_db +@pytest.mark.requires_no_db def test_one_task_dag_to_job(): if airflow_version >= "2.0.0": dag = DAG( diff --git a/python_modules/libraries/dagster-airflow/dagster_airflow_tests/test_dagster_pipeline_factory/test_load_connections.py b/python_modules/libraries/dagster-airflow/dagster_airflow_tests/test_dagster_pipeline_factory/test_load_connections.py index 7dcce4d95f50e..b4f9a1e3627eb 100644 --- a/python_modules/libraries/dagster-airflow/dagster_airflow_tests/test_dagster_pipeline_factory/test_load_connections.py +++ b/python_modules/libraries/dagster-airflow/dagster_airflow_tests/test_dagster_pipeline_factory/test_load_connections.py @@ -9,8 +9,6 @@ from airflow.models import Connection from dagster_airflow import make_dagster_definitions_from_airflow_dags_path -from dagster_airflow_tests.marks import requires_local_db - LOAD_CONNECTION_DAG_FILE_AIRFLOW_2_CONTENTS = """ import pendulum from airflow import DAG @@ -48,7 +46,7 @@ @pytest.mark.skipif(airflow_version < "2.0.0", reason="requires airflow 2") -@requires_local_db +@pytest.mark.requires_local_db class TestConnectionsAirflow2(unittest.TestCase): @mock.patch("dagster_airflow.hooks.dagster_hook.DagsterHook.launch_run", return_value="run_id") @mock.patch("dagster_airflow.hooks.dagster_hook.DagsterHook.wait_for_run") @@ -111,7 +109,7 @@ def test_ingest_airflow_dags_with_connections(self, launch_run, wait_for_run): @pytest.mark.skipif(airflow_version >= "2.0.0", reason="requires airflow 1") -@requires_local_db +@pytest.mark.requires_local_db class TestConnectionsAirflow1(unittest.TestCase): @mock.patch("dagster_airflow.hooks.dagster_hook.DagsterHook.launch_run", return_value="run_id") @mock.patch("dagster_airflow.hooks.dagster_hook.DagsterHook.wait_for_run") diff --git a/python_modules/libraries/dagster-airflow/dagster_airflow_tests/test_dagster_pipeline_factory/test_load_dag_bag.py b/python_modules/libraries/dagster-airflow/dagster_airflow_tests/test_dagster_pipeline_factory/test_load_dag_bag.py index d66a5b03e532d..3841fd5fcdb2d 100644 --- a/python_modules/libraries/dagster-airflow/dagster_airflow_tests/test_dagster_pipeline_factory/test_load_dag_bag.py +++ b/python_modules/libraries/dagster-airflow/dagster_airflow_tests/test_dagster_pipeline_factory/test_load_dag_bag.py @@ -9,7 +9,6 @@ ) from dagster_airflow_tests.airflow_utils import test_make_from_dagbag_inputs -from dagster_airflow_tests.marks import requires_local_db @pytest.mark.skipif(airflow_version >= "2.0.0", reason="requires airflow 1") @@ -100,7 +99,7 @@ def test_make_repo( "expected_job_names, exclude_from_execution_tests", test_airflow_example_dags_inputs, ) -@requires_local_db +@pytest.mark.requires_local_db def test_airflow_example_dags( expected_job_names, exclude_from_execution_tests, diff --git a/python_modules/libraries/dagster-airflow/dagster_airflow_tests/test_dagster_pipeline_factory/test_load_dag_bag_airflow_2.py b/python_modules/libraries/dagster-airflow/dagster_airflow_tests/test_dagster_pipeline_factory/test_load_dag_bag_airflow_2.py index 5b3711a7d8796..10d79b1a92105 100644 --- a/python_modules/libraries/dagster-airflow/dagster_airflow_tests/test_dagster_pipeline_factory/test_load_dag_bag_airflow_2.py +++ b/python_modules/libraries/dagster-airflow/dagster_airflow_tests/test_dagster_pipeline_factory/test_load_dag_bag_airflow_2.py @@ -9,7 +9,6 @@ ) from dagster_airflow_tests.airflow_utils import test_make_from_dagbag_inputs_airflow_2 -from dagster_airflow_tests.marks import requires_local_db, requires_no_db @pytest.mark.skipif(airflow_version < "2.0.0", reason="requires airflow 2") @@ -17,7 +16,7 @@ "path_and_content_tuples, fn_arg_path, expected_job_names", test_make_from_dagbag_inputs_airflow_2, ) -@requires_no_db +@pytest.mark.requires_no_db def test_make_repo( path_and_content_tuples, fn_arg_path, @@ -93,7 +92,7 @@ def get_examples_airflow_repo_params(): "job_name, exclude_from_execution_tests", get_examples_airflow_repo_params(), ) -@requires_local_db +@pytest.mark.requires_local_db def test_airflow_example_dags( airflow_examples_repo, job_name, diff --git a/python_modules/libraries/dagster-airflow/dagster_airflow_tests/test_dagster_pipeline_factory/test_op_execution.py b/python_modules/libraries/dagster-airflow/dagster_airflow_tests/test_dagster_pipeline_factory/test_op_execution.py index ea17aed5d0178..9a991e51a7666 100644 --- a/python_modules/libraries/dagster-airflow/dagster_airflow_tests/test_dagster_pipeline_factory/test_op_execution.py +++ b/python_modules/libraries/dagster-airflow/dagster_airflow_tests/test_dagster_pipeline_factory/test_op_execution.py @@ -2,6 +2,8 @@ import os from unittest import mock +import pytest + # We ignore type errors in several places because we are importing in such a way as to be # compatible with both versions 1.x and 2.x of airflow. This means importing from places that are # not the blessed API of the latest version, which raises pyright "not exported" errors. @@ -24,8 +26,6 @@ from dagster._time import get_current_datetime from dagster_airflow import make_dagster_job_from_airflow_dag -from dagster_airflow_tests.marks import requires_no_db - default_args = { "owner": "dagster", "start_date": days_ago(1), @@ -35,7 +35,7 @@ # Airflow DAG ids and Task ids allow a larger valid character set (alphanumeric characters, # dashes, dots and underscores) than Dagster's naming conventions (alphanumeric characters, # underscores), so Dagster will strip invalid characters and replace with '_' -@requires_no_db +@pytest.mark.requires_no_db def test_normalize_name(): if airflow_version >= "2.0.0": dag = DAG( @@ -67,7 +67,7 @@ def test_normalize_name(): # Test names with 250 characters, Airflow's max allowed length -@requires_no_db +@pytest.mark.requires_no_db def test_long_name(): dag_name = "dag-with.dot-dash-lo00ong" * 10 if airflow_version >= "2.0.0": @@ -107,7 +107,7 @@ def test_long_name(): ) -@requires_no_db +@pytest.mark.requires_no_db def test_one_task_dag(): if airflow_version >= "2.0.0": dag = DAG( @@ -138,7 +138,7 @@ def normalize_file_content(s): return "\n".join([line for line in s.replace(os.linesep, "\n").split("\n") if line]) -@requires_no_db +@pytest.mark.requires_no_db def test_template_task_dag(tmpdir): if airflow_version >= "2.0.0": dag = DAG( @@ -234,7 +234,7 @@ def intercept_spark_submit(*_args, **_kwargs): return m -@requires_no_db +@pytest.mark.requires_no_db @mock.patch("subprocess.Popen", side_effect=intercept_spark_submit) def test_spark_dag(mock_subproc_popen): # Hack to get around having a Connection diff --git a/python_modules/libraries/dagster-airflow/dagster_airflow_tests/test_dagster_pipeline_factory/test_tags.py b/python_modules/libraries/dagster-airflow/dagster_airflow_tests/test_dagster_pipeline_factory/test_tags.py index e6e2cd7ad3b22..7e53c419b430f 100644 --- a/python_modules/libraries/dagster-airflow/dagster_airflow_tests/test_dagster_pipeline_factory/test_tags.py +++ b/python_modules/libraries/dagster-airflow/dagster_airflow_tests/test_dagster_pipeline_factory/test_tags.py @@ -1,6 +1,7 @@ import datetime import os +import pytest from airflow import __version__ as airflow_version from airflow.models.dag import DAG from airflow.operators.bash_operator import BashOperator # type: ignore @@ -16,8 +17,6 @@ from dagster._time import get_current_datetime from dagster_airflow import make_dagster_job_from_airflow_dag -from dagster_airflow_tests.marks import requires_no_db - default_args = { "owner": "dagster", "start_date": days_ago(10), @@ -87,7 +86,7 @@ def get_dag(): return dag -@requires_no_db +@pytest.mark.requires_no_db def test_job_tags(): dag = get_dag() @@ -108,7 +107,7 @@ def test_job_tags(): check_captured_logs(manager, result, EXECUTION_DATE_MINUS_WEEK.strftime("%Y-%m-%d")) -@requires_no_db +@pytest.mark.requires_no_db def test_job_auto_tag(): dag = get_dag() diff --git a/python_modules/libraries/dagster-airflow/dagster_airflow_tests/test_persistent_db/test_persistent_airflow_2_db.py b/python_modules/libraries/dagster-airflow/dagster_airflow_tests/test_persistent_db/test_persistent_airflow_2_db.py index 6d98c35f3ce7c..b46101d0f746c 100644 --- a/python_modules/libraries/dagster-airflow/dagster_airflow_tests/test_persistent_db/test_persistent_airflow_2_db.py +++ b/python_modules/libraries/dagster-airflow/dagster_airflow_tests/test_persistent_db/test_persistent_airflow_2_db.py @@ -23,8 +23,6 @@ make_persistent_airflow_db_resource, ) -from dagster_airflow_tests.marks import requires_persistent_db - RETRY_DAG = """ from airflow import models @@ -64,7 +62,7 @@ def reconstruct_retry_job(postgres_airflow_db: str, dags_path: str, *_args) -> J @pytest.mark.skipif(airflow_version < "2.0.0", reason="requires airflow 2") -@requires_persistent_db +@pytest.mark.requires_persistent_db def test_retry_from_failure(instance: DagsterInstance, postgres_airflow_db: str): with tempfile.TemporaryDirectory() as dags_path: with open(os.path.join(dags_path, "dag.py"), "wb") as f: @@ -126,7 +124,7 @@ def test_function(**kwargs): @pytest.mark.skipif(airflow_version < "2.0.0", reason="requires airflow 2") -@requires_persistent_db +@pytest.mark.requires_persistent_db def test_pools(postgres_airflow_db: str): with tempfile.TemporaryDirectory() as dags_path: with open(os.path.join(dags_path, "dag.py"), "wb") as f: @@ -179,7 +177,7 @@ def test_function(**kwargs): @pytest.mark.skipif(airflow_version < "2.0.0", reason="requires airflow 2") -@requires_persistent_db +@pytest.mark.requires_persistent_db def test_prev_execution_date(postgres_airflow_db: str): with tempfile.TemporaryDirectory() as dags_path: with open(os.path.join(dags_path, "dag.py"), "wb") as f: @@ -246,7 +244,7 @@ def get_examples_airflow_repo_params() -> List[ParameterSet]: "job_name, exclude_from_execution_tests", get_examples_airflow_repo_params(), ) -@requires_persistent_db +@pytest.mark.requires_persistent_db def test_airflow_example_dags_persistent_db( airflow_examples_repo: RepositoryDefinition, job_name: str, @@ -291,7 +289,7 @@ def test_function(**kwargs): @pytest.mark.skipif(airflow_version < "2.0.0", reason="requires airflow 2") -@requires_persistent_db +@pytest.mark.requires_persistent_db def test_dag_run_conf_persistent(postgres_airflow_db: str) -> None: with tempfile.TemporaryDirectory() as dags_path: with open(os.path.join(dags_path, "dag.py"), "wb") as f: diff --git a/python_modules/libraries/dagster-airflow/dagster_airflow_tests/test_persistent_db/test_persistent_airflow_db.py b/python_modules/libraries/dagster-airflow/dagster_airflow_tests/test_persistent_db/test_persistent_airflow_db.py index fd90da7e814d7..b699761659809 100644 --- a/python_modules/libraries/dagster-airflow/dagster_airflow_tests/test_persistent_db/test_persistent_airflow_db.py +++ b/python_modules/libraries/dagster-airflow/dagster_airflow_tests/test_persistent_db/test_persistent_airflow_db.py @@ -19,8 +19,6 @@ make_persistent_airflow_db_resource, ) -from dagster_airflow_tests.marks import requires_persistent_db - RETRY_DAG = """ from airflow import models @@ -62,7 +60,7 @@ def reconstruct_retry_job(postgres_airflow_db: str, dags_path: str, *_args) -> J @pytest.mark.skipif(airflow_version >= "2.0.0", reason="requires airflow 1") -@requires_persistent_db +@pytest.mark.requires_persistent_db def test_retry_from_failure(instance: DagsterInstance, postgres_airflow_db: str) -> None: with tempfile.TemporaryDirectory() as dags_path: with open(os.path.join(dags_path, "dag.py"), "wb") as f: @@ -122,7 +120,7 @@ def test_function(**kwargs): @pytest.mark.skipif(airflow_version >= "2.0.0", reason="requires airflow 1") -@requires_persistent_db +@pytest.mark.requires_persistent_db def test_prev_execution_date(postgres_airflow_db: str) -> None: with tempfile.TemporaryDirectory() as dags_path: with open(os.path.join(dags_path, "dag.py"), "wb") as f: @@ -170,7 +168,7 @@ def test_function(**kwargs): @pytest.mark.skipif(airflow_version >= "2.0.0", reason="requires airflow 1") -@requires_persistent_db +@pytest.mark.requires_persistent_db def test_dag_run_conf_persistent(postgres_airflow_db: str) -> None: with tempfile.TemporaryDirectory() as dags_path: with open(os.path.join(dags_path, "dag.py"), "wb") as f: diff --git a/python_modules/libraries/dagster-airflow/setup.py b/python_modules/libraries/dagster-airflow/setup.py index 5c9c70585e34d..21ef759388b49 100644 --- a/python_modules/libraries/dagster-airflow/setup.py +++ b/python_modules/libraries/dagster-airflow/setup.py @@ -44,7 +44,8 @@ def get_version() -> str: extras_require={ "kubernetes": ["kubernetes>=3.0.0", "cryptography>=2.0.0"], "test_airflow_2": [ - "apache-airflow>=2.0.0,<2.8", + "apache-airflow>=2.0.0,<2.8", # 2.8+ airflow breaks a bunch of tests + "pendulum<3.0.0", # sub 2.8 blows up on pendulum 3 "boto3>=1.26.7", # Flask-session 0.6 is incompatible with certain airflow-provided test # utilities. diff --git a/python_modules/libraries/dagster-airflow/tox.ini b/python_modules/libraries/dagster-airflow/tox.ini index cd2443edcc0b3..334e27fea41b0 100644 --- a/python_modules/libraries/dagster-airflow/tox.ini +++ b/python_modules/libraries/dagster-airflow/tox.ini @@ -18,9 +18,7 @@ passenv = KUBECONFIG, POSTGRES_TEST_DB_HOST -; mix of issues prevent use of uv -; install_command = uv pip install {opts} {packages} - +install_command = uv pip install {opts} {packages} deps = -e ../../dagster[test] -e ../../dagster-pipes