diff --git a/dagster_ray/io_manager.py b/dagster_ray/io_manager.py index 61545cc..0229e30 100644 --- a/dagster_ray/io_manager.py +++ b/dagster_ray/io_manager.py @@ -64,8 +64,8 @@ class RayIOManager(ConfigurableIOManager, ConfigurableResource): def handle_output(self, context: OutputContext, obj): import ray - # if self.address: # TODO: should this really be done here? - # ray.init(self.address, ignore_reinit_error=True) + if self.address: # TODO: should this really be done here? + ray.init(self.address, ignore_reinit_error=True) object_map = RayObjectMap.get_or_create() @@ -83,8 +83,8 @@ def handle_output(self, context: OutputContext, obj): def load_input(self, context: InputContext): import ray - # if self.address: # TODO: should this really be done here? - # ray.init(self.address, ignore_reinit_error=True) + if self.address: # TODO: should this really be done here? + ray.init(self.address, ignore_reinit_error=True) object_map = RayObjectMap.get_or_create() diff --git a/tests/conftest.py b/tests/conftest.py index 1c1f936..71a5ddc 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -17,7 +17,9 @@ def dagster_instance(tmp_path_factory: TempPathFactory) -> DagsterInstance: def local_ray_address() -> Iterator[str]: import ray - context = ray.init(runtime_env={"env_vars": {"RAY_ENABLE_RECORD_ACTOR_TASK_LOGGING": "1"}}) + context = ray.init( + ignore_reinit_error=True, runtime_env={"env_vars": {"RAY_ENABLE_RECORD_ACTOR_TASK_LOGGING": "1"}} + ) yield "auto" diff --git a/tests/test_executor.py b/tests/test_executor.py index 40dd2c4..756182a 100644 --- a/tests/test_executor.py +++ b/tests/test_executor.py @@ -50,17 +50,6 @@ def my_failing_job(): failed_op() -@pytest.fixture -def local_ray_address() -> Iterator[str]: - import ray - - context = ray.init() - - yield "auto" - - context.disconnect() - - @pytest.fixture def dagster_instance() -> Iterator[DagsterInstance]: with instance_for_test() as instance: