From 2953fa1224569e26f9b573a8b731f0d0339d6f7f Mon Sep 17 00:00:00 2001 From: jobicarter Date: Wed, 26 Jun 2024 07:10:10 -0600 Subject: [PATCH] feat: time out STARTING jobs with new START_TIMEOUT RunFailureReason (#22684) ## Summary & Motivation Adds a new `START_TIMEOUT` RunFailureReason, and updates monitoring to fail jobs that time out with this reason. Enables additional retry behaviors per discussion in https://github.com/dagster-io/dagster/pull/22635 ## How I Tested These Changes Tested in ECS, any existing integration tests for `report_run_failed` or `run_monitoring` should hopefully cover. Not sure how we'd simulate the time calculation for adding integration tests. --------- Co-authored-by: Jobi Carter --- python_modules/dagster/dagster/_core/events/__init__.py | 1 + python_modules/dagster/dagster/_core/instance/__init__.py | 7 ++++++- .../dagster/dagster/_daemon/monitoring/run_monitoring.py | 6 ++++-- .../dagster_tests/daemon_tests/test_monitoring_daemon.py | 5 +++-- 4 files changed, 14 insertions(+), 5 deletions(-) diff --git a/python_modules/dagster/dagster/_core/events/__init__.py b/python_modules/dagster/dagster/_core/events/__init__.py index ca107f00710a0..f09e4977b7110 100644 --- a/python_modules/dagster/dagster/_core/events/__init__.py +++ b/python_modules/dagster/dagster/_core/events/__init__.py @@ -271,6 +271,7 @@ class RunFailureReason(Enum): RUN_EXCEPTION = "RUN_EXCEPTION" STEP_FAILURE = "STEP_FAILURE" JOB_INITIALIZATION_FAILURE = "JOB_INITIALIZATION_FAILURE" + START_TIMEOUT = "START_TIMEOUT" UNKNOWN = "UNKNOWN" diff --git a/python_modules/dagster/dagster/_core/instance/__init__.py b/python_modules/dagster/dagster/_core/instance/__init__.py index 6ee516d165418..16b58d7684c3e 100644 --- a/python_modules/dagster/dagster/_core/instance/__init__.py +++ b/python_modules/dagster/dagster/_core/instance/__init__.py @@ -119,6 +119,7 @@ DagsterEventBatchMetadata, DagsterEventType, EngineEventData, + JobFailureData, ) from dagster._core.events.log import EventLogEntry from dagster._core.execution.backfill import BulkActionStatus, PartitionBackfill @@ -2541,7 +2542,10 @@ def report_run_canceled( return dagster_event def report_run_failed( - self, dagster_run: DagsterRun, message: Optional[str] = None + self, + dagster_run: DagsterRun, + message: Optional[str] = None, + job_failure_data: Optional["JobFailureData"] = None, ) -> "DagsterEvent": from dagster._core.events import DagsterEvent, DagsterEventType @@ -2557,6 +2561,7 @@ def report_run_failed( event_type_value=DagsterEventType.PIPELINE_FAILURE.value, job_name=dagster_run.job_name, message=message, + event_specific_data=job_failure_data, ) self.report_dagster_event(dagster_event, run_id=dagster_run.run_id, log_level=logging.ERROR) return dagster_event diff --git a/python_modules/dagster/dagster/_daemon/monitoring/run_monitoring.py b/python_modules/dagster/dagster/_daemon/monitoring/run_monitoring.py index b0ebd9a8b8bee..ebf74f4544f9a 100644 --- a/python_modules/dagster/dagster/_daemon/monitoring/run_monitoring.py +++ b/python_modules/dagster/dagster/_daemon/monitoring/run_monitoring.py @@ -9,7 +9,7 @@ DagsterInstance, _check as check, ) -from dagster._core.events import DagsterEventType, EngineEventData +from dagster._core.events import DagsterEventType, EngineEventData, JobFailureData, RunFailureReason from dagster._core.launcher import WorkerStatus from dagster._core.storage.dagster_run import ( IN_PROGRESS_RUN_STATUSES, @@ -53,7 +53,9 @@ def monitor_starting_run( logger.info(msg) - instance.report_run_failed(run, msg) + instance.report_run_failed( + run, msg, JobFailureData(error=None, failure_reason=RunFailureReason.START_TIMEOUT) + ) def monitor_canceling_run( diff --git a/python_modules/dagster/dagster_tests/daemon_tests/test_monitoring_daemon.py b/python_modules/dagster/dagster_tests/daemon_tests/test_monitoring_daemon.py index 050270dbe9438..613853f35cd49 100644 --- a/python_modules/dagster/dagster_tests/daemon_tests/test_monitoring_daemon.py +++ b/python_modules/dagster/dagster_tests/daemon_tests/test_monitoring_daemon.py @@ -7,12 +7,12 @@ import dagster._check as check import pytest -from dagster._core.events import DagsterEvent, DagsterEventType +from dagster._core.events import DagsterEvent, DagsterEventType, RunFailureReason from dagster._core.events.log import EventLogEntry from dagster._core.instance import DagsterInstance from dagster._core.launcher import CheckRunHealthResult, RunLauncher, WorkerStatus from dagster._core.storage.dagster_run import DagsterRun, DagsterRunStatus -from dagster._core.storage.tags import MAX_RUNTIME_SECONDS_TAG +from dagster._core.storage.tags import MAX_RUNTIME_SECONDS_TAG, RUN_FAILURE_REASON_TAG from dagster._core.test_utils import ( create_run_for_test, create_test_daemon_workspace_context, @@ -201,6 +201,7 @@ def test_monitor_starting(instance: DagsterInstance, logger: Logger): run = instance.get_run_by_id(run.run_id) assert run assert run.status == DagsterRunStatus.FAILURE + assert run.tags[RUN_FAILURE_REASON_TAG] == RunFailureReason.START_TIMEOUT.value def test_monitor_canceling(instance: DagsterInstance, logger: Logger):