From 5b240a88cb67c87bb390636c1ffee6a63ff79835 Mon Sep 17 00:00:00 2001 From: Wenjun Si Date: Fri, 5 Jul 2019 21:23:38 +0800 Subject: [PATCH] Callback on quota allocation failure & bump version (#516) --- mars/_version.py | 2 +- mars/scheduler/tests/test_graph.py | 23 ++++++++++++++++++ mars/worker/quota.py | 38 +++++++++++++++++++----------- mars/worker/tests/test_quota.py | 31 ++++++++++++++++++++++-- 4 files changed, 77 insertions(+), 17 deletions(-) diff --git a/mars/_version.py b/mars/_version.py index b463441e1d..0108e490b1 100644 --- a/mars/_version.py +++ b/mars/_version.py @@ -16,7 +16,7 @@ import os import sys -version_info = (0, 2, 0, 'b1') +version_info = (0, 2, 0, 'b2') _num_index = max(idx if isinstance(v, int) else 0 for idx, v in enumerate(version_info)) __version__ = '.'.join(map(str, version_info[:_num_index + 1])) + \ diff --git a/mars/scheduler/tests/test_graph.py b/mars/scheduler/tests/test_graph.py index 0415e41a0a..6f62c6a86e 100644 --- a/mars/scheduler/tests/test_graph.py +++ b/mars/scheduler/tests/test_graph.py @@ -23,6 +23,7 @@ from mars.scheduler.utils import SchedulerClusterInfoActor from mars.utils import serialize_graph, get_next_port from mars.actors import create_actor_pool +from mars.graph import DAG from mars.tests.core import patch_method @@ -176,6 +177,28 @@ def testGraphTermination(self, *_): self.assertEqual(graph_ref.get_state(), GraphState.FAILED) + def testEmptyGraph(self, *_): + session_id = str(uuid.uuid4()) + + addr = '127.0.0.1:%d' % get_next_port() + with create_actor_pool(n_process=1, backend='gevent', address=addr) as pool: + pool.create_actor(SchedulerClusterInfoActor, [pool.cluster_info.address], + uid=SchedulerClusterInfoActor.default_uid()) + resource_ref = pool.create_actor(ResourceActor, uid=ResourceActor.default_uid()) + pool.create_actor(ChunkMetaActor, uid=ChunkMetaActor.default_uid()) + pool.create_actor(AssignerActor, uid=AssignerActor.default_uid()) + + resource_ref.set_worker_meta('localhost:12345', dict(hardware=dict(cpu_total=4))) + resource_ref.set_worker_meta('localhost:23456', dict(hardware=dict(cpu_total=4))) + + graph_key = str(uuid.uuid4()) + serialized_graph = serialize_graph(DAG()) + + graph_ref = pool.create_actor(GraphActor, session_id, graph_key, serialized_graph, + uid=GraphActor.gen_uid(session_id, graph_key)) + graph_ref.execute_graph() + self.assertEqual(graph_ref.get_state(), GraphState.SUCCEEDED) + def testErrorOnPrepare(self, *_): session_id = str(uuid.uuid4()) diff --git a/mars/worker/quota.py b/mars/worker/quota.py index 4871e728a6..69d466bf20 100644 --- a/mars/worker/quota.py +++ b/mars/worker/quota.py @@ -14,10 +14,11 @@ import itertools import logging +import sys import time from collections import namedtuple -from .. import resource +from .. import resource, promise from ..compat import OrderedDict3 from ..utils import log_unhandled from .utils import WorkerActor @@ -62,6 +63,7 @@ def _log_allocate(self, msg, *args, **kwargs): args += (self._allocated_size, self._total_size) logger.debug(msg + ' Allocated: %s, Total size: %s', *args, **kwargs) + @promise.reject_on_exception @log_unhandled def request_batch_quota(self, batch, callback=None): """ @@ -93,6 +95,7 @@ def request_batch_quota(self, batch, callback=None): return self._request_quota(keys, values, delta, callback, multiple=True, make_first=all_allocated) + @promise.reject_on_exception @log_unhandled def request_quota(self, key, quota_size, callback=None): """ @@ -339,21 +342,28 @@ def _process_requests(self): removed = [] for k, req in self._requests.items(): req_size, delta, req_time, multiple, callbacks = req - if self._has_space(delta): - alter_allocation = self.alter_allocations if multiple else self.alter_allocation - alter_allocation(k, req_size, handle_shrink=False) - for cb in callbacks: - self.tell_promise(cb) - if self._status_ref: - self._status_ref.update_mean_stats( - 'wait_time.' + self.uid.replace('Actor', ''), time.time() - req_time, - _tell=True, _wait=False) + try: + if self._has_space(delta): + alter_allocation = self.alter_allocations if multiple else self.alter_allocation + alter_allocation(k, req_size, handle_shrink=False) + for cb in callbacks: + self.tell_promise(cb) + if self._status_ref: + self._status_ref.update_mean_stats( + 'wait_time.' + self.uid.replace('Actor', ''), time.time() - req_time, + _tell=True, _wait=False) + removed.append(k) + else: + # Quota left cannot satisfy the next request, we quit + break + except: # noqa: E722 removed.append(k) - else: - # Quota left cannot satisfy the next request, we quit - break + # just in case the quota is allocated + self.release_quota(k) + for cb in callbacks: + self.tell_promise(cb, *sys.exc_info(), **dict(_accept=False)) for k in removed: - del self._requests[k] + self._requests.pop(k, None) class MemQuotaActor(QuotaActor): diff --git a/mars/worker/tests/test_quota.py b/mars/worker/tests/test_quota.py index f61588286e..a3cf931bb5 100644 --- a/mars/worker/tests/test_quota.py +++ b/mars/worker/tests/test_quota.py @@ -26,6 +26,9 @@ class Test(WorkerCase): def testQuota(self): + def _raiser(*_, **__): + raise ValueError + local_pool_addr = 'localhost:%d' % get_next_port() with create_actor_pool(n_process=1, backend='gevent', address=local_pool_addr) as pool: pool.create_actor(WorkerClusterInfoActor, schedulers=[local_pool_addr], @@ -68,10 +71,23 @@ def testQuota(self): self.assertNotIn('2', quota_ref.dump_data().allocations) - ref.cancel_requests(('1',), reject_exc=build_exc_info(ValueError)) - with self.assertRaises(ValueError): + ref.cancel_requests(('1',), reject_exc=build_exc_info(OSError)) + with self.assertRaises(OSError): self.get_result(5) + with patch_method(QuotaActor._request_quota, new=_raiser): + ref.request_quota('err_raise', 1, _promise=True) \ + .catch(lambda *exc: test_actor.set_result(exc, accept=False)) + + with self.assertRaises(ValueError): + self.get_result(5) + + ref.request_batch_quota({'err_raise': 1}, _promise=True) \ + .catch(lambda *exc: test_actor.set_result(exc, accept=False)) + + with self.assertRaises(ValueError): + self.get_result(5) + self.assertNotIn('1', quota_ref.dump_data().requests) self.assertIn('2', quota_ref.dump_data().allocations) self.assertNotIn('3', quota_ref.dump_data().allocations) @@ -83,6 +99,17 @@ def testQuota(self): quota_ref.alter_allocations(['3'], [50]) self.assertIn('4', quota_ref.dump_data().allocations) + with self.run_actor_test(pool) as test_actor: + ref = test_actor.promise_ref(QuotaActor.default_uid()) + ref.request_quota('5', 50, _promise=True) \ + .catch(lambda *exc: test_actor.set_result(exc, accept=False)) + + with patch_method(QuotaActor.alter_allocation, new=_raiser): + quota_ref.release_quota('2') + + with self.assertRaises(ValueError): + self.get_result(5) + def testQuotaAllocation(self): local_pool_addr = 'localhost:%d' % get_next_port() with create_actor_pool(n_process=1, backend='gevent', address=local_pool_addr) as pool: