From 4cb5897632ad2d1d6de308e378cc75bd6da0e131 Mon Sep 17 00:00:00 2001 From: Alex Langenfeld Date: Fri, 13 Sep 2024 16:15:29 -0500 Subject: [PATCH] [k8s client] handle terminated init containers (#24485) From https://github.com/dagster-io/dagster/pull/24432 - hitting this exception ``` container_status = next( s for s in all_statuses if s.name not in exited_containers | ready_initcontainers ) StopIteration ``` in our integration tests (that unfortunately do not run on external contributions) Due to not handling init containers that do not continue running and terminate quickly. Update the code to handle that case as well. ## How I Tested These Changes added test --- .../dagster-k8s/dagster_k8s/client.py | 18 ++-- .../unit_tests/test_client.py | 84 +++++++++++++++++-- 2 files changed, 87 insertions(+), 15 deletions(-) diff --git a/python_modules/libraries/dagster-k8s/dagster_k8s/client.py b/python_modules/libraries/dagster-k8s/dagster_k8s/client.py index a4f9d7b6a9612..ee8a93c012ac6 100644 --- a/python_modules/libraries/dagster-k8s/dagster_k8s/client.py +++ b/python_modules/libraries/dagster-k8s/dagster_k8s/client.py @@ -570,7 +570,7 @@ def wait_for_pod( # A set of container names that have exited. exited_containers = set() - ready_initcontainers = set() + ready_containers = set() ignore_containers = ignore_containers or set() error_logs = [] @@ -614,7 +614,7 @@ def wait_for_pod( all_statuses = [] all_statuses.extend(pod.status.init_container_statuses or []) all_statuses.extend(pod.status.container_statuses or []) - initcontainer_count = len(pod.status.init_container_statuses or []) + initcontainers = set(s.name for s in (pod.status.init_container_statuses or [])) # Filter out ignored containers all_statuses = [s for s in all_statuses if s.name not in ignore_containers] @@ -626,7 +626,7 @@ def wait_for_pod( # In case we are waiting for the pod to be ready, we will exit after # the first container in this list is ready. container_status = next( - s for s in all_statuses if s.name not in exited_containers | ready_initcontainers + s for s in all_statuses if s.name not in exited_containers | ready_containers ) # State checks below, see: @@ -642,12 +642,8 @@ def wait_for_pod( self.sleeper(wait_time_between_attempts) continue else: - if ( - initcontainer_count > 0 - and len(ready_initcontainers) < initcontainer_count - ): - ready_initcontainers.add(container_status.name) - if len(ready_initcontainers) == initcontainer_count: + ready_containers.add(container_status.name) + if initcontainers.issubset(exited_containers | ready_containers): self.logger(f'Pod "{pod_name}" is ready, done waiting') break @@ -710,6 +706,10 @@ def wait_for_pod( self.logger(msg) error_logs.append(msg) + elif container_name in initcontainers: + self.logger( + f"Init container {container_name} in {pod_name} has exited successfully" + ) else: self.logger(f"Container {container_name} in {pod_name} has exited successfully") diff --git a/python_modules/libraries/dagster-k8s/dagster_k8s_tests/unit_tests/test_client.py b/python_modules/libraries/dagster-k8s/dagster_k8s_tests/unit_tests/test_client.py index bdbcf545abb46..ad754885590bf 100644 --- a/python_modules/libraries/dagster-k8s/dagster_k8s_tests/unit_tests/test_client.py +++ b/python_modules/libraries/dagster-k8s/dagster_k8s_tests/unit_tests/test_client.py @@ -58,7 +58,6 @@ def create_timing_out_timer(num_good_ticks): def assert_logger_calls(mock_logger, log_messages): assert len(mock_logger.mock_calls) == len(log_messages) - for mock_call, log_message in zip(mock_logger.mock_calls, log_messages): _name, args, _kwargs = mock_call assert args[0] == log_message @@ -123,6 +122,7 @@ def test_create_job_failure_errors(): mock_client.create_namespaced_job_with_retries( body=V1Job(metadata=a_job_metadata), namespace=namespace, + wait_time_between_attempts=0, ) @@ -143,6 +143,7 @@ def test_create_job_with_idempotence_api_errors(): mock_client.create_namespaced_job_with_retries( body=V1Job(metadata=a_job_metadata), namespace=namespace, + wait_time_between_attempts=0, ) @@ -164,7 +165,7 @@ def test_wait_for_job_success_with_api_errors(): completed_job, ] - mock_client.wait_for_job_success(job_name, namespace) + mock_client.wait_for_job_success(job_name, namespace, wait_time_between_attempts=0) # logger should not have been called assert not mock_client.logger.mock_calls @@ -195,7 +196,11 @@ def test_wait_for_job_success_with_api_errors_retry_limit_exceeded(): ] with pytest.raises(DagsterK8sAPIRetryLimitExceeded): - mock_client.wait_for_job_success("a_job", "a_namespace") + mock_client.wait_for_job_success( + "a_job", + "a_namespace", + wait_time_between_attempts=0, + ) # logger should not have been called assert not mock_client.logger.mock_calls @@ -222,7 +227,7 @@ def test_wait_for_job_success_with_unrecoverable_api_errors(): ] with pytest.raises(DagsterK8sUnrecoverableAPIError) as exc_info: - mock_client.wait_for_job_success("a_job", "a_namespace") + mock_client.wait_for_job_success("a_job", "a_namespace", wait_time_between_attempts=0) assert "Unexpected error encountered in Kubernetes API Client." in str(exc_info.value) @@ -289,7 +294,7 @@ def test_wait_for_job_with_api_errors(): completed_job = V1Job(metadata=a_job_metadata, status=V1JobStatus(failed=0, succeeded=1)) mock_client.batch_api.read_namespaced_job_status.side_effect = [completed_job] - mock_client.wait_for_job_success(job_name, namespace) + mock_client.wait_for_job_success(job_name, namespace, wait_time_between_attempts=0) # 2 attempts with errors + 1 not launched + 1 launched assert len(mock_client.batch_api.list_namespaced_job.mock_calls) == 4 @@ -317,7 +322,7 @@ def test_wait_for_job_with_api_errors_retry_limit_exceeded(): mock_client.batch_api.read_namespaced_job_status.side_effect = [completed_job] with pytest.raises(DagsterK8sAPIRetryLimitExceeded): - mock_client.wait_for_job_success("a_job", "a_namespace") + mock_client.wait_for_job_success("a_job", "a_namespace", wait_time_between_attempts=0) # 4 attempts with errors assert len(mock_client.batch_api.list_namespaced_job.mock_calls) == 4 @@ -957,6 +962,73 @@ def test_wait_for_pod_initialize_with_multiple_init_containers_backwards(): ) +# init containers may terminate quickly, so a ready state is never observed +def test_wait_for_pod_initialize_with_fast_init_containers(): + mock_client = create_mocked_client(timer=create_timing_out_timer(num_good_ticks=5)) + waiting_container_status = _create_status( + state=V1ContainerState( + waiting=V1ContainerStateWaiting(reason=KubernetesWaitingReasons.PodInitializing) + ), + ready=False, + name="main", + ) + waiting_initcontainer_slow_status = _create_status( + name="init_slow", + state=V1ContainerState( + waiting=V1ContainerStateWaiting(reason=KubernetesWaitingReasons.PodInitializing) + ), + ready=False, + ) + waiting_initcontainer_fast_status = _create_status( + name="init_fast", + state=V1ContainerState( + waiting=V1ContainerStateWaiting(reason=KubernetesWaitingReasons.PodInitializing) + ), + ready=False, + ) + terminated_initcontainer_fast_status = _create_status( + name="init_fast", + ready=False, + state=V1ContainerState(terminated=V1ContainerStateTerminated(exit_code=0)), + ) + ready_initcontainer_slow_status = _ready_running_status(name="init_slow") + + two_waiting_inits_pod = _pod_list_for_container_status( + waiting_container_status, + init_container_statuses=[ + waiting_initcontainer_slow_status, + waiting_initcontainer_fast_status, + ], + ) + term_and_ready_waiting_pod = _pod_list_for_container_status( + waiting_container_status, + init_container_statuses=[ + terminated_initcontainer_fast_status, + ready_initcontainer_slow_status, + ], + ) + + mock_client.core_api.list_namespaced_pod.side_effect = [ + two_waiting_inits_pod, + two_waiting_inits_pod, + term_and_ready_waiting_pod, + term_and_ready_waiting_pod, + ] + + pod_name = "a_pod" + mock_client.wait_for_pod(pod_name=pod_name, namespace="namespace") + + assert_logger_calls( + mock_client.logger, + [ + f'Waiting for pod "{pod_name}"', + f'Waiting for pod "{pod_name}" to initialize...', + "Init container init_fast in a_pod has exited successfully", + f'Pod "{pod_name}" is ready, done waiting', + ], + ) + + def test_waiting_for_pod_container_creation(): mock_client = create_mocked_client(timer=create_timing_out_timer(num_good_ticks=3)) single_waiting_pod = _pod_list_for_container_status(