Skip to content

Commit

Permalink
Implementation of taskvine allpairs/map/reduce (#4011)
Browse files Browse the repository at this point in the history
* Implementation of taskvine allpairs/map/reduce

* lint

* lint v2

* cleanup code

* cleanup reduce

* add test

* remove debug print

* cleanup map

* format

* allpairs in terms of map

* format

* do not create lib in map

* error on lib name

---------

Co-authored-by: Kevin Xue <[email protected]>
Co-authored-by: Benjamin Tovar <[email protected]>
  • Loading branch information
3 people committed Jan 30, 2025
1 parent 8113f40 commit c1b9c8c
Show file tree
Hide file tree
Showing 3 changed files with 279 additions and 22 deletions.
157 changes: 135 additions & 22 deletions taskvine/src/bindings/python3/ndcctools/taskvine/futures.py
Original file line number Diff line number Diff line change
@@ -1,30 +1,33 @@

from . import cvine
import hashlib
from collections import deque
from concurrent.futures import Executor
from concurrent.futures import Future
from concurrent.futures import FIRST_COMPLETED
from concurrent.futures import FIRST_EXCEPTION
from concurrent.futures import ALL_COMPLETED
from concurrent.futures._base import PENDING
from concurrent.futures._base import CANCELLED
from concurrent.futures._base import FINISHED
from collections import deque, namedtuple
from concurrent.futures import (
Executor,
Future,
FIRST_COMPLETED,
FIRST_EXCEPTION,
ALL_COMPLETED,
)
from concurrent.futures._base import PENDING, CANCELLED, FINISHED
from concurrent.futures import TimeoutError
from collections import namedtuple

from .task import (
PythonTask,
FunctionCall,
FunctionCallNoResult,
)

from .manager import (
Factory,
Manager,
)

import math
import os
import time
import textwrap
from functools import partial
from collections.abc import Sequence

RESULT_PENDING = 'result_pending'

Expand Down Expand Up @@ -109,7 +112,7 @@ def as_completed(fs, timeout=None):
f.module_manager.submit(f._task)

start = time.perf_counter()
result_timeout = min(timeout, 5) if timeout is not None else 5
result_timeout = max(1, min(timeout, 5)) if timeout else 5

def _iterator():
# iterate of queue of futures, yeilding completed futures and
Expand All @@ -133,22 +136,39 @@ def _iterator():
assert result != RESULT_PENDING
yield f

if (
fs and timeout is not None
and time.perf_counter() - start > timeout
):
if fs and timeout and time.perf_counter() - start > timeout:
raise TimeoutError()

return _iterator()


def run_iterable(fn, *args):
return list(map(fn, args))


def reduction_tree(fn, *args, n=2):
# n is the arity of the reduction function fn
# if less than 2, we have an infinite loop
assert n > 1
entries = [f.result() if isinstance(f, VineFuture) else f for f in args]
if len(entries) < 2:
return entries[0]

len_multiple = int(math.ceil(len(entries) / n) * n)
new_args = map(fn, [entries[i:i + n] for i in range(0, len_multiple, n)])

return reduction_tree(fn, *new_args, n=n)

##
# \class FuturesExecutor
#
# TaskVine FuturesExecutor object
#
# This class acts as an interface for the creation of Futures


class FuturesExecutor(Executor):

def __init__(self, port=9123, batch_type="local", manager=None, manager_host_port=None, manager_name=None, factory_binary=None, worker_binary=None, log_file=os.devnull, factory=True, opts={}):
self.manager = Manager(port=port)
self.port = self.manager.port
Expand All @@ -173,6 +193,100 @@ def __init__(self, port=9123, batch_type="local", manager=None, manager_host_por
else:
self.factory = None

def map(self, fn, iterable, library_name=None, chunk_size=1):
assert chunk_size > 0
assert isinstance(iterable, Sequence)

def wait_for_map_resolution(*futures_batch):
result = []
for f in futures_batch:
result.extend(f.result() if isinstance(f, VineFuture) else f)
return result

tasks = []
fn_wrapped = partial(run_iterable, fn)
while iterable:
heads, iterable = iterable[:chunk_size], iterable[chunk_size:]

if library_name:
raise NotImplementedError("Using a library not currently supported.")
future_batch_task = self.submit(self.future_funcall(library_name, fn_wrapped, *heads))
else:
future_batch_task = self.submit(self.future_task(fn_wrapped, *heads))

tasks.append(future_batch_task)

return self.submit(self.future_task(wait_for_map_resolution, *tasks))

# Reduce performs a reduction tree on the iterable and currently returns a single value
#
# parameters:
# - Function
# - a function that receives fn_arity arguments
# - A sequence of parameters that function will take
# - a chunk_size to group elements in sequence to dispatch to a single task
# - arity of the function, elements of a chunk are reduce arity-wise.
# - an optional library_name for a library function call
def reduce(self, fn, iterable, library_name=None, chunk_size=2, fn_arity=2):
assert chunk_size > 1
assert fn_arity > 1
assert isinstance(iterable, Sequence)
chunk_size = max(fn_arity, chunk_size)

new_iterable = []
while iterable:
heads, iterable = iterable[:chunk_size], iterable[chunk_size:]
heads = [f.result() if isinstance(f, VineFuture) else f for f in heads]
if library_name:
raise NotImplementedError("Using a library not currently supported.")
future_batch_task = self.submit(
self.future_funcall(
library_name, reduction_tree, fn, *heads, n=fn_arity
)
)
else:
future_batch_task = self.submit(self.future_task(reduction_tree, fn, *heads, n=fn_arity))

new_iterable.append(future_batch_task)

if len(new_iterable) > 1:
return self.reduce(fn, new_iterable, library_name, chunk_size, fn_arity)
else:
return new_iterable[0]

def allpairs(self, fn, iterable_rows, iterable_cols, library_name=None, chunk_size=1):
assert chunk_size > 0
assert isinstance(iterable_rows, Sequence)
assert isinstance(iterable_cols, Sequence)

def wait_for_allpairs_resolution(row_size, col_size, mapped):
result = []
for _ in range(row_size):
result.append([0] * col_size)

mapped = mapped.result() if isinstance(mapped, VineFuture) else mapped
for p in mapped:
(i, j, r) = p.result() if isinstance(p, VineFuture) else p
result[i][j] = r

return result

def wrap_idx(args):
i, j, a, b = args
return (i, j, fn(a, b))

iterable = [(i, j, a, b) for (i, a) in enumerate(iterable_rows) for (j, b) in enumerate(iterable_cols)]
mapped = self.map(wrap_idx, iterable, library_name, chunk_size)

return self.submit(
self.future_task(
wait_for_allpairs_resolution,
len(iterable_rows),
len(iterable_cols),
mapped,
)
)

def submit(self, fn, *args, **kwargs):
if isinstance(fn, (FuturePythonTask, FutureFunctionCall)):
self.manager.submit(fn)
Expand Down Expand Up @@ -240,15 +354,15 @@ def cancelled(self):
return False

def running(self):
state = self._task.state
if state == "RUNNING":
state = self._task._module_manager.task_state(self._task.id)
if state == cvine.VINE_TASK_RUNNING:
return True
else:
return False

def done(self):
state = self._task.state
if state == "DONE" or state == "RETRIEVED":
state = self._task._module_manager.task_state(self._task.id)
if state == cvine.VINE_TASK_DONE:
return True
else:
return False
Expand Down Expand Up @@ -301,7 +415,6 @@ def __init__(self, manager, library_name, fn, *args, **kwargs):
self.manager = manager
self.library_name = library_name
self._envs = []

self._future = VineFuture(self)
self._has_retrieved = False

Expand All @@ -326,7 +439,6 @@ def output(self, timeout="wait_forever"):
self._saved_output = output['Result']
else:
self._saved_output = FunctionCallNoResult(output['Reason'])

except Exception as e:
self._saved_output = e
else:
Expand Down Expand Up @@ -400,6 +512,7 @@ def output(self, timeout="wait_forever"):
# task or the exception object of a failed task.
self._output = cloudpickle.loads(self._output_file.contents())
except Exception as e:
print(self._output_file.contents())
# handle output file fetch/deserialization failures
self._output = e
self._output_loaded = True
Expand Down
84 changes: 84 additions & 0 deletions taskvine/test/TR_vine_python_future_hof.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
#!/bin/sh

set -e

. ../../dttools/test/test_runner_common.sh

import_config_val CCTOOLS_PYTHON_TEST_EXEC
import_config_val CCTOOLS_PYTHON_TEST_DIR

export PYTHONPATH=$(pwd)/../../test_support/python_modules/${CCTOOLS_PYTHON_TEST_DIR}:$PYTHONPATH

STATUS_FILE=vine.status
PORT_FILE=vine.port

check_needed()
{
[ -n "${CCTOOLS_PYTHON_TEST_EXEC}" ] || return 1

# Poncho currently requires ast.unparse to serialize the function,
# which only became available in Python 3.9. Some older platforms
# (e.g. almalinux8) will not have this natively.
"${CCTOOLS_PYTHON_TEST_EXEC}" -c "from ast import unparse" || return 1

# In some limited build circumstances (e.g. macos build on github),
# poncho doesn't work due to lack of conda-pack or cloudpickle
"${CCTOOLS_PYTHON_TEST_EXEC}" -c "import conda_pack" || return 1
"${CCTOOLS_PYTHON_TEST_EXEC}" -c "import cloudpickle" || return 1

return 0
}

prepare()
{
rm -f $STATUS_FILE
rm -f $PORT_FILE
return 0
}

run()
{
( ${CCTOOLS_PYTHON_TEST_EXEC} vine_python_future_hof.py $PORT_FILE; echo $? > $STATUS_FILE ) &

# wait at most 15 seconds for vine to find a port.
wait_for_file_creation $PORT_FILE 15

run_taskvine_worker $PORT_FILE worker.log --cores 2 --memory 2000 --disk 2000

# wait for vine to exit.
wait_for_file_creation $STATUS_FILE 15

# retrieve exit status
status=$(cat $STATUS_FILE)
if [ $status -ne 0 ]
then
# display log files in case of failure.
logfile=$(latest_vine_debug_log)
if [ -f ${logfile} ]
then
echo "master log:"
cat ${logfile}
fi

if [ -f worker.log ]
then
echo "worker log:"
cat worker.log
fi

exit 1
fi

exit 0
}

clean()
{
rm -f $STATUS_FILE
rm -f $PORT_FILE
rm -rf vine-run-info
exit 0
}


dispatch "$@"
60 changes: 60 additions & 0 deletions taskvine/test/vine_python_future_hof.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
#! /usr/bin/env python

import sys
import ndcctools.taskvine as vine

port_file = None
try:
port_file = sys.argv[1]
except IndexError:
sys.stderr.write("Usage: {} PORTFILE\n".format(sys.argv[0]))
raise

def main():
executor = vine.FuturesExecutor(
port=[9123, 9129], manager_name="vine_hof_test", factory=False
)

print("listening on port {}".format(executor.manager.port))
with open(port_file, "w") as f:
f.write(str(executor.manager.port))

nums = list(range(101))

rows = 3
mult_table = executor.allpairs(lambda x, y: x*y, range(rows), nums, chunk_size=11).result()
assert sum(mult_table[1]) == sum(nums)
assert sum(sum(r) for r in mult_table) == sum(sum(nums) * n for n in range(rows))

doubles = executor.map(lambda x: 2*x, nums, chunk_size=10).result()
assert sum(doubles) == sum(nums)*2

doubles = executor.map(lambda x: 2*x, nums, chunk_size=13).result()
assert sum(doubles) == sum(nums)*2

maximum = executor.reduce(max, nums, fn_arity=2).result()
assert maximum == 100

maximum = executor.reduce(max, nums, fn_arity=25).result()
assert maximum == 100

maximum = executor.reduce(max, nums, fn_arity=1000).result()
assert maximum == 100

maximum = executor.reduce(max, nums, fn_arity=2, chunk_size=50).result()
assert maximum == 100

minimum = executor.reduce(min, nums, fn_arity=2, chunk_size=50).result()
assert minimum == 0

total = executor.reduce(sum, nums, fn_arity=11, chunk_size=13).result()
assert total == sum(nums)




if __name__ == "__main__":
main()


# vim: set sts=4 sw=4 ts=4 expandtab ft=python:

0 comments on commit c1b9c8c

Please sign in to comment.