Skip to content

Commit

Permalink
setup
Browse files Browse the repository at this point in the history
  • Loading branch information
CodyCBakerPhD committed Sep 16, 2024
1 parent ad1e2a1 commit 9b78811
Show file tree
Hide file tree
Showing 5 changed files with 268 additions and 5 deletions.
7 changes: 4 additions & 3 deletions .github/workflows/deploy-tests.yml
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
name: Deploy tests

on:
pull_request:
types: [synchronize, opened, reopened, ready_for_review] # defaults + ready_for_review
merge_group:
# TODO: disabled to save resources
# pull_request:
# types: [synchronize, opened, reopened, ready_for_review] # defaults + ready_for_review
# merge_group:
workflow_dispatch:

concurrency: # Cancel previous workflows on the same pull request
Expand Down
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
# Upcoming

## Features
* Added the `rclone_transfer_batch_job` helper function for executing Rclone data transfers in AWS Batch jobs. [PR #]()



## v0.6.4

## Bug Fixes
Expand Down
3 changes: 2 additions & 1 deletion src/neuroconv/tools/aws/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from ._submit_aws_batch_job import submit_aws_batch_job
from ._rclone_transfer_batch_job import rclone_transfer_batch_job

__all__ = ["submit_aws_batch_job"]
__all__ = ["submit_aws_batch_job", "rclone_transfer_batch_job"]
113 changes: 113 additions & 0 deletions src/neuroconv/tools/aws/_rclone_transfer_batch_job.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
"""Collection of helper functions for assessing and performing automated data transfers related to AWS."""

import warnings
from typing import Optional

from pydantic import FilePath, validate_call

from ._submit_aws_batch_job import submit_aws_batch_job


@validate_call
def rclone_transfer_batch_job(
*,
rclone_command: str,
job_name: str,
efs_volume_name: str,
rclone_config_file_path: Optional[FilePath] = None,
status_tracker_table_name: str = "neuroconv_batch_status_tracker",
compute_environment_name: str = "neuroconv_batch_environment",
job_queue_name: str = "neuroconv_batch_queue",
job_definition_name: Optional[str] = None,
minimum_worker_ram_in_gib: int = 4,
minimum_worker_cpus: int = 4,
submission_id: Optional[str] = None,
region: Optional[str] = None,
) -> dict[str, str]:
"""
Submit a job to AWS Batch for processing.
Requires AWS credentials saved to files in the `~/.aws/` folder or set as environment variables.
Parameters
----------
rclone_command : str
The command to pass directly to Rclone running on the EC2 instance.
E.g.: "rclone copy my_drive:testing_rclone /mnt/efs"
Must move data from or to '/mnt/efs'.
job_name : str
The name of the job to submit.
efs_volume_name : str
The name of an EFS volume to be created and attached to the job.
The path exposed to the container will always be `/mnt/efs`.
rclone_config_file_path : FilePath, optional
The path to the Rclone configuration file to use for the job.
If unspecified, method will attempt to find the file in `~/.rclone` and will raise an error if it cannot.
status_tracker_table_name : str, default: "neuroconv_batch_status_tracker"
The name of the DynamoDB table to use for tracking job status.
compute_environment_name : str, default: "neuroconv_batch_environment"
The name of the compute environment to use for the job.
job_queue_name : str, default: "neuroconv_batch_queue"
The name of the job queue to use for the job.
job_definition_name : str, optional
The name of the job definition to use for the job.
If unspecified, a name starting with 'neuroconv_batch_' will be generated.
minimum_worker_ram_in_gib : int, default: 4
The minimum amount of base worker memory required to run this job.
Determines the EC2 instance type selected by the automatic 'best fit' selector.
Recommended to be several GiB to allow comfortable buffer space for data chunk iterators.
minimum_worker_cpus : int, default: 4
The minimum number of CPUs required to run this job.
A minimum of 4 is required, even if only one will be used in the actual process.
submission_id : str, optional
The unique ID to pair with this job submission when tracking the status via DynamoDB.
Defaults to a random UUID4.
region : str, optional
The AWS region to use for the job.
If not provided, we will attempt to load the region from your local AWS configuration.
If that file is not found on your system, we will default to "us-east-2", the location of the DANDI Archive.
Returns
-------
info : dict
A dictionary containing information about this AWS Batch job.
info["job_submission_info"] is the return value of `boto3.client.submit_job` which contains the job ID.
info["table_submission_info"] is the initial row data inserted into the DynamoDB status tracking table.
"""
docker_image = "ghcr.io/catalystneuro/rclone_with_config:latest"

if "/mnt/efs" not in rclone_command:
message = (
f"The Rclone command '{rclone_command}' does not contain a reference to '/mnt/efs'. "
"Without utilizing the EFS mount, the instance is unlikely to have enough local disk space."
)
warnings.warn(message=message, stacklevel=2)

rclone_config_file_path = rclone_config_file_path or pathlib.Path.home() / ".rclone" / "rclone.conf"
if not rclone_config_file_path.exists():
raise FileNotFoundError(
f"Rclone configuration file not found at: {rclone_config_file_path}! "
"Please check that `rclone config` successfully created the file."
)
with open(file=rclone_config_file_path, mode="r") as io:
rclone_config_file_stream = io.read()

region = region or "us-east-2"

info = submit_aws_batch_job(
job_name=job_name,
docker_image=docker_image,
environment_variables={"RCLONE_CONFIG": rclone_config_file_stream, "RCLONE_COMMAND": rclone_command},
efs_volume_name=efs_volume_name,
status_tracker_table_name=status_tracker_table_name,
compute_environment_name=compute_environment_name,
job_queue_name=job_queue_name,
job_definition_name=job_definition_name,
minimum_worker_ram_in_gib=minimum_worker_ram_in_gib,
minimum_worker_cpus=minimum_worker_cpus,
submission_id=submission_id,
region=region,
)

return info
145 changes: 144 additions & 1 deletion tests/test_minimal/test_tools/aws_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

import boto3

from neuroconv.tools.aws import submit_aws_batch_job
from neuroconv.tools.aws import rclone_transfer_batch_job, submit_aws_batch_job

_RETRY_STATES = ["RUNNABLE", "PENDING", "STARTING", "RUNNING"]

Expand Down Expand Up @@ -297,3 +297,146 @@ def test_submit_aws_batch_job_with_efs_mount():
table.update_item(
Key={"id": table_submission_id}, AttributeUpdates={"status": {"Action": "PUT", "Value": "Test passed."}}
)


class TestRcloneTransferBatchJob(TestCase):
"""
To allow this test to work, the developer must create a folder on the outer level of their personal Google Drive
called 'testing_rclone_spikeglx' with the following structure:
testing_rclone_spikeglx
├── ci_tests
├────── Noise4Sam_g0
Where 'Noise4Sam' is from the 'spikeglx/Noise4Sam_g0' GIN ephys dataset.
Then the developer must install Rclone and call `rclone config` to generate tokens in their own `rclone.conf` file.
The developer can easily find the location of the config file on their system using `rclone config file`.
"""

test_folder = OUTPUT_PATH / "aws_rclone_tests"
test_config_file_path = test_folder / "rclone.conf"

def setUp(self):
self.test_folder.mkdir(exist_ok=True)

# Pretend as if .conf file already exists on the system (created via interactive `rclone config` command)
token_dictionary = dict(
access_token=os.environ["RCLONE_DRIVE_ACCESS_TOKEN"],
token_type="Bearer",
refresh_token=os.environ["RCLONE_DRIVE_REFRESH_TOKEN"],
expiry=os.environ["RCLONE_EXPIRY_TOKEN"],
)
token_string = str(token_dictionary).replace("'", '"').replace(" ", "")
rclone_config_contents = [
"[test_google_drive_remote]\n",
"type = drive\n",
"scope = drive\n",
f"token = {token_string}\n",
"team_drive = \n",
"\n",
]
with open(file=self.test_config_file_path, mode="w") as io:
io.writelines(rclone_config_contents)

def test_rclone_transfer_batch_job(self):
region = "us-east-2"
aws_access_key_id = os.environ.get("AWS_ACCESS_KEY_ID", None)
aws_secret_access_key = os.environ.get("AWS_SECRET_ACCESS_KEY", None)

dynamodb_resource = boto3.resource(
service_name="dynamodb",
region_name=region,
aws_access_key_id=aws_access_key_id,
aws_secret_access_key=aws_secret_access_key,
)
batch_client = boto3.client(
service_name="batch",
region_name=region,
aws_access_key_id=aws_access_key_id,
aws_secret_access_key=aws_secret_access_key,
)
efs_client = boto3.client(
service_name="efs",
region_name=region,
aws_access_key_id=aws_access_key_id,
aws_secret_access_key=aws_secret_access_key,
)

rclone_command = "rclone copy test_google_drive_remote:testing_rclone_spikeglx /mnt/efs"
rclone_config_file_path = self.test_config_file_path

info = rclone_transfer_batch_job(
rclone_command=rclone_command,
rclone_config_file_path=rclone_config_file_path,
)

# Wait for AWS to process the job
time.sleep(60)

job_id = info["job_submission_info"]["jobId"]
job = None
max_retries = 10
retry = 0
while retry < max_retries:
job_description_response = batch_client.describe_jobs(jobs=[job_id])
assert job_description_response["ResponseMetadata"]["HTTPStatusCode"] == 200

jobs = job_description_response["jobs"]
assert len(jobs) == 1

job = jobs[0]

if job["status"] in _RETRY_STATES:
retry += 1
time.sleep(60)
else:
break

# Check EFS specific details
efs_volumes = efs_client.describe_file_systems()
matching_efs_volumes = [
file_system
for file_system in efs_volumes["FileSystems"]
for tag in file_system["Tags"]
if tag["Key"] == "Name" and tag["Value"] == efs_volume_name
]
assert len(matching_efs_volumes) == 1
efs_volume = matching_efs_volumes[0]
efs_id = efs_volume["FileSystemId"]

# Check normal job completion
assert job["jobName"] == job_name
assert "neuroconv_batch_queue" in job["jobQueue"]
assert "fs-" in job["jobDefinition"]
assert job["status"] == "SUCCEEDED"

status_tracker_table_name = "neuroconv_batch_status_tracker"
table = dynamodb_resource.Table(name=status_tracker_table_name)
table_submission_id = info["table_submission_info"]["id"]

table_item_response = table.get_item(Key={"id": table_submission_id})
assert table_item_response["ResponseMetadata"]["HTTPStatusCode"] == 200

table_item = table_item_response["Item"]
assert table_item["job_name"] == job_name
assert table_item["job_id"] == job_id
assert table_item["status"] == "Job submitted..."

table.update_item(
Key={"id": table_submission_id},
AttributeUpdates={"status": {"Action": "PUT", "Value": "Test passed - cleaning up..."}},
)

# Cleanup EFS after testing is complete - must clear mount targets first, then wait before deleting the volume
# TODO: cleanup job definitions? (since built daily)
mount_targets = efs_client.describe_mount_targets(FileSystemId=efs_id)
for mount_target in mount_targets["MountTargets"]:
efs_client.delete_mount_target(MountTargetId=mount_target["MountTargetId"])

time.sleep(60)
efs_client.delete_file_system(FileSystemId=efs_id)

table.update_item(
Key={"id": table_submission_id}, AttributeUpdates={"status": {"Action": "PUT", "Value": "Test passed."}}
)

0 comments on commit 9b78811

Please sign in to comment.