Skip to content

Commit

Permalink
Callback on quota allocation failure & bump version (#516)
Browse files Browse the repository at this point in the history
  • Loading branch information
wjsi authored and Xuye (Chris) Qin committed Jul 5, 2019
1 parent fb3a796 commit 5b240a8
Show file tree
Hide file tree
Showing 4 changed files with 77 additions and 17 deletions.
2 changes: 1 addition & 1 deletion mars/_version.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import os
import sys

version_info = (0, 2, 0, 'b1')
version_info = (0, 2, 0, 'b2')
_num_index = max(idx if isinstance(v, int) else 0
for idx, v in enumerate(version_info))
__version__ = '.'.join(map(str, version_info[:_num_index + 1])) + \
Expand Down
23 changes: 23 additions & 0 deletions mars/scheduler/tests/test_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from mars.scheduler.utils import SchedulerClusterInfoActor
from mars.utils import serialize_graph, get_next_port
from mars.actors import create_actor_pool
from mars.graph import DAG
from mars.tests.core import patch_method


Expand Down Expand Up @@ -176,6 +177,28 @@ def testGraphTermination(self, *_):

self.assertEqual(graph_ref.get_state(), GraphState.FAILED)

def testEmptyGraph(self, *_):
session_id = str(uuid.uuid4())

addr = '127.0.0.1:%d' % get_next_port()
with create_actor_pool(n_process=1, backend='gevent', address=addr) as pool:
pool.create_actor(SchedulerClusterInfoActor, [pool.cluster_info.address],
uid=SchedulerClusterInfoActor.default_uid())
resource_ref = pool.create_actor(ResourceActor, uid=ResourceActor.default_uid())
pool.create_actor(ChunkMetaActor, uid=ChunkMetaActor.default_uid())
pool.create_actor(AssignerActor, uid=AssignerActor.default_uid())

resource_ref.set_worker_meta('localhost:12345', dict(hardware=dict(cpu_total=4)))
resource_ref.set_worker_meta('localhost:23456', dict(hardware=dict(cpu_total=4)))

graph_key = str(uuid.uuid4())
serialized_graph = serialize_graph(DAG())

graph_ref = pool.create_actor(GraphActor, session_id, graph_key, serialized_graph,
uid=GraphActor.gen_uid(session_id, graph_key))
graph_ref.execute_graph()
self.assertEqual(graph_ref.get_state(), GraphState.SUCCEEDED)

def testErrorOnPrepare(self, *_):
session_id = str(uuid.uuid4())

Expand Down
38 changes: 24 additions & 14 deletions mars/worker/quota.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,11 @@

import itertools
import logging
import sys
import time
from collections import namedtuple

from .. import resource
from .. import resource, promise
from ..compat import OrderedDict3
from ..utils import log_unhandled
from .utils import WorkerActor
Expand Down Expand Up @@ -62,6 +63,7 @@ def _log_allocate(self, msg, *args, **kwargs):
args += (self._allocated_size, self._total_size)
logger.debug(msg + ' Allocated: %s, Total size: %s', *args, **kwargs)

@promise.reject_on_exception
@log_unhandled
def request_batch_quota(self, batch, callback=None):
"""
Expand Down Expand Up @@ -93,6 +95,7 @@ def request_batch_quota(self, batch, callback=None):
return self._request_quota(keys, values, delta, callback, multiple=True,
make_first=all_allocated)

@promise.reject_on_exception
@log_unhandled
def request_quota(self, key, quota_size, callback=None):
"""
Expand Down Expand Up @@ -339,21 +342,28 @@ def _process_requests(self):
removed = []
for k, req in self._requests.items():
req_size, delta, req_time, multiple, callbacks = req
if self._has_space(delta):
alter_allocation = self.alter_allocations if multiple else self.alter_allocation
alter_allocation(k, req_size, handle_shrink=False)
for cb in callbacks:
self.tell_promise(cb)
if self._status_ref:
self._status_ref.update_mean_stats(
'wait_time.' + self.uid.replace('Actor', ''), time.time() - req_time,
_tell=True, _wait=False)
try:
if self._has_space(delta):
alter_allocation = self.alter_allocations if multiple else self.alter_allocation
alter_allocation(k, req_size, handle_shrink=False)
for cb in callbacks:
self.tell_promise(cb)
if self._status_ref:
self._status_ref.update_mean_stats(
'wait_time.' + self.uid.replace('Actor', ''), time.time() - req_time,
_tell=True, _wait=False)
removed.append(k)
else:
# Quota left cannot satisfy the next request, we quit
break
except: # noqa: E722
removed.append(k)
else:
# Quota left cannot satisfy the next request, we quit
break
# just in case the quota is allocated
self.release_quota(k)
for cb in callbacks:
self.tell_promise(cb, *sys.exc_info(), **dict(_accept=False))
for k in removed:
del self._requests[k]
self._requests.pop(k, None)


class MemQuotaActor(QuotaActor):
Expand Down
31 changes: 29 additions & 2 deletions mars/worker/tests/test_quota.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@

class Test(WorkerCase):
def testQuota(self):
def _raiser(*_, **__):
raise ValueError

local_pool_addr = 'localhost:%d' % get_next_port()
with create_actor_pool(n_process=1, backend='gevent', address=local_pool_addr) as pool:
pool.create_actor(WorkerClusterInfoActor, schedulers=[local_pool_addr],
Expand Down Expand Up @@ -68,10 +71,23 @@ def testQuota(self):

self.assertNotIn('2', quota_ref.dump_data().allocations)

ref.cancel_requests(('1',), reject_exc=build_exc_info(ValueError))
with self.assertRaises(ValueError):
ref.cancel_requests(('1',), reject_exc=build_exc_info(OSError))
with self.assertRaises(OSError):
self.get_result(5)

with patch_method(QuotaActor._request_quota, new=_raiser):
ref.request_quota('err_raise', 1, _promise=True) \
.catch(lambda *exc: test_actor.set_result(exc, accept=False))

with self.assertRaises(ValueError):
self.get_result(5)

ref.request_batch_quota({'err_raise': 1}, _promise=True) \
.catch(lambda *exc: test_actor.set_result(exc, accept=False))

with self.assertRaises(ValueError):
self.get_result(5)

self.assertNotIn('1', quota_ref.dump_data().requests)
self.assertIn('2', quota_ref.dump_data().allocations)
self.assertNotIn('3', quota_ref.dump_data().allocations)
Expand All @@ -83,6 +99,17 @@ def testQuota(self):
quota_ref.alter_allocations(['3'], [50])
self.assertIn('4', quota_ref.dump_data().allocations)

with self.run_actor_test(pool) as test_actor:
ref = test_actor.promise_ref(QuotaActor.default_uid())
ref.request_quota('5', 50, _promise=True) \
.catch(lambda *exc: test_actor.set_result(exc, accept=False))

with patch_method(QuotaActor.alter_allocation, new=_raiser):
quota_ref.release_quota('2')

with self.assertRaises(ValueError):
self.get_result(5)

def testQuotaAllocation(self):
local_pool_addr = 'localhost:%d' % get_next_port()
with create_actor_pool(n_process=1, backend='gevent', address=local_pool_addr) as pool:
Expand Down

0 comments on commit 5b240a8

Please sign in to comment.