Skip to content

1.5 1 fix #402

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 4 commits into
base: 1.5-1
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ protobuf==3.20.1
psutil==5.6.7 # sagemaker-containers requires psutil 5.6.7
pynvml==11.4.1
python-dateutil==2.8.1
requests==2.25.1
requests==2.27.0
retrying==1.3.3
sagemaker-containers==2.8.6.post2
sagemaker-inference==1.5.5
Expand All @@ -27,3 +27,4 @@ wheel==0.36.2
jinja2==2.11.3
MarkupSafe==1.1.1
Werkzeug==0.15.6
zstandard==0.21.0
26 changes: 22 additions & 4 deletions src/sagemaker_xgboost_container/algorithm_mode/train.py
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,9 @@ def sagemaker_train(
train_dmatrix, val_dmatrix, train_val_dmatrix = get_validated_dmatrices(
train_path, val_path, file_type, csv_weights, is_pipe, combine_train_val
)

missing_validation_data = validation_channel and not val_dmatrix

train_args = dict(
train_cfg=validated_train_config,
train_dmatrix=train_dmatrix,
Expand All @@ -210,22 +213,37 @@ def sagemaker_train(
# Wait for hosts to find each other
logging.info(f"Distributed node training with {num_hosts} hosts: {sm_hosts}")
distributed.wait_hostname_resolution(sm_hosts)
include_in_training = True
if not train_dmatrix:
logging.warning(
"Host {} does not have data. Will broadcast to cluster and will not be used in distributed"
" training.".format(sm_current_host)
f"Host {sm_current_host} does not have training data. Will broadcast to "
f"cluster and this host {sm_current_host} will not be used in distributed training. "
f"Please divide the training data across instances properly. See https://docs.aws.amazon.com/"
f"sagemaker/latest/dg/xgboost.html#Instance-XGBoost-distributed-training-divide-data. "
)
include_in_training = False
if missing_validation_data:
logging.warning(
f"Host {sm_current_host} does not have validation data "
f"in the validation channel : {validation_channel}. "
f"Will broadcast to cluster and this host {sm_current_host} will not be used "
f"in distributed training. Please divide the validation data across instances properly. "
f"See https://docs.aws.amazon.com/sagemaker/latest/dg/xgboost.html"
f"#Instance-XGBoost-distributed-training-divide-data. "
)
include_in_training = False

distributed.rabit_run(
exec_fun=train_job,
args=train_args,
include_in_training=(train_dmatrix is not None),
include_in_training=include_in_training,
hosts=sm_hosts,
current_host=sm_current_host,
update_rabit_args=True,
)
elif num_hosts == 1:
if train_dmatrix:
if validation_channel and not val_dmatrix:
if missing_validation_data:
raise exc.UserError(f"No data in validation channel path {val_path}")
logging.info("Single node training.")
train_args.update({"is_master": True})
Expand Down
94 changes: 63 additions & 31 deletions src/sagemaker_xgboost_container/data_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@
PARQUET = "parquet"
RECORDIO_PROTOBUF = "recordio-protobuf"

MAX_FOLDER_DEPTH = 3

VALID_CONTENT_TYPES = [
CSV,
LIBSVM,
Expand Down Expand Up @@ -528,33 +530,74 @@ def _get_pipe_mode_files_path(data_path: Union[List[str], str]) -> List[str]:
return files_path


def _make_symlinks_from_a_folder(dest_path: str, data_path: str, depth: int):
if (depth > MAX_FOLDER_DEPTH):
raise exc.UserError(f"Folder depth exceed the limit: {MAX_FOLDER_DEPTH}.")

if os.path.isfile(data_path):
_make_symlink(data_path, dest_path, os.path.basename(data_path))
return
else:
logging.info(f"Making smlinks from folder {data_path} to folder {dest_path}")
for item in os.scandir(data_path):
if item.is_file():
_make_symlink(item.path, dest_path, item.name)
elif item.is_dir():
_make_symlinks_from_a_folder(dest_path, item.path, depth + 1)


def _make_symlinks_from_a_folder_with_warning(dest_path: str, data_path: str):
"""
:param dest_path: A dir
:param data_path: Either dir or file
:param depth: current folder depth, Integer
"""

# If data_path is a single file A, create smylink A -> dest_path/A
# If data_path is a dir, create symlinks for files located within depth of MAX_FOLDER_DEPTH
# under this dir. Ignore the files in deeper sub dirs and log a warning if they exist.

if (not os.path.exists(dest_path)) or (not os.path.exists(data_path)):
raise exc.AlgorithmError(f"Unable to create symlinks as {data_path} or {dest_path} doesn't exist ")

if (not os.path.isdir(dest_path)):
raise exc.AlgorithmError(f"Unable to create symlinks as dest_path {dest_path} is not a dir")

try:
_make_symlinks_from_a_folder(dest_path, data_path, 1)
except exc.UserError as e:
if e.message == f"Folder depth exceed the limit: {MAX_FOLDER_DEPTH}.":
logging.warning(
f"The depth of folder {data_path} exceed the limit {MAX_FOLDER_DEPTH}."
f" Files in deeper sub dirs won't be loaded."
f" Please adjust the folder structure accordingly."
)


def _get_file_mode_files_path(data_path: Union[List[str], str]) -> List[str]:
"""
:param data_path: Either directory or file
"""
# In file mode, we create a temp directory with symlink to all input files or
# directories to meet XGB's assumption that all files are in the same directory.

logging.info("File path {} of input files".format(data_path))
# Create a directory with symlinks to input files.
files_path = "/tmp/sagemaker_xgboost_input_data"
shutil.rmtree(files_path, ignore_errors=True)
os.mkdir(files_path)
if isinstance(data_path, list):
logging.info("File path {} of input files".format(data_path))
# Create a directory with symlinks to input files.
files_path = "/tmp/sagemaker_xgboost_input_data"
shutil.rmtree(files_path, ignore_errors=True)
os.mkdir(files_path)
for index, path in enumerate(data_path):
if not os.path.exists(path):
return None
if os.path.isfile(path):
_make_symlink(path, files_path, os.path.basename(path), index)
else:
for file in os.scandir(path):
_make_symlink(file, files_path, file.name, index)

for path in data_path:
_make_symlinks_from_a_folder_with_warning(files_path, path)
else:
if not os.path.exists(data_path):
logging.info("File path {} does not exist!".format(data_path))
return None
files_path = get_files_path_from_string(data_path)
elif os.path.isdir(data_path) or os.path.isfile(data_path):
# traverse all sub-dirs to load all training data
_make_symlinks_from_a_folder_with_warning(files_path, data_path)
else:
exc.UserError("Unknown input files path: {}".format(data_path))

return files_path

Expand Down Expand Up @@ -635,22 +678,11 @@ def get_size(data_path, is_pipe=False):
return total_size


def get_files_path_from_string(data_path: Union[List[str], str]) -> List[str]:
if os.path.isfile(data_path):
files_path = data_path
else:
for root, dirs, files in os.walk(data_path):
if dirs == []:
files_path = root
break

return files_path


def _make_symlink(path, source_path, name, index):
base_name = os.path.join(source_path, f"{name}_{str(index)}")
logging.info(f"creating symlink between Path {source_path} and destination {base_name}")
os.symlink(path, base_name)
def _make_symlink(path, source_path, name):
base_name = os.path.join(source_path, name)
file_name = base_name + str(hash(path))
logging.info(f"creating symlink between Path {path} and destination {file_name}")
os.symlink(path, file_name)


def check_data_redundancy(train_path, validate_path):
Expand Down
Loading