Skip to content

Commit

Permalink
[k8s client] handle terminated init containers (#24485)
Browse files Browse the repository at this point in the history
From #24432 - hitting this
exception
```
           container_status = next(
                 s for s in all_statuses if s.name not in exited_containers | ready_initcontainers
            )
           StopIteration
```	   

in our integration tests (that unfortunately do not run on external
contributions)

Due to not handling init containers that do not continue running and
terminate quickly. Update the code to handle that case as well.

## How I Tested These Changes

added test
  • Loading branch information
alangenfeld authored Sep 13, 2024
1 parent e7c4c96 commit 4cb5897
Show file tree
Hide file tree
Showing 2 changed files with 87 additions and 15 deletions.
18 changes: 9 additions & 9 deletions python_modules/libraries/dagster-k8s/dagster_k8s/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -570,7 +570,7 @@ def wait_for_pod(

# A set of container names that have exited.
exited_containers = set()
ready_initcontainers = set()
ready_containers = set()
ignore_containers = ignore_containers or set()
error_logs = []

Expand Down Expand Up @@ -614,7 +614,7 @@ def wait_for_pod(
all_statuses = []
all_statuses.extend(pod.status.init_container_statuses or [])
all_statuses.extend(pod.status.container_statuses or [])
initcontainer_count = len(pod.status.init_container_statuses or [])
initcontainers = set(s.name for s in (pod.status.init_container_statuses or []))

# Filter out ignored containers
all_statuses = [s for s in all_statuses if s.name not in ignore_containers]
Expand All @@ -626,7 +626,7 @@ def wait_for_pod(
# In case we are waiting for the pod to be ready, we will exit after
# the first container in this list is ready.
container_status = next(
s for s in all_statuses if s.name not in exited_containers | ready_initcontainers
s for s in all_statuses if s.name not in exited_containers | ready_containers
)

# State checks below, see:
Expand All @@ -642,12 +642,8 @@ def wait_for_pod(
self.sleeper(wait_time_between_attempts)
continue
else:
if (
initcontainer_count > 0
and len(ready_initcontainers) < initcontainer_count
):
ready_initcontainers.add(container_status.name)
if len(ready_initcontainers) == initcontainer_count:
ready_containers.add(container_status.name)
if initcontainers.issubset(exited_containers | ready_containers):
self.logger(f'Pod "{pod_name}" is ready, done waiting')
break

Expand Down Expand Up @@ -710,6 +706,10 @@ def wait_for_pod(

self.logger(msg)
error_logs.append(msg)
elif container_name in initcontainers:
self.logger(
f"Init container {container_name} in {pod_name} has exited successfully"
)
else:
self.logger(f"Container {container_name} in {pod_name} has exited successfully")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ def create_timing_out_timer(num_good_ticks):

def assert_logger_calls(mock_logger, log_messages):
assert len(mock_logger.mock_calls) == len(log_messages)

for mock_call, log_message in zip(mock_logger.mock_calls, log_messages):
_name, args, _kwargs = mock_call
assert args[0] == log_message
Expand Down Expand Up @@ -123,6 +122,7 @@ def test_create_job_failure_errors():
mock_client.create_namespaced_job_with_retries(
body=V1Job(metadata=a_job_metadata),
namespace=namespace,
wait_time_between_attempts=0,
)


Expand All @@ -143,6 +143,7 @@ def test_create_job_with_idempotence_api_errors():
mock_client.create_namespaced_job_with_retries(
body=V1Job(metadata=a_job_metadata),
namespace=namespace,
wait_time_between_attempts=0,
)


Expand All @@ -164,7 +165,7 @@ def test_wait_for_job_success_with_api_errors():
completed_job,
]

mock_client.wait_for_job_success(job_name, namespace)
mock_client.wait_for_job_success(job_name, namespace, wait_time_between_attempts=0)

# logger should not have been called
assert not mock_client.logger.mock_calls
Expand Down Expand Up @@ -195,7 +196,11 @@ def test_wait_for_job_success_with_api_errors_retry_limit_exceeded():
]

with pytest.raises(DagsterK8sAPIRetryLimitExceeded):
mock_client.wait_for_job_success("a_job", "a_namespace")
mock_client.wait_for_job_success(
"a_job",
"a_namespace",
wait_time_between_attempts=0,
)

# logger should not have been called
assert not mock_client.logger.mock_calls
Expand All @@ -222,7 +227,7 @@ def test_wait_for_job_success_with_unrecoverable_api_errors():
]

with pytest.raises(DagsterK8sUnrecoverableAPIError) as exc_info:
mock_client.wait_for_job_success("a_job", "a_namespace")
mock_client.wait_for_job_success("a_job", "a_namespace", wait_time_between_attempts=0)

assert "Unexpected error encountered in Kubernetes API Client." in str(exc_info.value)

Expand Down Expand Up @@ -289,7 +294,7 @@ def test_wait_for_job_with_api_errors():
completed_job = V1Job(metadata=a_job_metadata, status=V1JobStatus(failed=0, succeeded=1))
mock_client.batch_api.read_namespaced_job_status.side_effect = [completed_job]

mock_client.wait_for_job_success(job_name, namespace)
mock_client.wait_for_job_success(job_name, namespace, wait_time_between_attempts=0)

# 2 attempts with errors + 1 not launched + 1 launched
assert len(mock_client.batch_api.list_namespaced_job.mock_calls) == 4
Expand Down Expand Up @@ -317,7 +322,7 @@ def test_wait_for_job_with_api_errors_retry_limit_exceeded():
mock_client.batch_api.read_namespaced_job_status.side_effect = [completed_job]

with pytest.raises(DagsterK8sAPIRetryLimitExceeded):
mock_client.wait_for_job_success("a_job", "a_namespace")
mock_client.wait_for_job_success("a_job", "a_namespace", wait_time_between_attempts=0)

# 4 attempts with errors
assert len(mock_client.batch_api.list_namespaced_job.mock_calls) == 4
Expand Down Expand Up @@ -957,6 +962,73 @@ def test_wait_for_pod_initialize_with_multiple_init_containers_backwards():
)


# init containers may terminate quickly, so a ready state is never observed
def test_wait_for_pod_initialize_with_fast_init_containers():
mock_client = create_mocked_client(timer=create_timing_out_timer(num_good_ticks=5))
waiting_container_status = _create_status(
state=V1ContainerState(
waiting=V1ContainerStateWaiting(reason=KubernetesWaitingReasons.PodInitializing)
),
ready=False,
name="main",
)
waiting_initcontainer_slow_status = _create_status(
name="init_slow",
state=V1ContainerState(
waiting=V1ContainerStateWaiting(reason=KubernetesWaitingReasons.PodInitializing)
),
ready=False,
)
waiting_initcontainer_fast_status = _create_status(
name="init_fast",
state=V1ContainerState(
waiting=V1ContainerStateWaiting(reason=KubernetesWaitingReasons.PodInitializing)
),
ready=False,
)
terminated_initcontainer_fast_status = _create_status(
name="init_fast",
ready=False,
state=V1ContainerState(terminated=V1ContainerStateTerminated(exit_code=0)),
)
ready_initcontainer_slow_status = _ready_running_status(name="init_slow")

two_waiting_inits_pod = _pod_list_for_container_status(
waiting_container_status,
init_container_statuses=[
waiting_initcontainer_slow_status,
waiting_initcontainer_fast_status,
],
)
term_and_ready_waiting_pod = _pod_list_for_container_status(
waiting_container_status,
init_container_statuses=[
terminated_initcontainer_fast_status,
ready_initcontainer_slow_status,
],
)

mock_client.core_api.list_namespaced_pod.side_effect = [
two_waiting_inits_pod,
two_waiting_inits_pod,
term_and_ready_waiting_pod,
term_and_ready_waiting_pod,
]

pod_name = "a_pod"
mock_client.wait_for_pod(pod_name=pod_name, namespace="namespace")

assert_logger_calls(
mock_client.logger,
[
f'Waiting for pod "{pod_name}"',
f'Waiting for pod "{pod_name}" to initialize...',
"Init container init_fast in a_pod has exited successfully",
f'Pod "{pod_name}" is ready, done waiting',
],
)


def test_waiting_for_pod_container_creation():
mock_client = create_mocked_client(timer=create_timing_out_timer(num_good_ticks=3))
single_waiting_pod = _pod_list_for_container_status(
Expand Down

0 comments on commit 4cb5897

Please sign in to comment.