diff --git a/mango/agent/core.py b/mango/agent/core.py index 98775eb..e3beb0f 100644 --- a/mango/agent/core.py +++ b/mango/agent/core.py @@ -487,10 +487,6 @@ async def shutdown(self): await self._check_inbox_task except asyncio.CancelledError: pass - try: - await self.scheduler.stop() - except asyncio.CancelledError: - pass try: await self.scheduler.shutdown() except asyncio.CancelledError: diff --git a/mango/util/scheduling.py b/mango/util/scheduling.py index b9a921d..10c05df 100644 --- a/mango/util/scheduling.py +++ b/mango/util/scheduling.py @@ -894,13 +894,21 @@ def _remove_generic_task(self, target_list, fut=asyncio.Future): del target_list[i] break + async def stop_tasks(self, task_list): + for i in range(len(task_list) - 1, -1, -1): + _, task, _, _ = task_list[i] + task.cancel() + try: + await task + except asyncio.CancelledError: + pass + async def stop(self): """ Cancel all not finished scheduled tasks """ - for _, task, _, _ in self._scheduled_tasks + self._scheduled_process_tasks: - task.cancel() - await task + await self.stop_tasks(self._scheduled_tasks) + await self.stop_tasks(self._scheduled_process_tasks) async def tasks_complete(self, timeout=1, recursive=False): """Finish all pending tasks using a timeout. @@ -945,8 +953,13 @@ async def shutdown(self): # resume all process so they can get shutdown for _, _, scheduled_process_control, _ in self._scheduled_process_tasks: scheduled_process_control.kill_process() + if len(self._scheduled_tasks) > 0: + logger.debug( + "There are still scheduled tasks running on shutdown %s", + self._scheduled_tasks, + ) + await self.stop() for task, _, _, _ in self._scheduled_tasks: task.close() - await self.stop() if self._process_pool_exec is not None: self._process_pool_exec.shutdown() diff --git a/tests/unit_tests/core/test_agent.py b/tests/unit_tests/core/test_agent.py index 1dd6f1e..e1b7840 100644 --- a/tests/unit_tests/core/test_agent.py +++ b/tests/unit_tests/core/test_agent.py @@ -136,3 +136,24 @@ async def run_this(c): assert agent2.test_counter == 1 asyncio.run(run_this(c)) + + +async def do_weird_stuff(): + fut = asyncio.Future() + await fut + + +@pytest.mark.asyncio +async def test_agent_with_deadlock_task(): + # GIVEN + c = create_tcp_container(addr=("127.0.0.1", 5555)) + agent = c.register(MyAgent()) + + async with activate(c) as c: + t = agent.schedule_instant_task(do_weird_stuff()) + t = agent.schedule_instant_task(do_weird_stuff()) + t = agent.schedule_instant_task(do_weird_stuff()) + t = agent.schedule_instant_task(do_weird_stuff()) + + # THEN + assert len(agent.scheduler._scheduled_tasks) == 0