Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implementation of taskvine allpairs/map/reduce #4011

Merged

Conversation

RamenMode
Copy link
Contributor

@RamenMode RamenMode commented Dec 18, 2024

Proposed Changes

This PR implements a basic implementation of map, reduce, and allpairs for the TaskVine Futures Executor that also supports batching. It currently works with PythonTasks and for small workloads, with additional errors for other cases mentioned below. There is a basic test below. Please note a couple of errors encountered during development. 1. It seems that Futures does not interface well with Condor, so please test locally. This issue was brought up in #3989. 2. Futures executor seems to fail with consecutive sequential workloads, which seems like it may be a downstream issue as a result of the implementation of futures itself. See #3988. 3. Upon talking to @dthain, there is also well-known issue that @tphung3 is tangentially working on which is that dynamically declared functions which are passed into taskvine functions such as executor.map may fail due to the functions not being serializable. This is problematic for this executor in attempting to implement FunctionCalls due to the need for a function to run our function on a list of iterables, which is necessary in an executor batching format. This prevents a FunctionCall implementation from being possible in the current version. The code that runs when method="FutureFunctionCall" is not operable but in the process of implementation. Regardless, this implements a version that works nicely with the default, FuturePythonTasks.

An overview of the functions
map:
executor.map(fn, iterable, library_name="Some_Library", method=None, chunk_size=1)
fn - a function to be executed
iterable - an iterable where each element contains the parameters of fn
library_name - a library name passed in by the user for FunctionCalls (not working)
method - "FutureFunctionCall" for FunctionCalls, otherwise PythonTask
chunk_size - A chunk size to execute the functions on

Returns: a future which returns the iterable when calling future.result()

reduce
executor.reduce(fn, iterable, library_name="Some_Library", method=None, chunk_size=1)
fn - a function to be executed
iterable - an iterable where each element contains the parameters of fn
library_name - a library name passed in by the user for FunctionCalls (not working)
method - "FutureFunctionCall" for FunctionCalls, otherwise PythonTask
chunk_size - A chunk size to execute the functions on

Returns: a future which returns the reduced value when calling future.result()

allpairs:
executor.allpairs(fn, iterable_a, iterable_b, library_name=None, method=None, chunk_size=1):
fn - a function to be executed
iterable_a - an iterable where each element contains the first parameters
iterable_b - an iterable where each element contains the second parameters
library_name - a library name passed in by the user for FunctionCalls (not working)
method - "FutureFunctionCall" for FunctionCalls, otherwise PythonTask
chunk_size - A chunk size to execute the functions on

Returns: a list of lists which returns essentially the cartesian product (combination) when fn is applied

Run a basic test with the following code

import ndcctools.taskvine as vine
import time

def add(x, y):
    return x + y
def sleep(n, dummy):
    time.sleep(n)
def identity(x):
    return x

executor_opts = {"cores": 4, "min-workers": 1, "max-workers": 1}
executor = vine.FuturesExecutor(manager_name="future_test_manager", batch_type="local", opts=executor_opts, port=9124)
executor.manager.enable_peer_transfers()

futures = executor.map(add, [(1, 2), (3, 4)]*5, chunk_size=10)
print(futures.result())

futures = executor.map(identity, ["hello", "world", "!"])
print(futures.result())

futures = executor.allpairs(add, [50, 100, 150, 200], [50, 100, 150, 200], chunk_size=3)
print(futures.result())
futures = executor.reduce(add, [1, 2, 3, 4, 5, 6], chunk_size=2)
print(futures.result())

for i in range(5):
    t = executor.future_task(identity, "hello")
    t.set_cores(1)
    f = executor.submit(t)
    a = f.result()
    print(a)

for i in range(3):
    t = executor.future_task(add, 5, 4)
    t.set_cores(1)
    f = executor.submit(t)
    a = f.result()
    print(a)

Merge Checklist

The following items must be completed before PRs can be merged.
Check these off to verify you have completed all steps.

  • make test Run local tests prior to pushing.
  • make format Format source code to comply with lint policies. Note that some lint errors can only be resolved manually (e.g., Python)
  • make lint Run lint on source code prior to pushing.
  • Manual Update: Update the manual to reflect user-visible changes.
  • Type Labels: Select a github label for the type: bugfix, enhancement, etc.
  • Product Labels: Select a github label for the product: TaskVine, Makeflow, etc.
  • PR RTM: Mark your PR as ready to merge.

@RamenMode
Copy link
Contributor Author

Ready for review!

@dthain
Copy link
Member

dthain commented Dec 19, 2024

@BarrySlyDelgado and @tphung3 if you have any concerns that can be addressed by a quick fix, please speak up now.

Also keep in mind that this is a first version that can be evolved...

@tphung3
Copy link
Contributor

tphung3 commented Dec 19, 2024

Just a note that FunctionCalls in the latest master branch should run with dynamically executed functions. See TR_vine_python_serverless.sh and

# define special functions (1 lambda function and 1 dynamically executed function
Otherwise no big issues on my end.

total = executor.reduce(sum, nums, fn_arity=11, chunk_size=13).result()
assert total == sum(nums)


Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add tests for libraries/function calls here as well?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that library calls don't work with futures right now?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mmm, no, the test for funcall future calls works.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, the issue is that the auxiliary functions to run chunks will not be in the library constructed by the user. I guess we can always add them to all libraries that are created?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tphung3 libraries cannot be used right now. The issue is that we cannot compose functions in a library and these high order functions would need to construct the event structure to pass the correct arguments for remote execution.
We could make it work for this case, but I think we would be better if we come up with something more general.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm good with that as long as we have a safeguard of not using functions atm. I see that the code raises error if library is used so that should be safe enough for users.

Copy link
Member

@dthain dthain left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will defer to @btovar on when to merge this one. I am ok with the general approach, and it's ok if it doesn't work with functions/libraries, as long as that case is checked for explicitly.

@btovar btovar force-pushed the futures_executor_feature branch from 5151e2a to 190a6fa Compare January 30, 2025 13:45
@btovar btovar merged commit 0509ea8 into cooperative-computing-lab:master Jan 30, 2025
10 checks passed
btovar added a commit that referenced this pull request Jan 30, 2025
* Implementation of taskvine allpairs/map/reduce

* lint

* lint v2

* cleanup code

* cleanup reduce

* add test

* remove debug print

* cleanup map

* format

* allpairs in terms of map

* format

* do not create lib in map

* error on lib name

---------

Co-authored-by: Kevin Xue <[email protected]>
Co-authored-by: Benjamin Tovar <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants