Skip to content

Commit

Permalink
Smart cancel of failed process tasks.
Browse files Browse the repository at this point in the history
  • Loading branch information
rcschrg committed Mar 11, 2024
1 parent 2c26c2e commit 760d01b
Show file tree
Hide file tree
Showing 3 changed files with 9 additions and 4 deletions.
2 changes: 1 addition & 1 deletion mango/agent/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -472,7 +472,7 @@ async def shutdown(self):
except asyncio.CancelledError:
pass
try:
self._scheduler.shutdown()
await self._scheduler.shutdown()
except asyncio.CancelledError:
pass
finally:
Expand Down
5 changes: 4 additions & 1 deletion mango/util/scheduling.py
Original file line number Diff line number Diff line change
Expand Up @@ -810,6 +810,8 @@ def _remove_process_task(self, fut=asyncio.Future):
for i in range(len(self._scheduled_process_tasks)):
_, task, event, _ = self._scheduled_process_tasks[i]
if task == fut:
event[0].set()
event[1].set()
del self._scheduled_process_tasks[i]
break

Expand Down Expand Up @@ -869,7 +871,7 @@ async def tasks_complete_or_sleeping(self):
# we need to recognize how many sleeping tasks we have in order to find out if all tasks are done
sleeping_tasks.append(scheduled_task)

def shutdown(self):
async def shutdown(self):
"""
Shutdown internal process executor pool.
"""
Expand All @@ -879,5 +881,6 @@ def shutdown(self):
event[1].set()
for task, _, _, _ in self._scheduled_tasks:
task.close()
await self.stop()
if self._process_pool_exec is not None:
self._process_pool_exec.shutdown()
6 changes: 4 additions & 2 deletions tests/unit_tests/util/scheduling_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ async def increase_counter():
with pytest.raises(asyncio.exceptions.TimeoutError):
await asyncio.wait_for(t, timeout=0.2)

scheduler.shutdown()
await scheduler.shutdown()

# THEN
assert len(l) == 0
Expand Down Expand Up @@ -368,7 +368,9 @@ async def test_task_as_process_suspend():
except asyncio.exceptions.TimeoutError as err:
pass

scheduler.shutdown()
scheduler.resume(marker)

await scheduler.shutdown()


@pytest.mark.asyncio
Expand Down

0 comments on commit 760d01b

Please sign in to comment.