Skip to content

Commit

Permalink
Add error boundaries to prevent write stream errors in InstigationLog…
Browse files Browse the repository at this point in the history
…ger from failing schedule/sensor ticks (#26609)

This prevents issues with the compute log manager from failing a
schedule or sensor tick in which the logging call is happening.

## Summary & Motivation

## How I Tested These Changes
BK

## Changelog
Exceptions that are raised when a schedule or sensor are writing logs
will now write an error message to stdout instead of failing the tick.

> Insert changelog entry or delete this section.
  • Loading branch information
gibsondan authored Dec 23, 2024
1 parent 3ea9100 commit 9ce5277
Show file tree
Hide file tree
Showing 2 changed files with 90 additions and 6 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import json
import logging
import sys
import threading
import traceback
from contextlib import ExitStack
Expand All @@ -10,6 +11,7 @@
from dagster._core.log_manager import LOG_RECORD_METADATA_ATTR
from dagster._core.storage.compute_log_manager import ComputeIOType, ComputeLogManager
from dagster._core.utils import coerce_valid_log_level
from dagster._utils.error import serializable_error_info_from_exc_info
from dagster._utils.log import create_console_logger


Expand Down Expand Up @@ -66,7 +68,12 @@ def emit(self, record: logging.LogRecord):
if exc_info:
record_dict["exc_info"] = "".join(traceback.format_exception(*exc_info))

self._write_stream.write(_seven.json.dumps(record_dict) + "\n")
try:
self._write_stream.write(_seven.json.dumps(record_dict) + "\n")
except Exception:
sys.stderr.write(
f"Exception writing to logger event stream: {serializable_error_info_from_exc_info(sys.exc_info())}\n"
)


class InstigationLogger(logging.Logger):
Expand Down Expand Up @@ -107,18 +114,30 @@ def __enter__(self):
and self._instance
and isinstance(self._instance.compute_log_manager, ComputeLogManager)
):
write_stream = self._exit_stack.enter_context(
self._instance.compute_log_manager.open_log_stream(
self._log_key, ComputeIOType.STDERR
try:
write_stream = self._exit_stack.enter_context(
self._instance.compute_log_manager.open_log_stream(
self._log_key, ComputeIOType.STDERR
)
)
)
except Exception:
sys.stderr.write(
f"Exception initializing logger write stream: {serializable_error_info_from_exc_info(sys.exc_info())}\n"
)
write_stream = None

if write_stream:
self._capture_handler = CapturedLogHandler(write_stream)
self.addHandler(self._capture_handler)
return self

def __exit__(self, _exception_type, _exception_value, _traceback):
self._exit_stack.close()
try:
self._exit_stack.close()
except Exception:
sys.stderr.write(
f"Exception closing logger write stream: {serializable_error_info_from_exc_info(sys.exc_info())}\n"
)

def _annotate_record(self, record: logging.LogRecord) -> logging.LogRecord:
if self._repository_name and self._instigator_name:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
from contextlib import contextmanager
from unittest import mock

from dagster._core.definitions.instigation_logger import InstigationLogger
from dagster._core.storage.noop_compute_log_manager import NoOpComputeLogManager
from dagster._core.test_utils import instance_for_test


def test_gets_correct_logger():
Expand All @@ -9,3 +14,63 @@ def test_gets_correct_logger():

instigation_logger = InstigationLogger(logger_name=custom_logger_name)
assert instigation_logger.name == custom_logger_name


class CrashyStartupComputeLogManager(NoOpComputeLogManager):
@contextmanager
def open_log_stream(self, log_key, io_type):
raise Exception("OOPS")
yield None


class MockLogStreamComputeLogManager(NoOpComputeLogManager):
@contextmanager
def open_log_stream(self, log_key, io_type):
yield mock.MagicMock()
raise Exception("OOPS ON EXIT")


def test_instigation_logger_start_failure(capsys):
with instance_for_test(
overrides={
"compute_logs": {
"module": "dagster_tests.storage_tests.test_instigation_logger",
"class": "CrashyStartupComputeLogManager",
}
}
) as instance:
with InstigationLogger(log_key="foo", instance=instance) as logger:
captured = capsys.readouterr()
assert (
captured.err.count("Exception initializing logger write stream: Exception: OOPS")
== 1
)
logger.info("I can log without failing")


def test_instigation_logger_log_failure(capsys):
with instance_for_test(
overrides={
"compute_logs": {
"module": "dagster_tests.storage_tests.test_instigation_logger",
"class": "MockLogStreamComputeLogManager",
}
}
) as instance:
with InstigationLogger(log_key="foo", instance=instance) as logger:
mock_write_stream = logger._capture_handler._write_stream # type: ignore # noqa
mock_write_stream.write.side_effect = Exception("OOPS")

logger.info("HELLO")
captured = capsys.readouterr()

assert (
captured.err.count("Exception writing to logger event stream: Exception: OOPS") == 1
)

captured = capsys.readouterr()

assert (
captured.err.count("Exception closing logger write stream: Exception: OOPS ON EXIT")
== 1
)

0 comments on commit 9ce5277

Please sign in to comment.