Skip to content

Commit

Permalink
add test
Browse files Browse the repository at this point in the history
  • Loading branch information
btovar committed Feb 10, 2025
1 parent be37932 commit fd749bb
Show file tree
Hide file tree
Showing 4 changed files with 151 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ class DaskVineWarning(UserWarning):

from .dask_executor import DaskVine
from .dask_dag import DaskVineDag
except ImportError:
except (ImportError, ModuleNotFoundError):
warnings.warn("Dask >= 2024.12.0 not available, using DaskVine legacy task graph representation.", DaskVineWarning)

from .compat import DaskVine
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@

if vd >= vr:
warnings.warn("ndcctools.taskvine.compat only works with dask version < 2024.12.0")
except ImportError:
except (ImportError, ModuleNotFoundError):
pass


Expand Down
52 changes: 52 additions & 0 deletions taskvine/test/TR_vine_task_graph_compat.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
#!/bin/sh
set -e

. ../../dttools/test/test_runner_common.sh

import_config_val CCTOOLS_PYTHON_TEST_EXEC
import_config_val CCTOOLS_PYTHON_TEST_DIR

export PYTHONPATH=$(pwd)/../../test_support/python_modules/${CCTOOLS_PYTHON_TEST_DIR}:$PYTHONPATH
export PATH=$(pwd)/../src/worker:$(pwd)/../../batch_job/src:$PATH

STATUS_FILE=vine.status
PORT_FILE=vine.port

check_needed()
{
[ -n "${CCTOOLS_PYTHON_TEST_EXEC}" ] || return 1
"${CCTOOLS_PYTHON_TEST_EXEC}" -c "import cloudpickle; import dask" || return 1

return 0
}

prepare()
{
rm -f $STATUS_FILE
rm -f $PORT_FILE

return 0
}

run()
{
${CCTOOLS_PYTHON_TEST_EXEC} vine_task_graph_compat.py
echo $? > $STATUS_FILE

# retrieve taskvine exit status
status=$(cat $STATUS_FILE)
if [ $status -ne 0 ]
then
exit 1
fi

exit 0
}

clean()
{
rm -rf vine-run-info
exit 0
}

dispatch "$@"
97 changes: 97 additions & 0 deletions taskvine/test/vine_task_graph_compat.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
#!/usr/bin/env python

# Copyright (C) 2023- The University of Notre Dame
# This software is distributed under the GNU General Public License.
# See the file COPYING for details.

# This example shows TaskVine executing a manually constructed dask graph.
# See vine_example_dask_delayed.py for an example where the graph
# is constructed by dask.

import ndcctools.taskvine as vine
from ndcctools.taskvine.compat import DaskVine
import argparse
import getpass
import sys
import traceback


from operator import add # use add function in the example graph
dsk_graph = {
"x": 1,
"y": 2,
"z": (add, "x", "y"),
"w": (sum, ["x", "y", "z"]),
"v": [(sum, ["w", "z"]), 2],
"t": (sum, "v")
}

expected_result = 11

if __name__ == "__main__":
parser = argparse.ArgumentParser(
prog="vine_example_dask_graph.py",
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
description="""This example shows TaskVine executing a manually constructed dask graph.
See vine_example_dask_delayed.py for an example where the graph
is constructed by dask.""")
parser.add_argument(
"--name",
nargs="?",
type=str,
help="name to assign to the manager.",
default=f"vine-dask-graph-{getpass.getuser()}",
)
parser.add_argument(
"--port",
nargs="?",
type=int,
help="port for the manager to listen for connections. If 0, pick any available.",
default=9123,
)
parser.add_argument(
"--disable-peer-transfers",
action="store_true",
help="disable transfers among workers.",
default=False,
)

args = parser.parse_args()

m = DaskVine(port=args.port, ssl=True)
m.set_name(args.name)
print(f"Listening for workers at port: {m.port}")

if args.disable_peer_transfers:
m.disable_peer_transfers()

# checkpoint at even levels when nodes have at least one dependency
def checkpoint(dag, key):
if dag.depth_of(key) % 2 == 0 and len(dag.get_dependencies(key)) > 0:
print(f"checkpoint for {key}")
return True
return False

f = vine.Factory(manager=m)
f.cores = 4
f.disk = 2000
f.memory = 2000
f.max_workers = 1
f.min_workers = 1
with f:
desired_keys = ["t", "v"]
desired_keys = list(dsk_graph.keys())
print(f"dask graph example is:\n{dsk_graph}")
print(f"desired keys are {desired_keys}")

try:
results = m.get(dsk_graph, desired_keys, lazy_transfers=True, checkpoint_fn=checkpoint, resources={"cores": 1})
print({k: v for k, v in zip(desired_keys, results)})
except Exception:
traceback.print_exc()

print("Terminating workers...", end="")

print("done!")
sys.exit(0)
# vim: set sts=4 sw=4 ts=4 expandtab ft=python:

0 comments on commit fd749bb

Please sign in to comment.