Skip to content

Commit

Permalink
reduce memory usage in analyze_fulfilled_requests
Browse files Browse the repository at this point in the history
  • Loading branch information
abelsonlive committed Dec 12, 2024
1 parent b704583 commit b46b325
Show file tree
Hide file tree
Showing 6 changed files with 102 additions and 107 deletions.
36 changes: 0 additions & 36 deletions app/bam_app/cron.py

This file was deleted.

1 change: 1 addition & 0 deletions app/bam_app/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

app = FastAPI()


# apikey authentication
def check_api_key(apikey: str):
if apikey != APIKEY:
Expand Down
1 change: 0 additions & 1 deletion app/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
fastapi
uvicorn
sqlalchemy
schedule
1 change: 0 additions & 1 deletion app/start.sh
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,4 @@

set -x

# python -m bam_app.cron &
uvicorn bam_app.main:app --port 3030 --host 0.0.0.0 --log-level info
168 changes: 99 additions & 69 deletions core/bam_core/functions/analyze_fulfilled_requests.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
from datetime import datetime
import logging
from collections import defaultdict
import os
import json
import tempfile
import shutil
from typing import Any, Dict, List
from zoneinfo import ZoneInfo
from collections import defaultdict

from .base import Function
from .params import Params, Param
from bam_core.constants import FULFILLED_REQUESTS_SHEET_NAME
from bam_core.utils.serde import json_to_obj
from bam_core.lib.airtable import Airtable

log = logging.getLogger(__name__)
Expand All @@ -17,12 +20,18 @@


class AnalyzeFulfilledRequests(Function):
"""
This function analyzes airtable snapshots to identify fulfilled requests and writes them to a Google Sheet.
It writes individual records to files in a temporary directory and groups them by record ID in order to
reduce memory usage.
"""

params = Params(
Param(
name="dry_run",
type="bool",
default=True,
description="If true, data will not be written to the Google Sheet.",
description="If true, data will not be written to the Google Sheet.",
)
)

Expand All @@ -39,12 +48,22 @@ def get_snapshot_date(self, filepath):
dt = dt.astimezone(ZoneInfo("UTC"))
return dt.date().isoformat()

def get_grouped_records(self):
def save_record_to_file(
self, record, record_id, snapshot_date, records_dir
):
"""
Get records from Digital Ocean Space
Save individual record to a JSON file in a unique directory based on the record ID.
"""
record_dir = os.path.join(records_dir, record_id)
os.makedirs(record_dir, exist_ok=True)
record_file = os.path.join(record_dir, f"{snapshot_date}.json")
with open(record_file, "w") as f:
json.dump(record, f)

def process_snapshots(self, records_dir):
"""
Process snapshots from Digital Ocean Space and save each record to a file.
"""
grouped_records = defaultdict(list)
self.log.info("Fetching snapshots from Digital Ocean Space...")
for filepath in self.s3.list_keys(
"airtable-snapshots/assistance-requests-main/"
):
Expand All @@ -54,82 +73,93 @@ def get_grouped_records(self):
snapshot_date = self.get_snapshot_date(filepath)
contents = self.s3.get_contents(filepath).decode("utf-8")
if contents:
file_records = json_to_obj(contents)
file_records = json.loads(contents)
for record in file_records:
record[SNAPSHOT_DATE_FIELD] = snapshot_date
grouped_records[record["id"]].append(record)
return grouped_records
self.save_record_to_file(
record, record["id"], snapshot_date, records_dir
)

def analyze_grouped_records(
self, grouped_records: Dict[str, List[Dict[str, Any]]]
def get_group_records(
self, records_dir, record_id
) -> List[Dict[str, Any]]:
"""
Analyze grouped records and return a list of fulfilled requests
Read records from files and group them by record ID.
"""
group_records = []
record_id_dir = os.path.join(records_dir, record_id)
if os.path.isdir(record_id_dir):
for record_file in os.listdir(record_id_dir):
with open(os.path.join(record_id_dir, record_file), "r") as f:
record = json.load(f)
group_records.append(record)
return group_records

def analyze_record(
self, record_id: str, group_records: List[Dict[str, Any]]
) -> List[Dict[str, Any]]:
"""
Analyze records for a single record ID and return a list of fulfilled requests.
"""
fulfilled_requests = []
# iterate through list of snapshots for each record id
for request_id, group_records in grouped_records.items():
# If there is only one snapshot for this record id, skip it
if len(group_records) <= 1:
continue
last_statuses = {}
# iterate through snapshots for this record id
for record in sorted(
group_records, key=lambda r: r[SNAPSHOT_DATE_FIELD]
):
these_statuses = Airtable.analyze_requests(record)
# iterate through tag types
for tag_type, these_tag_statuses in these_statuses.items():
# get last statuses for this tag type
last_tag_statuses = last_statuses.get(tag_type, {})
# iterate through previously open items for this tag type
for item in last_tag_statuses.get("open", []):
# if this item now has a delivered status
# mark it as delivered
if item in these_tag_statuses.get("delivered", []):
fulfilled_requests.append(
{
"Request Type": tag_type,
"Delivered Item": item,
"Date Delivered": record[
SNAPSHOT_DATE_FIELD
],
"Airtable Link": self.airtable.get_assistance_request_link(
request_id
),
}
)
last_statuses = these_statuses
return list(
sorted(
fulfilled_requests,
key=lambda r: r["Date Delivered"],
reverse=True,
)
)
if len(group_records) <= 1:
return fulfilled_requests
last_statuses = {}
for record in sorted(
group_records, key=lambda r: r[SNAPSHOT_DATE_FIELD]
):
these_statuses = Airtable.analyze_requests(record)
for tag_type, these_tag_statuses in these_statuses.items():
last_tag_statuses = last_statuses.get(tag_type, {})
for item in last_tag_statuses.get("open", []):
if item in these_tag_statuses.get("delivered", []):
fulfilled_requests.append(
{
"Request Type": tag_type,
"Delivered Item": item,
"Date Delivered": record[SNAPSHOT_DATE_FIELD],
"Airtable Link": self.airtable.get_assistance_request_link(
record_id
),
}
)
last_statuses = these_statuses
return fulfilled_requests

def run(self, params, context):
"""
Analyze airtable snapshots to identify fulfilled requests and write to a google sheet.
"""
# fetch records and group by ID
grouped_records = self.get_grouped_records()
fulfilled_requests = self.analyze_grouped_records(grouped_records)
self.log.info(f"Found {len(fulfilled_requests)} fulfilled requests")
if not params.get("dry_run", False):
self.log.info(
f"Writing fulfilled requests to Google Sheet: '{FULFILLED_REQUESTS_SHEET_NAME}'"
)
self.gsheets.upload_to_sheet(
sheet_name=FULFILLED_REQUESTS_SHEET_NAME,
sheet_index=0,
data=fulfilled_requests,
)
else:
records_dir = tempfile.mkdtemp()
try:
self.log.info("Fetching snapshots from Digital Ocean Space...")
fulfilled_requests = []
self.process_snapshots(records_dir)
self.log.info("Analyzing records...")
for record_id in os.listdir(records_dir):
group_records = self.get_group_records(records_dir, record_id)
fulfilled_requests.extend(
self.analyze_record(record_id, group_records)
)
self.log.info(
"Dry run, not writing fulfilled requests to Google Sheet"
f"Found {len(fulfilled_requests)} fulfilled requests"
)
return {"num_fulfilled_requests": len(fulfilled_requests)}
if not params.get("dry_run", False):
self.log.info(
f"Writing fulfilled requests to Google Sheet: '{FULFILLED_REQUESTS_SHEET_NAME}'"
)
self.gsheets.upload_to_sheet(
sheet_name=FULFILLED_REQUESTS_SHEET_NAME,
sheet_index=0,
data=fulfilled_requests,
)
else:
self.log.info(
"Dry run, not writing fulfilled requests to Google Sheet"
)
return {"num_fulfilled_requests": len(fulfilled_requests)}
finally:
shutil.rmtree(records_dir)


if __name__ == "__main__":
Expand Down
2 changes: 2 additions & 0 deletions functions/packages/cron/daily/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from bam_core.functions.dedupe_airtable_views import DedupeAirtableViews
from bam_core.functions.update_mailjet_lists import UpdateMailjetLists
from bam_core.functions.snapshot_airtable_views import SnapshotAirtableViews
from bam_core.functions.analyze_fulfilled_requests import AnalyzeFulfilledRequests


def main(event, context):
Expand All @@ -11,6 +12,7 @@ def main(event, context):
DedupeAirtableViews,
UpdateMailjetLists,
SnapshotAirtableViews,
AnalyzeFulfilledRequests,
)


Expand Down

0 comments on commit b46b325

Please sign in to comment.