Skip to content

Commit

Permalink
Merge pull request #69 from OFFIS-DAI/fix-shutdown-processes
Browse files Browse the repository at this point in the history
Fix shutdown processes
  • Loading branch information
rcschrg authored Mar 11, 2024
2 parents 1765d97 + 01adf6a commit 3906405
Show file tree
Hide file tree
Showing 4 changed files with 76 additions and 25 deletions.
5 changes: 4 additions & 1 deletion mango/agent/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -467,10 +467,13 @@ async def shutdown(self):
await self._check_inbox_task
except asyncio.CancelledError:
pass

try:
await self._scheduler.stop()
except asyncio.CancelledError:
pass
try:
await self._scheduler.shutdown()
except asyncio.CancelledError:
pass
finally:
logger.info("Agent %s: Shutdown successful", self.aid)
1 change: 1 addition & 0 deletions mango/container/tcp.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
This module contains the abstract Container class and the subclasses
TCPContainer and MQTTContainer
"""

import asyncio
import logging
import time
Expand Down
91 changes: 69 additions & 22 deletions mango/util/scheduling.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,22 +6,44 @@
import concurrent.futures
import datetime
from abc import abstractmethod
from multiprocessing import Manager
from multiprocessing import Manager, Event
from typing import Any, List, Tuple
from dataclasses import dataclass
from multiprocessing.synchronize import Event as MultiprocessingEvent

from dateutil.rrule import rrule

from mango.util.clock import AsyncioClock, Clock, ExternalClock
from asyncio import Future


@dataclass
class ScheduledProcessControl:
run_task_event: MultiprocessingEvent
kill_process_event: MultiprocessingEvent

def kill_process(self):
self.kill_process_event.set()

def init_process(self):
self.kill_process_event.clear()

def resume_task(self):
self.run_task_event.set()

def suspend_task(self):
self.run_task_event.clear()


class Suspendable:
"""
Wraps a coroutine, intercepting __await__ to add the functionality of suspending.
"""

def __init__(self, coro, ext_contr_event=None):
def __init__(self, coro, ext_contr_event=None, kill_event=None):
self._coro = coro

self._kill_event = kill_event
if ext_contr_event is not None:
self._can_run = ext_contr_event
else:
Expand All @@ -44,6 +66,9 @@ def __await__(self):
except BaseException as err:
send, message = iter_throw, err

if self._kill_event is not None and self._kill_event.is_set():
return None

try:
# throw error or resume coroutine
signal = send(message)
Expand Down Expand Up @@ -433,16 +458,25 @@ def __init__(
Tuple[ScheduledTask, asyncio.Future, Suspendable, Any]
] = []
self.clock = clock if clock is not None else AsyncioClock()
self._scheduled_process_tasks = []
self._scheduled_process_tasks: List[
Tuple[ScheduledProcessTask, Future, ScheduledProcessControl, Any]
] = []
self._manager = None
self._num_process_parallel = num_process_parallel
self._process_pool_exec = None
self._suspendable = suspendable
self._observable = observable

@staticmethod
def _run_task_in_p_context(task, suspend_event):
def _run_task_in_p_context(
task, scheduled_process_control: ScheduledProcessControl
):
try:
coro = Suspendable(task.run(), ext_contr_event=suspend_event)
coro = Suspendable(
task.run(),
ext_contr_event=scheduled_process_control.run_task_event,
kill_event=scheduled_process_control.kill_process_event,
)

return asyncio.get_event_loop().run_until_complete(coro)
finally:
Expand Down Expand Up @@ -638,17 +672,29 @@ def schedule_process_task(self, task: ScheduledProcessTask, src=None):
initializer=_create_asyncio_context,
)
loop = asyncio.get_running_loop()
manager = Manager()
event = manager.Event()
event.set()
if self._manager is None:
self._manager = Manager()

scheduled_process_control = ScheduledProcessControl(
run_task_event=self._manager.Event(),
kill_process_event=self._manager.Event(),
)
scheduled_process_control.init_process()
scheduled_process_control.resume_task()

l_task = asyncio.ensure_future(
loop.run_in_executor(
self._process_pool_exec, Scheduler._run_task_in_p_context, task, event
self._process_pool_exec,
Scheduler._run_task_in_p_context,
task,
scheduled_process_control,
)
)
l_task.add_done_callback(self._remove_process_task)
l_task.add_done_callback(task.on_stop)
self._scheduled_process_tasks.append((task, l_task, event, src))
self._scheduled_process_tasks.append(
(task, l_task, scheduled_process_control, src)
)
return l_task

def schedule_timestamp_process_task(
Expand Down Expand Up @@ -776,9 +822,9 @@ def suspend(self, given_src):
for _, _, coro, src in self._scheduled_tasks:
if src == given_src and coro is not None:
coro.suspend()
for _, _, event, src in self._scheduled_process_tasks:
if src == given_src and event is not None:
event.clear()
for _, _, scheduled_process_control, src in self._scheduled_process_tasks:
if src == given_src:
scheduled_process_control.suspend_task()

def resume(self, given_src):
"""Resume a set of tasks triggered by the given src object.
Expand All @@ -792,16 +838,17 @@ def resume(self, given_src):
for _, _, coro, src in self._scheduled_tasks:
if src == given_src and coro is not None:
coro.resume()
for _, _, event, src in self._scheduled_process_tasks:
if src == given_src and event is not None:
event.set()
for _, _, scheduled_process_control, src in self._scheduled_process_tasks:
if src == given_src:
scheduled_process_control.resume_task()

def _remove_process_task(self, fut=asyncio.Future):
for i in range(len(self._scheduled_process_tasks)):
_, task, event, _ = self._scheduled_process_tasks[i]
_, task, scheduled_process_control, _ = self._scheduled_process_tasks[i]
if task == fut:
scheduled_process_control.resume_task()
scheduled_process_control.kill_process()
del self._scheduled_process_tasks[i]
event.set()
break

# methods for removing tasks, stopping or shutting down
Expand Down Expand Up @@ -860,15 +907,15 @@ async def tasks_complete_or_sleeping(self):
# we need to recognize how many sleeping tasks we have in order to find out if all tasks are done
sleeping_tasks.append(scheduled_task)

def shutdown(self):
async def shutdown(self):
"""
Shutdown internal process executor pool.
"""
# resume all process so they can get shutdown
for _, _, event, _ in self._scheduled_process_tasks:
if event is not None:
event.set()
for _, _, scheduled_process_control, _ in self._scheduled_process_tasks:
scheduled_process_control.kill_process()
for task, _, _, _ in self._scheduled_tasks:
task.close()
await self.stop()
if self._process_pool_exec is not None:
self._process_pool_exec.shutdown()
4 changes: 2 additions & 2 deletions tests/unit_tests/util/scheduling_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ async def increase_counter():
with pytest.raises(asyncio.exceptions.TimeoutError):
await asyncio.wait_for(t, timeout=0.2)

scheduler.shutdown()
await scheduler.shutdown()

# THEN
assert len(l) == 0
Expand Down Expand Up @@ -370,7 +370,7 @@ async def test_task_as_process_suspend():

scheduler.resume(marker)

scheduler.shutdown()
await scheduler.shutdown()


@pytest.mark.asyncio
Expand Down

0 comments on commit 3906405

Please sign in to comment.