Skip to content

Commit

Permalink
[BACKPORT] Callback on quota allocation failure & bump version (#516) (
Browse files Browse the repository at this point in the history
…#518)

* Callback on quota allocation failure & bump version (#516)
* Adopt to latest numpy and pyarrow
  • Loading branch information
wjsi authored and Xuye (Chris) Qin committed Jul 5, 2019
1 parent 2a34b0a commit bc1765e
Show file tree
Hide file tree
Showing 7 changed files with 58 additions and 17 deletions.
3 changes: 3 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ matrix:
language: generic
env: PYTHON=3.7

addons:
apt:
update: true
before_install:
- source ./bin/travis-install-conda.sh
- if [[ "$PYTHON" =~ "2" ]]; then
Expand Down
4 changes: 3 additions & 1 deletion bin/travis-install-conda.sh
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ if [[ $TRAVIS_OS_NAME == 'osx' ]]; then
ulimit -n 1024
CONDA_OS="MacOSX"
elif [[ $TRAVIS_OS_NAME == 'linux' ]]; then
sudo apt-get install -y liblz4-dev
CONDA_OS="Linux"
fi

Expand All @@ -17,7 +18,8 @@ fi

curl -s -o miniconda.sh https://repo.continuum.io/miniconda/$CONDA_FILE
bash miniconda.sh -b -p $HOME/miniconda && rm miniconda.sh
$HOME/miniconda/bin/conda create --quiet --yes -n test python=$PYTHON virtualenv gevent psutil pyyaml nomkl
$HOME/miniconda/bin/conda create --quiet --yes -n test python=$PYTHON virtualenv gevent psutil \
pyyaml nomkl libopenblas lz4
export PATH="$HOME/miniconda/envs/test/bin:$HOME/miniconda/bin:$PATH"

#check python version
Expand Down
2 changes: 1 addition & 1 deletion mars/_version.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import os
import sys

version_info = (0, 1, 3)
version_info = (0, 1, 4)
_num_index = max(idx if isinstance(v, int) else 0
for idx, v in enumerate(version_info))
__version__ = '.'.join(map(str, version_info[:_num_index + 1])) + \
Expand Down
2 changes: 1 addition & 1 deletion mars/graph.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -461,7 +461,7 @@ cdef class DAG(DirectedGraph):
succ_checker = succ_checker or _default_succ_checker

stack = list((p for p, l in preds.items() if len(l) == 0))
if not stack:
if self._nodes and not stack:
raise GraphContainsCycleError
while stack:
node = stack.pop()
Expand Down
3 changes: 3 additions & 0 deletions mars/scheduler/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,9 @@ def _detect_cancel(callback=None):
self.state = GraphState.FAILED
raise

if len(self._chunk_graph_cache) == 0:
self.state = GraphState.SUCCEEDED

def stop_graph(self):
"""
Stop graph execution
Expand Down
25 changes: 24 additions & 1 deletion mars/scheduler/tests/test_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,10 @@

import mars.tensor as mt
from mars.scheduler import GraphActor, ResourceActor, ChunkMetaActor, AssignerActor
from mars.scheduler.utils import SchedulerClusterInfoActor
from mars.scheduler.utils import SchedulerClusterInfoActor, GraphState
from mars.utils import serialize_graph, get_next_port
from mars.actors import create_actor_pool
from mars.graph import DAG


class Test(unittest.TestCase):
Expand Down Expand Up @@ -136,3 +137,25 @@ def testMultipleAdd(self):
a = mt.array(base_arr)
sumv = reduce(operator.add, [a[:10, :10] for _ in range(10)])
self.run_expr_suite(sumv)

def testEmptyGraph(self, *_):
session_id = str(uuid.uuid4())

addr = '127.0.0.1:%d' % get_next_port()
with create_actor_pool(n_process=1, backend='gevent', address=addr) as pool:
pool.create_actor(SchedulerClusterInfoActor, [pool.cluster_info.address],
uid=SchedulerClusterInfoActor.default_uid())
resource_ref = pool.create_actor(ResourceActor, uid=ResourceActor.default_uid())
pool.create_actor(ChunkMetaActor, uid=ChunkMetaActor.default_uid())
pool.create_actor(AssignerActor, uid=AssignerActor.default_uid())

resource_ref.set_worker_meta('localhost:12345', dict(hardware=dict(cpu_total=4)))
resource_ref.set_worker_meta('localhost:23456', dict(hardware=dict(cpu_total=4)))

graph_key = str(uuid.uuid4())
serialized_graph = serialize_graph(DAG())

graph_ref = pool.create_actor(GraphActor, session_id, graph_key, serialized_graph,
uid=GraphActor.gen_uid(session_id, graph_key))
graph_ref.execute_graph()
self.assertEqual(graph_ref.get_state(), GraphState.SUCCEEDED)
36 changes: 23 additions & 13 deletions mars/worker/quota.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,10 @@
# limitations under the License.

import logging
import sys
import time

from .. import resource
from .. import resource, promise
from ..compat import six, OrderedDict3
from ..utils import log_unhandled
from .utils import WorkerActor
Expand Down Expand Up @@ -59,6 +60,7 @@ def _log_allocate(self, msg, *args, **kwargs):
args += (self._allocated_size, self._total_size)
logger.debug(msg + ' Allocated: %s, Total size: %s', *args, **kwargs)

@promise.reject_on_exception
@log_unhandled
def request_batch_quota(self, batch, callback):
"""
Expand Down Expand Up @@ -88,6 +90,7 @@ def request_batch_quota(self, batch, callback):
# make allocated requests the highest priority to be allocated
return self._request_quota(keys, values, delta, callback, make_first=all_allocated)

@promise.reject_on_exception
@log_unhandled
def request_quota(self, key, quota_size, callback):
"""
Expand Down Expand Up @@ -295,20 +298,27 @@ def _process_requests(self):
removed = []
for k, req in six.iteritems(self._requests):
req_size, delta, req_time, callbacks = req
if self._has_space(delta):
self.apply_allocation(k, req_size, handle_shrink=False)
for cb in callbacks:
self.tell_promise(cb)
if self._status_ref:
self._status_ref.update_mean_stats(
'wait_time.' + self.uid.replace('Actor', ''), time.time() - req_time,
_tell=True, _wait=False)
try:
if self._has_space(delta):
self.apply_allocation(k, req_size, handle_shrink=False)
for cb in callbacks:
self.tell_promise(cb)
if self._status_ref:
self._status_ref.update_mean_stats(
'wait_time.' + self.uid.replace('Actor', ''), time.time() - req_time,
_tell=True, _wait=False)
removed.append(k)
else:
# Quota left cannot satisfy the next request, we quit
break
except: # noqa: E722
removed.append(k)
else:
# Quota left cannot satisfy the next request, we quit
break
# just in case the quota is allocated
self.release_quota(k)
for cb in callbacks:
self.tell_promise(cb, *sys.exc_info(), **dict(_accept=False))
for k in removed:
del self._requests[k]
self._requests.pop(k, None)


class MemQuotaActor(QuotaActor):
Expand Down

0 comments on commit bc1765e

Please sign in to comment.