diff --git a/python_modules/libraries/dagster-k8s/dagster_k8s/client.py b/python_modules/libraries/dagster-k8s/dagster_k8s/client.py index a4f9d7b6a9612..ee8a93c012ac6 100644 --- a/python_modules/libraries/dagster-k8s/dagster_k8s/client.py +++ b/python_modules/libraries/dagster-k8s/dagster_k8s/client.py @@ -570,7 +570,7 @@ def wait_for_pod( # A set of container names that have exited. exited_containers = set() - ready_initcontainers = set() + ready_containers = set() ignore_containers = ignore_containers or set() error_logs = [] @@ -614,7 +614,7 @@ def wait_for_pod( all_statuses = [] all_statuses.extend(pod.status.init_container_statuses or []) all_statuses.extend(pod.status.container_statuses or []) - initcontainer_count = len(pod.status.init_container_statuses or []) + initcontainers = set(s.name for s in (pod.status.init_container_statuses or [])) # Filter out ignored containers all_statuses = [s for s in all_statuses if s.name not in ignore_containers] @@ -626,7 +626,7 @@ def wait_for_pod( # In case we are waiting for the pod to be ready, we will exit after # the first container in this list is ready. container_status = next( - s for s in all_statuses if s.name not in exited_containers | ready_initcontainers + s for s in all_statuses if s.name not in exited_containers | ready_containers ) # State checks below, see: @@ -642,12 +642,8 @@ def wait_for_pod( self.sleeper(wait_time_between_attempts) continue else: - if ( - initcontainer_count > 0 - and len(ready_initcontainers) < initcontainer_count - ): - ready_initcontainers.add(container_status.name) - if len(ready_initcontainers) == initcontainer_count: + ready_containers.add(container_status.name) + if initcontainers.issubset(exited_containers | ready_containers): self.logger(f'Pod "{pod_name}" is ready, done waiting') break @@ -710,6 +706,10 @@ def wait_for_pod( self.logger(msg) error_logs.append(msg) + elif container_name in initcontainers: + self.logger( + f"Init container {container_name} in {pod_name} has exited successfully" + ) else: self.logger(f"Container {container_name} in {pod_name} has exited successfully") diff --git a/python_modules/libraries/dagster-k8s/dagster_k8s_tests/unit_tests/test_client.py b/python_modules/libraries/dagster-k8s/dagster_k8s_tests/unit_tests/test_client.py index bdbcf545abb46..ad754885590bf 100644 --- a/python_modules/libraries/dagster-k8s/dagster_k8s_tests/unit_tests/test_client.py +++ b/python_modules/libraries/dagster-k8s/dagster_k8s_tests/unit_tests/test_client.py @@ -58,7 +58,6 @@ def create_timing_out_timer(num_good_ticks): def assert_logger_calls(mock_logger, log_messages): assert len(mock_logger.mock_calls) == len(log_messages) - for mock_call, log_message in zip(mock_logger.mock_calls, log_messages): _name, args, _kwargs = mock_call assert args[0] == log_message @@ -123,6 +122,7 @@ def test_create_job_failure_errors(): mock_client.create_namespaced_job_with_retries( body=V1Job(metadata=a_job_metadata), namespace=namespace, + wait_time_between_attempts=0, ) @@ -143,6 +143,7 @@ def test_create_job_with_idempotence_api_errors(): mock_client.create_namespaced_job_with_retries( body=V1Job(metadata=a_job_metadata), namespace=namespace, + wait_time_between_attempts=0, ) @@ -164,7 +165,7 @@ def test_wait_for_job_success_with_api_errors(): completed_job, ] - mock_client.wait_for_job_success(job_name, namespace) + mock_client.wait_for_job_success(job_name, namespace, wait_time_between_attempts=0) # logger should not have been called assert not mock_client.logger.mock_calls @@ -195,7 +196,11 @@ def test_wait_for_job_success_with_api_errors_retry_limit_exceeded(): ] with pytest.raises(DagsterK8sAPIRetryLimitExceeded): - mock_client.wait_for_job_success("a_job", "a_namespace") + mock_client.wait_for_job_success( + "a_job", + "a_namespace", + wait_time_between_attempts=0, + ) # logger should not have been called assert not mock_client.logger.mock_calls @@ -222,7 +227,7 @@ def test_wait_for_job_success_with_unrecoverable_api_errors(): ] with pytest.raises(DagsterK8sUnrecoverableAPIError) as exc_info: - mock_client.wait_for_job_success("a_job", "a_namespace") + mock_client.wait_for_job_success("a_job", "a_namespace", wait_time_between_attempts=0) assert "Unexpected error encountered in Kubernetes API Client." in str(exc_info.value) @@ -289,7 +294,7 @@ def test_wait_for_job_with_api_errors(): completed_job = V1Job(metadata=a_job_metadata, status=V1JobStatus(failed=0, succeeded=1)) mock_client.batch_api.read_namespaced_job_status.side_effect = [completed_job] - mock_client.wait_for_job_success(job_name, namespace) + mock_client.wait_for_job_success(job_name, namespace, wait_time_between_attempts=0) # 2 attempts with errors + 1 not launched + 1 launched assert len(mock_client.batch_api.list_namespaced_job.mock_calls) == 4 @@ -317,7 +322,7 @@ def test_wait_for_job_with_api_errors_retry_limit_exceeded(): mock_client.batch_api.read_namespaced_job_status.side_effect = [completed_job] with pytest.raises(DagsterK8sAPIRetryLimitExceeded): - mock_client.wait_for_job_success("a_job", "a_namespace") + mock_client.wait_for_job_success("a_job", "a_namespace", wait_time_between_attempts=0) # 4 attempts with errors assert len(mock_client.batch_api.list_namespaced_job.mock_calls) == 4 @@ -957,6 +962,73 @@ def test_wait_for_pod_initialize_with_multiple_init_containers_backwards(): ) +# init containers may terminate quickly, so a ready state is never observed +def test_wait_for_pod_initialize_with_fast_init_containers(): + mock_client = create_mocked_client(timer=create_timing_out_timer(num_good_ticks=5)) + waiting_container_status = _create_status( + state=V1ContainerState( + waiting=V1ContainerStateWaiting(reason=KubernetesWaitingReasons.PodInitializing) + ), + ready=False, + name="main", + ) + waiting_initcontainer_slow_status = _create_status( + name="init_slow", + state=V1ContainerState( + waiting=V1ContainerStateWaiting(reason=KubernetesWaitingReasons.PodInitializing) + ), + ready=False, + ) + waiting_initcontainer_fast_status = _create_status( + name="init_fast", + state=V1ContainerState( + waiting=V1ContainerStateWaiting(reason=KubernetesWaitingReasons.PodInitializing) + ), + ready=False, + ) + terminated_initcontainer_fast_status = _create_status( + name="init_fast", + ready=False, + state=V1ContainerState(terminated=V1ContainerStateTerminated(exit_code=0)), + ) + ready_initcontainer_slow_status = _ready_running_status(name="init_slow") + + two_waiting_inits_pod = _pod_list_for_container_status( + waiting_container_status, + init_container_statuses=[ + waiting_initcontainer_slow_status, + waiting_initcontainer_fast_status, + ], + ) + term_and_ready_waiting_pod = _pod_list_for_container_status( + waiting_container_status, + init_container_statuses=[ + terminated_initcontainer_fast_status, + ready_initcontainer_slow_status, + ], + ) + + mock_client.core_api.list_namespaced_pod.side_effect = [ + two_waiting_inits_pod, + two_waiting_inits_pod, + term_and_ready_waiting_pod, + term_and_ready_waiting_pod, + ] + + pod_name = "a_pod" + mock_client.wait_for_pod(pod_name=pod_name, namespace="namespace") + + assert_logger_calls( + mock_client.logger, + [ + f'Waiting for pod "{pod_name}"', + f'Waiting for pod "{pod_name}" to initialize...', + "Init container init_fast in a_pod has exited successfully", + f'Pod "{pod_name}" is ready, done waiting', + ], + ) + + def test_waiting_for_pod_container_creation(): mock_client = create_mocked_client(timer=create_timing_out_timer(num_good_ticks=3)) single_waiting_pod = _pod_list_for_container_status(