Skip to content

Commit

Permalink
Merge pull request #66 from OFFIS-DAI/enhancement-coro-close
Browse files Browse the repository at this point in the history
Closing all external coroutines when the scheduler shuts down or the …
  • Loading branch information
jsagerOffis authored Feb 22, 2024
2 parents 1abaf78 + 2514b7b commit 2c1a149
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 5 deletions.
28 changes: 28 additions & 0 deletions mango/util/scheduling.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""
Module for commonly used time based scheduled task executed inside one agent.
"""

import asyncio
import concurrent.futures
import datetime
Expand Down Expand Up @@ -91,6 +92,13 @@ def coro(self):
# asyncio tasks


def _close_coro(coro):
try:
coro.close()
except:
pass


class ScheduledTask:
"""
Base class for scheduled tasks in mango. Within this class it is possible to
Expand Down Expand Up @@ -131,6 +139,11 @@ def on_stop(self, fut: asyncio.Future = None):
self._on_stop_hook_in(fut)
if self._is_observable:
self._is_done.set_result(True)
self.close()

def close(self):
"""Perform closing actions"""
pass


class TimestampScheduledTask(ScheduledTask):
Expand All @@ -155,6 +168,9 @@ async def run(self):
await self._wait(self._timestamp)
return await self._coro

def close(self):
_close_coro(self._coro)


class AwaitingTask(ScheduledTask):
"""
Expand All @@ -175,6 +191,10 @@ async def run(self):
self.notify_running()
return await self._coroutine

def close(self):
_close_coro(self._awaited_coroutine)
_close_coro(self._coroutine)


class InstantScheduledTask(TimestampScheduledTask):
"""
Expand All @@ -188,6 +208,9 @@ def __init__(self, coroutine, clock: Clock = None, on_stop=None, observable=True
coroutine, clock.time, clock=clock, on_stop=on_stop, observable=observable
)

def close(self):
_close_coro(self._coro)


class PeriodicScheduledTask(ScheduledTask):
"""
Expand Down Expand Up @@ -274,6 +297,9 @@ async def run(self):
self.notify_running()
return await self._coro

def close(self):
_close_coro(self._coro)


# process tasks

Expand Down Expand Up @@ -838,4 +864,6 @@ def shutdown(self):
for _, _, event, _ in self._scheduled_process_tasks:
if event is not None:
event.set()
for task, _, _, _ in self._scheduled_tasks:
task.close()
self._process_pool_exec.shutdown()
16 changes: 11 additions & 5 deletions tests/unit_tests/util/scheduling_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@ async def increase_counter():
)

# WHEN
t = scheduler.schedule_task(RecurrentScheduledTask(increase_counter, recurrency, clock))
t = scheduler.schedule_task(
RecurrentScheduledTask(increase_counter, recurrency, clock)
)
try:
new_time = start + datetime.timedelta(days=1)
clock.set_time(new_time.timestamp())
Expand Down Expand Up @@ -73,6 +75,7 @@ async def increase_counter():
assert task._is_done.done()
assert len(l) == 2


@pytest.mark.asyncio
async def test_recurrent_wait():
# GIVEN
Expand All @@ -84,14 +87,15 @@ async def test_recurrent_wait():

async def increase_counter():
l.append(clock._time)

tomorrow = start + datetime.timedelta(days=1)
aftertomorrow = start + datetime.timedelta(days=2)
recurrency = rrule.rrule(
rrule.DAILY, interval=1, dtstart=tomorrow, until=end
)
recurrency = rrule.rrule(rrule.DAILY, interval=1, dtstart=tomorrow, until=end)

# WHEN
t = scheduler.schedule_task(RecurrentScheduledTask(increase_counter, recurrency, clock))
t = scheduler.schedule_task(
RecurrentScheduledTask(increase_counter, recurrency, clock)
)
task = scheduler._scheduled_tasks[0][0]
try:
clock.set_time(start.timestamp())
Expand Down Expand Up @@ -175,6 +179,8 @@ async def increase_counter():
with pytest.raises(asyncio.exceptions.TimeoutError):
await asyncio.wait_for(t, timeout=0.2)

scheduler.shutdown()

# THEN
assert len(l) == 0

Expand Down

0 comments on commit 2c1a149

Please sign in to comment.