Skip to content

Commit

Permalink
Merge dev
Browse files Browse the repository at this point in the history
  • Loading branch information
rcschrg committed Feb 24, 2024
2 parents e3898dc + 2c1a149 commit b620476
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 0 deletions.
27 changes: 27 additions & 0 deletions mango/util/scheduling.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,13 @@ def coro(self):
# asyncio tasks


def _close_coro(coro):
try:
coro.close()
except:
pass


class ScheduledTask:
"""
Base class for scheduled tasks in mango. Within this class it is possible to
Expand Down Expand Up @@ -136,6 +143,11 @@ def on_stop(self, fut: asyncio.Future = None):
self._on_stop_hook_in(fut)
if self._is_observable:
self._is_done.set_result(True)
self.close()

def close(self):
"""Perform closing actions"""
pass


class TimestampScheduledTask(ScheduledTask):
Expand All @@ -160,6 +172,9 @@ async def run(self):
await self._wait(self._timestamp)
return await self._coro

def close(self):
_close_coro(self._coro)


class AwaitingTask(ScheduledTask):
"""
Expand All @@ -180,6 +195,10 @@ async def run(self):
self.notify_running()
return await self._coroutine

def close(self):
_close_coro(self._awaited_coroutine)
_close_coro(self._coroutine)


class InstantScheduledTask(TimestampScheduledTask):
"""
Expand All @@ -193,6 +212,9 @@ def __init__(self, coroutine, clock: Clock = None, on_stop=None, observable=True
coroutine, clock.time, clock=clock, on_stop=on_stop, observable=observable
)

def close(self):
_close_coro(self._coro)


class PeriodicScheduledTask(ScheduledTask):
"""
Expand Down Expand Up @@ -279,6 +301,9 @@ async def run(self):
self.notify_running()
return await self._coro

def close(self):
_close_coro(self._coro)


# process tasks

Expand Down Expand Up @@ -852,6 +877,8 @@ def shutdown(self):
for _, _, event, _ in self._scheduled_process_tasks:
if event[1] is not None:
event[1].set()
for task, _, _, _ in self._scheduled_tasks:
task.close()
self._process_pool_exec.shutdown(wait=True, cancel_futures=True)
if self._manager is not None:
self._manager.shutdown()
2 changes: 2 additions & 0 deletions tests/unit_tests/util/scheduling_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,8 @@ async def increase_counter():
with pytest.raises(asyncio.exceptions.TimeoutError):
await asyncio.wait_for(t, timeout=0.2)

scheduler.shutdown()

# THEN
assert len(l) == 0

Expand Down

0 comments on commit b620476

Please sign in to comment.