Skip to content

Commit

Permalink
Merge pull request #140 from OFFIS-DAI/allow_setting_mp_method
Browse files Browse the repository at this point in the history
Add option to set multiprocessing method through mp_method param in container
  • Loading branch information
rcschrg authored Nov 25, 2024
2 parents ba0a697 + f7ef90c commit e23d197
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 3 deletions.
5 changes: 4 additions & 1 deletion mango/container/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ def __init__(
clock: Clock,
copy_internal_messages=False,
mirror_data=None,
mp_method="spawn",
**kwargs,
):
self.name: str = name
Expand Down Expand Up @@ -64,7 +65,9 @@ def __init__(
self, self._mirror_data
)
else:
self._container_process_manager = MainContainerProcessManager(self)
self._container_process_manager = MainContainerProcessManager(
self, mp_method
)

def _all_aids(self):
all_aids = list(self._agents.keys()) + self._container_process_manager.aids
Expand Down
15 changes: 13 additions & 2 deletions mango/container/mp.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import asyncio
import logging
import os
import warnings
from collections.abc import Callable
from dataclasses import dataclass
from multiprocessing import get_context
Expand Down Expand Up @@ -362,11 +363,12 @@ class MainContainerProcessManager(BaseContainerProcessManager):
def __init__(
self,
container,
mp_method: str = "spawn",
) -> None:
self._active = False
self._container = container
self._mp_enabled = False
self._ctx = get_context("spawn")
self._ctx = get_context(mp_method)
self._agent_process_init_list = []
self._started = False

Expand Down Expand Up @@ -510,7 +512,16 @@ def _create_agent_process_bytes(
from_pipe_message, to_pipe_message = aioduplex(self._ctx)
from_pipe, to_pipe = aioduplex(self._ctx)
process_initialized = self._ctx.Event()
with to_pipe.detach() as to_pipe, to_pipe_message.detach() as to_pipe_message:
with (
warnings.catch_warnings(),
to_pipe.detach() as to_pipe,
to_pipe_message.detach() as to_pipe_message,
):
warnings.filterwarnings(
"ignore",
message=r".*This process .* is multi-threaded, use of fork\(\) may lead to deadlocks.*",
category=DeprecationWarning,
)
agent_process = self._ctx.Process(
target=create_agent_process_environment,
args=(
Expand Down

0 comments on commit e23d197

Please sign in to comment.