Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implementation of taskvine allpairs/map/reduce #4011

Merged
143 changes: 134 additions & 9 deletions taskvine/src/bindings/python3/ndcctools/taskvine/futures.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@

from . import cvine
import hashlib
from collections import deque
from collections import deque, namedtuple
from concurrent.futures import Executor
from concurrent.futures import Future
from concurrent.futures import FIRST_COMPLETED
Expand All @@ -11,7 +10,6 @@
from concurrent.futures._base import CANCELLED
from concurrent.futures._base import FINISHED
from concurrent.futures import TimeoutError
from collections import namedtuple
from .task import (
PythonTask,
FunctionCall,
Expand All @@ -25,6 +23,8 @@
import os
import time
import textwrap
import inspect
from functools import partial

RESULT_PENDING = 'result_pending'

Expand Down Expand Up @@ -142,13 +142,52 @@ def _iterator():
return _iterator()


def run_iterable(fn, iterable, dimensions=1):

if not ((hasattr(iterable, '__iter__') or hasattr(iterable, '__getitem__')) and not isinstance(iterable, str)):
return fn(iterable)
if dimensions < 1:
return None
result = []
if dimensions == 1:
for element in iterable:
if (hasattr(element, '__iter__') or hasattr(element, '__getitem__')) and not isinstance(element, str):
result.append(fn(*element))
else:
result.append(fn(element))
else:
for inner_iterable in iterable:
result.append(run_iterable(fn, inner_iterable, dimensions - 1))
return result


def reduction_tree(fn, *args):

minimum_parameters = len(inspect.signature(fn).parameters)
curr_size = len(args)
entries = deque([f.result() if isinstance(f, VineFuture) else f for f in args])
while curr_size >= minimum_parameters:
parameters = []
for _ in range(minimum_parameters):
parameters.append(entries.popleft())
new_result = fn(*parameters)
if (hasattr(new_result, '__getitem__') or hasattr(new_result, '__iter__')) and not isinstance(new_result, str):
for result in new_result:
entries.appendleft(result)
else:
entries.appendleft(new_result)
curr_size = len(entries)
return entries[0]
##
# \class FuturesExecutor
#
# TaskVine FuturesExecutor object
#
# This class acts as an interface for the creation of Futures


class FuturesExecutor(Executor):

def __init__(self, port=9123, batch_type="local", manager=None, manager_host_port=None, manager_name=None, factory_binary=None, worker_binary=None, log_file=os.devnull, factory=True, opts={}):
self.manager = Manager(port=port)
self.port = self.manager.port
Expand All @@ -173,6 +212,93 @@ def __init__(self, port=9123, batch_type="local", manager=None, manager_host_por
else:
self.factory = None

def map(self, fn, iterable, library_name="Some_Library", method=None, chunk_size=1):
def wait_for_map_resolution(*futures_batch):
result = []
for computed_result in futures_batch:
result.extend(computed_result)
return result
if (hasattr(iterable, '__iter__') or hasattr(iterable, '__getitem__')) and not isinstance(iterable, str):
tasks = []
if method == "FutureFunctionCall": # this currently does not work (error described in PR)
partial_obj = partial(run_iterable, fn)

def partial_func(*args, **kwargs):
return partial_obj(*args, **kwargs)
libtask = self.create_library_from_functions(library_name, partial_func)
self.install_library(libtask)
for i in range(0, len(iterable), chunk_size):
future_batch_task = self.submit(self.future_funcall(library_name, partial_func, iterable[i:i + chunk_size]))
tasks.append(future_batch_task)
else:
for i in range(0, len(iterable), chunk_size):
future_batch_task = self.submit(run_iterable, fn, iterable[i:i + chunk_size])
tasks.append(future_batch_task)
future = self.submit(wait_for_map_resolution, *tasks)
else:
raise Exception('Error: Map function takes in an iterable')
return future

# Reduce performs a reduction tree on the iterable and currently returns a single value
#
# parameters:
# - Function
# - Iterable of parameters that function will take
# - a library_name for a library function call
# - a method
# - a chunk_size, which is the number of iterations to complete in one task. if c is chunk_size, a single task will reduce c(n-1) + 1 nodes to 1 node

def reduce(self, fn, iterable, library_name=None, method=None, chunk_size=1):
# This line is just the identity - since when a future is pickled, it actually becomes some file, which means it is evaluated immediately/sent to queue
if (hasattr(iterable, '__iter__') or hasattr(iterable, '__getitem__')) and not isinstance(iterable, str):
sub_futures = [iterable]
num_parameters = len(inspect.signature(fn).parameters)
reduction_size = chunk_size * (num_parameters - 1)
while len(sub_futures[-1]) > 1 or len(sub_futures) == 1:
layer = []
for i in range(0, len(sub_futures[-1]), reduction_size):
if method == "FutureFunctionCall":
future_batch_task = self.submit(self.future_funcall(library_name, reduction_tree, fn, *[f if isinstance(f, VineFuture) else f for f in sub_futures[-1][i:i + reduction_size]]))
else: # Method is FuturePythonTask
future_batch_task = self.submit(reduction_tree, fn, *[f if isinstance(f, VineFuture) else f for f in sub_futures[-1][i:i + reduction_size]])
layer.append(future_batch_task)
sub_futures.append(layer)
future = sub_futures[-1][0]
# if this is a single value
else:
if method == "FutureFunctionCall":
future = self.submit(self.future_funcall(library_name, reduction_tree, fn, iterable))
else:
future = self.submit(reduction_tree, fn, iterable)
return future

def allpairs(self, fn, iterable_a, iterable_b, library_name=None, method=None, chunk_size=1):
def wait_for_allpairs_resolution(row_size, *futures_batch):
result = []
for computed_result in futures_batch:
result.extend(computed_result)
processed_result = []
for i in range(len(result) // row_size):
row = result[i * row_size:i * row_size + row_size]
processed_result.append(row)
return processed_result
iterable = [(a, b) for b in iterable_b for a in iterable_a]
if (hasattr(iterable, '__iter__') or hasattr(iterable, '__getitem__')) and not isinstance(iterable, str):
tasks = []
for i in range(0, len(iterable), chunk_size):
if method == "FutureFunctionCall":
future_batch_task = self.submit(self.future_funcall(library_name, run_iterable, fn, iterable[i:i + chunk_size]))
else: # Method is FuturePythonTask
future_batch_task = self.submit(run_iterable, fn, iterable[i:i + chunk_size])
tasks.append(future_batch_task)
future = self.submit(wait_for_allpairs_resolution, len(iterable_b), *tasks)
else:
if method == "FutureFunctionCall":
future = self.submit(self.future_funcall(library_name, run_iterable, fn, iterable))
else: # Method is FuturePythonTask
future = self.submit(run_iterable, fn, iterable)
return future

def submit(self, fn, *args, **kwargs):
if isinstance(fn, FuturePythonTask):
self.manager.submit(fn)
Expand Down Expand Up @@ -246,15 +372,15 @@ def cancelled(self):
return False

def running(self):
state = self._task.state
if state == "RUNNING":
state = self._task._module_manager.task_state(self._task.id)
if state == cvine.VINE_TASK_RUNNING:
return True
else:
return False

def done(self):
state = self._task.state
if state == "DONE" or state == "RETRIEVED":
state = self._task._module_manager.task_state(self._task.id)
if state == cvine.VINE_TASK_DONE:
return True
else:
return False
Expand Down Expand Up @@ -307,7 +433,6 @@ def __init__(self, manager, library_name, fn, *args, **kwargs):
self.manager = manager
self.library_name = library_name
self._envs = []

self._future = VineFuture(self)
self._has_retrieved = False

Expand All @@ -332,7 +457,6 @@ def output(self, timeout="wait_forever"):
self._saved_output = output['Result']
else:
self._saved_output = FunctionCallNoResult(output['Reason'])

except Exception as e:
self._saved_output = e
else:
Expand Down Expand Up @@ -406,6 +530,7 @@ def output(self, timeout="wait_forever"):
# task or the exception object of a failed task.
self._output = cloudpickle.loads(self._output_file.contents())
except Exception as e:
print(self._output_file.contents())
# handle output file fetch/deserialization failures
self._output = e
self._output_loaded = True
Expand Down
Loading