diff --git a/src/s3_log_extraction/_command_line_interface/_cli.py b/src/s3_log_extraction/_command_line_interface/_cli.py index f9fb1c6..82de83d 100644 --- a/src/s3_log_extraction/_command_line_interface/_cli.py +++ b/src/s3_log_extraction/_command_line_interface/_cli.py @@ -5,12 +5,14 @@ import click -from ..config import reset_extraction, set_cache_directory +from ..config import get_summary_directory, reset_extraction, set_cache_directory from ..extractors import DandiS3LogAccessExtractor, RemoteS3LogAccessExtractor, S3LogAccessExtractor, stop_extraction from ..ip_utils import index_ips, update_index_to_region_codes, update_region_code_coordinates from ..summarize import ( generate_all_dandiset_summaries, generate_all_dandiset_totals, + generate_all_dataset_summaries, + generate_all_dataset_totals, generate_archive_summaries, generate_archive_totals, ) @@ -203,6 +205,7 @@ def _update_ip_coordinates_cli() -> None: "--mode", help=( "Generate condensed summaries of activity across the extracted data per object key. " + "Defaults to grouping summaries by top level prefix." "Mode 'dandi' will map asset hashes to Dandisets and their content filenames. " "Mode 'archive' aggregates over all dataset summaries." ), @@ -213,17 +216,14 @@ def _update_ip_coordinates_cli() -> None: def _update_summaries_cli(mode: typing.Literal["dandi", "archive"] | None = None) -> None: """ Generate condensed summaries of activity. - - TODO """ match mode: case "dandi": generate_all_dandiset_summaries() case "archive": - generate_archive_summaries() + generate_archive_summaries(get_summary_directory()) case _: - message = "The generic mode is not yet implemented - please raise an issue to discuss." - click.echo(message=message, err=True) + generate_all_dataset_summaries() # s3logextraction update totals @@ -244,10 +244,9 @@ def _update_totals_cli(mode: typing.Literal["dandi", "archive"] | None = None) - case "dandi": generate_all_dandiset_totals() case "archive": - generate_archive_totals() + generate_archive_totals(get_summary_directory()) case _: - message = "The generic mode is not yet implemented - please raise an issue to discuss." - click.echo(message=message, err=True) + generate_all_dataset_totals() # s3logextraction testing diff --git a/src/s3_log_extraction/extractors/_s3_log_access_extractor.py b/src/s3_log_extraction/extractors/_s3_log_access_extractor.py index b8f9311..a45130d 100644 --- a/src/s3_log_extraction/extractors/_s3_log_access_extractor.py +++ b/src/s3_log_extraction/extractors/_s3_log_access_extractor.py @@ -117,7 +117,9 @@ def extract_directory(self, *, directory: str | pathlib.Path, limit: int | None max_workers = _handle_max_workers(workers=workers) all_log_files = { - str(file_path.absolute()) for file_path in natsort.natsorted(seq=directory.rglob(pattern="*-*-*-*-*-*-*")) + str(file_path.absolute()) + for file_path in natsort.natsorted(seq=directory.rglob(pattern="*-*-*-*-*-*-*")) + if file_path.is_file() } unextracted_files = all_log_files - set(self.file_processing_end_record.keys()) diff --git a/src/s3_log_extraction/summarize/__init__.py b/src/s3_log_extraction/summarize/__init__.py index 205b6b9..dd77d62 100644 --- a/src/s3_log_extraction/summarize/__init__.py +++ b/src/s3_log_extraction/summarize/__init__.py @@ -1,12 +1,15 @@ -from ._generate_archive_totals import generate_archive_totals -from ._generate_archive_summaries import generate_archive_summaries -from ._generate_all_dandiset_totals import generate_all_dandiset_totals - from ._generate_all_dandiset_summaries import generate_all_dandiset_summaries +from ._generate_all_dandiset_totals import generate_all_dandiset_totals +from ._generate_all_dataset_summaries import generate_all_dataset_summaries +from ._generate_all_dataset_totals import generate_all_dataset_totals +from ._generate_archive_summaries import generate_archive_summaries +from ._generate_archive_totals import generate_archive_totals __all__ = [ "generate_all_dandiset_summaries", "generate_all_dandiset_totals", - "generate_archive_totals", + "generate_all_dataset_summaries", + "generate_all_dataset_totals", "generate_archive_summaries", + "generate_archive_totals", ] diff --git a/src/s3_log_extraction/summarize/_generate_all_dataset_summaries.py b/src/s3_log_extraction/summarize/_generate_all_dataset_summaries.py new file mode 100644 index 0000000..63ffd58 --- /dev/null +++ b/src/s3_log_extraction/summarize/_generate_all_dataset_summaries.py @@ -0,0 +1,166 @@ +import collections +import datetime +import pathlib + +import pandas +import tqdm + +from ..config import get_extraction_directory, get_summary_directory +from ..ip_utils import load_ip_cache + + +def generate_summaries(level: int = 0) -> None: + extraction_directory = get_extraction_directory() + + datasets = [item for item in extraction_directory.iterdir() if item.is_dir()] + + summary_directory = get_summary_directory() + index_to_region = load_ip_cache(cache_type="index_to_region") + + for dataset in tqdm.tqdm( + iterable=datasets, + total=len(datasets), + desc="Summarizing Datasets", + position=0, + leave=True, + mininterval=5.0, + smoothing=0, + unit="dataset", + ): + dataset_id = dataset.name + asset_directories = [file_path for file_path in dataset.rglob(pattern="*") if file_path.is_dir()] + _summarize_dataset( + dataset_id=dataset_id, + asset_directories=asset_directories, + summary_directory=summary_directory, + index_to_region=index_to_region, + ) + + +def _summarize_dataset( + *, + dataset_id: str, + asset_directories: list[pathlib.Path], + summary_directory: pathlib.Path, + index_to_region: dict[int, str], +) -> None: + _summarize_dataset_by_day( + asset_directories=asset_directories, + summary_file_path=summary_directory / dataset_id / "dandiset_summary_by_day.tsv", + ) + _summarize_dataset_by_asset( + asset_directories=asset_directories, + summary_file_path=summary_directory / dataset_id / "dandiset_summary_by_asset.tsv", + ) + _summarize_dataset_by_region( + asset_directories=asset_directories, + summary_file_path=summary_directory / dataset_id / "dandiset_summary_by_region.tsv", + index_to_region=index_to_region, + ) + + +def _summarize_dataset_by_day(*, asset_directories: list[pathlib.Path], summary_file_path: pathlib.Path) -> None: + all_dates = [] + all_bytes_sent = [] + for asset_directory in asset_directories: + # TODO: Could add a step here to track which object IDs have been processed, and if encountered again + # Just copy the file over instead of reprocessing + + timestamps_file_path = asset_directory / "timestamps.txt" + + if not timestamps_file_path.exists(): + continue + + dates = [ + datetime.datetime.strptime(str(timestamp.strip()), "%y%m%d%H%M%S").strftime(format="%Y-%m-%d") + for timestamp in timestamps_file_path.read_text().splitlines() + ] + all_dates.extend(dates) + + bytes_sent_file_path = asset_directory / "bytes_sent.txt" + bytes_sent = [int(value.strip()) for value in bytes_sent_file_path.read_text().splitlines()] + all_bytes_sent.extend(bytes_sent) + + summarized_activity_by_day = collections.defaultdict(int) + for date, bytes_sent in zip(all_dates, all_bytes_sent): + summarized_activity_by_day[date] += bytes_sent + + if len(summarized_activity_by_day) == 0: + return + + summary_file_path.parent.mkdir(parents=True, exist_ok=True) + summary_table = pandas.DataFrame( + data={ + "date": list(summarized_activity_by_day.keys()), + "bytes_sent": list(summarized_activity_by_day.values()), + } + ) + summary_table.sort_values(by="date", inplace=True) + summary_table.index = range(len(summary_table)) + summary_table.to_csv(path_or_buf=summary_file_path, mode="w", sep="\t", header=True, index=True) + + +def _summarize_dataset_by_asset(*, asset_directories: list[pathlib.Path], summary_file_path: pathlib.Path) -> None: + summarized_activity_by_asset = collections.defaultdict(int) + for asset_directory in asset_directories: + # TODO: Could add a step here to track which object IDs have been processed, and if encountered again + # Just copy the file over instead of reprocessing + bytes_sent_file_path = asset_directory / "bytes_sent.txt" + + if not bytes_sent_file_path.exists(): + continue + + bytes_sent = [int(value.strip()) for value in bytes_sent_file_path.read_text().splitlines()] + + asset_path = str(asset_directory) + summarized_activity_by_asset[asset_path] += sum(bytes_sent) + + if len(summarized_activity_by_asset) == 0: + return + + summary_file_path.parent.mkdir(parents=True, exist_ok=True) + summary_table = pandas.DataFrame( + data={ + "asset_path": list(summarized_activity_by_asset.keys()), + "bytes_sent": list(summarized_activity_by_asset.values()), + } + ) + summary_table.to_csv(path_or_buf=summary_file_path, mode="w", sep="\t", header=True, index=True) + + +def _summarize_dataset_by_region( + *, asset_directories: list[pathlib.Path], summary_file_path: pathlib.Path, index_to_region: dict[int, str] +) -> None: + all_regions = [] + all_bytes_sent = [] + for asset_directory in asset_directories: + # TODO: Could add a step here to track which object IDs have been processed, and if encountered again + # Just copy the file over instead of reprocessing + indexed_ips_file_path = asset_directory / "indexed_ips.txt" + + if not indexed_ips_file_path.exists(): + continue + + indexed_ips = [ip_index.strip() for ip_index in indexed_ips_file_path.read_text().splitlines()] + regions = [index_to_region.get(ip_index.strip(), "unknown") for ip_index in indexed_ips] + all_regions.extend(regions) + + bytes_sent_file_path = asset_directory / "bytes_sent.txt" + bytes_sent = [int(value.strip()) for value in bytes_sent_file_path.read_text().splitlines()] + all_bytes_sent.extend(bytes_sent) + + summarized_activity_by_region = collections.defaultdict(int) + for region, bytes_sent in zip(all_regions, all_bytes_sent): + summarized_activity_by_region[region] += bytes_sent + + if len(summarized_activity_by_region) == 0: + return + + summary_file_path.parent.mkdir(parents=True, exist_ok=True) + summary_table = pandas.DataFrame( + data={ + "region": list(summarized_activity_by_region.keys()), + "bytes_sent": list(summarized_activity_by_region.values()), + } + ) + summary_table.to_csv(path_or_buf=summary_file_path, mode="w", sep="\t", header=True, index=True) diff --git a/src/s3_log_extraction/summarize/_generate_all_dataset_totals.py b/src/s3_log_extraction/summarize/_generate_all_dataset_totals.py new file mode 100644 index 0000000..8f72701 --- /dev/null +++ b/src/s3_log_extraction/summarize/_generate_all_dataset_totals.py @@ -0,0 +1,57 @@ +import json +import pathlib + +import pandas + +from ..config import get_summary_directory + + +def generate_all_dataset_totals( + summary_directory: str | pathlib.Path | None = None, +) -> None: + """ + Generate top-level totals of summarized access activity for all dandisets. + + Parameters + ---------- + summary_directory : pathlib.Path + Path to the folder containing all Dandiset summaries of the S3 access logs. + """ + if summary_directory: + summary_directory = pathlib.Path(summary_directory) + else: + summary_directory = get_summary_directory() + + # TODO: record progress over + + all_dandiset_totals = {} + for dandiset_id_folder_path in summary_directory.iterdir(): + if not dandiset_id_folder_path.is_dir(): + continue # TODO: use better structure for separating mapped activity from summaries + dandiset_id = dandiset_id_folder_path.name + + summary_file_path = summary_directory / dandiset_id / "dandiset_summary_by_region.tsv" + summary = pandas.read_table(filepath_or_buffer=summary_file_path) + + unique_countries = {} + for region in summary["region"]: + if region in ["VPN", "GitHub", "unknown"]: + continue + + country_code, region_name = region.split("/") + if "AWS" in country_code: + country_code = region_name.split("-")[0].upper() + + unique_countries[country_code] = True + + number_of_unique_regions = len(summary["region"]) + number_of_unique_countries = len(unique_countries) + all_dandiset_totals[dandiset_id] = { + "total_bytes_sent": int(summary["bytes_sent"].sum()), + "number_of_unique_regions": number_of_unique_regions, + "number_of_unique_countries": number_of_unique_countries, + } + + top_level_summary_file_path = summary_directory / "all_dandiset_totals.json" + with top_level_summary_file_path.open(mode="w") as io: + json.dump(obj=all_dandiset_totals, fp=io)