Skip to content

Commit

Permalink
[WIP] Update evaluation id logic
Browse files Browse the repository at this point in the history
  • Loading branch information
OwenKephart committed Oct 22, 2024
1 parent 0d80191 commit c044439
Show file tree
Hide file tree
Showing 7 changed files with 92 additions and 90 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ def __init__(self, tick: InstigatorTick):
cursor=tick.cursor,
logKey=tick.log_key,
endTimestamp=tick.end_timestamp,
autoMaterializeAssetEvaluationId=tick.tick_data.auto_materialize_evaluation_id,
autoMaterializeAssetEvaluationId=tick.automation_condition_evaluation_id,
)

def resolve_id(self, _):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,8 +165,7 @@ def test_get_tick_range(self, graphql_context):
assert len(result.data["autoMaterializeTicks"]) == 1
tick = result.data["autoMaterializeTicks"][0]
assert (
tick["autoMaterializeAssetEvaluationId"]
== success_2.tick_data.auto_materialize_evaluation_id
tick["autoMaterializeAssetEvaluationId"] == success_2.automation_condition_evaluation_id
)

result = execute_dagster_graphql(
Expand All @@ -186,7 +185,7 @@ def test_get_tick_range(self, graphql_context):
assert ticks[0]["timestamp"] == success_1.timestamp
assert (
ticks[0]["autoMaterializeAssetEvaluationId"]
== success_1.tick_data.auto_materialize_evaluation_id
== success_1.automation_condition_evaluation_id
)

cursor = ticks[0]["id"]
Expand Down
11 changes: 11 additions & 0 deletions python_modules/dagster/dagster/_core/scheduler/instigation.py
Original file line number Diff line number Diff line change
Expand Up @@ -528,6 +528,17 @@ def unsubmitted_run_ids_with_requests(self) -> Sequence[Tuple[str, RunRequest]]:
if run_id in unrequested_run_ids
]

@property
def automation_condition_evaluation_id(self) -> int:
"""Returns a unique identifier for the current automation condition evaluation. In general,
this will be identical to the current tick id, but in cases where an evaluation needs to
be retried, an override value may be set.
"""
if self.tick_data.auto_materialize_evaluation_id is not None:
return self.tick_data.auto_materialize_evaluation_id
else:
return self.tick_id


@whitelist_for_serdes(
old_storage_names={"JobTickData"},
Expand Down
92 changes: 28 additions & 64 deletions python_modules/dagster/dagster/_daemon/asset_daemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -330,10 +330,6 @@ def write(self) -> None:

class AssetDaemon(DagsterDaemon):
def __init__(self, settings: Mapping[str, Any], pre_sensor_interval_seconds: int):
self._initialized_evaluation_id = False
self._evaluation_id_lock = threading.Lock()
self._next_evaluation_id = None

self._pre_sensor_interval_seconds = pre_sensor_interval_seconds
self._last_pre_sensor_submit_time = None

Expand All @@ -360,46 +356,6 @@ def _get_print_sensor_name(self, sensor: Optional[RemoteSensor]) -> str:
)
return f" for {sensor.name} in {repo_name}"

def _initialize_evaluation_id(
self,
instance: DagsterInstance,
):
# Find the largest stored evaluation ID across all auto-materialize cursor
# to initialize the thread-safe evaluation ID counter
with self._evaluation_id_lock:
sensor_states = check.not_none(instance.schedule_storage).all_instigator_state(
instigator_type=InstigatorType.SENSOR
)

self._next_evaluation_id = 0
for sensor_state in sensor_states:
if not (
sensor_state.sensor_instigator_data
and sensor_state.sensor_instigator_data.sensor_type
and sensor_state.sensor_instigator_data.sensor_type.is_handled_by_asset_daemon
):
continue

compressed_cursor = sensor_state.sensor_instigator_data.cursor
if compressed_cursor:
stored_evaluation_id = asset_daemon_cursor_from_instigator_serialized_cursor(
compressed_cursor, None
).evaluation_id
self._next_evaluation_id = max(self._next_evaluation_id, stored_evaluation_id)

stored_cursor = _get_pre_sensor_auto_materialize_cursor(instance, None)
self._next_evaluation_id = max(self._next_evaluation_id, stored_cursor.evaluation_id)

self._initialized_evaluation_id = True

def _get_next_evaluation_id(self):
# Thread-safe way to generate a new evaluation ID across multiple
# workers running asset policy sensors at once
with self._evaluation_id_lock:
check.invariant(self._initialized_evaluation_id)
self._next_evaluation_id = self._next_evaluation_id + 1
return self._next_evaluation_id

def core_loop(
self,
workspace_process_context: IWorkspaceProcessContext,
Expand Down Expand Up @@ -471,8 +427,6 @@ def _run_iteration_impl(

workspace = workspace_process_context.create_request_context()

if not self._initialized_evaluation_id:
self._initialize_evaluation_id(instance)
sensors_and_repos: Sequence[Tuple[Optional[RemoteSensor], Optional[RemoteRepository]]] = []

if use_auto_materialize_sensors:
Expand Down Expand Up @@ -799,18 +753,29 @@ def _process_auto_materialize_tick_generator(

# Determine if the most recent tick requires retrying
retry_tick: Optional[InstigatorTick] = None

override_evaluation_id: Optional[int] = None
if latest_tick:
can_resume = (
get_current_timestamp() - latest_tick.timestamp
) <= MAX_TIME_TO_RESUME_TICK_SECONDS

# the evaluation ids not matching indicates that the tick failed or crashed before
# the cursor could be written, so no new runs could have been launched and it's
# safe to re-evaluate things from scratch in a new tick without retrying anything
previous_cursor_written = (
latest_tick.automation_condition_evaluation_id == stored_cursor.evaluation_id
)

if can_resume and not previous_cursor_written:
# if the tick failed before writing a cursor, we don't want to advance the
# evaluation id yet
override_evaluation_id = latest_tick.automation_condition_evaluation_id

# If the previous tick matches the stored cursor's evaluation ID, check if it failed
# or crashed partway through execution and needs to be resumed
# Don't resume very old ticks though in case the daemon crashed for a long time and
# then restarted
if (
get_current_timestamp() - latest_tick.timestamp
<= MAX_TIME_TO_RESUME_TICK_SECONDS
and latest_tick.tick_data.auto_materialize_evaluation_id
== stored_cursor.evaluation_id
):
if can_resume and previous_cursor_written:
if latest_tick.status == TickStatus.STARTED:
self._logger.warn(
f"Tick for evaluation {stored_cursor.evaluation_id}{print_group_name} was interrupted part-way through, resuming"
Expand All @@ -829,28 +794,25 @@ def _process_auto_materialize_tick_generator(
error=None,
timestamp=evaluation_time.timestamp(),
end_timestamp=None,
),
)._replace(
# make sure to override the evaluation id to stay on the previous value
auto_materialize_evaluation_id=latest_tick.automation_condition_evaluation_id
)
)
# otherwise, tick completed normally, no need to do anything
else:
# (The evaluation IDs not matching indicates that the tick failed or crashed before
# the cursor could be written, so no runs have been launched and it's safe to
# re-evaluate things from scratch in a new tick without retrying anything)
if latest_tick.status == TickStatus.STARTED:
# Old tick that won't be resumed - move it into a SKIPPED state so it isn't
# left dangling in STARTED
self._logger.warn(
f"Moving dangling STARTED tick from evaluation {latest_tick.tick_data.auto_materialize_evaluation_id}{print_group_name} into SKIPPED"
f"Moving dangling STARTED tick from evaluation {latest_tick.automation_condition_evaluation_id}{print_group_name} into SKIPPED"
)
latest_tick = latest_tick.with_status(status=TickStatus.SKIPPED)
instance.update_tick(latest_tick)

if retry_tick:
tick = retry_tick
else:
# Evaluation ID will always be monotonically increasing, but will not always
# be auto-incrementing by 1 once there are multiple AMP evaluations happening in
# parallel
next_evaluation_id = self._get_next_evaluation_id()
tick = instance.create_tick(
TickData(
instigator_origin_id=instigator_origin_id,
Expand All @@ -861,7 +823,9 @@ def _process_auto_materialize_tick_generator(
status=TickStatus.STARTED,
timestamp=evaluation_time.timestamp(),
selector_id=instigator_selector_id,
auto_materialize_evaluation_id=next_evaluation_id,
# we only set the auto_materialize_evaluation_id if it is not equal to the
# current tick id
auto_materialize_evaluation_id=override_evaluation_id,
)
)

Expand Down Expand Up @@ -908,7 +872,7 @@ def _evaluate_auto_materialize_tick(
is_retry: bool,
instigator_state: Optional[InstigatorState],
):
evaluation_id = check.not_none(tick.tick_data.auto_materialize_evaluation_id)
evaluation_id = tick.automation_condition_evaluation_id

instance = workspace_process_context.instance

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
set_auto_materialize_paused,
)
from dagster._serdes.serdes import deserialize_value, serialize_value
from dagster._time import get_current_datetime
from dagster._time import get_current_datetime, get_current_timestamp

from dagster_tests.definitions_tests.declarative_automation_tests.legacy_tests.updated_scenarios.basic_scenarios import (
basic_scenarios,
Expand Down Expand Up @@ -327,7 +327,7 @@ def test_daemon_paused() -> None:
assert ticks[0].status == TickStatus.SUCCESS
assert ticks[0].timestamp == state.current_time.timestamp()
assert ticks[0].tick_data.end_timestamp == state.current_time.timestamp()
assert ticks[0].tick_data.auto_materialize_evaluation_id == 1
assert ticks[0].automation_condition_evaluation_id == 1

state = daemon_scenario.execution_fn(state)
ticks = _get_asset_daemon_ticks(instance)
Expand All @@ -337,7 +337,7 @@ def test_daemon_paused() -> None:
assert ticks[-1].status == TickStatus.SKIPPED
assert ticks[-1].timestamp == state.current_time.timestamp()
assert ticks[-1].tick_data.end_timestamp == state.current_time.timestamp()
assert ticks[-1].tick_data.auto_materialize_evaluation_id == 2
assert ticks[-1].automation_condition_evaluation_id == 2


three_assets = ScenarioSpec(asset_specs=[AssetSpec("A"), AssetSpec("B"), AssetSpec("C")])
Expand Down Expand Up @@ -449,7 +449,19 @@ def test_auto_materialize_sensor_no_transition():
def test_auto_materialize_sensor_transition():
with get_daemon_instance(paused=False) as instance:
# Have been using global AMP, so there is a cursor
pre_sensor_evaluation_id = 12345
pre_sensor_evaluation_id = 4
for _ in range(pre_sensor_evaluation_id):
# create junk ticks so that the next tick id will be 4
instance.create_tick(
TickData(
instigator_origin_id="",
instigator_name="",
instigator_type=InstigatorType.SCHEDULE,
status=TickStatus.SUCCESS,
timestamp=get_current_timestamp(),
run_ids=[],
)
)

assert not get_has_migrated_to_sensors(instance)

Expand Down Expand Up @@ -578,13 +590,15 @@ def test_auto_materialize_sensor_name_transition() -> None:
# skip over the old state for the old name
if sensor_state.instigator_name == "default_auto_materialize_sensor":
continue
# ensure that we're properly accounting for the old cursor information
# we do not account for the old cursor as it is assumed that the current
# tick id will be strictly larger than the current asset daemon cursor
# value in the real world (as each evaluation creates a new tick)
assert (
asset_daemon_cursor_from_instigator_serialized_cursor(
cast(SensorInstigatorData, sensor_state.instigator_data).cursor,
None,
).evaluation_id
> 2
> 0 # real world should be larger
)


Expand All @@ -600,7 +614,19 @@ def test_auto_materialize_sensor_ticks(num_threads):
},
) as instance:
with _get_threadpool_executor(instance) as threadpool_executor:
pre_sensor_evaluation_id = 12345
pre_sensor_evaluation_id = 3
for _ in range(pre_sensor_evaluation_id):
# create junk ticks so that the next tick id will be 4
instance.create_tick(
TickData(
instigator_origin_id="",
instigator_name="",
instigator_type=InstigatorType.SCHEDULE,
status=TickStatus.SUCCESS,
timestamp=get_current_timestamp(),
run_ids=[],
)
)

instance.daemon_cursor_storage.set_cursor_values(
{
Expand Down Expand Up @@ -817,7 +843,7 @@ def test_auto_materialize_sensor_ticks(num_threads):

prev_evaluation_id = None
for tick in ticks:
evaluation_id = tick.tick_data.auto_materialize_evaluation_id
evaluation_id = tick.automation_condition_evaluation_id
assert evaluation_id > pre_sensor_evaluation_id
assert evaluation_id not in seen_evaluation_ids
seen_evaluation_ids.add(evaluation_id)
Expand Down
Loading

0 comments on commit c044439

Please sign in to comment.