Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implementation of taskvine allpairs/map/reduce #4011

Merged
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
159 changes: 137 additions & 22 deletions taskvine/src/bindings/python3/ndcctools/taskvine/futures.py
Original file line number Diff line number Diff line change
@@ -1,30 +1,33 @@

from . import cvine
import hashlib
from collections import deque
from concurrent.futures import Executor
from concurrent.futures import Future
from concurrent.futures import FIRST_COMPLETED
from concurrent.futures import FIRST_EXCEPTION
from concurrent.futures import ALL_COMPLETED
from concurrent.futures._base import PENDING
from concurrent.futures._base import CANCELLED
from concurrent.futures._base import FINISHED
from collections import deque, namedtuple
from concurrent.futures import (
Executor,
Future,
FIRST_COMPLETED,
FIRST_EXCEPTION,
ALL_COMPLETED,
)
from concurrent.futures._base import PENDING, CANCELLED, FINISHED
from concurrent.futures import TimeoutError
from collections import namedtuple

from .task import (
PythonTask,
FunctionCall,
FunctionCallNoResult,
)

from .manager import (
Factory,
Manager,
)

import math
import os
import time
import textwrap
from functools import partial
from collections.abc import Sequence

RESULT_PENDING = 'result_pending'

Expand Down Expand Up @@ -109,7 +112,7 @@ def as_completed(fs, timeout=None):
f.module_manager.submit(f._task)

start = time.perf_counter()
result_timeout = min(timeout, 5) if timeout is not None else 5
result_timeout = max(1, min(timeout, 5)) if timeout else 5

def _iterator():
# iterate of queue of futures, yeilding completed futures and
Expand All @@ -133,22 +136,39 @@ def _iterator():
assert result != RESULT_PENDING
yield f

if (
fs and timeout is not None
and time.perf_counter() - start > timeout
):
if fs and timeout and time.perf_counter() - start > timeout:
raise TimeoutError()

return _iterator()


def run_iterable(fn, *args):
return list(map(fn, args))


def reduction_tree(fn, *args, n=2):
# n is the arity of the reduction function fn
# if less than 2, we have an infinite loop
assert n > 1
entries = [f.result() if isinstance(f, VineFuture) else f for f in args]
if len(entries) < 2:
return entries[0]

len_multiple = int(math.ceil(len(entries) / n) * n)
new_args = map(fn, [entries[i:i + n] for i in range(0, len_multiple, n)])

return reduction_tree(fn, *new_args, n=n)

##
# \class FuturesExecutor
#
# TaskVine FuturesExecutor object
#
# This class acts as an interface for the creation of Futures


class FuturesExecutor(Executor):

def __init__(self, port=9123, batch_type="local", manager=None, manager_host_port=None, manager_name=None, factory_binary=None, worker_binary=None, log_file=os.devnull, factory=True, opts={}):
self.manager = Manager(port=port)
self.port = self.manager.port
Expand All @@ -173,6 +193,102 @@ def __init__(self, port=9123, batch_type="local", manager=None, manager_host_por
else:
self.factory = None

def map(self, fn, iterable, library_name=None, chunk_size=1):
assert chunk_size > 0
assert isinstance(iterable, Sequence)

def wait_for_map_resolution(*futures_batch):
result = []
for f in futures_batch:
result.extend(f.result() if isinstance(f, VineFuture) else f)
return result

tasks = []
fn_wrapped = partial(run_iterable, fn)
if library_name:
libtask = self.create_library_from_functions(library_name, fn_wrapped)
self.install_library(libtask)

while iterable:
heads, iterable = iterable[:chunk_size], iterable[chunk_size:]

if library_name:
future_batch_task = self.submit(self.future_funcall(library_name, fn_wrapped, *heads))
else:
future_batch_task = self.submit(self.future_task(fn_wrapped, *heads))

tasks.append(future_batch_task)

return self.submit(self.future_task(wait_for_map_resolution, *tasks))

# Reduce performs a reduction tree on the iterable and currently returns a single value
#
# parameters:
# - Function
# - a function that receives fn_arity arguments
# - A sequence of parameters that function will take
# - a chunk_size to group elements in sequence to dispatch to a single task
# - arity of the function, elements of a chunk are reduce arity-wise.
# - an optional library_name for a library function call
def reduce(self, fn, iterable, library_name=None, chunk_size=2, fn_arity=2):
assert chunk_size > 1
assert fn_arity > 1
assert isinstance(iterable, Sequence)
chunk_size = max(fn_arity, chunk_size)

new_iterable = []
while iterable:
heads, iterable = iterable[:chunk_size], iterable[chunk_size:]
heads = [f.result() if isinstance(f, VineFuture) else f for f in heads]
if library_name:
future_batch_task = self.submit(
self.future_funcall(
library_name, reduction_tree, fn, *heads, n=fn_arity
)
)
else:
future_batch_task = self.submit(self.future_task(reduction_tree, fn, *heads, n=fn_arity))

new_iterable.append(future_batch_task)

if len(new_iterable) > 1:
return self.reduce(fn, new_iterable, library_name, chunk_size, fn_arity)
else:
return new_iterable[0]

def allpairs(self, fn, iterable_rows, iterable_cols, library_name=None, chunk_size=1):
assert chunk_size > 0
assert isinstance(iterable_rows, Sequence)
assert isinstance(iterable_cols, Sequence)

def wait_for_allpairs_resolution(row_size, col_size, mapped):
result = []
for _ in range(row_size):
result.append([0] * col_size)

mapped = mapped.result() if isinstance(mapped, VineFuture) else mapped
for p in mapped:
(i, j, r) = p.result() if isinstance(p, VineFuture) else p
result[i][j] = r

return result

def wrap_idx(args):
i, j, a, b = args
return (i, j, fn(a, b))

iterable = [(i, j, a, b) for (i, a) in enumerate(iterable_rows) for (j, b) in enumerate(iterable_cols)]
mapped = self.map(wrap_idx, iterable, library_name, chunk_size)

return self.submit(
self.future_task(
wait_for_allpairs_resolution,
len(iterable_rows),
len(iterable_cols),
mapped,
)
)

def submit(self, fn, *args, **kwargs):
if isinstance(fn, FuturePythonTask):
self.manager.submit(fn)
Expand Down Expand Up @@ -246,15 +362,15 @@ def cancelled(self):
return False

def running(self):
state = self._task.state
if state == "RUNNING":
state = self._task._module_manager.task_state(self._task.id)
if state == cvine.VINE_TASK_RUNNING:
return True
else:
return False

def done(self):
state = self._task.state
if state == "DONE" or state == "RETRIEVED":
state = self._task._module_manager.task_state(self._task.id)
if state == cvine.VINE_TASK_DONE:
return True
else:
return False
Expand Down Expand Up @@ -307,7 +423,6 @@ def __init__(self, manager, library_name, fn, *args, **kwargs):
self.manager = manager
self.library_name = library_name
self._envs = []

self._future = VineFuture(self)
self._has_retrieved = False

Expand All @@ -332,7 +447,6 @@ def output(self, timeout="wait_forever"):
self._saved_output = output['Result']
else:
self._saved_output = FunctionCallNoResult(output['Reason'])

except Exception as e:
self._saved_output = e
else:
Expand Down Expand Up @@ -406,6 +520,7 @@ def output(self, timeout="wait_forever"):
# task or the exception object of a failed task.
self._output = cloudpickle.loads(self._output_file.contents())
except Exception as e:
print(self._output_file.contents())
# handle output file fetch/deserialization failures
self._output = e
self._output_loaded = True
Expand Down
84 changes: 84 additions & 0 deletions taskvine/test/TR_vine_python_future_hof.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
#!/bin/sh

set -e

. ../../dttools/test/test_runner_common.sh

import_config_val CCTOOLS_PYTHON_TEST_EXEC
import_config_val CCTOOLS_PYTHON_TEST_DIR

export PYTHONPATH=$(pwd)/../../test_support/python_modules/${CCTOOLS_PYTHON_TEST_DIR}:$PYTHONPATH

STATUS_FILE=vine.status
PORT_FILE=vine.port

check_needed()
{
[ -n "${CCTOOLS_PYTHON_TEST_EXEC}" ] || return 1

# Poncho currently requires ast.unparse to serialize the function,
# which only became available in Python 3.9. Some older platforms
# (e.g. almalinux8) will not have this natively.
"${CCTOOLS_PYTHON_TEST_EXEC}" -c "from ast import unparse" || return 1

# In some limited build circumstances (e.g. macos build on github),
# poncho doesn't work due to lack of conda-pack or cloudpickle
"${CCTOOLS_PYTHON_TEST_EXEC}" -c "import conda_pack" || return 1
"${CCTOOLS_PYTHON_TEST_EXEC}" -c "import cloudpickle" || return 1

return 0
}

prepare()
{
rm -f $STATUS_FILE
rm -f $PORT_FILE
return 0
}

run()
{
( ${CCTOOLS_PYTHON_TEST_EXEC} vine_python_future_hof.py $PORT_FILE; echo $? > $STATUS_FILE ) &

# wait at most 15 seconds for vine to find a port.
wait_for_file_creation $PORT_FILE 15

run_taskvine_worker $PORT_FILE worker.log --cores 2 --memory 2000 --disk 2000

# wait for vine to exit.
wait_for_file_creation $STATUS_FILE 15

# retrieve exit status
status=$(cat $STATUS_FILE)
if [ $status -ne 0 ]
then
# display log files in case of failure.
logfile=$(latest_vine_debug_log)
if [ -f ${logfile} ]
then
echo "master log:"
cat ${logfile}
fi

if [ -f worker.log ]
then
echo "worker log:"
cat worker.log
fi

exit 1
fi

exit 0
}

clean()
{
rm -f $STATUS_FILE
rm -f $PORT_FILE
rm -rf vine-run-info
exit 0
}


dispatch "$@"
60 changes: 60 additions & 0 deletions taskvine/test/vine_python_future_hof.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
#! /usr/bin/env python

import sys
import ndcctools.taskvine as vine

port_file = None
try:
port_file = sys.argv[1]
except IndexError:
sys.stderr.write("Usage: {} PORTFILE\n".format(sys.argv[0]))
raise

def main():
executor = vine.FuturesExecutor(
port=[9123, 9129], manager_name="vine_hof_test", factory=False
)

print("listening on port {}".format(executor.manager.port))
with open(port_file, "w") as f:
f.write(str(executor.manager.port))

nums = list(range(101))

rows = 3
mult_table = executor.allpairs(lambda x, y: x*y, range(rows), nums, chunk_size=11).result()
assert sum(mult_table[1]) == sum(nums)
assert sum(sum(r) for r in mult_table) == sum(sum(nums) * n for n in range(rows))

doubles = executor.map(lambda x: 2*x, nums, chunk_size=10).result()
assert sum(doubles) == sum(nums)*2

doubles = executor.map(lambda x: 2*x, nums, chunk_size=13).result()
assert sum(doubles) == sum(nums)*2

maximum = executor.reduce(max, nums, fn_arity=2).result()
assert maximum == 100

maximum = executor.reduce(max, nums, fn_arity=25).result()
assert maximum == 100

maximum = executor.reduce(max, nums, fn_arity=1000).result()
assert maximum == 100

maximum = executor.reduce(max, nums, fn_arity=2, chunk_size=50).result()
assert maximum == 100

minimum = executor.reduce(min, nums, fn_arity=2, chunk_size=50).result()
assert minimum == 0

total = executor.reduce(sum, nums, fn_arity=11, chunk_size=13).result()
assert total == sum(nums)


Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add tests for libraries/function calls here as well?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that library calls don't work with futures right now?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mmm, no, the test for funcall future calls works.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, the issue is that the auxiliary functions to run chunks will not be in the library constructed by the user. I guess we can always add them to all libraries that are created?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tphung3 libraries cannot be used right now. The issue is that we cannot compose functions in a library and these high order functions would need to construct the event structure to pass the correct arguments for remote execution.
We could make it work for this case, but I think we would be better if we come up with something more general.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm good with that as long as we have a safeguard of not using functions atm. I see that the code raises error if library is used so that should be safe enough for users.



if __name__ == "__main__":
main()


# vim: set sts=4 sw=4 ts=4 expandtab ft=python:
Loading