diff --git a/python_modules/dagster-test/dagster_test/test_project/test_jobs/repo.py b/python_modules/dagster-test/dagster_test/test_project/test_jobs/repo.py index 4307a3c237fe2..9440bee4d18c8 100644 --- a/python_modules/dagster-test/dagster_test/test_project/test_jobs/repo.py +++ b/python_modules/dagster-test/dagster_test/test_project/test_jobs/repo.py @@ -31,6 +31,7 @@ from dagster._core.definitions.job_definition import JobDefinition from dagster._core.definitions.output import Out from dagster._core.definitions.resource_definition import ResourceDefinition +from dagster._core.errors import DagsterExecutionInterruptedError from dagster._core.execution.plan.objects import StepSuccessData from dagster._core.test_utils import nesting_graph, poll_for_step_start from dagster._utils import segfault @@ -175,10 +176,18 @@ def demo_slow_graph(): count_letters(multiply_the_word_slow()) -@op -def hanging_op(_): - while True: - time.sleep(0.1) +@op(config_schema={"cleanup_delay": Field(int, is_required=False)}) +def hanging_op(context): + try: + while True: + time.sleep(0.1) + except DagsterExecutionInterruptedError: + cleanup_delay = context.op_config.get("cleanup_delay") + if cleanup_delay: + context.log.info(f"Delaying cleanup for {cleanup_delay} seconds") + time.sleep(cleanup_delay) + context.log.info("Done cleaning up") + raise @op(config_schema={"looking_for": str}) diff --git a/python_modules/libraries/dagster-docker/dagster_docker/docker_executor.py b/python_modules/libraries/dagster-docker/dagster_docker/docker_executor.py index 608ede06330f7..ab60303b0f8a7 100644 --- a/python_modules/libraries/dagster-docker/dagster_docker/docker_executor.py +++ b/python_modules/libraries/dagster-docker/dagster_docker/docker_executor.py @@ -203,6 +203,12 @@ def _create_step_container( assert len(step_keys_to_execute) == 1, "Launching multiple steps is not currently supported" step_key = step_keys_to_execute[0] + container_kwargs = {**container_context.container_kwargs} + if "stop_timeout" in container_kwargs: + # This should work, but does not due to https://github.com/docker/docker-py/issues/3168 + # Pull it out and apply it in the terminate() method instead + del container_kwargs["stop_timeout"] + env_vars = dict([parse_env_var(env_var) for env_var in container_context.env_vars]) env_vars["DAGSTER_RUN_JOB_NAME"] = step_handler_context.dagster_run.job_name env_vars["DAGSTER_RUN_STEP_KEY"] = step_key @@ -213,7 +219,7 @@ def _create_step_container( network=container_context.networks[0] if len(container_context.networks) else None, command=execute_step_args.get_command_args(), environment=env_vars, - **container_context.container_kwargs, + **container_kwargs, ) def launch_step(self, step_handler_context: StepHandlerContext) -> Iterator[DagsterEvent]: @@ -311,4 +317,6 @@ def terminate_step(self, step_handler_context: StepHandlerContext) -> Iterator[D container = client.containers.get(container_name) - container.stop() + stop_timeout = container_context.container_kwargs.get("stop_timeout") + + container.stop(timeout=stop_timeout) diff --git a/python_modules/libraries/dagster-docker/dagster_docker/docker_run_launcher.py b/python_modules/libraries/dagster-docker/dagster_docker/docker_run_launcher.py index 64630843fb12f..ad2b5ca3f822b 100644 --- a/python_modules/libraries/dagster-docker/dagster_docker/docker_run_launcher.py +++ b/python_modules/libraries/dagster-docker/dagster_docker/docker_run_launcher.py @@ -105,6 +105,12 @@ def _launch_container_with_command(self, run, docker_image, command): container_kwargs = {**container_context.container_kwargs} labels = container_kwargs.pop("labels", {}) + + if "stop_timeout" in container_kwargs: + # This should work, but does not due to https://github.com/docker/docker-py/issues/3168 + # Pull it out and apply it in the terminate() method instead + del container_kwargs["stop_timeout"] + if isinstance(labels, list): labels = {key: "" for key in labels} @@ -208,6 +214,9 @@ def terminate(self, run_id): container = self._get_container(run) + container_context = self.get_container_context(run) + stop_timeout = container_context.container_kwargs.get("stop_timeout") + if not container: self._instance.report_engine_event( message="Unable to get docker container to send termination request to.", @@ -216,7 +225,7 @@ def terminate(self, run_id): ) return False - container.stop() + container.stop(timeout=stop_timeout) return True diff --git a/python_modules/libraries/dagster-docker/dagster_docker_tests/test_launch_docker.py b/python_modules/libraries/dagster-docker/dagster_docker_tests/test_launch_docker.py index c5c780beef93d..d73208450dd98 100644 --- a/python_modules/libraries/dagster-docker/dagster_docker_tests/test_launch_docker.py +++ b/python_modules/libraries/dagster-docker/dagster_docker_tests/test_launch_docker.py @@ -192,6 +192,7 @@ def test_terminate_launched_docker_run(aws_env): launcher_config = { "env_vars": aws_env, "network": "container:test-postgres-db-docker", + "container_kwargs": {"stop_timeout": 15}, } if IS_BUILDKITE: @@ -199,11 +200,14 @@ def test_terminate_launched_docker_run(aws_env): else: find_local_test_image(docker_image) - run_config = merge_yamls( - [ - os.path.join(get_test_project_environments_path(), "env_s3.yaml"), - ] - ) + run_config = { + **merge_yamls( + [ + os.path.join(get_test_project_environments_path(), "env_s3.yaml"), + ] + ), + "ops": {"hanging_op": {"config": {"cleanup_delay": 10}}}, + } with docker_postgres_instance( overrides={