diff --git a/python_modules/dagster-test/dagster_test/toys/auto_materializing/large_graph.py b/python_modules/dagster-test/dagster_test/toys/auto_materializing/large_graph.py index 6449ba0eed8b6..3a8626841e750 100644 --- a/python_modules/dagster-test/dagster_test/toys/auto_materializing/large_graph.py +++ b/python_modules/dagster-test/dagster_test/toys/auto_materializing/large_graph.py @@ -3,7 +3,7 @@ from dagster import ( AssetsDefinition, - AutoMaterializePolicy, + AutomationCondition, DailyPartitionsDefinition, HourlyPartitionsDefinition, PartitionsDefinition, @@ -11,6 +11,7 @@ asset, repository, ) +from dagster._core.definitions.auto_materialize_policy import AutoMaterializePolicy class AssetLayerConfig(NamedTuple): @@ -22,7 +23,7 @@ class AssetLayerConfig(NamedTuple): def build_assets( id: str, layer_configs: Sequence[AssetLayerConfig], - auto_materialize_policy: AutoMaterializePolicy = AutoMaterializePolicy.eager(), + automation_condition: AutomationCondition, ) -> List[AssetsDefinition]: layers = [] @@ -44,7 +45,7 @@ def build_assets( @asset( partitions_def=layer_config.partitions_def, name=f"{id}_{len(layers)}_{i}", - auto_materialize_policy=auto_materialize_policy, + automation_condition=automation_condition, non_argument_deps=non_argument_deps, ) def _asset(): @@ -74,6 +75,7 @@ def auto_materialize_large_time_graph(): AssetLayerConfig(n_assets=100, n_upstreams_per_asset=4, partitions_def=daily), AssetLayerConfig(n_assets=100, n_upstreams_per_asset=4, partitions_def=daily), ], + automation_condition=AutoMaterializePolicy.eager().to_automation_condition(), ) @@ -88,4 +90,5 @@ def auto_materialize_large_static_graph(): AssetLayerConfig(n_assets=100, n_upstreams_per_asset=4, partitions_def=static), AssetLayerConfig(n_assets=100, n_upstreams_per_asset=4, partitions_def=None), ], + automation_condition=AutoMaterializePolicy.eager().to_automation_condition(), ) diff --git a/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/daemon_tests/definitions/massive_user_code.py b/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/daemon_tests/definitions/massive_user_code.py new file mode 100644 index 0000000000000..0818b233f31a2 --- /dev/null +++ b/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/daemon_tests/definitions/massive_user_code.py @@ -0,0 +1,35 @@ +from dagster import ( + AutomationCondition, + AutomationConditionSensorDefinition, + DailyPartitionsDefinition, + Definitions, + HourlyPartitionsDefinition, +) +from dagster_test.toys.auto_materializing.large_graph import AssetLayerConfig, build_assets + + +def get_defs(n: int) -> Definitions: + hourly_partitions_def = HourlyPartitionsDefinition("2020-01-01-00:00") + daily_partitions_def = DailyPartitionsDefinition("2020-01-01") + unit = n // 10 + assets = build_assets( + id="perf_test", + layer_configs=[ + AssetLayerConfig(1 * unit, 0, hourly_partitions_def), + AssetLayerConfig(2 * unit, 2, hourly_partitions_def), + AssetLayerConfig(2 * unit, 4, hourly_partitions_def), + AssetLayerConfig(2 * unit, 4, daily_partitions_def), + AssetLayerConfig(2 * unit, 2, daily_partitions_def), + AssetLayerConfig(1 * unit, 2, daily_partitions_def), + ], + automation_condition=AutomationCondition.eager(), + ) + return Definitions( + assets=assets, + sensors=[ + AutomationConditionSensorDefinition("the_sensor", asset_selection="*", user_code=True) + ], + ) + + +defs = get_defs(700) diff --git a/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/daemon_tests/test_e2e.py b/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/daemon_tests/test_e2e.py index ccb6e334b5904..d6e1730ca41c5 100644 --- a/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/daemon_tests/test_e2e.py +++ b/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/daemon_tests/test_e2e.py @@ -1,6 +1,7 @@ import datetime import os import sys +import time from contextlib import contextmanager from typing import AbstractSet, Mapping, Sequence, cast @@ -27,6 +28,7 @@ from dagster._core.types.loadable_target_origin import LoadableTargetOrigin from dagster._core.utils import InheritContextThreadPoolExecutor from dagster._core.workspace.context import WorkspaceProcessContext, WorkspaceRequestContext +from dagster._core.workspace.load_target import GrpcServerTarget from dagster._daemon.asset_daemon import ( AssetDaemon, asset_daemon_cursor_from_instigator_serialized_cursor, @@ -34,18 +36,23 @@ from dagster._daemon.backfill import execute_backfill_iteration from dagster._daemon.daemon import get_default_daemon_logger from dagster._daemon.sensor import execute_sensor_iteration +from dagster._grpc.server import GrpcServerProcess from dagster._time import get_current_datetime +def get_loadable_target_origin(filename: str) -> LoadableTargetOrigin: + return LoadableTargetOrigin( + executable_path=sys.executable, + module_name=( + f"dagster_tests.definitions_tests.declarative_automation_tests.daemon_tests.definitions.{filename}" + ), + working_directory=os.getcwd(), + ) + + def get_code_location_origin(filename: str) -> InProcessCodeLocationOrigin: return InProcessCodeLocationOrigin( - loadable_target_origin=LoadableTargetOrigin( - executable_path=sys.executable, - module_name=( - f"dagster_tests.definitions_tests.declarative_automation_tests.daemon_tests.definitions.{filename}" - ), - working_directory=os.getcwd(), - ), + loadable_target_origin=get_loadable_target_origin(filename), location_name=filename, ) @@ -95,6 +102,35 @@ def get_workspace_request_context(filenames: Sequence[str]): yield workspace_context +@contextmanager +def get_grpc_workspace_request_context(filename: str): + with instance_for_test( + overrides={ + "run_launcher": { + "module": "dagster._core.launcher.sync_in_memory_run_launcher", + "class": "SyncInMemoryRunLauncher", + }, + } + ) as instance: + with GrpcServerProcess( + instance_ref=instance.get_ref(), + loadable_target_origin=get_loadable_target_origin(filename), + max_workers=4, + wait_on_exit=False, + ) as server_process: + target = GrpcServerTarget( + host="localhost", + socket=server_process.socket, + port=server_process.port, + location_name="test", + ) + with create_test_daemon_workspace_context( + workspace_load_target=target, instance=instance + ) as workspace_context: + _setup_instance(workspace_context) + yield workspace_context + + @contextmanager def get_threadpool_executor(): with SingleThreadPoolExecutor() as executor: @@ -463,8 +499,8 @@ def test_backfill_creation_simple(location: str) -> None: def test_backfill_with_runs_and_checks() -> None: - with get_workspace_request_context( - ["backfill_with_runs_and_checks"] + with get_grpc_workspace_request_context( + "backfill_with_runs_and_checks" ) as context, get_threadpool_executor() as executor: asset_graph = context.create_request_context().asset_graph @@ -524,8 +560,8 @@ def test_backfill_with_runs_and_checks() -> None: def test_custom_condition() -> None: - with get_workspace_request_context( - ["custom_condition"] + with get_grpc_workspace_request_context( + "custom_condition" ) as context, get_threadpool_executor() as executor: time = datetime.datetime(2024, 8, 16, 1, 35) @@ -547,3 +583,23 @@ def test_custom_condition() -> None: _execute_ticks(context, executor) runs = _get_runs_for_latest_ticks(context) assert len(runs) == 0 + + +def test_massive_user_code(capsys) -> None: + with get_grpc_workspace_request_context( + "massive_user_code" + ) as context, get_threadpool_executor() as executor: + freeze_dt = datetime.datetime(2024, 8, 16, 1, 35) + + for _ in range(2): + clock_time = time.time() + with freeze_time(freeze_dt): + _execute_ticks(context, executor) + runs = _get_runs_for_latest_ticks(context) + assert len(runs) == 0 + assert time.time() - clock_time < 20.0 + + freeze_dt += datetime.timedelta(minutes=1) + + for line in capsys.readouterr(): + assert "RESOURCE_EXHAUSTED" not in line diff --git a/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/perf_tests/test_perf.py b/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/perf_tests/test_perf.py index 00214aef6fba8..4d950fea52bf7 100644 --- a/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/perf_tests/test_perf.py +++ b/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/perf_tests/test_perf.py @@ -23,7 +23,7 @@ def test_eager_perf() -> None: AssetLayerConfig(200, 2, daily_partitions_def), AssetLayerConfig(100, 2, daily_partitions_def), ], - auto_materialize_policy=AutomationCondition.eager().as_auto_materialize_policy(), + automation_condition=AutomationCondition.eager(), ) instance = DagsterInstance.ephemeral()