Skip to content

Commit

Permalink
Merge Distributed Qualification Tools CLI (#1516)
Browse files Browse the repository at this point in the history
Fixes #1249.

This PR merged the distributed tool branch to dev. We have setup
internal CI pipeline to test this feature.

Created a follow-up to support recursive processing of event logs
(#1515)

---------

Signed-off-by: Partho Sarthi <[email protected]>
  • Loading branch information
parthosa authored Feb 5, 2025
1 parent fcb4e94 commit 2b9473b
Show file tree
Hide file tree
Showing 42 changed files with 1,876 additions and 77 deletions.
4 changes: 3 additions & 1 deletion user_tools/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,9 @@ dependencies = [
# dependency of shap, python [3.9, 3.12]
"scikit-learn==1.5.2",
# used for retrieving available memory on the host
"psutil==6.1.1"
"psutil==6.1.1",
# pyspark for distributed computing
"pyspark>=3.4.2,<4.0.0"
]
dynamic=["entry-points", "version"]

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2023-2024, NVIDIA CORPORATION.
# Copyright (c) 2023-2025, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -88,6 +88,9 @@ def create_saving_estimator(self,
def create_local_submission_job(self, job_prop, ctxt) -> Any:
return DBAzureLocalRapidsJob(prop_container=job_prop, exec_ctxt=ctxt)

def create_distributed_submission_job(self, job_prop, ctxt) -> Any:
pass

def validate_job_submission_args(self, submission_args: dict) -> dict:
pass

Expand Down
5 changes: 4 additions & 1 deletion user_tools/src/spark_rapids_pytools/cloud_api/dataproc.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2023-2024, NVIDIA CORPORATION.
# Copyright (c) 2023-2025, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -130,6 +130,9 @@ def create_saving_estimator(self,
def create_local_submission_job(self, job_prop, ctxt) -> Any:
return DataprocLocalRapidsJob(prop_container=job_prop, exec_ctxt=ctxt)

def create_distributed_submission_job(self, job_prop, ctxt) -> Any:
pass

def validate_job_submission_args(self, submission_args: dict) -> dict:
pass

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2023-2024, NVIDIA CORPORATION.
# Copyright (c) 2023-2025, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -92,6 +92,9 @@ def create_saving_estimator(self,
def create_local_submission_job(self, job_prop, ctxt) -> Any:
return DataprocGkeLocalRapidsJob(prop_container=job_prop, exec_ctxt=ctxt)

def create_distributed_submission_job(self, job_prop, ctxt) -> Any:
pass


@dataclass
class DataprocGkeCMDDriver(DataprocCMDDriver):
Expand Down
5 changes: 4 additions & 1 deletion user_tools/src/spark_rapids_pytools/cloud_api/emr.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2023-2024, NVIDIA CORPORATION.
# Copyright (c) 2023-2025, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -115,6 +115,9 @@ def create_saving_estimator(self,
def create_local_submission_job(self, job_prop, ctxt) -> Any:
return EmrLocalRapidsJob(prop_container=job_prop, exec_ctxt=ctxt)

def create_distributed_submission_job(self, job_prop, ctxt) -> Any:
pass

def generate_cluster_configuration(self, render_args: dict):
image_version = self.configs.get_value_silent('clusterInference', 'defaultImage')
render_args['IMAGE'] = f'"{image_version}"'
Expand Down
16 changes: 14 additions & 2 deletions user_tools/src/spark_rapids_pytools/cloud_api/onprem.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2023-2024, NVIDIA CORPORATION.
# Copyright (c) 2023-2025, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand All @@ -19,14 +19,14 @@
from typing import Any, List, Optional

from spark_rapids_tools import CspEnv
from spark_rapids_pytools.rapids.rapids_job import RapidsLocalJob
from spark_rapids_pytools.cloud_api.sp_types import PlatformBase, ClusterBase, ClusterNode, \
CMDDriverBase, ClusterGetAccessor, GpuDevice, \
GpuHWInfo, NodeHWInfo, SparkNodeType, SysInfo
from spark_rapids_pytools.common.prop_manager import JSONPropertiesContainer
from spark_rapids_pytools.common.sys_storage import StorageDriver
from spark_rapids_pytools.pricing.dataproc_pricing import DataprocPriceProvider
from spark_rapids_pytools.pricing.price_provider import SavingsEstimator
from spark_rapids_pytools.rapids.rapids_job import RapidsLocalJob, RapidsDistributedJob


@dataclass
Expand All @@ -49,6 +49,9 @@ def _install_storage_driver(self):
def create_local_submission_job(self, job_prop, ctxt) -> Any:
return OnPremLocalRapidsJob(prop_container=job_prop, exec_ctxt=ctxt)

def create_distributed_submission_job(self, job_prop, ctxt) -> RapidsDistributedJob:
return OnPremDistributedRapidsJob(prop_container=job_prop, exec_ctxt=ctxt)

def _construct_cluster_from_props(self, cluster: str, props: str = None, is_inferred: bool = False,
is_props_file: bool = False):
return OnPremCluster(self, is_inferred=is_inferred).set_connection(cluster_id=cluster, props=props)
Expand Down Expand Up @@ -154,6 +157,15 @@ class OnPremLocalRapidsJob(RapidsLocalJob):
job_label = 'onpremLocal'


# pylint: disable=abstract-method
@dataclass
class OnPremDistributedRapidsJob(RapidsDistributedJob):
"""
Implementation of a RAPIDS job that runs on a distributed cluster
"""
job_label = 'onprem.distributed'


@dataclass
class OnPremNode(ClusterNode):
"""Implementation of Onprem cluster node."""
Expand Down
7 changes: 5 additions & 2 deletions user_tools/src/spark_rapids_pytools/cloud_api/sp_types.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2023-2024, NVIDIA CORPORATION.
# Copyright (c) 2023-2025, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand All @@ -22,11 +22,11 @@
from logging import Logger
from typing import Type, Any, List, Callable, Union, Optional, final, Dict

from spark_rapids_tools import EnumeratedType, CspEnv
from spark_rapids_pytools.common.prop_manager import AbstractPropertiesContainer, JSONPropertiesContainer, \
get_elem_non_safe
from spark_rapids_pytools.common.sys_storage import StorageDriver, FSUtil
from spark_rapids_pytools.common.utilities import ToolLogging, SysCmd, Utils, TemplateGenerator
from spark_rapids_tools import EnumeratedType, CspEnv


class DeployMode(EnumeratedType):
Expand Down Expand Up @@ -884,6 +884,9 @@ def create_saving_estimator(self,
def create_local_submission_job(self, job_prop, ctxt) -> Any:
raise NotImplementedError

def create_distributed_submission_job(self, job_prop, ctxt) -> Any:
raise NotImplementedError

def load_platform_configs(self):
config_file_name = f'{CspEnv.tostring(self.type_id).lower()}-configs.json'
config_path = Utils.resource_path(config_file_name)
Expand Down
19 changes: 17 additions & 2 deletions user_tools/src/spark_rapids_pytools/rapids/qualification.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
from spark_rapids_pytools.common.sys_storage import FSUtil
from spark_rapids_pytools.common.utilities import Utils, TemplateGenerator
from spark_rapids_pytools.rapids.rapids_tool import RapidsJarTool
from spark_rapids_tools.enums import QualFilterApp, QualEstimationModel
from spark_rapids_tools.enums import QualFilterApp, QualEstimationModel, SubmissionMode
from spark_rapids_tools.storagelib import CspFs
from spark_rapids_tools.tools.additional_heuristics import AdditionalHeuristics
from spark_rapids_tools.tools.cluster_config_recommender import ClusterConfigRecommender
Expand Down Expand Up @@ -153,6 +153,17 @@ def _process_estimation_model_args(self) -> None:
estimation_model_args = QualEstimationModel.create_default_model_args(selected_model)
self.ctxt.set_ctxt('estimationModelArgs', estimation_model_args)

def _process_submission_mode_arg(self) -> None:
"""
Process the value provided by `--submission_mode` argument.
"""
submission_mode_arg = self.wrapper_options.get('submissionMode')
if submission_mode_arg is None or not submission_mode_arg:
submission_mode = SubmissionMode.get_default()
else:
submission_mode = SubmissionMode.fromstring(submission_mode_arg)
self.ctxt.set_ctxt('submissionMode', submission_mode)

def _process_custom_args(self) -> None:
"""
Qualification tool processes extra arguments:
Expand Down Expand Up @@ -181,6 +192,7 @@ def _process_custom_args(self) -> None:
self._process_estimation_model_args()
self._process_offline_cluster_args()
self._process_eventlogs_args()
self._process_submission_mode_arg()
# This is noise to dump everything
# self.logger.debug('%s custom arguments = %s', self.pretty_name(), self.ctxt.props['wrapperCtx'])

Expand Down Expand Up @@ -375,7 +387,7 @@ def create_stdout_table_pprinter(total_apps: pd.DataFrame,

df = self._read_qualification_output_file('summaryReport')
# 1. Operations related to XGboost modelling
if self.ctxt.get_ctxt('estimationModelArgs')['xgboostEnabled']:
if not df.empty and self.ctxt.get_ctxt('estimationModelArgs')['xgboostEnabled']:
try:
df = self.__update_apps_with_prediction_info(df,
self.ctxt.get_ctxt('estimationModelArgs'))
Expand Down Expand Up @@ -609,6 +621,9 @@ def _read_qualification_output_file(self, report_name_key: str, file_format_key:
# extract the file name of report from the YAML config (e.g., toolOutput -> csv -> summaryReport -> fileName)
report_file_name = self.ctxt.get_value('toolOutput', file_format_key, report_name_key, 'fileName')
report_file_path = FSUtil.build_path(self.ctxt.get_rapids_output_folder(), report_file_name)
if not FSUtil.resource_exists(report_file_path):
self.logger.warning('Unable to read the report file \'%s\'. File does not exist.', report_file_path)
return pd.DataFrame()
return pd.read_csv(report_file_path)

def _read_qualification_metric_file(self, file_name: str) -> Dict[str, pd.DataFrame]:
Expand Down
65 changes: 54 additions & 11 deletions user_tools/src/spark_rapids_pytools/rapids/rapids_job.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2023-2024, NVIDIA CORPORATION.
# Copyright (c) 2023-2025, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand All @@ -17,12 +17,15 @@
import os
from dataclasses import dataclass, field
from logging import Logger
from typing import List, Optional
from typing import List, Optional, Union

from spark_rapids_pytools.common.prop_manager import JSONPropertiesContainer
from spark_rapids_pytools.common.utilities import ToolLogging, Utils
from spark_rapids_pytools.rapids.tool_ctxt import ToolContext
from spark_rapids_tools import CspPath
from spark_rapids_tools.storagelib import LocalPath
from spark_rapids_tools_distributed.distributed_main import DistributedToolsExecutor
from spark_rapids_tools_distributed.jar_cmd_args import JarCmdArgs


@dataclass
Expand All @@ -38,6 +41,8 @@ def _init_fields(self):
self.props['sparkConfArgs'] = {}
if self.get_value_silent('platformArgs') is None:
self.props['platformArgs'] = {}
if self.get_value_silent('distributedToolsConfigs') is None:
self.props['distributedToolsConfigs'] = {}

def get_jar_file(self):
return self.get_value('rapidsArgs', 'jarFile')
Expand All @@ -48,6 +53,9 @@ def get_jar_main_class(self):
def get_rapids_args(self):
return self.get_value('rapidsArgs', 'jarArgs')

def get_distribution_tools_configs(self):
return self.get_value('distributedToolsConfigs')


@dataclass
class RapidsJob:
Expand Down Expand Up @@ -90,10 +98,10 @@ def _build_rapids_args(self):
rapids_arguments.extend(extra_rapids_args)
return rapids_arguments

def _build_submission_cmd(self) -> list:
def _build_submission_cmd(self) -> Union[list, JarCmdArgs]:
raise NotImplementedError

def _submit_job(self, cmd_args: list) -> str:
def _submit_job(self, cmd_args: Union[list, JarCmdArgs]) -> str:
raise NotImplementedError

def _print_job_output(self, job_output: str):
Expand Down Expand Up @@ -125,13 +133,6 @@ def run_job(self):
self._cleanup_temp_log4j_files()
return job_output


@dataclass
class RapidsLocalJob(RapidsJob):
"""
Implementation of a RAPIDS job that runs local on a machine.
"""

def _get_hadoop_classpath(self) -> Optional[str]:
"""
Gets the Hadoop's configuration directory from the environment variables.
Expand Down Expand Up @@ -202,6 +203,13 @@ def _build_jvm_args(self):
vm_args.append(val)
return vm_args


@dataclass
class RapidsLocalJob(RapidsJob):
"""
Implementation of a RAPIDS job that runs local on a machine.
"""

def _build_submission_cmd(self) -> list:
# env vars are added later as a separate dictionary
classpath_arr = self._build_classpath()
Expand All @@ -218,3 +226,38 @@ def _submit_job(self, cmd_args: list) -> str:
out_std = self.exec_ctxt.platform.cli.run_sys_cmd(cmd=cmd_args,
env_vars=env_args)
return out_std


@dataclass
class RapidsDistributedJob(RapidsJob):
"""
Implementation of a RAPIDS job that runs distributed on a cluster.
"""

def _build_submission_cmd(self) -> JarCmdArgs:
classpath_arr = self._build_classpath()
hadoop_cp = self._get_hadoop_classpath()
jvm_args_arr = self._build_jvm_args()
jar_main_class = self.prop_container.get_jar_main_class()
jar_output_dir_args = self._get_persistent_rapids_args()
extra_rapids_args = self.prop_container.get_rapids_args()
return JarCmdArgs(jvm_args_arr, classpath_arr, hadoop_cp, jar_main_class,
jar_output_dir_args, extra_rapids_args)

def _build_classpath(self) -> List[str]:
"""
Only the Spark RAPIDS Tools JAR file is needed for the classpath.
Assumption: Each worker node should have the Spark Jars pre-installed.
TODO: Ship the Spark JARs to the cluster to avoid version mismatch issues.
"""
return ['-cp', self.prop_container.get_jar_file()]

def _submit_job(self, cmd_args: JarCmdArgs) -> None:
"""
Submit the Tools JAR cmd to the Spark cluster.
"""
user_configs = self.prop_container.get_distribution_tools_configs()
executor = DistributedToolsExecutor(user_submission_configs=user_configs.submission,
cli_output_path=CspPath(self.exec_ctxt.get_output_folder()),
jar_cmd_args=cmd_args)
executor.run_as_spark_app()
Loading

0 comments on commit 2b9473b

Please sign in to comment.