Skip to content

Commit

Permalink
[dask] merge changes from custom-images (#1250)
Browse files Browse the repository at this point in the history
* [dask] merged from custom-images

* retry flakey ssh command

* import time

* wait for dnf to finish before giving it a ramdisk

* print command that failed ; try harder

* fixed is_ubuntu18 function
  • Loading branch information
cjac authored Oct 27, 2024
1 parent 90b1bd1 commit b184253
Show file tree
Hide file tree
Showing 2 changed files with 173 additions and 54 deletions.
213 changes: 160 additions & 53 deletions dask/dask.sh
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,10 @@ set -euxo pipefail

function os_id() { grep '^ID=' /etc/os-release | cut -d= -f2 | xargs ; }
function os_version() { grep '^VERSION_ID=' /etc/os-release | cut -d= -f2 | xargs ; }
function os_codename() { grep '^VERSION_CODENAME=' /etc/os-release | cut -d= -f2 | xargs ; }
function is_ubuntu() { [[ "$(os_id)" == 'ubuntu' ]] ; }
function is_ubuntu18() { is_ubuntu && [[ "$(os_version)" == '18.04'* ]] ; }
function is_debian() { [[ "$(os_id)" == 'debian' ]] ; }
function is_debuntu() { is_debian || is_ubuntu ; }

function print_metadata_value() {
local readonly tmpfile=$(mktemp)
Expand Down Expand Up @@ -64,52 +65,34 @@ function get_metadata_value() {
return ${return_code}
}

function get_metadata_attribute() {
function get_metadata_attribute() (
set +x
local -r attribute_name="$1"
local -r default_value="${2:-}"
get_metadata_value "attributes/${attribute_name}" || echo -n "${default_value}"
set -x
}
)

readonly DEFAULT_CUDA_VERSION="12.4"
readonly CUDA_VERSION=$(get_metadata_attribute 'cuda-version' ${DEFAULT_CUDA_VERSION})
function is_cuda12() { [[ "${CUDA_VERSION%%.*}" == "12" ]] ; }
function is_cuda11() { [[ "${CUDA_VERSION%%.*}" == "11" ]] ; }

readonly DASK_RUNTIME="$(get_metadata_attribute dask-runtime || echo 'standalone')"

# Dask 'standalone' config
readonly DASK_SERVICE=dask-cluster
readonly DASK_WORKER_SERVICE=dask-worker
readonly DASK_SCHEDULER_SERVICE=dask-scheduler

readonly KNOX_HOME=/usr/lib/knox
readonly KNOX_DASK_DIR="${KNOX_HOME}/data/services/dask/0.1.0"
readonly KNOX_DASKWS_DIR="${KNOX_HOME}/data/services/daskws/0.1.0"

function execute_with_retries() {
local -r cmd=$1
for ((i = 0; i < 10; i++)); do
local -r cmd="$*"
for i in {0..9} ; do
if eval "$cmd"; then
return 0
fi
return 0 ; fi
sleep 5
done
echo "Cmd '${cmd}' failed."
return 1
}

DASK_CONDA_ENV="/opt/conda/miniconda3/envs/dask"

function configure_dask_yarn() {
readonly DASK_YARN_CONFIG_DIR=/etc/dask/
readonly DASK_YARN_CONFIG_FILE=${DASK_YARN_CONFIG_DIR}/config.yaml
dask_python="$(realpath "${DASK_CONDA_ENV}/bin/python")"
# Minimal custom configuration is required for this
# setup. Please see https://yarn.dask.org/en/latest/quickstart.html#usage
# for information on tuning Dask-Yarn environments.
mkdir -p ${DASK_YARN_CONFIG_DIR}
mkdir -p "${DASK_YARN_CONFIG_DIR}"

cat <<EOF >"${DASK_YARN_CONFIG_FILE}"
# Config file for Dask Yarn.
Expand All @@ -118,16 +101,13 @@ function configure_dask_yarn() {
# https://yarn.dask.org/en/latest/configuration.html#default-configuration
yarn:
environment: python://${dask_python}
environment: python://${DASK_CONDA_ENV}/bin/python
worker:
count: 2
EOF
}

enable_worker_service="0"
ROLE="$(get_metadata_attribute dataproc-role)"
MASTER="$(get_metadata_attribute dataproc-master)"
function install_systemd_dask_worker() {
echo "Installing systemd Dask Worker service..."
local -r dask_worker_local_dir="/tmp/${DASK_WORKER_SERVICE}"
Expand Down Expand Up @@ -164,17 +144,18 @@ EOF
if [[ "${ROLE}" != "Master" ]]; then
enable_worker_service="1"
else
local RUN_WORKER_ON_MASTER="$(get_metadata_attribute dask-worker-on-master || echo 'true')"
local RUN_WORKER_ON_MASTER="$(get_metadata_attribute dask-worker-on-master 'true')"
# Enable service on single-node cluster (no workers)
local worker_count="$(get_metadata_attribute dataproc-worker-count)"
if [[ "${worker_count}" == "0" ]]; then RUN_WORKER_ON_MASTER='true'; fi

if [[ "${RUN_WORKER_ON_MASTER}" == "true" ]]; then
if [[ "${worker_count}" == "0" || "${RUN_WORKER_ON_MASTER}" == "true" ]]; then
enable_worker_service="1"
fi
fi

if [[ "${enable_worker_service}" == "1" ]]; then systemctl enable "${DASK_WORKER_SERVICE}" ; fi
if [[ "${enable_worker_service}" == "1" ]]; then
systemctl enable "${DASK_WORKER_SERVICE}"
systemctl restart "${DASK_WORKER_SERVICE}"
fi
}

function install_systemd_dask_scheduler() {
Expand All @@ -185,7 +166,6 @@ function install_systemd_dask_scheduler() {

mkdir -p "${dask_scheduler_local_dir}"


local DASK_SCHEDULER_LAUNCHER="/usr/local/bin/${DASK_SCHEDULER_SERVICE}-launcher.sh"

cat <<EOF >"${DASK_SCHEDULER_LAUNCHER}"
Expand Down Expand Up @@ -234,9 +214,11 @@ function configure_knox_for_dask() {
fi

local DASK_UI_PORT=8787
sed -i \
"/<\/topology>/i <service><role>DASK<\/role><url>http://localhost:${DASK_UI_PORT}<\/url><\/service> <service><role>DASKWS<\/role><url>ws:\/\/${MASTER}:${DASK_UI_PORT}<\/url><\/service>" \
/etc/knox/conf/topologies/default.xml
if [[ -f /etc/knox/conf/topologies/default.xml ]]; then
sed -i \
"/<\/topology>/i <service><role>DASK<\/role><url>http://localhost:${DASK_UI_PORT}<\/url><\/service> <service><role>DASKWS<\/role><url>ws:\/\/${MASTER}:${DASK_UI_PORT}<\/url><\/service>" \
/etc/knox/conf/topologies/default.xml
fi

mkdir -p "${KNOX_DASK_DIR}"

Expand Down Expand Up @@ -378,10 +360,12 @@ EOF

chown -R knox:knox "${KNOX_DASK_DIR}" "${KNOX_DASKWS_DIR}"

restart_knox
# Do not restart knox during pre-init script run
if [[ -n "${ROLE}" ]]; then
restart_knox
fi
}


function configure_fluentd_for_dask() {
if [[ "$(hostname -s)" == "${MASTER}" ]]; then
cat >/etc/google-fluentd/config.d/dataproc-dask.conf <<EOF
Expand Down Expand Up @@ -436,15 +420,16 @@ EOF

function install_dask() {
if is_cuda12 ; then
local python_spec="python=3.10"
local cuda_spec="cuda-version>=12,<=12.5"
local dask_spec="dask>=2024.5"
local python_spec="python>=3.11"
local cuda_spec="cuda-version>=12,<13"
local dask_spec="dask>=2024.7"
elif is_cuda11 ; then
local python_spec="python=3.9"
local python_spec="python>=3.9"
local cuda_spec="cuda-version>=11,<12.0a0"
local dask_spec="dask"
fi

CONDA_PACKAGES=()
if [[ "${DASK_RUNTIME}" == 'yarn' ]]; then
# Pin `distributed` and `dask` package versions to old release
# because `dask-yarn` 0.9 uses skein in a way which
Expand All @@ -469,34 +454,38 @@ function install_dask() {
)

# Install dask
local is_installed="0"
mamba="/opt/conda/miniconda3/bin/mamba"
conda="/opt/conda/miniconda3/bin/conda"

set +e
( set +e
local is_installed=0
for installer in "${mamba}" "${conda}" ; do
test -d "${DASK_CONDA_ENV}" || \
time "${installer}" "create" -m -n "dask" -y --no-channel-priority \
-c 'conda-forge' -c 'nvidia' \
${CONDA_PACKAGES[*]} \
"${python_spec}"
set -e
if [[ "$?" == "0" ]] ; then
"${python_spec}" \
> "${install_log}" 2>&1 && retval=$? || { retval=$? ; cat "${install_log}" ; }
sync
if [[ "$retval" == "0" ]] ; then
is_installed="1"
break
else
"${conda}" config --set channel_priority flexible
fi
"${conda}" config --set channel_priority flexible
done
if [[ "${is_installed}" == "0" ]]; then
echo "failed to install dask"
return 1
fi
)
}

function main() {
# Install Dask
install_dask

# In "standalone" mode, Dask relies on a systemd unit to launch.
# In "yarn" mode, it relies a config.yaml file.
if [[ "${DASK_RUNTIME}" == "yarn" ]]; then
# Create Dask YARN config file
configure_dask_yarn
Expand Down Expand Up @@ -529,7 +518,125 @@ function main() {
echo "Dask for ${DASK_RUNTIME} successfully initialized."
}

function exit_handler() (
set +e
echo "Exit handler invoked"

main
# Free conda cache
/opt/conda/miniconda3/bin/conda clean -a > /dev/null 2>&1

# Clear pip cache
pip cache purge || echo "unable to purge pip cache"

# remove the tmpfs conda pkgs_dirs
if [[ -d /mnt/shm ]] ; then /opt/conda/miniconda3/bin/conda config --remove pkgs_dirs /mnt/shm ; fi

df -h
# Clean up shared memory mounts
for shmdir in /var/cache/apt/archives /var/cache/dnf /mnt/shm ; do
if grep -q "^tmpfs ${shmdir}" /proc/mounts ; then
rm -rf ${shmdir}/*
umount -f ${shmdir}
fi
done

# Clean up OS package cache ; re-hold systemd package
if is_debuntu ; then
apt-get -y -qq clean
apt-get -y -qq autoremove
else
dnf clean all
fi

# print disk usage statistics
if is_debuntu ; then
# Rocky doesn't have sort -h and fails when the argument is passed
du --max-depth 3 -hx / | sort -h | tail -10
fi

# Process disk usage logs from installation period
rm -f "${tmpdir}/keep-running-df"
sleep 6s
# compute maximum size of disk during installation
# Log file contains logs like the following (minus the preceeding #):
#Filesystem Size Used Avail Use% Mounted on
#/dev/vda2 6.8G 2.5G 4.0G 39% /
df -h / | tee -a "${tmpdir}/disk-usage.log"
perl -e '$max=( sort
map { (split)[2] =~ /^(\d+)/ }
grep { m:^/: } <STDIN> )[-1];
print( "maximum-disk-used: $max", $/ );' < "${tmpdir}/disk-usage.log"

echo "exit_handler has completed"

# zero free disk space
if [[ -n "$(get_metadata_attribute creating-image)" ]]; then
dd if=/dev/zero of=/zero ; sync ; rm -f /zero
fi

return 0
)

function prepare_to_install() {
readonly DEFAULT_CUDA_VERSION="12.4"
CUDA_VERSION=$(get_metadata_attribute 'cuda-version' ${DEFAULT_CUDA_VERSION})
readonly CUDA_VERSION

readonly ROLE=$(get_metadata_attribute dataproc-role)
readonly MASTER=$(get_metadata_attribute dataproc-master)

# Dask config
DASK_RUNTIME="$(get_metadata_attribute dask-runtime || echo 'standalone')"
readonly DASK_RUNTIME
readonly DASK_SERVICE=dask-cluster
readonly DASK_WORKER_SERVICE=dask-worker
readonly DASK_SCHEDULER_SERVICE=dask-scheduler
readonly DASK_CONDA_ENV="/opt/conda/miniconda3/envs/dask"

# Knox config
readonly KNOX_HOME=/usr/lib/knox
readonly KNOX_DASK_DIR="${KNOX_HOME}/data/services/dask/0.1.0"
readonly KNOX_DASKWS_DIR="${KNOX_HOME}/data/services/daskws/0.1.0"
enable_worker_service="0"

free_mem="$(awk '/^MemFree/ {print $2}' /proc/meminfo)"
# Write to a ramdisk instead of churning the persistent disk
if [[ ${free_mem} -ge 5250000 ]]; then
tmpdir=/mnt/shm
mkdir -p /mnt/shm
mount -t tmpfs tmpfs /mnt/shm

# Download conda packages to tmpfs
/opt/conda/miniconda3/bin/conda config --add pkgs_dirs /mnt/shm
mount -t tmpfs tmpfs /mnt/shm

# Download pip packages to tmpfs
pip config set global.cache-dir /mnt/shm || echo "unable to set global.cache-dir"

# Download OS packages to tmpfs
if is_debuntu ; then
mount -t tmpfs tmpfs /var/cache/apt/archives
else
while [[ -f /var/cache/dnf/metadata_lock.pid ]] ; do sleep 1s ; done
mount -t tmpfs tmpfs /var/cache/dnf
fi
else
tmpdir=/tmp
fi
install_log="${tmpdir}/install.log"
trap exit_handler EXIT

# Monitor disk usage in a screen session
if is_debuntu ; then
apt-get install -y -qq screen
else
dnf -y -q install screen
fi
rm -f "${tmpdir}/disk-usage.log"
touch "${tmpdir}/keep-running-df"
screen -d -m -US keep-running-df \
bash -c "while [[ -f ${tmpdir}/keep-running-df ]] ; do df -h / | tee -a ${tmpdir}/disk-usage.log ; sleep 5s ; done"
}

prepare_to_install

main
14 changes: 13 additions & 1 deletion dask/test_dask.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import os
import time

import pkg_resources
from absl.testing import absltest
from absl.testing import parameterized
Expand Down Expand Up @@ -33,7 +35,17 @@ def _run_dask_test_script(self, name, script):
self.upload_test_file(
os.path.join(os.path.dirname(os.path.abspath(__file__)),
script), name)
self.assert_instance_command(name, verify_cmd)
command_asserted=0
for try_number in range(0, 7):
try:
self.assert_instance_command(name, verify_cmd)
command_asserted=1
break
except:
time.sleep(2**try_number)
if command_asserted == 0:
raise Exception("Unable to assert instance command [{}]".format(verify_cmd))

self.remove_test_script(script, name)


Expand Down

0 comments on commit b184253

Please sign in to comment.