From 69839c97fbd86e87cbf6ace562d32706f448c99f Mon Sep 17 00:00:00 2001 From: prha Date: Mon, 5 Aug 2024 11:53:59 -0700 Subject: [PATCH] check that compute log manager implements captured log interface --- .../dagster/_core/instance/__init__.py | 11 +-- .../storage_tests/test_compute_log_manager.py | 70 ++----------------- 2 files changed, 6 insertions(+), 75 deletions(-) diff --git a/python_modules/dagster/dagster/_core/instance/__init__.py b/python_modules/dagster/dagster/_core/instance/__init__.py index 126e07e526f4b..a4b020cec6203 100644 --- a/python_modules/dagster/dagster/_core/instance/__init__.py +++ b/python_modules/dagster/dagster/_core/instance/__init__.py @@ -74,7 +74,7 @@ from dagster._utils import PrintFn, is_uuid, traced from dagster._utils.error import serializable_error_info_from_exc_info from dagster._utils.merger import merge_dicts -from dagster._utils.warnings import deprecation_warning, experimental_warning +from dagster._utils.warnings import experimental_warning from .config import ( DAGSTER_CONFIG_YAML_FILENAME, @@ -410,7 +410,6 @@ def __init__( from dagster._core.scheduler import Scheduler from dagster._core.secrets import SecretsLoader from dagster._core.storage.captured_log_manager import CapturedLogManager - from dagster._core.storage.compute_log_manager import ComputeLogManager from dagster._core.storage.event_log import EventLogStorage from dagster._core.storage.root import LocalArtifactStorage from dagster._core.storage.runs import RunStorage @@ -428,14 +427,8 @@ def __init__( if compute_log_manager: self._compute_log_manager = check.inst_param( - compute_log_manager, "compute_log_manager", ComputeLogManager + compute_log_manager, "compute_log_manager", CapturedLogManager ) - if not isinstance(self._compute_log_manager, CapturedLogManager): - deprecation_warning( - "ComputeLogManager", - "1.2.0", - "Implement the CapturedLogManager interface instead.", - ) self._compute_log_manager.register_instance(self) else: check.invariant( diff --git a/python_modules/dagster/dagster_tests/storage_tests/test_compute_log_manager.py b/python_modules/dagster/dagster_tests/storage_tests/test_compute_log_manager.py index d65f73fb90433..4eace8b4459e5 100644 --- a/python_modules/dagster/dagster_tests/storage_tests/test_compute_log_manager.py +++ b/python_modules/dagster/dagster_tests/storage_tests/test_compute_log_manager.py @@ -3,7 +3,6 @@ from typing import IO, Generator, Optional, Sequence import dagster._check as check -import pytest from dagster import job, op from dagster._core.instance import DagsterInstance, InstanceRef, InstanceType from dagster._core.launcher import DefaultRunLauncher @@ -110,59 +109,6 @@ def on_unsubscribe(self, subscription): pass -class BrokenComputeLogManager(ComputeLogManager): - def __init__(self, fail_on_setup=False, fail_on_teardown=False): - self._fail_on_setup = check.opt_bool_param(fail_on_setup, "fail_on_setup") - self._fail_on_teardown = check.opt_bool_param(fail_on_teardown, "fail_on_teardown") - - @contextmanager - def _watch_logs(self, pipeline_run, step_key=None): - yield - - def is_watch_completed(self, run_id, key): - return True - - def on_watch_start(self, pipeline_run, step_key): - if self._fail_on_setup: - raise Exception("wahhh") - - def on_watch_finish(self, pipeline_run, step_key): - if self._fail_on_teardown: - raise Exception("blahhh") - - def get_local_path(self, run_id: str, key: str, io_type: ComputeIOType): - pass - - def download_url(self, run_id, key, io_type): - return None - - def read_logs_file(self, run_id, key, io_type, cursor=0, max_bytes=MAX_BYTES_FILE_READ): - return ComputeLogFileData( - path=f"{key}.{io_type}", data=None, cursor=0, size=0, download_url=None - ) - - def on_subscribe(self, subscription): - pass - - -@contextmanager -def broken_compute_log_manager_instance(fail_on_setup=False, fail_on_teardown=False): - with tempfile.TemporaryDirectory() as temp_dir: - with environ({"DAGSTER_HOME": temp_dir}): - yield DagsterInstance( - instance_type=InstanceType.PERSISTENT, - local_artifact_storage=LocalArtifactStorage(temp_dir), - run_storage=SqliteRunStorage.from_local(temp_dir), - event_storage=SqliteEventLogStorage(temp_dir), - compute_log_manager=BrokenComputeLogManager( - fail_on_setup=fail_on_setup, fail_on_teardown=fail_on_teardown - ), - run_coordinator=DefaultRunCoordinator(), - run_launcher=DefaultRunLauncher(), - ref=InstanceRef.from_dir(temp_dir), - ) - - @contextmanager def broken_captured_log_manager_instance(fail_on_setup=False, fail_on_teardown=False): with tempfile.TemporaryDirectory() as temp_dir: @@ -181,14 +127,6 @@ def broken_captured_log_manager_instance(fail_on_setup=False, fail_on_teardown=F ) -@pytest.fixture( - name="instance_cm", - params=[broken_compute_log_manager_instance, broken_captured_log_manager_instance], -) -def instance_cm_fixture(request): - return request.param - - def _has_setup_exception(execute_result): return any( [ @@ -231,8 +169,8 @@ def boo_job(): boo() -def test_broken_compute_log_manager(instance_cm): - with instance_cm(fail_on_setup=True) as instance: +def test_broken_compute_log_manager(): + with broken_captured_log_manager_instance(fail_on_setup=True) as instance: yay_result = yay_job.execute_in_process(instance=instance) assert yay_result.success assert _has_setup_exception(yay_result) @@ -241,7 +179,7 @@ def test_broken_compute_log_manager(instance_cm): assert not boo_result.success assert _has_setup_exception(boo_result) - with instance_cm(fail_on_teardown=True) as instance: + with broken_captured_log_manager_instance(fail_on_teardown=True) as instance: yay_result = yay_job.execute_in_process(instance=instance) assert yay_result.success assert _has_teardown_exception(yay_result) @@ -251,7 +189,7 @@ def test_broken_compute_log_manager(instance_cm): assert not boo_result.success assert _has_teardown_exception(boo_result) - with instance_cm() as instance: + with broken_captured_log_manager_instance() as instance: yay_result = yay_job.execute_in_process(instance=instance) assert yay_result.success assert not _has_setup_exception(yay_result)