diff --git a/python_modules/dagster-graphql/dagster_graphql/schema/instigation.py b/python_modules/dagster-graphql/dagster_graphql/schema/instigation.py index 27eb75679c01d..81b27d529c49f 100644 --- a/python_modules/dagster-graphql/dagster_graphql/schema/instigation.py +++ b/python_modules/dagster-graphql/dagster_graphql/schema/instigation.py @@ -271,7 +271,7 @@ def __init__(self, tick: InstigatorTick): cursor=tick.cursor, logKey=tick.log_key, endTimestamp=tick.end_timestamp, - autoMaterializeAssetEvaluationId=tick.tick_data.auto_materialize_evaluation_id, + autoMaterializeAssetEvaluationId=tick.automation_condition_evaluation_id, ) def resolve_id(self, _): diff --git a/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_asset_condition_evaluations.py b/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_asset_condition_evaluations.py index 79b460170e8ac..0e16cbe682493 100644 --- a/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_asset_condition_evaluations.py +++ b/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_asset_condition_evaluations.py @@ -165,8 +165,7 @@ def test_get_tick_range(self, graphql_context): assert len(result.data["autoMaterializeTicks"]) == 1 tick = result.data["autoMaterializeTicks"][0] assert ( - tick["autoMaterializeAssetEvaluationId"] - == success_2.tick_data.auto_materialize_evaluation_id + tick["autoMaterializeAssetEvaluationId"] == success_2.automation_condition_evaluation_id ) result = execute_dagster_graphql( @@ -186,7 +185,7 @@ def test_get_tick_range(self, graphql_context): assert ticks[0]["timestamp"] == success_1.timestamp assert ( ticks[0]["autoMaterializeAssetEvaluationId"] - == success_1.tick_data.auto_materialize_evaluation_id + == success_1.automation_condition_evaluation_id ) cursor = ticks[0]["id"] diff --git a/python_modules/dagster/dagster/_core/scheduler/instigation.py b/python_modules/dagster/dagster/_core/scheduler/instigation.py index a81d2acba0c73..e147821cdcc89 100644 --- a/python_modules/dagster/dagster/_core/scheduler/instigation.py +++ b/python_modules/dagster/dagster/_core/scheduler/instigation.py @@ -528,6 +528,17 @@ def unsubmitted_run_ids_with_requests(self) -> Sequence[Tuple[str, RunRequest]]: if run_id in unrequested_run_ids ] + @property + def automation_condition_evaluation_id(self) -> int: + """Returns a unique identifier for the current automation condition evaluation. In general, + this will be identical to the current tick id, but in cases where an evaluation needs to + be retried, an override value may be set. + """ + if self.tick_data.auto_materialize_evaluation_id is not None: + return self.tick_data.auto_materialize_evaluation_id + else: + return self.tick_id + @whitelist_for_serdes( old_storage_names={"JobTickData"}, diff --git a/python_modules/dagster/dagster/_daemon/asset_daemon.py b/python_modules/dagster/dagster/_daemon/asset_daemon.py index 7f183578c54ac..a470d75edb55d 100644 --- a/python_modules/dagster/dagster/_daemon/asset_daemon.py +++ b/python_modules/dagster/dagster/_daemon/asset_daemon.py @@ -330,10 +330,6 @@ def write(self) -> None: class AssetDaemon(DagsterDaemon): def __init__(self, settings: Mapping[str, Any], pre_sensor_interval_seconds: int): - self._initialized_evaluation_id = False - self._evaluation_id_lock = threading.Lock() - self._next_evaluation_id = None - self._pre_sensor_interval_seconds = pre_sensor_interval_seconds self._last_pre_sensor_submit_time = None @@ -360,46 +356,6 @@ def _get_print_sensor_name(self, sensor: Optional[RemoteSensor]) -> str: ) return f" for {sensor.name} in {repo_name}" - def _initialize_evaluation_id( - self, - instance: DagsterInstance, - ): - # Find the largest stored evaluation ID across all auto-materialize cursor - # to initialize the thread-safe evaluation ID counter - with self._evaluation_id_lock: - sensor_states = check.not_none(instance.schedule_storage).all_instigator_state( - instigator_type=InstigatorType.SENSOR - ) - - self._next_evaluation_id = 0 - for sensor_state in sensor_states: - if not ( - sensor_state.sensor_instigator_data - and sensor_state.sensor_instigator_data.sensor_type - and sensor_state.sensor_instigator_data.sensor_type.is_handled_by_asset_daemon - ): - continue - - compressed_cursor = sensor_state.sensor_instigator_data.cursor - if compressed_cursor: - stored_evaluation_id = asset_daemon_cursor_from_instigator_serialized_cursor( - compressed_cursor, None - ).evaluation_id - self._next_evaluation_id = max(self._next_evaluation_id, stored_evaluation_id) - - stored_cursor = _get_pre_sensor_auto_materialize_cursor(instance, None) - self._next_evaluation_id = max(self._next_evaluation_id, stored_cursor.evaluation_id) - - self._initialized_evaluation_id = True - - def _get_next_evaluation_id(self): - # Thread-safe way to generate a new evaluation ID across multiple - # workers running asset policy sensors at once - with self._evaluation_id_lock: - check.invariant(self._initialized_evaluation_id) - self._next_evaluation_id = self._next_evaluation_id + 1 - return self._next_evaluation_id - def core_loop( self, workspace_process_context: IWorkspaceProcessContext, @@ -471,8 +427,6 @@ def _run_iteration_impl( workspace = workspace_process_context.create_request_context() - if not self._initialized_evaluation_id: - self._initialize_evaluation_id(instance) sensors_and_repos: Sequence[Tuple[Optional[RemoteSensor], Optional[RemoteRepository]]] = [] if use_auto_materialize_sensors: @@ -799,18 +753,25 @@ def _process_auto_materialize_tick_generator( # Determine if the most recent tick requires retrying retry_tick: Optional[InstigatorTick] = None - + override_evaluation_id: Optional[int] = None if latest_tick: + can_resume = ( + get_current_timestamp() - latest_tick.timestamp + ) <= MAX_TIME_TO_RESUME_TICK_SECONDS + previous_cursor_written = ( + latest_tick.automation_condition_evaluation_id == stored_cursor.evaluation_id + ) + + if can_resume and not previous_cursor_written: + # if the tick failed before writing a cursor, we don't want to advance the + # evaluation id yet + override_evaluation_id = latest_tick.automation_condition_evaluation_id + # If the previous tick matches the stored cursor's evaluation ID, check if it failed # or crashed partway through execution and needs to be resumed # Don't resume very old ticks though in case the daemon crashed for a long time and # then restarted - if ( - get_current_timestamp() - latest_tick.timestamp - <= MAX_TIME_TO_RESUME_TICK_SECONDS - and latest_tick.tick_data.auto_materialize_evaluation_id - == stored_cursor.evaluation_id - ): + if can_resume and previous_cursor_written: if latest_tick.status == TickStatus.STARTED: self._logger.warn( f"Tick for evaluation {stored_cursor.evaluation_id}{print_group_name} was interrupted part-way through, resuming" @@ -829,8 +790,12 @@ def _process_auto_materialize_tick_generator( error=None, timestamp=evaluation_time.timestamp(), end_timestamp=None, - ), + )._replace( + # make sure to override the evaluation id to stay on the previous value + auto_materialize_evaluation_id=latest_tick.automation_condition_evaluation_id + ) ) + # otherwise, tick completed normally, no need to do anything else: # (The evaluation IDs not matching indicates that the tick failed or crashed before # the cursor could be written, so no runs have been launched and it's safe to @@ -839,7 +804,7 @@ def _process_auto_materialize_tick_generator( # Old tick that won't be resumed - move it into a SKIPPED state so it isn't # left dangling in STARTED self._logger.warn( - f"Moving dangling STARTED tick from evaluation {latest_tick.tick_data.auto_materialize_evaluation_id}{print_group_name} into SKIPPED" + f"Moving dangling STARTED tick from evaluation {latest_tick.automation_condition_evaluation_id}{print_group_name} into SKIPPED" ) latest_tick = latest_tick.with_status(status=TickStatus.SKIPPED) instance.update_tick(latest_tick) @@ -847,10 +812,6 @@ def _process_auto_materialize_tick_generator( if retry_tick: tick = retry_tick else: - # Evaluation ID will always be monotonically increasing, but will not always - # be auto-incrementing by 1 once there are multiple AMP evaluations happening in - # parallel - next_evaluation_id = self._get_next_evaluation_id() tick = instance.create_tick( TickData( instigator_origin_id=instigator_origin_id, @@ -861,7 +822,9 @@ def _process_auto_materialize_tick_generator( status=TickStatus.STARTED, timestamp=evaluation_time.timestamp(), selector_id=instigator_selector_id, - auto_materialize_evaluation_id=next_evaluation_id, + # we only set the auto_materialize_evaluation_id if it is not equal to the + # current tick id + auto_materialize_evaluation_id=override_evaluation_id, ) ) @@ -908,7 +871,7 @@ def _evaluate_auto_materialize_tick( is_retry: bool, instigator_state: Optional[InstigatorState], ): - evaluation_id = check.not_none(tick.tick_data.auto_materialize_evaluation_id) + evaluation_id = tick.automation_condition_evaluation_id instance = workspace_process_context.instance diff --git a/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/daemon_tests/test_asset_daemon.py b/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/daemon_tests/test_asset_daemon.py index b285e48e70ac7..e103ed77c769e 100644 --- a/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/daemon_tests/test_asset_daemon.py +++ b/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/daemon_tests/test_asset_daemon.py @@ -45,7 +45,7 @@ set_auto_materialize_paused, ) from dagster._serdes.serdes import deserialize_value, serialize_value -from dagster._time import get_current_datetime +from dagster._time import get_current_datetime, get_current_timestamp from dagster_tests.definitions_tests.declarative_automation_tests.legacy_tests.updated_scenarios.basic_scenarios import ( basic_scenarios, @@ -327,7 +327,7 @@ def test_daemon_paused() -> None: assert ticks[0].status == TickStatus.SUCCESS assert ticks[0].timestamp == state.current_time.timestamp() assert ticks[0].tick_data.end_timestamp == state.current_time.timestamp() - assert ticks[0].tick_data.auto_materialize_evaluation_id == 1 + assert ticks[0].automation_condition_evaluation_id == 1 state = daemon_scenario.execution_fn(state) ticks = _get_asset_daemon_ticks(instance) @@ -337,7 +337,7 @@ def test_daemon_paused() -> None: assert ticks[-1].status == TickStatus.SKIPPED assert ticks[-1].timestamp == state.current_time.timestamp() assert ticks[-1].tick_data.end_timestamp == state.current_time.timestamp() - assert ticks[-1].tick_data.auto_materialize_evaluation_id == 2 + assert ticks[-1].automation_condition_evaluation_id == 2 three_assets = ScenarioSpec(asset_specs=[AssetSpec("A"), AssetSpec("B"), AssetSpec("C")]) @@ -449,7 +449,19 @@ def test_auto_materialize_sensor_no_transition(): def test_auto_materialize_sensor_transition(): with get_daemon_instance(paused=False) as instance: # Have been using global AMP, so there is a cursor - pre_sensor_evaluation_id = 12345 + pre_sensor_evaluation_id = 4 + for _ in range(pre_sensor_evaluation_id): + # create junk ticks so that the next tick id will be 4 + instance.create_tick( + TickData( + instigator_origin_id="", + instigator_name="", + instigator_type=InstigatorType.SCHEDULE, + status=TickStatus.SUCCESS, + timestamp=get_current_timestamp(), + run_ids=[], + ) + ) assert not get_has_migrated_to_sensors(instance) @@ -578,13 +590,15 @@ def test_auto_materialize_sensor_name_transition() -> None: # skip over the old state for the old name if sensor_state.instigator_name == "default_auto_materialize_sensor": continue - # ensure that we're properly accounting for the old cursor information + # we do not account for the old cursor as it is assumed that the current + # tick id will be strictly larger than the current asset daemon cursor + # value in the real world (as each evaluation creates a new tick) assert ( asset_daemon_cursor_from_instigator_serialized_cursor( cast(SensorInstigatorData, sensor_state.instigator_data).cursor, None, ).evaluation_id - > 2 + > 0 # real world should be larger ) @@ -600,7 +614,19 @@ def test_auto_materialize_sensor_ticks(num_threads): }, ) as instance: with _get_threadpool_executor(instance) as threadpool_executor: - pre_sensor_evaluation_id = 12345 + pre_sensor_evaluation_id = 3 + for _ in range(pre_sensor_evaluation_id): + # create junk ticks so that the next tick id will be 4 + instance.create_tick( + TickData( + instigator_origin_id="", + instigator_name="", + instigator_type=InstigatorType.SCHEDULE, + status=TickStatus.SUCCESS, + timestamp=get_current_timestamp(), + run_ids=[], + ) + ) instance.daemon_cursor_storage.set_cursor_values( { @@ -817,7 +843,7 @@ def test_auto_materialize_sensor_ticks(num_threads): prev_evaluation_id = None for tick in ticks: - evaluation_id = tick.tick_data.auto_materialize_evaluation_id + evaluation_id = tick.automation_condition_evaluation_id assert evaluation_id > pre_sensor_evaluation_id assert evaluation_id not in seen_evaluation_ids seen_evaluation_ids.add(evaluation_id) diff --git a/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/daemon_tests/test_asset_daemon_failure_recovery.py b/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/daemon_tests/test_asset_daemon_failure_recovery.py index e8fe8903fc8e1..4477d8c3f1af5 100644 --- a/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/daemon_tests/test_asset_daemon_failure_recovery.py +++ b/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/daemon_tests/test_asset_daemon_failure_recovery.py @@ -101,7 +101,7 @@ def test_old_tick_not_resumed(daemon_not_paused_instance): assert len(ticks) == 1 assert ticks[0].status == TickStatus.FAILURE - assert ticks[0].tick_data.auto_materialize_evaluation_id == 1 + assert ticks[0].automation_condition_evaluation_id == 1 assert ticks[0].timestamp == execution_time.timestamp() # advancing past MAX_TIME_TO_RESUME_TICK_SECONDS gives up and advances to a new evaluation @@ -123,7 +123,7 @@ def test_old_tick_not_resumed(daemon_not_paused_instance): assert len(ticks) == 2 assert ticks[0].status == TickStatus.FAILURE - assert ticks[0].tick_data.auto_materialize_evaluation_id == 2 + assert ticks[0].automation_condition_evaluation_id == 2 # advancing less than that retries the same tick execution_time = execution_time + datetime.timedelta( @@ -144,7 +144,7 @@ def test_old_tick_not_resumed(daemon_not_paused_instance): assert len(ticks) == 3 assert ticks[0].status == TickStatus.FAILURE - assert ticks[0].tick_data.auto_materialize_evaluation_id == 2 + assert ticks[0].automation_condition_evaluation_id == 2 @pytest.mark.parametrize( @@ -182,7 +182,7 @@ def test_error_loop_before_cursor_written(daemon_not_paused_instance, crash_loca assert ticks[0].status == TickStatus.FAILURE assert ticks[0].timestamp == test_time.timestamp() assert ticks[0].tick_data.end_timestamp == test_time.timestamp() - assert ticks[0].tick_data.auto_materialize_evaluation_id == 1 + assert ticks[0].automation_condition_evaluation_id == 1 # each tick is considered a brand new retry since it happened before the cursor # was written @@ -217,7 +217,7 @@ def test_error_loop_before_cursor_written(daemon_not_paused_instance, crash_loca assert ticks[0].status == TickStatus.SUCCESS assert ticks[0].timestamp == test_time.timestamp() assert ticks[0].tick_data.end_timestamp == test_time.timestamp() - assert ticks[0].tick_data.auto_materialize_evaluation_id == 1 # finally finishes + assert ticks[0].automation_condition_evaluation_id == 1 # finally finishes runs = instance.get_runs() assert len(runs) == 5 @@ -263,7 +263,7 @@ def test_error_loop_after_cursor_written(daemon_not_paused_instance, crash_locat assert ticks[0].status == TickStatus.FAILURE assert ticks[0].timestamp == test_time.timestamp() assert ticks[0].tick_data.end_timestamp == test_time.timestamp() - assert ticks[0].tick_data.auto_materialize_evaluation_id == 1 + assert ticks[0].automation_condition_evaluation_id == 1 # failure count does not increase since it was a user code error assert ticks[0].tick_data.failure_count == 0 @@ -304,7 +304,7 @@ def test_error_loop_after_cursor_written(daemon_not_paused_instance, crash_locat assert ticks[0].status == TickStatus.FAILURE assert ticks[0].timestamp == test_time.timestamp() assert ticks[0].tick_data.end_timestamp == test_time.timestamp() - assert ticks[0].tick_data.auto_materialize_evaluation_id == 1 + assert ticks[0].automation_condition_evaluation_id == 1 # failure count only increases if the cursor was written - otherwise # each tick is considered a brand new retry @@ -342,7 +342,7 @@ def test_error_loop_after_cursor_written(daemon_not_paused_instance, crash_locat assert ticks[0].status == TickStatus.FAILURE assert ticks[0].timestamp == test_time.timestamp() assert ticks[0].tick_data.end_timestamp == test_time.timestamp() - assert ticks[0].tick_data.auto_materialize_evaluation_id == 2 # advances + assert ticks[0].automation_condition_evaluation_id == 5 # advances, skipping a few numbers assert "Oops new tick" in str(ticks[0].tick_data.error) @@ -370,7 +370,7 @@ def test_error_loop_after_cursor_written(daemon_not_paused_instance, crash_locat assert ticks[0].status != TickStatus.FAILURE assert ticks[0].timestamp == test_time.timestamp() assert ticks[0].tick_data.end_timestamp == test_time.timestamp() - assert ticks[0].tick_data.auto_materialize_evaluation_id == 2 # finishes + assert ticks[0].automation_condition_evaluation_id == 5 # finishes spawn_ctx = multiprocessing.get_context("spawn") @@ -438,7 +438,7 @@ def test_asset_daemon_crash_recovery(daemon_not_paused_instance, crash_location) assert not ticks[0].tick_data.end_timestamp == scenario.current_time.timestamp() assert not len(ticks[0].tick_data.run_ids) - assert ticks[0].tick_data.auto_materialize_evaluation_id == 1 + assert ticks[0].automation_condition_evaluation_id == 1 freeze_datetime = scenario.current_time + datetime.timedelta(seconds=1) @@ -479,7 +479,7 @@ def test_asset_daemon_crash_recovery(daemon_not_paused_instance, crash_location) ) assert ticks[0].tick_data.end_timestamp == freeze_datetime.timestamp() assert len(ticks[0].tick_data.run_ids) == 5 - assert ticks[0].tick_data.auto_materialize_evaluation_id == 1 + assert ticks[0].automation_condition_evaluation_id == 1 if len(ticks) == 2: # first tick is intercepted and moved into skipped instead of being stuck in STARTED @@ -547,7 +547,7 @@ def test_asset_daemon_exception_recovery(daemon_not_paused_instance, crash_locat assert ticks[0].timestamp == scenario.current_time.timestamp() assert ticks[0].tick_data.end_timestamp == scenario.current_time.timestamp() - assert ticks[0].tick_data.auto_materialize_evaluation_id == 1 + assert ticks[0].automation_condition_evaluation_id == 1 tick_data_written = crash_location not in ("EVALUATIONS_FINISHED", "ASSET_EVALUATIONS_ADDED") @@ -592,7 +592,7 @@ def test_asset_daemon_exception_recovery(daemon_not_paused_instance, crash_locat assert ticks[0].timestamp == freeze_datetime.timestamp() assert ticks[0].tick_data.end_timestamp == freeze_datetime.timestamp() assert len(ticks[0].tick_data.run_ids) == 5 - assert ticks[0].tick_data.auto_materialize_evaluation_id == 1 + assert ticks[0].automation_condition_evaluation_id == 1 _assert_run_requests_match(scenario.expected_run_requests, ticks[0].tick_data.run_requests)