From 51de49f4dee16cff1216d09b73901c00c8ae2337 Mon Sep 17 00:00:00 2001 From: Owen Kephart Date: Thu, 10 Oct 2024 14:13:56 -0700 Subject: [PATCH] Create default_automation_condition_sensor in Definitions object --- .../dagster_graphql_tests/graphql/repo.py | 17 ++- .../graphql/test_sensors.py | 10 +- .../automation_condition_sensor_definition.py | 4 +- .../_core/definitions/definitions_class.py | 16 ++- .../dagster/_core/definitions/utils.py | 97 ++++++++++++++- .../_core/remote_representation/external.py | 117 ++++++------------ .../dagster/dagster/_daemon/sensor.py | 12 +- .../test_default_auto_materialize_sensors.py | 9 +- .../definitions/old_code_server_simulation.py | 34 +++++ .../daemon_tests/test_e2e.py | 37 +++++- 10 files changed, 257 insertions(+), 96 deletions(-) create mode 100644 python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/daemon_tests/definitions/old_code_server_simulation.py diff --git a/python_modules/dagster-graphql/dagster_graphql_tests/graphql/repo.py b/python_modules/dagster-graphql/dagster_graphql_tests/graphql/repo.py index 5d42a80d7b3a4..e3fe41facd37e 100644 --- a/python_modules/dagster-graphql/dagster_graphql_tests/graphql/repo.py +++ b/python_modules/dagster-graphql/dagster_graphql_tests/graphql/repo.py @@ -101,7 +101,12 @@ from dagster._core.definitions.multi_dimensional_partitions import MultiPartitionsDefinition from dagster._core.definitions.partition import PartitionedConfig from dagster._core.definitions.reconstruct import ReconstructableRepository -from dagster._core.definitions.sensor_definition import RunRequest, SensorDefinition, SkipReason +from dagster._core.definitions.sensor_definition import ( + RunRequest, + SensorDefinition, + SensorType, + SkipReason, +) from dagster._core.definitions.unresolved_asset_job_definition import UnresolvedAssetJobDefinition from dagster._core.errors import DagsterInvalidDefinitionError from dagster._core.log_manager import coerce_valid_log_level @@ -1250,7 +1255,9 @@ def jobless_sensor(_): auto_materialize_sensor = AutomationConditionSensorDefinition( "my_auto_materialize_sensor", asset_selection=AssetSelection.assets( - "fresh_diamond_bottom", "asset_with_automation_condition" + "fresh_diamond_bottom", + "asset_with_automation_condition", + "asset_with_custom_automation_condition", ), ) @@ -2166,6 +2173,12 @@ def define_asset_checks(): def _targets_asset_job(instigator: Union[ScheduleDefinition, SensorDefinition]) -> bool: + if isinstance(instigator, SensorDefinition) and instigator.sensor_type in ( + # these contain asset selections, which are invalid with the dict repo + SensorType.AUTOMATION, + SensorType.AUTO_MATERIALIZE, + ): + return True try: return instigator.job_name in asset_job_names or instigator.has_anonymous_job except DagsterInvalidDefinitionError: # thrown when `job_name` is invalid diff --git a/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_sensors.py b/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_sensors.py index a29a4528a31ec..d191fad0990ed 100644 --- a/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_sensors.py +++ b/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_sensors.py @@ -1656,10 +1656,11 @@ def test_asset_selection(graphql_context): assert ( result.data["sensorOrError"]["assetSelection"]["assetSelectionString"] - == "fresh_diamond_bottom or asset_with_automation_condition" + == "fresh_diamond_bottom or asset_with_automation_condition or asset_with_custom_automation_condition" ) assert result.data["sensorOrError"]["assetSelection"]["assetKeys"] == [ {"path": ["asset_with_automation_condition"]}, + {"path": ["asset_with_custom_automation_condition"]}, {"path": ["fresh_diamond_bottom"]}, ] assert result.data["sensorOrError"]["assetSelection"]["assets"] == [ @@ -1667,6 +1668,10 @@ def test_asset_selection(graphql_context): "key": {"path": ["asset_with_automation_condition"]}, "definition": {"assetKey": {"path": ["asset_with_automation_condition"]}}, }, + { + "key": {"path": ["asset_with_custom_automation_condition"]}, + "definition": {"assetKey": {"path": ["asset_with_custom_automation_condition"]}}, + }, { "key": {"path": ["fresh_diamond_bottom"]}, "definition": {"assetKey": {"path": ["fresh_diamond_bottom"]}}, @@ -1676,6 +1681,9 @@ def test_asset_selection(graphql_context): { "key": {"path": ["asset_with_automation_condition"]}, }, + { + "key": {"path": ["asset_with_custom_automation_condition"]}, + }, { "key": {"path": ["fresh_diamond_bottom"]}, }, diff --git a/python_modules/dagster/dagster/_core/definitions/automation_condition_sensor_definition.py b/python_modules/dagster/dagster/_core/definitions/automation_condition_sensor_definition.py index 720bd48f41a66..70a07601abcf3 100644 --- a/python_modules/dagster/dagster/_core/definitions/automation_condition_sensor_definition.py +++ b/python_modules/dagster/dagster/_core/definitions/automation_condition_sensor_definition.py @@ -18,8 +18,9 @@ from dagster._core.errors import DagsterInvalidInvocationError from dagster._utils.tags import normalize_tags -EMIT_BACKFILLS_METADATA_KEY = "dagster/emit_backfills" MAX_ENTITIES = 500 +EMIT_BACKFILLS_METADATA_KEY = "dagster/emit_backfills" +DEFAULT_AUTOMATION_CONDITION_SENSOR_NAME = "default_automation_condition_sensor" def _evaluate(sensor_def: "AutomationConditionSensorDefinition", context: SensorEvaluationContext): @@ -57,6 +58,7 @@ def _evaluate(sensor_def: "AutomationConditionSensorDefinition", context: Sensor "or split this sensor into multiple AutomationConditionSensorDefinitions with AssetSelections that target fewer " "assets or checks." ) + run_requests, new_cursor, updated_evaluations = evaluation_context.evaluate() return SensorResult( diff --git a/python_modules/dagster/dagster/_core/definitions/definitions_class.py b/python_modules/dagster/dagster/_core/definitions/definitions_class.py index 535844f07c666..a889119fc71a5 100644 --- a/python_modules/dagster/dagster/_core/definitions/definitions_class.py +++ b/python_modules/dagster/dagster/_core/definitions/definitions_class.py @@ -43,7 +43,10 @@ from dagster._core.definitions.schedule_definition import ScheduleDefinition from dagster._core.definitions.sensor_definition import SensorDefinition from dagster._core.definitions.unresolved_asset_job_definition import UnresolvedAssetJobDefinition -from dagster._core.definitions.utils import dedupe_object_refs +from dagster._core.definitions.utils import ( + add_default_automation_condition_sensor, + dedupe_object_refs, +) from dagster._core.errors import DagsterInvariantViolationError from dagster._core.execution.build_resources import wrap_resources_for_execution from dagster._core.execution.with_resources import with_resources @@ -268,10 +271,17 @@ def _create_repository_using_definitions_args( # First, dedupe all definition types. sensors = dedupe_object_refs(sensors) jobs = dedupe_object_refs(jobs) - assets = dedupe_object_refs(assets) + assets = _canonicalize_specs_to_assets_defs(dedupe_object_refs(assets)) schedules = dedupe_object_refs(schedules) asset_checks = dedupe_object_refs(asset_checks) + # add in a default automation condition sensor definition if required + sensors = add_default_automation_condition_sensor( + sensors, + [asset for asset in assets if not isinstance(asset, CacheableAssetsDefinition)], + asset_checks or [], + ) + executor_def = ( executor if isinstance(executor, ExecutorDefinition) or executor is None @@ -296,7 +306,7 @@ def _create_repository_using_definitions_args( ) def created_repo(): return [ - *with_resources(_canonicalize_specs_to_assets_defs(assets or []), resource_defs), + *with_resources(assets, resource_defs), *with_resources(asset_checks or [], resource_defs), *(schedules_with_resources), *(sensors_with_resources), diff --git a/python_modules/dagster/dagster/_core/definitions/utils.py b/python_modules/dagster/dagster/_core/definitions/utils.py index 7a177be59b511..389302e27f310 100644 --- a/python_modules/dagster/dagster/_core/definitions/utils.py +++ b/python_modules/dagster/dagster/_core/definitions/utils.py @@ -11,17 +11,20 @@ Mapping, Optional, Sequence, + Set, Tuple, TypeVar, + Union, cast, ) import yaml import dagster._check as check +from dagster._core.definitions.asset_key import AssetCheckKey, EntityKey from dagster._core.errors import DagsterInvalidDefinitionError, DagsterInvariantViolationError from dagster._core.utils import is_valid_email -from dagster._utils.warnings import deprecation_warning +from dagster._utils.warnings import deprecation_warning, disable_dagster_warnings from dagster._utils.yaml_utils import merge_yaml_strings, merge_yamls DEFAULT_OUTPUT = "result" @@ -59,10 +62,16 @@ if TYPE_CHECKING: from dagster._core.definitions.asset_key import AssetKey + from dagster._core.definitions.asset_selection import AssetSelection + from dagster._core.definitions.assets import AssetsDefinition from dagster._core.definitions.auto_materialize_policy import AutoMaterializePolicy + from dagster._core.definitions.base_asset_graph import BaseAssetGraph from dagster._core.definitions.declarative_automation.automation_condition import ( AutomationCondition, ) + from dagster._core.definitions.sensor_definition import SensorDefinition + from dagster._core.definitions.source_asset import SourceAsset + from dagster._core.remote_representation.external import RemoteSensor class NoValueSentinel: @@ -311,3 +320,89 @@ def resolve_automation_condition( def dedupe_object_refs(objects: Optional[Iterable[T]]) -> Sequence[T]: """Dedupe definitions by reference equality.""" return list({id(obj): obj for obj in objects}.values()) if objects is not None else [] + + +def add_default_automation_condition_sensor( + sensors: Sequence["SensorDefinition"], + assets: Iterable[Union["AssetsDefinition", "SourceAsset"]], + asset_checks: Iterable["AssetsDefinition"], +) -> Sequence["SensorDefinition"]: + """Adds a default automation condition sensor if the provided sensors do not already handle all + provided assets. + """ + from dagster._core.definitions.asset_graph import AssetGraph + from dagster._core.definitions.automation_condition_sensor_definition import ( + DEFAULT_AUTOMATION_CONDITION_SENSOR_NAME, + AutomationConditionSensorDefinition, + ) + + with disable_dagster_warnings(): + asset_graph = AssetGraph.from_assets([*assets, *asset_checks]) + sensor_selection = get_default_automation_condition_sensor_selection(sensors, asset_graph) + if sensor_selection: + default_sensor = AutomationConditionSensorDefinition( + DEFAULT_AUTOMATION_CONDITION_SENSOR_NAME, asset_selection=sensor_selection + ) + sensors = [*sensors, default_sensor] + + return sensors + + +def get_default_automation_condition_sensor_selection( + sensors: Sequence[Union["SensorDefinition", "RemoteSensor"]], asset_graph: "BaseAssetGraph" +) -> Optional["AssetSelection"]: + from dagster._core.definitions.asset_selection import AssetSelection + from dagster._core.definitions.sensor_definition import SensorType + + automation_condition_sensors = sorted( + ( + s + for s in sensors + if s.sensor_type in (SensorType.AUTO_MATERIALIZE, SensorType.AUTOMATION) + ), + key=lambda s: s.name, + ) + + automation_condition_keys = set() + for k in asset_graph.materializable_asset_keys | asset_graph.asset_check_keys: + if asset_graph.get(k).automation_condition is not None: + automation_condition_keys.add(k) + + has_auto_observe_keys = False + for k in asset_graph.observable_asset_keys: + if ( + # for backcompat, treat auto-observe assets as if they have a condition + asset_graph.get(k).automation_condition is not None + or asset_graph.get(k).auto_observe_interval_minutes is not None + ): + has_auto_observe_keys = True + automation_condition_keys.add(k) + + # get the set of keys that are handled by an existing sensor + covered_keys: Set[EntityKey] = set() + for sensor in automation_condition_sensors: + selection = check.not_none(sensor.asset_selection) + covered_keys = covered_keys.union( + selection.resolve(asset_graph) | selection.resolve_checks(asset_graph) + ) + + default_sensor_keys = automation_condition_keys - covered_keys + if len(default_sensor_keys) > 0: + # Use AssetSelection.all if the default sensor is the only sensor - otherwise + # enumerate the assets that are not already included in some other + # non-default sensor + default_sensor_asset_selection = AssetSelection.all(include_sources=has_auto_observe_keys) + + # if there are any asset checks, include checks in the selection + if any(isinstance(k, AssetCheckKey) for k in default_sensor_keys): + default_sensor_asset_selection |= AssetSelection.all_asset_checks() + + # remove any selections that are already covered + for sensor in automation_condition_sensors: + default_sensor_asset_selection = default_sensor_asset_selection - check.not_none( + sensor.asset_selection + ) + return default_sensor_asset_selection + # no additional sensor required + else: + return None diff --git a/python_modules/dagster/dagster/_core/remote_representation/external.py b/python_modules/dagster/dagster/_core/remote_representation/external.py index e1b271153c1ed..2b4779d8fc96e 100644 --- a/python_modules/dagster/dagster/_core/remote_representation/external.py +++ b/python_modules/dagster/dagster/_core/remote_representation/external.py @@ -23,6 +23,9 @@ from dagster._config.snap import ConfigFieldSnap, ConfigSchemaSnapshot from dagster._core.definitions.asset_check_spec import AssetCheckKey from dagster._core.definitions.asset_job import IMPLICIT_ASSET_JOB_NAME +from dagster._core.definitions.automation_condition_sensor_definition import ( + DEFAULT_AUTOMATION_CONDITION_SENSOR_NAME, +) from dagster._core.definitions.backfill_policy import BackfillPolicy from dagster._core.definitions.events import AssetKey from dagster._core.definitions.metadata import MetadataValue @@ -40,6 +43,7 @@ DefaultSensorStatus, SensorType, ) +from dagster._core.definitions.utils import get_default_automation_condition_sensor_selection from dagster._core.errors import DagsterError from dagster._core.execution.plan.handle import ResolvedFromDynamicStepHandle, StepHandle from dagster._core.instance import DagsterInstance @@ -86,7 +90,6 @@ from dagster._utils.schedules import schedule_execution_time_iterator if TYPE_CHECKING: - from dagster._core.definitions.asset_key import EntityKey from dagster._core.definitions.remote_asset_graph import RemoteRepositoryAssetGraph from dagster._core.scheduler.instigation import InstigatorState from dagster._core.snap.execution_plan_snapshot import ExecutionStepSnap @@ -191,9 +194,6 @@ def _utilized_env_vars(self) -> Mapping[str, Sequence[EnvVarConsumer]]: def get_utilized_env_vars(self) -> Mapping[str, Sequence[EnvVarConsumer]]: return self._utilized_env_vars - def get_default_auto_materialize_sensor_name(self) -> str: - return "default_automation_condition_sensor" - @property @cached_method def _sensors(self) -> Dict[str, "RemoteSensor"]: @@ -202,81 +202,28 @@ def _sensors(self) -> Dict[str, "RemoteSensor"]: for sensor_snap in self.repository_snap.sensors } - if self._instance.auto_materialize_use_sensors: - asset_graph = self.asset_graph - - has_any_auto_observe_source_assets = False - - existing_automation_condition_sensors = { - sensor_name: sensor - for sensor_name, sensor in sensor_datas.items() - if sensor.sensor_type in (SensorType.AUTO_MATERIALIZE, SensorType.AUTOMATION) - } - - covered_entity_keys: Set[EntityKey] = set() - for sensor in existing_automation_condition_sensors.values(): - selection = check.not_none(sensor.asset_selection) - covered_entity_keys = covered_entity_keys.union( - # for now, all asset checks are handled by the same asset as their asset - selection.resolve(asset_graph) | selection.resolve_checks(asset_graph) - ) - - default_sensor_entity_keys = set() - for entity_key in asset_graph.materializable_asset_keys | asset_graph.asset_check_keys: - if not asset_graph.get(entity_key).automation_condition: - continue - - if entity_key not in covered_entity_keys: - default_sensor_entity_keys.add(entity_key) - - for asset_key in asset_graph.observable_asset_keys: - if ( - asset_graph.get(asset_key).auto_observe_interval_minutes is None - and asset_graph.get(asset_key).automation_condition is None - ): - continue - - has_any_auto_observe_source_assets = True - - if asset_key not in covered_entity_keys: - default_sensor_entity_keys.add(asset_key) - - if default_sensor_entity_keys: - default_sensor_asset_check_keys = { - key for key in default_sensor_entity_keys if isinstance(key, AssetCheckKey) - } - # Use AssetSelection.all if the default sensor is the only sensor - otherwise - # enumerate the assets that are not already included in some other - # non-default sensor - default_sensor_asset_selection = AssetSelection.all( - include_sources=has_any_auto_observe_source_assets - ) - # if there are any asset checks, include them - if default_sensor_asset_check_keys: - default_sensor_asset_selection |= AssetSelection.all_asset_checks() - - for sensor in existing_automation_condition_sensors.values(): - default_sensor_asset_selection = ( - default_sensor_asset_selection - check.not_none(sensor.asset_selection) - ) - - default_sensor_data = SensorSnap( - name=self.get_default_auto_materialize_sensor_name(), - job_name=None, - op_selection=None, - asset_selection=default_sensor_asset_selection, - mode=None, - min_interval=30, - description=None, - target_dict={}, - metadata=None, - default_status=None, - sensor_type=SensorType.AUTO_MATERIALIZE, - run_tags=None, - ) - sensor_datas[default_sensor_data.name] = RemoteSensor( - default_sensor_data, self._handle - ) + # if necessary, create a default automation condition sensor + # NOTE: if a user's code location is at a version >= 1.9, then this step should + # never be necessary, as this will be added in Definitions construction process + default_sensor_selection = get_default_automation_condition_sensor_selection( + sensors=[data for data in sensor_datas.values()], asset_graph=self.asset_graph + ) + if default_sensor_selection is not None: + default_sensor_data = SensorSnap( + name=DEFAULT_AUTOMATION_CONDITION_SENSOR_NAME, + job_name=None, + op_selection=None, + asset_selection=default_sensor_selection, + mode=None, + min_interval=30, + description=None, + target_dict={}, + metadata=None, + default_status=None, + sensor_type=SensorType.AUTO_MATERIALIZE, + run_tags=None, + ) + sensor_datas[default_sensor_data.name] = RemoteSensor(default_sensor_data, self._handle) return sensor_datas @@ -466,6 +413,18 @@ def _sensor_mappings( try: keys = sensor.asset_selection.resolve(self.asset_graph) for key in keys: + # only count an asset as targeted by an automation condition sensor if it + # has an automation condition + if sensor.sensor_type in ( + SensorType.AUTO_MATERIALIZE, + SensorType.AUTOMATION, + ): + node_snap = self.get_asset_node_snap(key) + if not node_snap or not ( + node_snap.automation_condition + or node_snap.automation_condition_snapshot + ): + continue asset_key_mapping[key].append(sensor) except DagsterError: pass diff --git a/python_modules/dagster/dagster/_daemon/sensor.py b/python_modules/dagster/dagster/_daemon/sensor.py index 7881698ddac22..f9daf37f65a3b 100644 --- a/python_modules/dagster/dagster/_daemon/sensor.py +++ b/python_modules/dagster/dagster/_daemon/sensor.py @@ -37,10 +37,11 @@ ) from dagster._core.definitions.run_request import DagsterRunReaction, InstigatorType, RunRequest from dagster._core.definitions.selector import JobSubsetSelector -from dagster._core.definitions.sensor_definition import DefaultSensorStatus +from dagster._core.definitions.sensor_definition import DefaultSensorStatus, SensorType from dagster._core.errors import ( DagsterCodeLocationLoadError, DagsterError, + DagsterInvalidInvocationError, DagsterUserCodeUnreachableError, ) from dagster._core.execution.backfill import PartitionBackfill @@ -786,6 +787,15 @@ def _evaluate_sensor( sensor_debug_crash_flags: Optional[SingleInstigatorDebugCrashFlags] = None, ): instance = workspace_process_context.instance + if ( + remote_sensor.sensor_type == SensorType.AUTOMATION + and not instance.auto_materialize_use_sensors + ): + raise DagsterInvalidInvocationError( + "Cannot evaluate an AutomationConditionSensorDefinition if the instance setting " + "`auto_materialize: use_sensors` is set to False. Update your configuration to prevent this error.", + ) + context.logger.info(f"Checking for new runs for sensor: {remote_sensor.name}") code_location = _get_code_location_for_sensor(workspace_process_context, remote_sensor) repository_handle = remote_sensor.handle.repository_handle diff --git a/python_modules/dagster/dagster_tests/definitions_tests/asset_policy_sensors_tests/test_default_auto_materialize_sensors.py b/python_modules/dagster/dagster_tests/definitions_tests/asset_policy_sensors_tests/test_default_auto_materialize_sensors.py index 7d285b2bbc518..5a0784bbf92de 100644 --- a/python_modules/dagster/dagster_tests/definitions_tests/asset_policy_sensors_tests/test_default_auto_materialize_sensors.py +++ b/python_modules/dagster/dagster_tests/definitions_tests/asset_policy_sensors_tests/test_default_auto_materialize_sensors.py @@ -150,13 +150,13 @@ def test_default_auto_materialize_sensors_without_observable( assert auto_materialize_sensor.asset_selection == AssetSelection.all(include_sources=False) -def test_no_default_auto_materialize_sensors(instance_without_auto_materialize_sensors): +def test_opt_out_default_auto_materialize_sensors(instance_without_auto_materialize_sensors): repo_handle = RepositoryHandle.for_test( location_name="foo_location", repository_name="bar_repo", ) - # If not opted in, no default sensors are created + # If opted out, we still do create default auto materialize sensors remote_repo = RemoteRepository( RepositorySnap.from_def( defs.get_repository_def(), @@ -165,8 +165,9 @@ def test_no_default_auto_materialize_sensors(instance_without_auto_materialize_s instance=instance_without_auto_materialize_sensors, ) sensors = remote_repo.get_sensors() - assert len(sensors) == 1 - assert sensors[0].name == "normal_sensor" + assert len(sensors) == 2 + assert sensors[0].name == "default_automation_condition_sensor" + assert sensors[1].name == "normal_sensor" def test_combine_default_sensors_with_non_default_sensors(instance_with_auto_materialize_sensors): diff --git a/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/daemon_tests/definitions/old_code_server_simulation.py b/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/daemon_tests/definitions/old_code_server_simulation.py new file mode 100644 index 0000000000000..1fcb97e22114e --- /dev/null +++ b/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/daemon_tests/definitions/old_code_server_simulation.py @@ -0,0 +1,34 @@ +import dagster as dg +from dagster._core.definitions.asset_graph import AssetGraph +from dagster._core.definitions.asset_job import IMPLICIT_ASSET_JOB_NAME, build_asset_job +from dagster._core.definitions.repository_definition.repository_data import CachingRepositoryData +from dagster._core.definitions.repository_definition.repository_definition import ( + RepositoryDefinition, +) + + +@dg.asset(automation_condition=dg.AutomationCondition.initial_evaluation()) +def old_asset() -> None: ... + + +# directly construct so we can simulate an older repo without auto-constructed sensors +repo_data = CachingRepositoryData( + jobs={ + # implicit asset job to make this valid repo data + IMPLICIT_ASSET_JOB_NAME: build_asset_job( + IMPLICIT_ASSET_JOB_NAME, + AssetGraph.from_assets([old_asset]), + allow_different_partitions_defs=True, + ) + }, + schedules={}, + sensors={}, + source_assets_by_key={}, + assets_defs_by_key={old_asset.key: old_asset}, + asset_checks_defs_by_key={}, + top_level_resources={}, + utilized_env_vars={}, + unresolved_partitioned_asset_schedules={}, +) +repo = RepositoryDefinition("repo", repository_data=repo_data) +assert repo.schedule_defs == [] diff --git a/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/daemon_tests/test_e2e.py b/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/daemon_tests/test_e2e.py index a5cbdfa25ee5f..7cce4f8f2d523 100644 --- a/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/daemon_tests/test_e2e.py +++ b/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/daemon_tests/test_e2e.py @@ -3,7 +3,7 @@ import sys import time from contextlib import contextmanager -from typing import AbstractSet, Mapping, Optional, Sequence, cast +from typing import AbstractSet, Any, Dict, Mapping, Optional, Sequence, cast import dagster._check as check import pytest @@ -90,13 +90,16 @@ def _setup_instance(context: WorkspaceProcessContext) -> None: @contextmanager -def get_workspace_request_context(filenames: Sequence[str]): +def get_workspace_request_context( + filenames: Sequence[str], overrides: Optional[Dict[str, Any]] = None +): with instance_for_test( overrides={ "run_launcher": { "module": "dagster._core.launcher.sync_in_memory_run_launcher", "class": "SyncInMemoryRunLauncher", }, + **(overrides or {}), } ) as instance: target = InProcessTestWorkspaceLoadTarget( @@ -672,5 +675,31 @@ def test_500_eager_assets_user_code(capsys) -> None: assert latest_ticks[0].status == TickStatus.SKIPPED # more specific check - for line in capsys.readouterr(): - assert "RESOURCE_EXHAUSTED" not in line + assert "RESOURCE_EXHAUSTED" not in capsys.readouterr().out + + +def test_fail_if_not_use_sensors(capsys) -> None: + with get_workspace_request_context( + ["simple_user_code"], overrides={"auto_materialize": {"use_sensors": False}} + ) as context, get_threadpool_executor() as executor: + _execute_ticks(context, executor) + latest_ticks = _get_latest_ticks(context.create_request_context()) + assert len(latest_ticks) == 1 + # no failure + assert latest_ticks[0].status == TickStatus.FAILURE + assert ( + "Cannot evaluate an AutomationConditionSensorDefinition if the instance setting `auto_materialize: use_sensors` is set to False" + in capsys.readouterr().out + ) + + +def test_simple_old_code_server() -> None: + with get_grpc_workspace_request_context( + "old_code_server_simulation" + ) as context, get_threadpool_executor() as executor: + time = datetime.datetime(2024, 8, 16, 1, 35) + with freeze_time(time): + # initial evaluation + _execute_ticks(context, executor) + runs = _get_runs_for_latest_ticks(context) + assert len(runs) == 1