Skip to content

Commit

Permalink
Accept KeyboardInterruptError in multiprocess executor termination lo…
Browse files Browse the repository at this point in the history
…gic (#23408)

Summary:
The dagster signal handler raises interrupts as
DagsterExecutionInterruptedError, but sometimes ops can run code that
installs its own signal handler, causing the interrupt to be raised as a
KeyboardInterrupt (for example, the snowflake python connector does
this:
https://github.com/snowflakedb/snowflake-connector-python/blob/main/src/snowflake/connector/cursor.py#L663-L684).
Handle both in the logic that decides if a run terminated cleanly or
not.

Resolves #23406.

Test Plan: BK

## Summary & Motivation

## How I Tested These Changes
  • Loading branch information
gibsondan authored Aug 5, 2024
1 parent a11dd7f commit fedc34d
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,8 @@ def execute(
and (not active_iters)
and all(
[
err_info.cls_name == "DagsterExecutionInterruptedError"
err_info.cls_name
in {"DagsterExecutionInterruptedError", "KeyboardInterrupt"}
for err_info in errs.values()
]
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

import pytest
from dagster import (
Config,
DagsterEvent,
DagsterEventType,
DefaultRunLauncher,
Expand All @@ -18,7 +19,7 @@
repository,
)
from dagster._core.definitions import op
from dagster._core.errors import DagsterLaunchFailedError
from dagster._core.errors import DagsterExecutionInterruptedError, DagsterLaunchFailedError
from dagster._core.execution.plan.objects import StepSuccessData
from dagster._core.instance import DagsterInstance
from dagster._core.storage.dagster_run import DagsterRunStatus
Expand Down Expand Up @@ -70,10 +71,23 @@ def exity_job():
exity_op()


class SleepyOpConfig(Config):
raise_keyboard_interrupt: bool = False


@op
def sleepy_op(_):
def sleepy_op(config: SleepyOpConfig):
while True:
time.sleep(0.1)
try:
time.sleep(0.1)

except DagsterExecutionInterruptedError:
if config.raise_keyboard_interrupt:
# simulates a custom signal handler that has overridden ours
# to raise a normal KeyboardInterrupt
raise KeyboardInterrupt
else:
raise


@job
Expand Down Expand Up @@ -440,7 +454,11 @@ def test_exity_run(

@pytest.mark.parametrize(
"run_config",
run_configs(),
[
None, # multiprocess
{"execution": {"config": {"in_process": {}}}}, # in-process
{"ops": {"sleepy_op": {"config": {"raise_keyboard_interrupt": True}}}},
],
)
def test_terminated_run(
instance: DagsterInstance,
Expand Down

0 comments on commit fedc34d

Please sign in to comment.