diff --git a/python_modules/dagster-test/dagster_test/toys/auto_materializing/large_graph.py b/python_modules/dagster-test/dagster_test/toys/auto_materializing/large_graph.py index 6449ba0eed8b6..3a8626841e750 100644 --- a/python_modules/dagster-test/dagster_test/toys/auto_materializing/large_graph.py +++ b/python_modules/dagster-test/dagster_test/toys/auto_materializing/large_graph.py @@ -3,7 +3,7 @@ from dagster import ( AssetsDefinition, - AutoMaterializePolicy, + AutomationCondition, DailyPartitionsDefinition, HourlyPartitionsDefinition, PartitionsDefinition, @@ -11,6 +11,7 @@ asset, repository, ) +from dagster._core.definitions.auto_materialize_policy import AutoMaterializePolicy class AssetLayerConfig(NamedTuple): @@ -22,7 +23,7 @@ class AssetLayerConfig(NamedTuple): def build_assets( id: str, layer_configs: Sequence[AssetLayerConfig], - auto_materialize_policy: AutoMaterializePolicy = AutoMaterializePolicy.eager(), + automation_condition: AutomationCondition, ) -> List[AssetsDefinition]: layers = [] @@ -44,7 +45,7 @@ def build_assets( @asset( partitions_def=layer_config.partitions_def, name=f"{id}_{len(layers)}_{i}", - auto_materialize_policy=auto_materialize_policy, + automation_condition=automation_condition, non_argument_deps=non_argument_deps, ) def _asset(): @@ -74,6 +75,7 @@ def auto_materialize_large_time_graph(): AssetLayerConfig(n_assets=100, n_upstreams_per_asset=4, partitions_def=daily), AssetLayerConfig(n_assets=100, n_upstreams_per_asset=4, partitions_def=daily), ], + automation_condition=AutoMaterializePolicy.eager().to_automation_condition(), ) @@ -88,4 +90,5 @@ def auto_materialize_large_static_graph(): AssetLayerConfig(n_assets=100, n_upstreams_per_asset=4, partitions_def=static), AssetLayerConfig(n_assets=100, n_upstreams_per_asset=4, partitions_def=None), ], + automation_condition=AutoMaterializePolicy.eager().to_automation_condition(), ) diff --git a/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/daemon_tests/definitions/500_eager_assets.py b/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/daemon_tests/definitions/500_eager_assets.py new file mode 100644 index 0000000000000..7a44087659f00 --- /dev/null +++ b/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/daemon_tests/definitions/500_eager_assets.py @@ -0,0 +1,35 @@ +from dagster import ( + AutomationCondition, + AutomationConditionSensorDefinition, + DailyPartitionsDefinition, + Definitions, + HourlyPartitionsDefinition, +) +from dagster_test.toys.auto_materializing.large_graph import AssetLayerConfig, build_assets + + +def get_defs(n: int) -> Definitions: + hourly_partitions_def = HourlyPartitionsDefinition("2020-01-01-00:00") + daily_partitions_def = DailyPartitionsDefinition("2020-01-01") + unit = n // 10 + assets = build_assets( + id="perf_test", + layer_configs=[ + AssetLayerConfig(1 * unit, 0, hourly_partitions_def), + AssetLayerConfig(2 * unit, 2, hourly_partitions_def), + AssetLayerConfig(2 * unit, 4, hourly_partitions_def), + AssetLayerConfig(2 * unit, 4, daily_partitions_def), + AssetLayerConfig(2 * unit, 2, daily_partitions_def), + AssetLayerConfig(1 * unit, 2, daily_partitions_def), + ], + automation_condition=AutomationCondition.eager(), + ) + return Definitions( + assets=assets, + sensors=[ + AutomationConditionSensorDefinition("the_sensor", asset_selection="*", user_code=True) + ], + ) + + +defs = get_defs(500) diff --git a/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/daemon_tests/test_e2e.py b/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/daemon_tests/test_e2e.py index daeae8f397247..229b112fa0b7c 100644 --- a/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/daemon_tests/test_e2e.py +++ b/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/daemon_tests/test_e2e.py @@ -1,6 +1,7 @@ import datetime import os import sys +import time from contextlib import contextmanager from typing import AbstractSet, Mapping, Sequence, cast @@ -15,7 +16,12 @@ from dagster._core.execution.backfill import PartitionBackfill from dagster._core.remote_representation.external import RemoteSensor from dagster._core.remote_representation.origin import InProcessCodeLocationOrigin -from dagster._core.scheduler.instigation import InstigatorState, SensorInstigatorData +from dagster._core.scheduler.instigation import ( + InstigatorState, + InstigatorTick, + SensorInstigatorData, + TickStatus, +) from dagster._core.storage.dagster_run import DagsterRun from dagster._core.test_utils import ( InProcessTestWorkspaceLoadTarget, @@ -27,6 +33,7 @@ from dagster._core.types.loadable_target_origin import LoadableTargetOrigin from dagster._core.utils import InheritContextThreadPoolExecutor from dagster._core.workspace.context import WorkspaceProcessContext, WorkspaceRequestContext +from dagster._core.workspace.load_target import GrpcServerTarget from dagster._daemon.asset_daemon import ( AssetDaemon, asset_daemon_cursor_from_instigator_serialized_cursor, @@ -34,18 +41,23 @@ from dagster._daemon.backfill import execute_backfill_iteration from dagster._daemon.daemon import get_default_daemon_logger from dagster._daemon.sensor import execute_sensor_iteration +from dagster._grpc.server import GrpcServerProcess from dagster._time import get_current_datetime +def get_loadable_target_origin(filename: str) -> LoadableTargetOrigin: + return LoadableTargetOrigin( + executable_path=sys.executable, + module_name=( + f"dagster_tests.definitions_tests.declarative_automation_tests.daemon_tests.definitions.{filename}" + ), + working_directory=os.getcwd(), + ) + + def get_code_location_origin(filename: str) -> InProcessCodeLocationOrigin: return InProcessCodeLocationOrigin( - loadable_target_origin=LoadableTargetOrigin( - executable_path=sys.executable, - module_name=( - f"dagster_tests.definitions_tests.declarative_automation_tests.daemon_tests.definitions.{filename}" - ), - working_directory=os.getcwd(), - ), + loadable_target_origin=get_loadable_target_origin(filename), location_name=filename, ) @@ -95,6 +107,35 @@ def get_workspace_request_context(filenames: Sequence[str]): yield workspace_context +@contextmanager +def get_grpc_workspace_request_context(filename: str): + with instance_for_test( + overrides={ + "run_launcher": { + "module": "dagster._core.launcher.sync_in_memory_run_launcher", + "class": "SyncInMemoryRunLauncher", + }, + } + ) as instance: + with GrpcServerProcess( + instance_ref=instance.get_ref(), + loadable_target_origin=get_loadable_target_origin(filename), + max_workers=4, + wait_on_exit=False, + ) as server_process: + target = GrpcServerTarget( + host="localhost", + socket=server_process.socket, + port=server_process.port, + location_name="test", + ) + with create_test_daemon_workspace_context( + workspace_load_target=target, instance=instance + ) as workspace_context: + _setup_instance(workspace_context) + yield workspace_context + + @contextmanager def get_threadpool_executor(): with SingleThreadPoolExecutor() as executor: @@ -166,17 +207,23 @@ def _get_latest_evaluation_ids(context: WorkspaceProcessContext) -> AbstractSet[ return {cursor.evaluation_id for cursor in _get_current_cursors(context).values()} -def _get_reserved_ids_for_latest_ticks(context: WorkspaceProcessContext) -> Sequence[str]: - ids = [] - request_context = context.create_request_context() - for sensor in _get_automation_sensors(request_context): - ticks = request_context.instance.get_ticks( +def _get_latest_ticks(context: WorkspaceRequestContext) -> Sequence[InstigatorTick]: + latest_ticks = [] + for sensor in _get_automation_sensors(context): + ticks = context.instance.get_ticks( sensor.get_remote_origin_id(), sensor.get_remote_origin().get_selector().get_id(), limit=1, ) - latest_tick = next(iter(ticks), None) - if latest_tick and latest_tick.tick_data: + latest_ticks.extend(ticks) + return latest_ticks + + +def _get_reserved_ids_for_latest_ticks(context: WorkspaceProcessContext) -> Sequence[str]: + ids = [] + request_context = context.create_request_context() + for latest_tick in _get_latest_ticks(request_context): + if latest_tick.tick_data: ids.extend(latest_tick.tick_data.reserved_run_ids or []) return ids @@ -463,8 +510,8 @@ def test_backfill_creation_simple(location: str) -> None: def test_backfill_with_runs_and_checks() -> None: - with get_workspace_request_context( - ["backfill_with_runs_and_checks"] + with get_grpc_workspace_request_context( + "backfill_with_runs_and_checks" ) as context, get_threadpool_executor() as executor: asset_graph = context.create_request_context().asset_graph @@ -524,8 +571,8 @@ def test_backfill_with_runs_and_checks() -> None: def test_custom_condition() -> None: - with get_workspace_request_context( - ["custom_condition"] + with get_grpc_workspace_request_context( + "custom_condition" ) as context, get_threadpool_executor() as executor: time = datetime.datetime(2024, 8, 16, 1, 35) @@ -547,3 +594,30 @@ def test_custom_condition() -> None: _execute_ticks(context, executor) runs = _get_runs_for_latest_ticks(context) assert len(runs) == 0 + + +def test_500_eager_assets_user_code(capsys) -> None: + with get_grpc_workspace_request_context( + "500_eager_assets" + ) as context, get_threadpool_executor() as executor: + freeze_dt = datetime.datetime(2024, 8, 16, 1, 35) + + for _ in range(2): + clock_time = time.time() + with freeze_time(freeze_dt): + _execute_ticks(context, executor) + runs = _get_runs_for_latest_ticks(context) + assert len(runs) == 0 + duration = time.time() - clock_time + assert duration < 40.0 + + freeze_dt += datetime.timedelta(minutes=1) + + latest_ticks = _get_latest_ticks(context.create_request_context()) + assert len(latest_ticks) == 1 + # no failure + assert latest_ticks[0].status == TickStatus.SKIPPED + + # more specific check + for line in capsys.readouterr(): + assert "RESOURCE_EXHAUSTED" not in line diff --git a/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/perf_tests/test_perf.py b/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/perf_tests/test_perf.py index 00214aef6fba8..4d950fea52bf7 100644 --- a/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/perf_tests/test_perf.py +++ b/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/perf_tests/test_perf.py @@ -23,7 +23,7 @@ def test_eager_perf() -> None: AssetLayerConfig(200, 2, daily_partitions_def), AssetLayerConfig(100, 2, daily_partitions_def), ], - auto_materialize_policy=AutomationCondition.eager().as_auto_materialize_policy(), + automation_condition=AutomationCondition.eager(), ) instance = DagsterInstance.ephemeral()