Skip to content

Add simple generate summaries and totals functions that group by directory. #103

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 13 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 8 additions & 9 deletions src/s3_log_extraction/_command_line_interface/_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,14 @@

import click

from ..config import reset_extraction, set_cache_directory
from ..config import get_summary_directory, reset_extraction, set_cache_directory
from ..extractors import DandiS3LogAccessExtractor, RemoteS3LogAccessExtractor, S3LogAccessExtractor, stop_extraction
from ..ip_utils import index_ips, update_index_to_region_codes, update_region_code_coordinates
from ..summarize import (
generate_all_dandiset_summaries,
generate_all_dandiset_totals,
generate_all_dataset_summaries,
generate_all_dataset_totals,
generate_archive_summaries,
generate_archive_totals,
)
Expand Down Expand Up @@ -203,6 +205,7 @@ def _update_ip_coordinates_cli() -> None:
"--mode",
help=(
"Generate condensed summaries of activity across the extracted data per object key. "
"Defaults to grouping summaries by top level prefix."
"Mode 'dandi' will map asset hashes to Dandisets and their content filenames. "
"Mode 'archive' aggregates over all dataset summaries."
),
Expand All @@ -213,17 +216,14 @@ def _update_ip_coordinates_cli() -> None:
def _update_summaries_cli(mode: typing.Literal["dandi", "archive"] | None = None) -> None:
"""
Generate condensed summaries of activity.

TODO
"""
match mode:
case "dandi":
generate_all_dandiset_summaries()
case "archive":
generate_archive_summaries()
generate_archive_summaries(get_summary_directory())
case _:
message = "The generic mode is not yet implemented - please raise an issue to discuss."
click.echo(message=message, err=True)
generate_all_dataset_summaries()


# s3logextraction update totals
Expand All @@ -244,10 +244,9 @@ def _update_totals_cli(mode: typing.Literal["dandi", "archive"] | None = None) -
case "dandi":
generate_all_dandiset_totals()
case "archive":
generate_archive_totals()
generate_archive_totals(get_summary_directory())
case _:
message = "The generic mode is not yet implemented - please raise an issue to discuss."
click.echo(message=message, err=True)
generate_all_dataset_totals()


# s3logextraction testing
Expand Down
4 changes: 3 additions & 1 deletion src/s3_log_extraction/extractors/_s3_log_access_extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,9 @@ def extract_directory(self, *, directory: str | pathlib.Path, limit: int | None
max_workers = _handle_max_workers(workers=workers)

all_log_files = {
str(file_path.absolute()) for file_path in natsort.natsorted(seq=directory.rglob(pattern="*-*-*-*-*-*-*"))
str(file_path.absolute())
for file_path in natsort.natsorted(seq=directory.rglob(pattern="*-*-*-*-*-*-*"))
if file_path.is_file()
}
unextracted_files = all_log_files - set(self.file_processing_end_record.keys())

Expand Down
13 changes: 8 additions & 5 deletions src/s3_log_extraction/summarize/__init__.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
from ._generate_archive_totals import generate_archive_totals
from ._generate_archive_summaries import generate_archive_summaries
from ._generate_all_dandiset_totals import generate_all_dandiset_totals

from ._generate_all_dandiset_summaries import generate_all_dandiset_summaries
from ._generate_all_dandiset_totals import generate_all_dandiset_totals
from ._generate_all_dataset_summaries import generate_all_dataset_summaries
from ._generate_all_dataset_totals import generate_all_dataset_totals
from ._generate_archive_summaries import generate_archive_summaries
from ._generate_archive_totals import generate_archive_totals

__all__ = [
"generate_all_dandiset_summaries",
"generate_all_dandiset_totals",
"generate_archive_totals",
"generate_all_dataset_summaries",
"generate_all_dataset_totals",
"generate_archive_summaries",
"generate_archive_totals",
]
166 changes: 166 additions & 0 deletions src/s3_log_extraction/summarize/_generate_all_dataset_summaries.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
import collections
import datetime
import pathlib

import pandas
import tqdm

from ..config import get_extraction_directory, get_summary_directory
from ..ip_utils import load_ip_cache


def generate_summaries(level: int = 0) -> None:
extraction_directory = get_extraction_directory()

datasets = [item for item in extraction_directory.iterdir() if item.is_dir()]

summary_directory = get_summary_directory()
index_to_region = load_ip_cache(cache_type="index_to_region")

for dataset in tqdm.tqdm(
iterable=datasets,
total=len(datasets),
desc="Summarizing Datasets",
position=0,
leave=True,
mininterval=5.0,
smoothing=0,
unit="dataset",
):
dataset_id = dataset.name
asset_directories = [file_path for file_path in dataset.rglob(pattern="*") if file_path.is_dir()]
_summarize_dataset(
dataset_id=dataset_id,
asset_directories=asset_directories,
summary_directory=summary_directory,
index_to_region=index_to_region,
)


def _summarize_dataset(
*,
dataset_id: str,
asset_directories: list[pathlib.Path],
summary_directory: pathlib.Path,
index_to_region: dict[int, str],
) -> None:
_summarize_dataset_by_day(
asset_directories=asset_directories,
summary_file_path=summary_directory / dataset_id / "dandiset_summary_by_day.tsv",
)
_summarize_dataset_by_asset(
asset_directories=asset_directories,
summary_file_path=summary_directory / dataset_id / "dandiset_summary_by_asset.tsv",
)
_summarize_dataset_by_region(
asset_directories=asset_directories,
summary_file_path=summary_directory / dataset_id / "dandiset_summary_by_region.tsv",
index_to_region=index_to_region,
)


def _summarize_dataset_by_day(*, asset_directories: list[pathlib.Path], summary_file_path: pathlib.Path) -> None:
all_dates = []
all_bytes_sent = []
for asset_directory in asset_directories:
# TODO: Could add a step here to track which object IDs have been processed, and if encountered again
# Just copy the file over instead of reprocessing

timestamps_file_path = asset_directory / "timestamps.txt"

if not timestamps_file_path.exists():
continue

dates = [
datetime.datetime.strptime(str(timestamp.strip()), "%y%m%d%H%M%S").strftime(format="%Y-%m-%d")
for timestamp in timestamps_file_path.read_text().splitlines()
]
all_dates.extend(dates)

bytes_sent_file_path = asset_directory / "bytes_sent.txt"
bytes_sent = [int(value.strip()) for value in bytes_sent_file_path.read_text().splitlines()]
all_bytes_sent.extend(bytes_sent)

summarized_activity_by_day = collections.defaultdict(int)
for date, bytes_sent in zip(all_dates, all_bytes_sent):
summarized_activity_by_day[date] += bytes_sent

if len(summarized_activity_by_day) == 0:
return

summary_file_path.parent.mkdir(parents=True, exist_ok=True)
summary_table = pandas.DataFrame(
data={
"date": list(summarized_activity_by_day.keys()),
"bytes_sent": list(summarized_activity_by_day.values()),
}
)
summary_table.sort_values(by="date", inplace=True)
summary_table.index = range(len(summary_table))
summary_table.to_csv(path_or_buf=summary_file_path, mode="w", sep="\t", header=True, index=True)


def _summarize_dataset_by_asset(*, asset_directories: list[pathlib.Path], summary_file_path: pathlib.Path) -> None:
summarized_activity_by_asset = collections.defaultdict(int)
for asset_directory in asset_directories:
# TODO: Could add a step here to track which object IDs have been processed, and if encountered again
# Just copy the file over instead of reprocessing
bytes_sent_file_path = asset_directory / "bytes_sent.txt"

if not bytes_sent_file_path.exists():
continue

bytes_sent = [int(value.strip()) for value in bytes_sent_file_path.read_text().splitlines()]

asset_path = str(asset_directory)
summarized_activity_by_asset[asset_path] += sum(bytes_sent)

if len(summarized_activity_by_asset) == 0:
return

summary_file_path.parent.mkdir(parents=True, exist_ok=True)
summary_table = pandas.DataFrame(
data={
"asset_path": list(summarized_activity_by_asset.keys()),
"bytes_sent": list(summarized_activity_by_asset.values()),
}
)
summary_table.to_csv(path_or_buf=summary_file_path, mode="w", sep="\t", header=True, index=True)


def _summarize_dataset_by_region(
*, asset_directories: list[pathlib.Path], summary_file_path: pathlib.Path, index_to_region: dict[int, str]
) -> None:
all_regions = []
all_bytes_sent = []
for asset_directory in asset_directories:
# TODO: Could add a step here to track which object IDs have been processed, and if encountered again
# Just copy the file over instead of reprocessing
indexed_ips_file_path = asset_directory / "indexed_ips.txt"

if not indexed_ips_file_path.exists():
continue

indexed_ips = [ip_index.strip() for ip_index in indexed_ips_file_path.read_text().splitlines()]
regions = [index_to_region.get(ip_index.strip(), "unknown") for ip_index in indexed_ips]
all_regions.extend(regions)

bytes_sent_file_path = asset_directory / "bytes_sent.txt"
bytes_sent = [int(value.strip()) for value in bytes_sent_file_path.read_text().splitlines()]
all_bytes_sent.extend(bytes_sent)

summarized_activity_by_region = collections.defaultdict(int)
for region, bytes_sent in zip(all_regions, all_bytes_sent):
summarized_activity_by_region[region] += bytes_sent

if len(summarized_activity_by_region) == 0:
return

summary_file_path.parent.mkdir(parents=True, exist_ok=True)
summary_table = pandas.DataFrame(
data={
"region": list(summarized_activity_by_region.keys()),
"bytes_sent": list(summarized_activity_by_region.values()),
}
)
summary_table.to_csv(path_or_buf=summary_file_path, mode="w", sep="\t", header=True, index=True)
57 changes: 57 additions & 0 deletions src/s3_log_extraction/summarize/_generate_all_dataset_totals.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
import json
import pathlib

import pandas

from ..config import get_summary_directory


def generate_all_dataset_totals(
summary_directory: str | pathlib.Path | None = None,
) -> None:
"""
Generate top-level totals of summarized access activity for all dandisets.

Parameters
----------
summary_directory : pathlib.Path
Path to the folder containing all Dandiset summaries of the S3 access logs.
"""
if summary_directory:
summary_directory = pathlib.Path(summary_directory)
else:
summary_directory = get_summary_directory()

# TODO: record progress over

all_dandiset_totals = {}
for dandiset_id_folder_path in summary_directory.iterdir():
if not dandiset_id_folder_path.is_dir():
continue # TODO: use better structure for separating mapped activity from summaries
dandiset_id = dandiset_id_folder_path.name

summary_file_path = summary_directory / dandiset_id / "dandiset_summary_by_region.tsv"
summary = pandas.read_table(filepath_or_buffer=summary_file_path)

unique_countries = {}
for region in summary["region"]:
if region in ["VPN", "GitHub", "unknown"]:
continue

country_code, region_name = region.split("/")
if "AWS" in country_code:
country_code = region_name.split("-")[0].upper()

unique_countries[country_code] = True

number_of_unique_regions = len(summary["region"])
number_of_unique_countries = len(unique_countries)
all_dandiset_totals[dandiset_id] = {
"total_bytes_sent": int(summary["bytes_sent"].sum()),
"number_of_unique_regions": number_of_unique_regions,
"number_of_unique_countries": number_of_unique_countries,
}

top_level_summary_file_path = summary_directory / "all_dandiset_totals.json"
with top_level_summary_file_path.open(mode="w") as io:
json.dump(obj=all_dandiset_totals, fp=io)