Skip to content

Commit

Permalink
Fix using the stop_timeout argument in DockerRunLauncher.container_kw…
Browse files Browse the repository at this point in the history
…args (#25423)

## Summary & Motivation
container_kwargs.stop_timeout should work out of the box, but does not
due to a longstanding docker-py bug. Move it into the terminate() method
instead

## How I Tested These Changes
New test case

## Changelog
[dagster-docker] `container_kwargs.stop_timeout` can now be set when
using the `DockerRunLauncher` or `docker_executor` to configure the
amount of time that Docker will wait when terminating a run for it to
clean up before forcibly stopping it with a SIGKILL signal.
  • Loading branch information
gibsondan authored Oct 22, 2024
1 parent 05740a4 commit 3a4f8da
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
from dagster._core.definitions.job_definition import JobDefinition
from dagster._core.definitions.output import Out
from dagster._core.definitions.resource_definition import ResourceDefinition
from dagster._core.errors import DagsterExecutionInterruptedError
from dagster._core.execution.plan.objects import StepSuccessData
from dagster._core.test_utils import nesting_graph, poll_for_step_start
from dagster._utils import segfault
Expand Down Expand Up @@ -175,10 +176,18 @@ def demo_slow_graph():
count_letters(multiply_the_word_slow())


@op
def hanging_op(_):
while True:
time.sleep(0.1)
@op(config_schema={"cleanup_delay": Field(int, is_required=False)})
def hanging_op(context):
try:
while True:
time.sleep(0.1)
except DagsterExecutionInterruptedError:
cleanup_delay = context.op_config.get("cleanup_delay")
if cleanup_delay:
context.log.info(f"Delaying cleanup for {cleanup_delay} seconds")
time.sleep(cleanup_delay)
context.log.info("Done cleaning up")
raise


@op(config_schema={"looking_for": str})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,12 @@ def _create_step_container(
assert len(step_keys_to_execute) == 1, "Launching multiple steps is not currently supported"
step_key = step_keys_to_execute[0]

container_kwargs = {**container_context.container_kwargs}
if "stop_timeout" in container_kwargs:
# This should work, but does not due to https://github.com/docker/docker-py/issues/3168
# Pull it out and apply it in the terminate() method instead
del container_kwargs["stop_timeout"]

env_vars = dict([parse_env_var(env_var) for env_var in container_context.env_vars])
env_vars["DAGSTER_RUN_JOB_NAME"] = step_handler_context.dagster_run.job_name
env_vars["DAGSTER_RUN_STEP_KEY"] = step_key
Expand All @@ -213,7 +219,7 @@ def _create_step_container(
network=container_context.networks[0] if len(container_context.networks) else None,
command=execute_step_args.get_command_args(),
environment=env_vars,
**container_context.container_kwargs,
**container_kwargs,
)

def launch_step(self, step_handler_context: StepHandlerContext) -> Iterator[DagsterEvent]:
Expand Down Expand Up @@ -311,4 +317,6 @@ def terminate_step(self, step_handler_context: StepHandlerContext) -> Iterator[D

container = client.containers.get(container_name)

container.stop()
stop_timeout = container_context.container_kwargs.get("stop_timeout")

container.stop(timeout=stop_timeout)
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,12 @@ def _launch_container_with_command(self, run, docker_image, command):

container_kwargs = {**container_context.container_kwargs}
labels = container_kwargs.pop("labels", {})

if "stop_timeout" in container_kwargs:
# This should work, but does not due to https://github.com/docker/docker-py/issues/3168
# Pull it out and apply it in the terminate() method instead
del container_kwargs["stop_timeout"]

if isinstance(labels, list):
labels = {key: "" for key in labels}

Expand Down Expand Up @@ -208,6 +214,9 @@ def terminate(self, run_id):

container = self._get_container(run)

container_context = self.get_container_context(run)
stop_timeout = container_context.container_kwargs.get("stop_timeout")

if not container:
self._instance.report_engine_event(
message="Unable to get docker container to send termination request to.",
Expand All @@ -216,7 +225,7 @@ def terminate(self, run_id):
)
return False

container.stop()
container.stop(timeout=stop_timeout)

return True

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,18 +192,22 @@ def test_terminate_launched_docker_run(aws_env):
launcher_config = {
"env_vars": aws_env,
"network": "container:test-postgres-db-docker",
"container_kwargs": {"stop_timeout": 15},
}

if IS_BUILDKITE:
launcher_config["registry"] = get_buildkite_registry_config()
else:
find_local_test_image(docker_image)

run_config = merge_yamls(
[
os.path.join(get_test_project_environments_path(), "env_s3.yaml"),
]
)
run_config = {
**merge_yamls(
[
os.path.join(get_test_project_environments_path(), "env_s3.yaml"),
]
),
"ops": {"hanging_op": {"config": {"cleanup_delay": 10}}},
}

with docker_postgres_instance(
overrides={
Expand Down

0 comments on commit 3a4f8da

Please sign in to comment.