Skip to content

Commit

Permalink
attempt to run reduced memory version of analyze_fulfilled_requests i…
Browse files Browse the repository at this point in the history
…n DO
  • Loading branch information
abelsonlive committed Dec 19, 2024
1 parent 7c8143a commit 76fb2d9
Show file tree
Hide file tree
Showing 5 changed files with 145 additions and 83 deletions.
1 change: 1 addition & 0 deletions app/bam_app/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

app = FastAPI()


# apikey authentication
def check_api_key(apikey: str):
if apikey != APIKEY:
Expand Down
211 changes: 130 additions & 81 deletions core/bam_core/functions/analyze_fulfilled_requests.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
from datetime import datetime
import logging
from collections import defaultdict
from typing import Any, Dict, List
from zoneinfo import ZoneInfo

from .base import Function
from .params import Params, Param
from bam_core.constants import FULFILLED_REQUESTS_SHEET_NAME
from bam_core.utils.serde import json_to_obj
import os
import tempfile
import json
from datetime import datetime, timedelta
import shutil
from typing import Any, Dict, Generator, List, Optional
from memory_profiler import profile

from bam_core.functions.base import Function
from bam_core.functions.params import Params, Param
from bam_core.constants import FULFILLED_REQUESTS_SHEET_NAME, REQUEST_FIELDS
from bam_core.lib.airtable import Airtable

log = logging.getLogger(__name__)

SNAPSHOT_DATE_FORMAT = r"%Y-%m-%d-%H-%M-%S"
SNAPSHOT_DATE_FIELD = "Snapshot Date"
Expand All @@ -22,7 +22,7 @@ class AnalyzeFulfilledRequests(Function):
name="dry_run",
type="bool",
default=True,
description="If true, data will not be written to the Google Sheet.",
description="If true, data will not be written to the Google Sheet.",
)
)

Expand All @@ -35,71 +35,113 @@ def get_snapshot_date(self, filepath):
filepath.split("/assistance-requests-main-")[-1].split(".")[0],
SNAPSHOT_DATE_FORMAT,
)
dt = dt.replace(tzinfo=ZoneInfo("America/New_York"))
dt = dt.astimezone(ZoneInfo("UTC"))
dt = dt - timedelta(hours=5) # Convert EST to UTC manually
return dt.date().isoformat()

def get_grouped_records(self):
def get_snapshot_files(self):
"""
Get records from Digital Ocean Space
Fetch snapshot files from Digital Ocean Space
"""
grouped_records = defaultdict(list)
self.log.info("Fetching snapshots from Digital Ocean Space...")
for filepath in self.s3.list_keys(
return self.s3.list_keys(
"airtable-snapshots/assistance-requests-main/"
):
)

def get_snapshot_records(self, filepath):
"""
Fetch snapshot file from Digital Ocean Space
"""
contents = self.s3.get_contents(filepath).decode("utf-8")
if contents:
return json.loads(contents)
return []

def write_snapshot_records(self, temp_dir, snapshot_date, records):
"""
Write snapshot records to temp directory
"""
for record in records:
small_record = {
k: v for k, v in record.items() if k in REQUEST_FIELDS
}
small_record[SNAPSHOT_DATE_FIELD] = snapshot_date
record_id = record["id"]
record_dir = os.path.join(temp_dir, record_id)
os.makedirs(record_dir, exist_ok=True)
record_file = os.path.join(record_dir, f"{snapshot_date}.json")
with open(record_file, "w") as f:
json.dump(small_record, f)
f.close()

def write_grouped_records(self):
"""
Get records from Digital Ocean Space and write them to temp directory
"""
temp_dir = tempfile.mkdtemp()
self.log.info(f"Temporary directory created at {temp_dir}")
self.log.info("Fetching snapshots from Digital Ocean Space...")
for filepath in self.get_snapshot_files():
if not filepath.endswith(".json"):
continue
log.debug(f"Fetching records from {filepath}")
snapshot_date = self.get_snapshot_date(filepath)
contents = self.s3.get_contents(filepath).decode("utf-8")
if contents:
file_records = json_to_obj(contents)
for record in file_records:
record[SNAPSHOT_DATE_FIELD] = snapshot_date
grouped_records[record["id"]].append(record)
return grouped_records

def analyze_grouped_records(
self, grouped_records: Dict[str, List[Dict[str, Any]]]
) -> List[Dict[str, Any]]:
file_records = self.get_snapshot_records(filepath)
self.write_snapshot_records(temp_dir, snapshot_date, file_records)
del file_records
return temp_dir

def analyze_requests_for_record(
self, temp_dir, record_id: str
) -> Generator[None, None, Optional[Dict[str, Any]]]:
record_dir = os.path.join(temp_dir, record_id)
if not os.path.isdir(record_dir):
return None
group_records = []
# iterate through files in the record directory
for record_file in os.listdir(record_dir):
record_path = os.path.join(record_dir, record_file)
with open(record_path, "r") as f:
group_records.append(json.load(f))
f.close()
# If there is only one snapshot for this record id, skip it
if len(group_records) <= 1:
return None
last_statuses = {}
# iterate through snapshots for this record id
for record in sorted(
group_records, key=lambda r: r[SNAPSHOT_DATE_FIELD]
):
these_statuses = Airtable.analyze_requests(record)
# iterate through tag types
for tag_type, these_tag_statuses in these_statuses.items():
# get last statuses for this tag type
last_tag_statuses = last_statuses.get(tag_type, {})
# iterate through previously open items for this tag type
for item in last_tag_statuses.get("open", []):
# if this item now has a delivered status
# mark it as delivered
if item in these_tag_statuses.get("delivered", []):
yield {
"Request Type": tag_type,
"Delivered Item": item,
"Date Delivered": record[SNAPSHOT_DATE_FIELD],
"Airtable Link": self.airtable.get_assistance_request_link(
record_id
),
}
last_statuses = these_statuses

def analyze_grouped_records(self, temp_dir: str) -> List[Dict[str, Any]]:
"""
Analyze grouped records and return a list of fulfilled requests
Analyze grouped records from temp directory and return a list of fulfilled requests
"""
fulfilled_requests = []
# iterate through list of snapshots for each record id
for request_id, group_records in grouped_records.items():
# If there is only one snapshot for this record id, skip it
if len(group_records) <= 1:
continue
last_statuses = {}
# iterate through snapshots for this record id
for record in sorted(
group_records, key=lambda r: r[SNAPSHOT_DATE_FIELD]
# iterate through directories for each record id
for record_id in os.listdir(temp_dir):
for fulfilled_request in self.analyze_requests_for_record(
temp_dir, record_id
):
these_statuses = Airtable.analyze_requests(record)
# iterate through tag types
for tag_type, these_tag_statuses in these_statuses.items():
# get last statuses for this tag type
last_tag_statuses = last_statuses.get(tag_type, {})
# iterate through previously open items for this tag type
for item in last_tag_statuses.get("open", []):
# if this item now has a delivered status
# mark it as delivered
if item in these_tag_statuses.get("delivered", []):
fulfilled_requests.append(
{
"Request Type": tag_type,
"Delivered Item": item,
"Date Delivered": record[
SNAPSHOT_DATE_FIELD
],
"Airtable Link": self.airtable.get_assistance_request_link(
request_id
),
}
)
last_statuses = these_statuses
if fulfilled_request:
fulfilled_requests.append(fulfilled_request)

return list(
sorted(
fulfilled_requests,
Expand All @@ -108,28 +150,35 @@ def analyze_grouped_records(
)
)

def run(self, params, context):
def run(self, params, _):
"""
Analyze airtable snapshots to identify fulfilled requests and write to a google sheet.
"""
# fetch records and group by ID
grouped_records = self.get_grouped_records()
fulfilled_requests = self.analyze_grouped_records(grouped_records)
self.log.info(f"Found {len(fulfilled_requests)} fulfilled requests")
if not params.get("dry_run", False):
self.log.info(
f"Writing fulfilled requests to Google Sheet: '{FULFILLED_REQUESTS_SHEET_NAME}'"
)
self.gsheets.upload_to_sheet(
sheet_name=FULFILLED_REQUESTS_SHEET_NAME,
sheet_index=0,
data=fulfilled_requests,
)
else:
temp_dir = None
try:
temp_dir = self.write_grouped_records()
fulfilled_requests = self.analyze_grouped_records(temp_dir)
self.log.info(
"Dry run, not writing fulfilled requests to Google Sheet"
f"Found {len(fulfilled_requests)} fulfilled requests"
)
return {"num_fulfilled_requests": len(fulfilled_requests)}
if not params.get("dry_run", False):
self.log.info(
f"Writing fulfilled requests to Google Sheet: '{FULFILLED_REQUESTS_SHEET_NAME}'"
)
self.gsheets.upload_to_sheet(
sheet_name=FULFILLED_REQUESTS_SHEET_NAME,
sheet_index=0,
data=fulfilled_requests,
)
else:
self.log.info(
"Dry run, not writing fulfilled requests to Google Sheet"
)
return {"num_fulfilled_requests": len(fulfilled_requests)}
finally:
if temp_dir:
shutil.rmtree(temp_dir)


if __name__ == "__main__":
Expand Down
10 changes: 8 additions & 2 deletions core/bam_core/lib/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,8 +161,14 @@ def get_contents(self, key: set):
:param key: An S3 key
:return dict
"""
obj = self.resource.Object(self.bucket_name, self._in_key(key))
return obj.get()["Body"].read()
f = None
try:
obj = self.resource.Object(self.bucket_name, self._in_key(key))
with obj.get()["Body"] as f:
return f.read()
finally:
if f:
f.close()

def exists(self, key: str) -> bool:
f"""
Expand Down
2 changes: 2 additions & 0 deletions core/bam_core/utils/serde.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,8 @@ def str_to_gz_fobj(s: str) -> io.BytesIO:
"""
out = io.BytesIO()
with gzip.GzipFile(fileobj=out, mode="w") as f:
if isinstance(s, str):
s = s.encode("utf-8")
f.write(s)
return out

Expand Down
4 changes: 4 additions & 0 deletions functions/packages/cron/daily/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@
from bam_core.functions.dedupe_airtable_views import DedupeAirtableViews
from bam_core.functions.update_mailjet_lists import UpdateMailjetLists
from bam_core.functions.snapshot_airtable_views import SnapshotAirtableViews
from bam_core.functions.analyze_fulfilled_requests import (
AnalyzeFulfilledRequests,
)


def main(event, context):
Expand All @@ -11,6 +14,7 @@ def main(event, context):
DedupeAirtableViews,
UpdateMailjetLists,
SnapshotAirtableViews,
AnalyzeFulfilledRequests,
)


Expand Down

0 comments on commit 76fb2d9

Please sign in to comment.