Skip to content

Commit

Permalink
Include full docker container state in failure message when a run wor…
Browse files Browse the repository at this point in the history
…ker fails (#24502)

Summary:
This will help diagnose transient issues where the docker api appears to
be incorrectly returning that a container is running when it is not.

Test Plan: New automated test case, run a crashy docker op locally and
check run logs

## Summary & Motivation

## How I Tested These Changes

## Changelog

Insert changelog entry or "NOCHANGELOG" here.

- `NEW` [dagster-docker] Added additional information about the
container to the event log when a run using the `DockerRunLauncher`
fails due to the container unexpectedly exiting.
  • Loading branch information
gibsondan authored Sep 17, 2024
1 parent a795498 commit 3f4f9d5
Show file tree
Hide file tree
Showing 2 changed files with 81 additions and 4 deletions.
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import json
from typing import Any, Mapping, Optional

import dagster._check as check
Expand Down Expand Up @@ -214,11 +215,22 @@ def supports_check_run_worker_health(self):
return True

def check_run_worker_health(self, run: DagsterRun):
container_id = run.tags.get(DOCKER_CONTAINER_ID_TAG)

if not container_id:
return CheckRunHealthResult(WorkerStatus.NOT_FOUND, msg="No container ID tag for run.")

container = self._get_container(run)
if container is None:
return CheckRunHealthResult(WorkerStatus.NOT_FOUND)
return CheckRunHealthResult(
WorkerStatus.NOT_FOUND, msg=f"Could not find container with ID {container_id}."
)
if container.status == "running":
return CheckRunHealthResult(WorkerStatus.RUNNING)
return CheckRunHealthResult(
WorkerStatus.FAILED, msg=f"Container status is {container.status}"

container_state = container.attrs.get("State")
failure_string = f"Container status is {container.status}." + (
f" Container state: {json.dumps(container_state)}" if container_state else ""
)

return CheckRunHealthResult(WorkerStatus.FAILED, msg=failure_string)
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# pylint doesn't know about pytest fixtures


import json
import os
import re
import time
Expand All @@ -10,7 +11,11 @@
from dagster._core.storage.dagster_run import DagsterRunStatus, RunsFilter
from dagster._core.test_utils import environ, poll_for_finished_run, poll_for_step_start
from dagster._utils.yaml_utils import merge_yamls
from dagster_docker.docker_run_launcher import DOCKER_CONTAINER_ID_TAG, DOCKER_IMAGE_TAG
from dagster_docker.docker_run_launcher import (
DOCKER_CONTAINER_ID_TAG,
DOCKER_IMAGE_TAG,
DockerRunLauncher,
)
from dagster_test.test_project import (
ReOriginatedExternalJobForTest,
find_local_test_image,
Expand Down Expand Up @@ -364,6 +369,66 @@ def test_cant_combine_network_and_networks(aws_env):
print(instance.run_launcher) # noqa: T201


from unittest import mock

from dagster._core.launcher.base import WorkerStatus
from dagster._core.test_utils import create_run_for_test, instance_for_test


def test_check_run_health():
mock_container_state = {
"Status": "exited",
"Running": False,
"Paused": False,
"Restarting": False,
"OOMKilled": True,
"Dead": False,
"Pid": 0,
"ExitCode": 1,
"Error": "Out of memory",
"StartedAt": "2024-09-16T12:49:45.539998202Z",
"FinishedAt": "2024-09-16T13:00:00.000000000Z",
}

with instance_for_test(
{
"run_launcher": {
"class": "DockerRunLauncher",
"module": "dagster_docker",
"config": {},
},
}
) as instance, mock.patch("docker.client.from_env") as mock_docker_client_from_env:
mock_docker_client = mock.MagicMock()
mock_docker_client_from_env.return_value = mock_docker_client

mock_container = mock.Mock()
mock_container.attrs = {"State": mock_container_state, "Status": "exited"}
mock_container.status = "exited"

run_launcher = DockerRunLauncher()

mock_containers = mock.MagicMock()
mock_containers.get.return_value = mock_container

# Mock containers.get to return the mock container
mock_docker_client.containers = mock_containers

run = create_run_for_test(
instance,
"test_job",
status=DagsterRunStatus.STARTED,
tags={DOCKER_CONTAINER_ID_TAG: "12345"},
)

health_check = run_launcher.check_run_worker_health(run)
assert health_check.status == WorkerStatus.FAILED
assert (
health_check.msg
== f"Container status is exited. Container state: {json.dumps(mock_container_state)}"
)


@pytest.mark.integration
def test_terminate(aws_env):
docker_image = get_test_project_docker_image()
Expand Down

0 comments on commit 3f4f9d5

Please sign in to comment.