diff --git a/mango/util/distributed_clock.py b/mango/util/distributed_clock.py index b088265..b47a3be 100644 --- a/mango/util/distributed_clock.py +++ b/mango/util/distributed_clock.py @@ -121,11 +121,11 @@ async def get_next_event(self): if self.schedules: next_event = min(self.schedules) else: - logger.warning("%s: no new events, time stands still", self.aid) + logger.info("%s: no new events, time stands still", self.aid) next_event = self.scheduler.clock.time if next_event < self.scheduler.clock.time: - logger.warning("%s: got old event, time stands still", self.aid) + logger.info("%s: got old event, time stands still", self.aid) next_event = self.scheduler.clock.time logger.debug("next event at %s", next_event) return next_event diff --git a/mango/util/scheduling.py b/mango/util/scheduling.py index 8ec62fd..4810643 100644 --- a/mango/util/scheduling.py +++ b/mango/util/scheduling.py @@ -5,6 +5,7 @@ import asyncio import concurrent.futures import datetime +import logging from abc import abstractmethod from asyncio import Future from dataclasses import dataclass @@ -16,6 +17,20 @@ from mango.util.clock import AsyncioClock, Clock, ExternalClock +logger = logging.getLogger(__name__) + + +def _raise_exceptions(fut: asyncio.Future): + """ + Inline function used as a callback to raise exceptions + :param fut: The Future object of the task + """ + if fut.exception() is not None: + try: + raise fut.exception() + except Exception: + logger.exception("got exception in scheduled event") + @dataclass class ScheduledProcessControl: @@ -507,6 +522,7 @@ def schedule_task(self, task: ScheduledTask, src=None) -> asyncio.Task: coro = task.run() l_task = asyncio.create_task(coro) l_task.add_done_callback(task.on_stop) + l_task.add_done_callback(_raise_exceptions) l_task.add_done_callback(self._remove_task) self._scheduled_tasks.append((task, l_task, coro, src)) return l_task @@ -703,6 +719,7 @@ def schedule_process_task(self, task: ScheduledProcessTask, src=None): ) l_task.add_done_callback(self._remove_process_task) l_task.add_done_callback(task.on_stop) + l_task.add_done_callback(_raise_exceptions) self._scheduled_process_tasks.append( (task, l_task, scheduled_process_control, src) )