Skip to content

Commit

Permalink
check that compute log manager implements captured log interface
Browse files Browse the repository at this point in the history
  • Loading branch information
prha committed Aug 5, 2024
1 parent fedc34d commit 69839c9
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 75 deletions.
11 changes: 2 additions & 9 deletions python_modules/dagster/dagster/_core/instance/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@
from dagster._utils import PrintFn, is_uuid, traced
from dagster._utils.error import serializable_error_info_from_exc_info
from dagster._utils.merger import merge_dicts
from dagster._utils.warnings import deprecation_warning, experimental_warning
from dagster._utils.warnings import experimental_warning

from .config import (
DAGSTER_CONFIG_YAML_FILENAME,
Expand Down Expand Up @@ -410,7 +410,6 @@ def __init__(
from dagster._core.scheduler import Scheduler
from dagster._core.secrets import SecretsLoader
from dagster._core.storage.captured_log_manager import CapturedLogManager
from dagster._core.storage.compute_log_manager import ComputeLogManager
from dagster._core.storage.event_log import EventLogStorage
from dagster._core.storage.root import LocalArtifactStorage
from dagster._core.storage.runs import RunStorage
Expand All @@ -428,14 +427,8 @@ def __init__(

if compute_log_manager:
self._compute_log_manager = check.inst_param(
compute_log_manager, "compute_log_manager", ComputeLogManager
compute_log_manager, "compute_log_manager", CapturedLogManager
)
if not isinstance(self._compute_log_manager, CapturedLogManager):
deprecation_warning(
"ComputeLogManager",
"1.2.0",
"Implement the CapturedLogManager interface instead.",
)
self._compute_log_manager.register_instance(self)
else:
check.invariant(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
from typing import IO, Generator, Optional, Sequence

import dagster._check as check
import pytest
from dagster import job, op
from dagster._core.instance import DagsterInstance, InstanceRef, InstanceType
from dagster._core.launcher import DefaultRunLauncher
Expand Down Expand Up @@ -110,59 +109,6 @@ def on_unsubscribe(self, subscription):
pass


class BrokenComputeLogManager(ComputeLogManager):
def __init__(self, fail_on_setup=False, fail_on_teardown=False):
self._fail_on_setup = check.opt_bool_param(fail_on_setup, "fail_on_setup")
self._fail_on_teardown = check.opt_bool_param(fail_on_teardown, "fail_on_teardown")

@contextmanager
def _watch_logs(self, pipeline_run, step_key=None):
yield

def is_watch_completed(self, run_id, key):
return True

def on_watch_start(self, pipeline_run, step_key):
if self._fail_on_setup:
raise Exception("wahhh")

def on_watch_finish(self, pipeline_run, step_key):
if self._fail_on_teardown:
raise Exception("blahhh")

def get_local_path(self, run_id: str, key: str, io_type: ComputeIOType):
pass

def download_url(self, run_id, key, io_type):
return None

def read_logs_file(self, run_id, key, io_type, cursor=0, max_bytes=MAX_BYTES_FILE_READ):
return ComputeLogFileData(
path=f"{key}.{io_type}", data=None, cursor=0, size=0, download_url=None
)

def on_subscribe(self, subscription):
pass


@contextmanager
def broken_compute_log_manager_instance(fail_on_setup=False, fail_on_teardown=False):
with tempfile.TemporaryDirectory() as temp_dir:
with environ({"DAGSTER_HOME": temp_dir}):
yield DagsterInstance(
instance_type=InstanceType.PERSISTENT,
local_artifact_storage=LocalArtifactStorage(temp_dir),
run_storage=SqliteRunStorage.from_local(temp_dir),
event_storage=SqliteEventLogStorage(temp_dir),
compute_log_manager=BrokenComputeLogManager(
fail_on_setup=fail_on_setup, fail_on_teardown=fail_on_teardown
),
run_coordinator=DefaultRunCoordinator(),
run_launcher=DefaultRunLauncher(),
ref=InstanceRef.from_dir(temp_dir),
)


@contextmanager
def broken_captured_log_manager_instance(fail_on_setup=False, fail_on_teardown=False):
with tempfile.TemporaryDirectory() as temp_dir:
Expand All @@ -181,14 +127,6 @@ def broken_captured_log_manager_instance(fail_on_setup=False, fail_on_teardown=F
)


@pytest.fixture(
name="instance_cm",
params=[broken_compute_log_manager_instance, broken_captured_log_manager_instance],
)
def instance_cm_fixture(request):
return request.param


def _has_setup_exception(execute_result):
return any(
[
Expand Down Expand Up @@ -231,8 +169,8 @@ def boo_job():
boo()


def test_broken_compute_log_manager(instance_cm):
with instance_cm(fail_on_setup=True) as instance:
def test_broken_compute_log_manager():
with broken_captured_log_manager_instance(fail_on_setup=True) as instance:
yay_result = yay_job.execute_in_process(instance=instance)
assert yay_result.success
assert _has_setup_exception(yay_result)
Expand All @@ -241,7 +179,7 @@ def test_broken_compute_log_manager(instance_cm):
assert not boo_result.success
assert _has_setup_exception(boo_result)

with instance_cm(fail_on_teardown=True) as instance:
with broken_captured_log_manager_instance(fail_on_teardown=True) as instance:
yay_result = yay_job.execute_in_process(instance=instance)
assert yay_result.success
assert _has_teardown_exception(yay_result)
Expand All @@ -251,7 +189,7 @@ def test_broken_compute_log_manager(instance_cm):
assert not boo_result.success
assert _has_teardown_exception(boo_result)

with instance_cm() as instance:
with broken_captured_log_manager_instance() as instance:
yay_result = yay_job.execute_in_process(instance=instance)
assert yay_result.success
assert not _has_setup_exception(yay_result)
Expand Down

0 comments on commit 69839c9

Please sign in to comment.