-
Notifications
You must be signed in to change notification settings - Fork 24
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Cloud Deployment IVc] Deploy NeuroConv in AWS with EFS #1086
Changes from all commits
9b78811
380b57a
93fd38e
a083737
0520312
a4cd38e
0c6c299
3f7e2e3
6c59c89
8652d57
ea992fb
b49a702
77b252a
68c8e94
830ca3d
c97f67b
ca48752
03e7a98
2f5c6c2
eb631f9
f1cfbd0
08ba1e1
bd7cb1e
6ddb351
beeabff
daeabc9
5a9eaff
d20ca09
8d0bb4c
a388bd1
8fb3811
41f3155
ff030df
cdb7f5f
750034d
860ad7f
7af57d4
c58649a
500f15b
6bac79f
867cac4
e40f8ef
afeec8d
f688e99
7b8500e
4054b4c
9faf04a
4463a7a
77c25e4
b119181
6bd2999
359c593
fe9bca7
b5e2ef4
6909323
236be55
9ea3bab
b3bd249
1ace016
6a8d552
7e30330
bac3510
85c31fa
253855b
cfe9ade
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,46 @@ | ||
name: NeuroConv Deployment AWS Tests | ||
on: | ||
schedule: | ||
- cron: "0 16 * * 3" # Weekly at noon on Wednesday | ||
workflow_dispatch: | ||
|
||
concurrency: # Cancel previous workflows on the same pull request | ||
group: ${{ github.workflow }}-${{ github.ref }} | ||
cancel-in-progress: true | ||
|
||
env: | ||
AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }} | ||
AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }} | ||
RCLONE_DRIVE_ACCESS_TOKEN: ${{ secrets.RCLONE_DRIVE_ACCESS_TOKEN }} | ||
RCLONE_DRIVE_REFRESH_TOKEN: ${{ secrets.RCLONE_DRIVE_REFRESH_TOKEN }} | ||
RCLONE_EXPIRY_TOKEN: ${{ secrets.RCLONE_EXPIRY_TOKEN }} | ||
DANDI_API_KEY: ${{ secrets.DANDI_API_KEY }} | ||
|
||
jobs: | ||
run: | ||
name: ${{ matrix.os }} Python ${{ matrix.python-version }} | ||
runs-on: ${{ matrix.os }} | ||
strategy: | ||
fail-fast: false | ||
matrix: | ||
python-version: ["3.12"] | ||
os: [ubuntu-latest] | ||
steps: | ||
- uses: actions/checkout@v4 | ||
- run: git fetch --prune --unshallow --tags | ||
- name: Setup Python ${{ matrix.python-version }} | ||
uses: actions/setup-python@v5 | ||
with: | ||
python-version: ${{ matrix.python-version }} | ||
|
||
- name: Global Setup | ||
run: | | ||
python -m pip install -U pip # Official recommended way | ||
git config --global user.email "[email protected]" | ||
git config --global user.name "CI Almighty" | ||
|
||
- name: Install AWS requirements | ||
run: pip install .[aws,test] | ||
|
||
- name: Run NeuroConv Deployment on AWS tests | ||
run: pytest -rsx -n auto tests/test_on_data/test_yaml/neuroconv_deployment_aws_tools_tests.py |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,4 +1,9 @@ | ||
from ._submit_aws_batch_job import submit_aws_batch_job | ||
from ._rclone_transfer_batch_job import rclone_transfer_batch_job | ||
from ._deploy_neuroconv_batch_job import deploy_neuroconv_batch_job | ||
|
||
__all__ = ["submit_aws_batch_job", "rclone_transfer_batch_job"] | ||
__all__ = [ | ||
"submit_aws_batch_job", | ||
"rclone_transfer_batch_job", | ||
"deploy_neuroconv_batch_job", | ||
] |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,241 @@ | ||
"""Collection of helper functions for deploying NeuroConv in EC2 Batch jobs on AWS.""" | ||
|
||
import os | ||
import time | ||
import uuid | ||
import warnings | ||
from typing import Optional | ||
|
||
import boto3 | ||
from pydantic import FilePath, validate_call | ||
|
||
from ._rclone_transfer_batch_job import rclone_transfer_batch_job | ||
from ._submit_aws_batch_job import submit_aws_batch_job | ||
|
||
_RETRY_STATES = ["RUNNABLE", "PENDING", "STARTING", "RUNNING"] | ||
|
||
|
||
@validate_call | ||
def deploy_neuroconv_batch_job( | ||
*, | ||
rclone_command: str, | ||
yaml_specification_file_path: FilePath, | ||
job_name: str, | ||
efs_volume_name: str, | ||
rclone_config_file_path: Optional[FilePath] = None, | ||
status_tracker_table_name: str = "neuroconv_batch_status_tracker", | ||
compute_environment_name: str = "neuroconv_batch_environment", | ||
job_queue_name: str = "neuroconv_batch_queue", | ||
job_definition_name: Optional[str] = None, | ||
minimum_worker_ram_in_gib: int = 16, # Higher than previous recommendations for safer buffering room | ||
minimum_worker_cpus: int = 4, | ||
region: Optional[str] = None, | ||
) -> dict[str, str]: | ||
""" | ||
Submit a job to AWS Batch for processing. | ||
|
||
Requires AWS credentials saved to files in the `~/.aws/` folder or set as environment variables. | ||
|
||
Parameters | ||
---------- | ||
rclone_command : str | ||
The command to pass directly to Rclone running on the EC2 instance. | ||
E.g.: "rclone copy my_drive:testing_rclone /mnt/efs/source" | ||
Must move data from or to '/mnt/efs/source'. | ||
yaml_specification_file_path : FilePath | ||
The path to the YAML file containing the NeuroConv specification. | ||
job_name : str | ||
The name of the job to submit. | ||
efs_volume_name : str | ||
The name of an EFS volume to be created and attached to the job. | ||
The path exposed to the container will always be `/mnt/efs`. | ||
rclone_config_file_path : FilePath, optional | ||
The path to the Rclone configuration file to use for the job. | ||
If unspecified, method will attempt to find the file in `~/.rclone` and will raise an error if it cannot. | ||
status_tracker_table_name : str, default: "neuroconv_batch_status_tracker" | ||
The name of the DynamoDB table to use for tracking job status. | ||
compute_environment_name : str, default: "neuroconv_batch_environment" | ||
The name of the compute environment to use for the job. | ||
job_queue_name : str, default: "neuroconv_batch_queue" | ||
The name of the job queue to use for the job. | ||
job_definition_name : str, optional | ||
The name of the job definition to use for the job. | ||
If unspecified, a name starting with 'neuroconv_batch_' will be generated. | ||
minimum_worker_ram_in_gib : int, default: 4 | ||
The minimum amount of base worker memory required to run this job. | ||
Determines the EC2 instance type selected by the automatic 'best fit' selector. | ||
Recommended to be several GiB to allow comfortable buffer space for data chunk iterators. | ||
minimum_worker_cpus : int, default: 4 | ||
The minimum number of CPUs required to run this job. | ||
A minimum of 4 is required, even if only one will be used in the actual process. | ||
region : str, optional | ||
The AWS region to use for the job. | ||
If not provided, we will attempt to load the region from your local AWS configuration. | ||
If that file is not found on your system, we will default to "us-east-2", the location of the DANDI Archive. | ||
|
||
Returns | ||
------- | ||
info : dict | ||
A dictionary containing information about this AWS Batch job. | ||
|
||
info["rclone_job_submission_info"] is the return value of `neuroconv.tools.aws.rclone_transfer_batch_job`. | ||
info["neuroconv_job_submission_info"] is the return value of `neuroconv.tools.aws.submit_job`. | ||
""" | ||
efs_volume_name = efs_volume_name or f"neuroconv_batch_efs_volume_{uuid.uuid4().hex[:4]}" | ||
region = region or "us-east-2" | ||
|
||
if "/mnt/efs/source" not in rclone_command: | ||
message = ( | ||
f"The Rclone command '{rclone_command}' does not contain a reference to '/mnt/efs/source'. " | ||
"Without utilizing the EFS mount, the instance is unlikely to have enough local disk space. " | ||
"The subfolder 'source' is also required to eliminate ambiguity in the transfer process." | ||
) | ||
raise ValueError(message) | ||
|
||
rclone_job_name = f"{job_name}_rclone_transfer" | ||
rclone_job_submission_info = rclone_transfer_batch_job( | ||
rclone_command=rclone_command, | ||
job_name=rclone_job_name, | ||
efs_volume_name=efs_volume_name, | ||
rclone_config_file_path=rclone_config_file_path, | ||
region=region, | ||
) | ||
rclone_job_id = rclone_job_submission_info["job_submission_info"]["jobId"] | ||
|
||
# Give the EFS and other aspects time to spin up before submitting next dependent job | ||
# (Otherwise, good chance that duplicate EFS will be created) | ||
aws_access_key_id = os.environ.get("AWS_ACCESS_KEY_ID", None) | ||
aws_secret_access_key = os.environ.get("AWS_SECRET_ACCESS_KEY", None) | ||
|
||
batch_client = boto3.client( | ||
service_name="batch", | ||
region_name=region, | ||
aws_access_key_id=aws_access_key_id, | ||
aws_secret_access_key=aws_secret_access_key, | ||
) | ||
efs_client = boto3.client( | ||
service_name="efs", | ||
region_name=region, | ||
aws_access_key_id=aws_access_key_id, | ||
aws_secret_access_key=aws_secret_access_key, | ||
) | ||
|
||
available_efs_volumes = efs_client.describe_file_systems() | ||
matching_efs_volumes = [ | ||
file_system | ||
for file_system in available_efs_volumes["FileSystems"] | ||
for tag in file_system["Tags"] | ||
if tag["Key"] == "Name" and tag["Value"] == efs_volume_name | ||
] | ||
max_iterations = 10 | ||
iteration = 0 | ||
while len(matching_efs_volumes) == 0 and iteration < max_iterations: | ||
iteration += 1 | ||
time.sleep(30) | ||
|
||
matching_efs_volumes = [ | ||
file_system | ||
for file_system in available_efs_volumes["FileSystems"] | ||
for tag in file_system["Tags"] | ||
if tag["Key"] == "Name" and tag["Value"] == efs_volume_name | ||
] | ||
|
||
if len(matching_efs_volumes) == 0: | ||
message = f"Unable to create EFS volume '{efs_volume_name}' after {max_iterations} attempts!" | ||
raise ValueError(message) | ||
|
||
docker_image = "ghcr.io/catalystneuro/neuroconv_yaml_variable:latest" | ||
|
||
with open(file=yaml_specification_file_path, mode="r") as io: | ||
yaml_specification_file_stream = io.read() | ||
|
||
neuroconv_job_name = f"{job_name}_neuroconv_deployment" | ||
job_dependencies = [{"jobId": rclone_job_id, "type": "SEQUENTIAL"}] | ||
neuroconv_job_submission_info = submit_aws_batch_job( | ||
job_name=neuroconv_job_name, | ||
docker_image=docker_image, | ||
environment_variables={ | ||
"NEUROCONV_YAML": yaml_specification_file_stream, | ||
"NEUROCONV_DATA_PATH": "/mnt/efs/source", | ||
# TODO: would prefer this to use subfolders for source and output, but need logic for YAML | ||
# related code to create them if missing (hard to send EFS this command directly) | ||
# (the code was included in this PR, but a release cycle needs to complete for the docker images before | ||
# it can be used here) | ||
# "NEUROCONV_OUTPUT_PATH": "/mnt/efs/output", | ||
"NEUROCONV_OUTPUT_PATH": "/mnt/efs", | ||
}, | ||
efs_volume_name=efs_volume_name, | ||
job_dependencies=job_dependencies, | ||
status_tracker_table_name=status_tracker_table_name, | ||
compute_environment_name=compute_environment_name, | ||
job_queue_name=job_queue_name, | ||
job_definition_name=job_definition_name, | ||
minimum_worker_ram_in_gib=minimum_worker_ram_in_gib, | ||
minimum_worker_cpus=minimum_worker_cpus, | ||
region=region, | ||
) | ||
|
||
info = { | ||
"rclone_job_submission_info": rclone_job_submission_info, | ||
"neuroconv_job_submission_info": neuroconv_job_submission_info, | ||
} | ||
|
||
# TODO: would be better to spin up third dependent job to clean up EFS volume after neuroconv job completes | ||
neuroconv_job_id = neuroconv_job_submission_info["job_submission_info"]["jobId"] | ||
job = None | ||
max_retries = 60 * 12 # roughly 12 hours max runtime (aside from internet loss) for checking cleanup | ||
sleep_time = 60 # 1 minute | ||
retry = 0.0 | ||
time.sleep(sleep_time) | ||
while retry < max_retries: | ||
job_description_response = batch_client.describe_jobs(jobs=[neuroconv_job_id]) | ||
if job_description_response["ResponseMetadata"]["HTTPStatusCode"] == 200: | ||
# sleep but only increment retry by a small amount | ||
# (really should only apply if internet connection is temporarily lost) | ||
retry += 0.1 | ||
time.sleep(sleep_time) | ||
|
||
job = job_description_response["jobs"][0] | ||
if job["status"] in _RETRY_STATES: | ||
retry += 1.0 | ||
time.sleep(sleep_time) | ||
elif job["status"] == "SUCCEEDED": | ||
break | ||
|
||
if retry >= max_retries: | ||
message = ( | ||
"Maximum retries reached for checking job completion for automatic EFS cleanup! " | ||
"Please delete the EFS volume manually." | ||
) | ||
warnings.warn(message=message, stacklevel=2) | ||
|
||
return info | ||
|
||
# Cleanup EFS after job is complete - must clear mount targets first, then wait before deleting the volume | ||
efs_volumes = efs_client.describe_file_systems() | ||
matching_efs_volumes = [ | ||
file_system | ||
for file_system in efs_volumes["FileSystems"] | ||
Comment on lines
+214
to
+218
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @h-mayorquin Also, in a follow-up, you may wish to add logic for detecting if the EFS volume exists before this method is called, in which case we might not want to clean it up automatically (might be shared across jobs or something) And/or always control the behavior with an extra argument |
||
for tag in file_system["Tags"] | ||
if tag["Key"] == "Name" and tag["Value"] == efs_volume_name | ||
] | ||
if len(matching_efs_volumes) != 1: | ||
message = ( | ||
f"Expected to find exactly one EFS volume with name '{efs_volume_name}', " | ||
f"but found {len(matching_efs_volumes)}\n\n{matching_efs_volumes=}\n\n!" | ||
"You will have to delete these manually." | ||
) | ||
warnings.warn(message=message, stacklevel=2) | ||
|
||
return info | ||
|
||
efs_volume = matching_efs_volumes[0] | ||
efs_id = efs_volume["FileSystemId"] | ||
mount_targets = efs_client.describe_mount_targets(FileSystemId=efs_id) | ||
for mount_target in mount_targets["MountTargets"]: | ||
efs_client.delete_mount_target(MountTargetId=mount_target["MountTargetId"]) | ||
|
||
time.sleep(sleep_time) | ||
efs_client.delete_file_system(FileSystemId=efs_id) | ||
|
||
return info |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This middle-man EFS management is rather annoying but had bugs with duplicates otherwise. Maybe a better way in the future would be direct EFS ID passing that overrides the 'search' based creation based on name pattern