From b18425328b9704fc985ed04e237d69c45f4fdabe Mon Sep 17 00:00:00 2001 From: "C.J. Collier" Date: Sun, 27 Oct 2024 14:04:59 -0700 Subject: [PATCH] [dask] merge changes from custom-images (#1250) * [dask] merged from custom-images * retry flakey ssh command * import time * wait for dnf to finish before giving it a ramdisk * print command that failed ; try harder * fixed is_ubuntu18 function --- dask/dask.sh | 213 ++++++++++++++++++++++++++++++++++------------ dask/test_dask.py | 14 ++- 2 files changed, 173 insertions(+), 54 deletions(-) diff --git a/dask/dask.sh b/dask/dask.sh index 946608d9e..d9007c3f1 100644 --- a/dask/dask.sh +++ b/dask/dask.sh @@ -23,9 +23,10 @@ set -euxo pipefail function os_id() { grep '^ID=' /etc/os-release | cut -d= -f2 | xargs ; } function os_version() { grep '^VERSION_ID=' /etc/os-release | cut -d= -f2 | xargs ; } -function os_codename() { grep '^VERSION_CODENAME=' /etc/os-release | cut -d= -f2 | xargs ; } function is_ubuntu() { [[ "$(os_id)" == 'ubuntu' ]] ; } function is_ubuntu18() { is_ubuntu && [[ "$(os_version)" == '18.04'* ]] ; } +function is_debian() { [[ "$(os_id)" == 'debian' ]] ; } +function is_debuntu() { is_debian || is_ubuntu ; } function print_metadata_value() { local readonly tmpfile=$(mktemp) @@ -64,52 +65,34 @@ function get_metadata_value() { return ${return_code} } -function get_metadata_attribute() { +function get_metadata_attribute() ( set +x local -r attribute_name="$1" local -r default_value="${2:-}" get_metadata_value "attributes/${attribute_name}" || echo -n "${default_value}" - set -x -} +) -readonly DEFAULT_CUDA_VERSION="12.4" -readonly CUDA_VERSION=$(get_metadata_attribute 'cuda-version' ${DEFAULT_CUDA_VERSION}) function is_cuda12() { [[ "${CUDA_VERSION%%.*}" == "12" ]] ; } function is_cuda11() { [[ "${CUDA_VERSION%%.*}" == "11" ]] ; } -readonly DASK_RUNTIME="$(get_metadata_attribute dask-runtime || echo 'standalone')" - -# Dask 'standalone' config -readonly DASK_SERVICE=dask-cluster -readonly DASK_WORKER_SERVICE=dask-worker -readonly DASK_SCHEDULER_SERVICE=dask-scheduler - -readonly KNOX_HOME=/usr/lib/knox -readonly KNOX_DASK_DIR="${KNOX_HOME}/data/services/dask/0.1.0" -readonly KNOX_DASKWS_DIR="${KNOX_HOME}/data/services/daskws/0.1.0" - function execute_with_retries() { - local -r cmd=$1 - for ((i = 0; i < 10; i++)); do + local -r cmd="$*" + for i in {0..9} ; do if eval "$cmd"; then - return 0 - fi + return 0 ; fi sleep 5 done echo "Cmd '${cmd}' failed." return 1 } -DASK_CONDA_ENV="/opt/conda/miniconda3/envs/dask" - function configure_dask_yarn() { readonly DASK_YARN_CONFIG_DIR=/etc/dask/ readonly DASK_YARN_CONFIG_FILE=${DASK_YARN_CONFIG_DIR}/config.yaml - dask_python="$(realpath "${DASK_CONDA_ENV}/bin/python")" # Minimal custom configuration is required for this # setup. Please see https://yarn.dask.org/en/latest/quickstart.html#usage # for information on tuning Dask-Yarn environments. - mkdir -p ${DASK_YARN_CONFIG_DIR} + mkdir -p "${DASK_YARN_CONFIG_DIR}" cat <"${DASK_YARN_CONFIG_FILE}" # Config file for Dask Yarn. @@ -118,16 +101,13 @@ function configure_dask_yarn() { # https://yarn.dask.org/en/latest/configuration.html#default-configuration yarn: - environment: python://${dask_python} + environment: python://${DASK_CONDA_ENV}/bin/python worker: count: 2 EOF } -enable_worker_service="0" -ROLE="$(get_metadata_attribute dataproc-role)" -MASTER="$(get_metadata_attribute dataproc-master)" function install_systemd_dask_worker() { echo "Installing systemd Dask Worker service..." local -r dask_worker_local_dir="/tmp/${DASK_WORKER_SERVICE}" @@ -164,17 +144,18 @@ EOF if [[ "${ROLE}" != "Master" ]]; then enable_worker_service="1" else - local RUN_WORKER_ON_MASTER="$(get_metadata_attribute dask-worker-on-master || echo 'true')" + local RUN_WORKER_ON_MASTER="$(get_metadata_attribute dask-worker-on-master 'true')" # Enable service on single-node cluster (no workers) local worker_count="$(get_metadata_attribute dataproc-worker-count)" - if [[ "${worker_count}" == "0" ]]; then RUN_WORKER_ON_MASTER='true'; fi - - if [[ "${RUN_WORKER_ON_MASTER}" == "true" ]]; then + if [[ "${worker_count}" == "0" || "${RUN_WORKER_ON_MASTER}" == "true" ]]; then enable_worker_service="1" fi fi - if [[ "${enable_worker_service}" == "1" ]]; then systemctl enable "${DASK_WORKER_SERVICE}" ; fi + if [[ "${enable_worker_service}" == "1" ]]; then + systemctl enable "${DASK_WORKER_SERVICE}" + systemctl restart "${DASK_WORKER_SERVICE}" + fi } function install_systemd_dask_scheduler() { @@ -185,7 +166,6 @@ function install_systemd_dask_scheduler() { mkdir -p "${dask_scheduler_local_dir}" - local DASK_SCHEDULER_LAUNCHER="/usr/local/bin/${DASK_SCHEDULER_SERVICE}-launcher.sh" cat <"${DASK_SCHEDULER_LAUNCHER}" @@ -234,9 +214,11 @@ function configure_knox_for_dask() { fi local DASK_UI_PORT=8787 - sed -i \ - "/<\/topology>/i DASK<\/role>http://localhost:${DASK_UI_PORT}<\/url><\/service> DASKWS<\/role>ws:\/\/${MASTER}:${DASK_UI_PORT}<\/url><\/service>" \ - /etc/knox/conf/topologies/default.xml + if [[ -f /etc/knox/conf/topologies/default.xml ]]; then + sed -i \ + "/<\/topology>/i DASK<\/role>http://localhost:${DASK_UI_PORT}<\/url><\/service> DASKWS<\/role>ws:\/\/${MASTER}:${DASK_UI_PORT}<\/url><\/service>" \ + /etc/knox/conf/topologies/default.xml + fi mkdir -p "${KNOX_DASK_DIR}" @@ -378,10 +360,12 @@ EOF chown -R knox:knox "${KNOX_DASK_DIR}" "${KNOX_DASKWS_DIR}" - restart_knox + # Do not restart knox during pre-init script run + if [[ -n "${ROLE}" ]]; then + restart_knox + fi } - function configure_fluentd_for_dask() { if [[ "$(hostname -s)" == "${MASTER}" ]]; then cat >/etc/google-fluentd/config.d/dataproc-dask.conf < "${install_log}" 2>&1 && retval=$? || { retval=$? ; cat "${install_log}" ; } + sync + if [[ "$retval" == "0" ]] ; then is_installed="1" break - else - "${conda}" config --set channel_priority flexible fi + "${conda}" config --set channel_priority flexible done if [[ "${is_installed}" == "0" ]]; then echo "failed to install dask" return 1 fi + ) } function main() { + # Install Dask install_dask + # In "standalone" mode, Dask relies on a systemd unit to launch. + # In "yarn" mode, it relies a config.yaml file. if [[ "${DASK_RUNTIME}" == "yarn" ]]; then # Create Dask YARN config file configure_dask_yarn @@ -529,7 +518,125 @@ function main() { echo "Dask for ${DASK_RUNTIME} successfully initialized." } +function exit_handler() ( + set +e + echo "Exit handler invoked" -main + # Free conda cache + /opt/conda/miniconda3/bin/conda clean -a > /dev/null 2>&1 + + # Clear pip cache + pip cache purge || echo "unable to purge pip cache" + + # remove the tmpfs conda pkgs_dirs + if [[ -d /mnt/shm ]] ; then /opt/conda/miniconda3/bin/conda config --remove pkgs_dirs /mnt/shm ; fi -df -h + # Clean up shared memory mounts + for shmdir in /var/cache/apt/archives /var/cache/dnf /mnt/shm ; do + if grep -q "^tmpfs ${shmdir}" /proc/mounts ; then + rm -rf ${shmdir}/* + umount -f ${shmdir} + fi + done + + # Clean up OS package cache ; re-hold systemd package + if is_debuntu ; then + apt-get -y -qq clean + apt-get -y -qq autoremove + else + dnf clean all + fi + + # print disk usage statistics + if is_debuntu ; then + # Rocky doesn't have sort -h and fails when the argument is passed + du --max-depth 3 -hx / | sort -h | tail -10 + fi + + # Process disk usage logs from installation period + rm -f "${tmpdir}/keep-running-df" + sleep 6s + # compute maximum size of disk during installation + # Log file contains logs like the following (minus the preceeding #): +#Filesystem Size Used Avail Use% Mounted on +#/dev/vda2 6.8G 2.5G 4.0G 39% / + df -h / | tee -a "${tmpdir}/disk-usage.log" + perl -e '$max=( sort + map { (split)[2] =~ /^(\d+)/ } + grep { m:^/: } )[-1]; +print( "maximum-disk-used: $max", $/ );' < "${tmpdir}/disk-usage.log" + + echo "exit_handler has completed" + + # zero free disk space + if [[ -n "$(get_metadata_attribute creating-image)" ]]; then + dd if=/dev/zero of=/zero ; sync ; rm -f /zero + fi + + return 0 +) + +function prepare_to_install() { + readonly DEFAULT_CUDA_VERSION="12.4" + CUDA_VERSION=$(get_metadata_attribute 'cuda-version' ${DEFAULT_CUDA_VERSION}) + readonly CUDA_VERSION + + readonly ROLE=$(get_metadata_attribute dataproc-role) + readonly MASTER=$(get_metadata_attribute dataproc-master) + + # Dask config + DASK_RUNTIME="$(get_metadata_attribute dask-runtime || echo 'standalone')" + readonly DASK_RUNTIME + readonly DASK_SERVICE=dask-cluster + readonly DASK_WORKER_SERVICE=dask-worker + readonly DASK_SCHEDULER_SERVICE=dask-scheduler + readonly DASK_CONDA_ENV="/opt/conda/miniconda3/envs/dask" + + # Knox config + readonly KNOX_HOME=/usr/lib/knox + readonly KNOX_DASK_DIR="${KNOX_HOME}/data/services/dask/0.1.0" + readonly KNOX_DASKWS_DIR="${KNOX_HOME}/data/services/daskws/0.1.0" + enable_worker_service="0" + + free_mem="$(awk '/^MemFree/ {print $2}' /proc/meminfo)" + # Write to a ramdisk instead of churning the persistent disk + if [[ ${free_mem} -ge 5250000 ]]; then + tmpdir=/mnt/shm + mkdir -p /mnt/shm + mount -t tmpfs tmpfs /mnt/shm + + # Download conda packages to tmpfs + /opt/conda/miniconda3/bin/conda config --add pkgs_dirs /mnt/shm + mount -t tmpfs tmpfs /mnt/shm + + # Download pip packages to tmpfs + pip config set global.cache-dir /mnt/shm || echo "unable to set global.cache-dir" + + # Download OS packages to tmpfs + if is_debuntu ; then + mount -t tmpfs tmpfs /var/cache/apt/archives + else + while [[ -f /var/cache/dnf/metadata_lock.pid ]] ; do sleep 1s ; done + mount -t tmpfs tmpfs /var/cache/dnf + fi + else + tmpdir=/tmp + fi + install_log="${tmpdir}/install.log" + trap exit_handler EXIT + + # Monitor disk usage in a screen session + if is_debuntu ; then + apt-get install -y -qq screen + else + dnf -y -q install screen + fi + rm -f "${tmpdir}/disk-usage.log" + touch "${tmpdir}/keep-running-df" + screen -d -m -US keep-running-df \ + bash -c "while [[ -f ${tmpdir}/keep-running-df ]] ; do df -h / | tee -a ${tmpdir}/disk-usage.log ; sleep 5s ; done" +} + +prepare_to_install + +main diff --git a/dask/test_dask.py b/dask/test_dask.py index a5d4314e4..440493511 100644 --- a/dask/test_dask.py +++ b/dask/test_dask.py @@ -1,4 +1,6 @@ import os +import time + import pkg_resources from absl.testing import absltest from absl.testing import parameterized @@ -33,7 +35,17 @@ def _run_dask_test_script(self, name, script): self.upload_test_file( os.path.join(os.path.dirname(os.path.abspath(__file__)), script), name) - self.assert_instance_command(name, verify_cmd) + command_asserted=0 + for try_number in range(0, 7): + try: + self.assert_instance_command(name, verify_cmd) + command_asserted=1 + break + except: + time.sleep(2**try_number) + if command_asserted == 0: + raise Exception("Unable to assert instance command [{}]".format(verify_cmd)) + self.remove_test_script(script, name)