Skip to content

Commit

Permalink
feat: time out STARTING jobs with new START_TIMEOUT RunFailureReason (#…
Browse files Browse the repository at this point in the history
…22684)

## Summary & Motivation
Adds a new `START_TIMEOUT` RunFailureReason, and updates monitoring to
fail jobs that time out with this reason.

Enables additional retry behaviors per discussion in
#22635

## How I Tested These Changes
Tested in ECS, any existing integration tests for `report_run_failed` or
`run_monitoring` should hopefully cover. Not sure how we'd simulate the
time calculation for adding integration tests.

---------

Co-authored-by: Jobi Carter <[email protected]>
  • Loading branch information
jobicarter and Jobi Carter authored Jun 26, 2024
1 parent 17d860c commit 2953fa1
Show file tree
Hide file tree
Showing 4 changed files with 14 additions and 5 deletions.
1 change: 1 addition & 0 deletions python_modules/dagster/dagster/_core/events/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,7 @@ class RunFailureReason(Enum):
RUN_EXCEPTION = "RUN_EXCEPTION"
STEP_FAILURE = "STEP_FAILURE"
JOB_INITIALIZATION_FAILURE = "JOB_INITIALIZATION_FAILURE"
START_TIMEOUT = "START_TIMEOUT"
UNKNOWN = "UNKNOWN"


Expand Down
7 changes: 6 additions & 1 deletion python_modules/dagster/dagster/_core/instance/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@
DagsterEventBatchMetadata,
DagsterEventType,
EngineEventData,
JobFailureData,
)
from dagster._core.events.log import EventLogEntry
from dagster._core.execution.backfill import BulkActionStatus, PartitionBackfill
Expand Down Expand Up @@ -2541,7 +2542,10 @@ def report_run_canceled(
return dagster_event

def report_run_failed(
self, dagster_run: DagsterRun, message: Optional[str] = None
self,
dagster_run: DagsterRun,
message: Optional[str] = None,
job_failure_data: Optional["JobFailureData"] = None,
) -> "DagsterEvent":
from dagster._core.events import DagsterEvent, DagsterEventType

Expand All @@ -2557,6 +2561,7 @@ def report_run_failed(
event_type_value=DagsterEventType.PIPELINE_FAILURE.value,
job_name=dagster_run.job_name,
message=message,
event_specific_data=job_failure_data,
)
self.report_dagster_event(dagster_event, run_id=dagster_run.run_id, log_level=logging.ERROR)
return dagster_event
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
DagsterInstance,
_check as check,
)
from dagster._core.events import DagsterEventType, EngineEventData
from dagster._core.events import DagsterEventType, EngineEventData, JobFailureData, RunFailureReason
from dagster._core.launcher import WorkerStatus
from dagster._core.storage.dagster_run import (
IN_PROGRESS_RUN_STATUSES,
Expand Down Expand Up @@ -53,7 +53,9 @@ def monitor_starting_run(

logger.info(msg)

instance.report_run_failed(run, msg)
instance.report_run_failed(
run, msg, JobFailureData(error=None, failure_reason=RunFailureReason.START_TIMEOUT)
)


def monitor_canceling_run(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,12 @@

import dagster._check as check
import pytest
from dagster._core.events import DagsterEvent, DagsterEventType
from dagster._core.events import DagsterEvent, DagsterEventType, RunFailureReason
from dagster._core.events.log import EventLogEntry
from dagster._core.instance import DagsterInstance
from dagster._core.launcher import CheckRunHealthResult, RunLauncher, WorkerStatus
from dagster._core.storage.dagster_run import DagsterRun, DagsterRunStatus
from dagster._core.storage.tags import MAX_RUNTIME_SECONDS_TAG
from dagster._core.storage.tags import MAX_RUNTIME_SECONDS_TAG, RUN_FAILURE_REASON_TAG
from dagster._core.test_utils import (
create_run_for_test,
create_test_daemon_workspace_context,
Expand Down Expand Up @@ -201,6 +201,7 @@ def test_monitor_starting(instance: DagsterInstance, logger: Logger):
run = instance.get_run_by_id(run.run_id)
assert run
assert run.status == DagsterRunStatus.FAILURE
assert run.tags[RUN_FAILURE_REASON_TAG] == RunFailureReason.START_TIMEOUT.value


def test_monitor_canceling(instance: DagsterInstance, logger: Logger):
Expand Down

0 comments on commit 2953fa1

Please sign in to comment.