Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Version/aeolus #13

Draft
wants to merge 42 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
0e09415
Improvements to allow nodes to be added from measurements in lcs
caparker Oct 12, 2022
d79becd
Moved to using orjson in the fetch process
caparker Oct 25, 2022
99365eb
Updated check to use orjson, added line check before load
caparker Oct 25, 2022
33228c8
Added sensors_latest update on ingestion
caparker Nov 9, 2022
10076d1
Updated to use new split_ingest_id method to parse ingest_id
caparker Nov 17, 2022
d9e728b
Testing out a new way to handle the timestamps on ingest
caparker Nov 19, 2022
492096b
Fixed timestamp 'bug', Added diagnostic data to ingest
caparker Nov 22, 2022
a7d107c
Added providers and timezones check on insert
caparker Dec 1, 2022
31bdb28
More updates to help with ingesting
caparker Jan 20, 2023
1676950
Updated ingester
caparker Jan 25, 2023
8c768d2
Adding batch methods for check
caparker Mar 21, 2023
d4aecc3
Ingest improvements
caparker Mar 30, 2023
e4ef1f9
Added start/end date lookup to realtime fetcher
caparker Jul 18, 2023
f6f54c4
Cleaning up
caparker Sep 7, 2023
2680b3d
Temporary fix to the airgradient duplication issue
caparker Mar 9, 2024
60c931c
Updated the lcs class to support the newer data format
caparker Apr 25, 2024
6d72520
Cleaned up to work in production setting
caparker Apr 25, 2024
1a5a82d
Swtiched source of loader in the handler
caparker Apr 25, 2024
feec1b0
Adding git action
caparker Jun 10, 2024
97360fb
Fix action branch name
caparker Jun 10, 2024
a82b13e
Updated deployment
caparker Jun 11, 2024
a121166
cleaning up the settings
caparker Jun 11, 2024
7c3e22b
Updated from 3.9 and added poetry
caparker Jun 11, 2024
36ce602
fixed deployment code
caparker Jun 11, 2024
d451de9
Changed position of the python install in deployment
caparker Jun 11, 2024
68ecd22
Updated pydantic settings
caparker Jun 11, 2024
0b2144a
Fixing computed field issue
caparker Jun 11, 2024
4b12116
Adding export plugin
caparker Jun 11, 2024
509d3a8
UPdated cdk version
caparker Jun 11, 2024
b5d0daf
Removed cdk version from deploy
caparker Jun 11, 2024
7fc9e87
Updated the python version to 12
caparker Jun 11, 2024
ec92f49
Redeploy with ingesting turned off
caparker Jun 13, 2024
d59f4f1
Clean up
caparker Aug 2, 2024
b9fab84
Resetting PAUSE_INGESTING to be False
caparker Aug 4, 2024
db082ed
Updates to support the CAC data (#14)
caparker Oct 23, 2024
9a9f3cf
Fixed bug with inserted new hourly data
caparker Oct 23, 2024
fb0ee0c
Updated the insterted_hours method
caparker Oct 24, 2024
0016d3d
Removed query
caparker Oct 24, 2024
37ba617
Redirected the realtime hourly queue update to the new table
caparker Nov 12, 2024
398c0f5
Updated to support uuid
caparker Nov 13, 2024
62f017c
Changed fake fetchlogs id
caparker Nov 13, 2024
d1ca447
Added backup sd value of zero
caparker Nov 13, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 76 additions & 0 deletions .github/workflows/deploy.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
name: Deploy ingestor

on:
push:
branches:
- version/aeolus

jobs:
deploy:
runs-on: ubuntu-latest
steps:
- name: Checkout repo
uses: actions/checkout@v4

- name: Configure aws credentials
uses: aws-actions/configure-aws-credentials@master
with:
aws-access-key-id: ${{ secrets.AWS_ACCESS_KEY_PROD }}
aws-secret-access-key: ${{ secrets.AWS_SECRET_KEY_PROD }}
aws-region: ${{ secrets.AWS_REGION }}

- name: Get envionmental values
uses: aws-actions/aws-secretsmanager-get-secrets@v2
with:
secret-ids: |
AEOLUS, openaq-env/aeolus
name-transformation: uppercase
parse-json-secrets: true

- uses: actions/setup-node@v4
with:
node-version: "20"


- name: Install CDK
run: |
npm install -g aws-cdk

- uses: actions/setup-python@v5
with:
python-version: '3.12'

- name: Install Poetry
uses: snok/install-poetry@v1

- name: Deploy stack
env:
ENV: "aeolus"
PROJECT: "openaq"

## deployment variables
# CDK_ACCOUNT: ${{ secrets.CDK_ACCOUNT }}
# CDK_REGION: ${{ secrets.CDK_REGION }}

VPC_ID: ${{ env.AEOLUS_VPC_ID }}

TOPIC_ARN: ${{ env.AEOLUS_FETCH_OBJECT_TOPIC_ARN }}

## application variables
DATABASE_READ_USER: ${{ env.AEOLUS_DATABASE_READ_USER }}
DATABASE_READ_PASSWORD: ${{ env.AEOLUS_DATABASE_READ_PASSWORD }}
DATABASE_WRITE_USER: ${{ env.AEOLUS_DATABASE_WRITE_USER }}
DATABASE_WRITE_PASSWORD: ${{ env.AEOLUS_DATABASE_WRITE_PASSWORD }}
DATABASE_DB: ${{ env.AEOLUS_DATABASE_DB }}
DATABASE_HOST: ${{ env.AEOLUS_DATABASE_HOST }}
DATABASE_PORT: ${{ env.AEOLUS_DATABASE_PORT }}
FETCH_BUCKET: ${{ env.AEOLUS_FETCH_BUCKET }}
ETL_BUCKET: ${{ env.AEOLUS_FETCH_BUCKET }}
PAUSE_INGESTING: False


working-directory: ./cdk
run: |
poetry self add poetry-plugin-export
poetry install
cdk deploy openaq-ingest-aeolus --require-approval never
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,5 @@


# Testing a realtime file

# Testing files
95 changes: 95 additions & 0 deletions benchmark.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
import logging
import os
import sys
import argparse
from time import time
import re

logger = logging.getLogger(__name__)

parser = argparse.ArgumentParser(
description="""
Test benchmarks for ingestion
""")

parser.add_argument(
'--name',
type=str,
required=False,
default="4xlarge",
help='Name to use for the test'
)
parser.add_argument(
'--env',
type=str,
default='.env',
required=False,
help='The dot env file to use'
)
parser.add_argument(
'--debug',
action="store_true",
help='Output at DEBUG level'
)
args = parser.parse_args()

if 'DOTENV' not in os.environ.keys() and args.env is not None:
os.environ['DOTENV'] = args.env

if args.debug:
os.environ['LOG_LEVEL'] = 'DEBUG'

from ingest.settings import settings
from fake import config, get_locations, as_realtime
from ingest.fetch import load_realtime

logging.basicConfig(
format='[%(asctime)s] %(levelname)s [%(name)s:%(lineno)s] %(message)s',
level=settings.LOG_LEVEL.upper(),
force=True,
)

f = open(f"benchmark_ingest_output_{args.name}.csv", "w")
f.writelines("name,key,locations,inserted_nodes,updated_nodes,total_meas,inserted_meas,ingest_time,process_time,log_time,copy_time,load_process_time\n")
n = 10
locations = [50, 250, 1000]
keys = []
ii = 1

## make a set of files
for r in locations:
for i in range(n):
config(source=f"benchmark-test-{r}-{i+1}", gz=True)
l = get_locations(n=r)
key = as_realtime(l["locations"], l["latitude"], l["longitude"])
keys.append({ "key": key, "locations": len(l["locations"]) })
ii=+1


## ingest each of the
for i, k in enumerate(keys):
key = k["key"]
locations = k["locations"]
logger.info(f"Ingesting {i+1} of {len(keys)}: {key} with {locations} locations")

start_time = time()
copy_time, load_process_time, log_time, notice = load_realtime([
(-1, key, None)
])
m = re.findall('([a-z-]+): (.+?),', notice)

process_time = round(float(m[17][1]))
total_meas = int(m[0][1])
inserted_meas = int(m[9][1])
updated_nodes = int(m[8][1])
inserted_nodes = int(m[11][1])
ingest_time = round((time() - start_time)*1000)
f.writelines(f"'{args.name}','{key}',{locations},{inserted_nodes},{updated_nodes},{total_meas},{inserted_meas},{ingest_time},{process_time},{log_time},{copy_time},{load_process_time}\n")

logger.info(
"loaded realtime records, timer: %0.4f, process: %0.4f",
ingest_time, process_time
)


f.close()
14 changes: 11 additions & 3 deletions cdk/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
Environment,
Tags,
)
import os

from lambda_ingest_stack import LambdaIngestStack

Expand All @@ -19,16 +20,23 @@

app = aws_cdk.App()

env = Environment(
account=os.environ['CDK_DEFAULT_ACCOUNT'],
region=os.environ['CDK_DEFAULT_REGION']
)

ingest = LambdaIngestStack(
app,
f"openaq-ingest-{settings.ENV}",
env_name=settings.ENV,
lambda_env=lambda_env,
fetch_bucket=settings.FETCH_BUCKET,
ingest_lambda_timeout=settings.INGEST_LAMBDA_TIMEOUT,
ingest_lambda_memory_size=settings.INGEST_LAMBDA_MEMORY_SIZE,
ingest_rate_minutes=settings.INGEST_RATE_MINUTES,
vpc_id=settings.VPC_ID,
lambda_timeout=settings.LAMBDA_TIMEOUT,
lambda_memory_size=settings.LAMBDA_MEMORY_SIZE,
rate_minutes=settings.RATE_MINUTES,
topic_arn=settings.TOPIC_ARN,
env=env,
)

Tags.of(ingest).add("project", settings.PROJECT)
Expand Down
2 changes: 1 addition & 1 deletion cdk/cdk.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"app": "python3.8 app.py",
"app": "poetry run python app.py",
"context": {
"aws-cdk:enableDiffNoFail": "true",
"@aws-cdk/core:stackRelativeExports": "true",
Expand Down
22 changes: 12 additions & 10 deletions cdk/config.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
from typing import List
from pydantic import BaseSettings
from pydantic_settings import (
BaseSettings,
SettingsConfigDict,
)
from pathlib import Path
from os import environ

Expand All @@ -8,18 +11,17 @@ class Settings(BaseSettings):
FETCH_BUCKET: str
ENV: str = "staging"
PROJECT: str = "openaq"
INGEST_LAMBDA_TIMEOUT: int = 900
INGEST_LAMBDA_MEMORY_SIZE: int = 1536
INGEST_RATE_MINUTES: int = 15
LAMBDA_TIMEOUT: int = 900
LAMBDA_MEMORY_SIZE: int = 1536
RATE_MINUTES: int = 15
LOG_LEVEL: str = 'INFO'
TOPIC_ARN: str = None
VPC_ID: str = None

class Config:
parent = Path(__file__).resolve().parent.parent
if 'DOTENV' in environ:
env_file = Path.joinpath(parent, environ['DOTENV'])
else:
env_file = Path.joinpath(parent, ".env")

model_config = SettingsConfigDict(
extra="ignore", env_file=f"../{environ.get('DOTENV', '.env')}", env_file_encoding="utf-8"
)


settings = Settings()
26 changes: 17 additions & 9 deletions cdk/lambda_ingest_stack.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@
from typing import Dict

from aws_cdk import (
Environment,
aws_lambda,
aws_s3,
aws_ec2,
Stack,
Duration,
aws_events,
Expand All @@ -24,18 +26,23 @@ def __init__(
self,
scope: Construct,
id: str,
env: Environment,
env_name: str,
lambda_env: Dict,
fetch_bucket: str,
ingest_lambda_timeout: int,
ingest_lambda_memory_size: int,
ingest_rate_minutes: int = 15,
lambda_timeout: int,
lambda_memory_size: int,
rate_minutes: int = 15,
topic_arn: str = None,
vpc_id: str = None,
**kwargs,
) -> None:
"""Lambda plus cronjob to ingest metadata,
realtime and pipeline data"""
super().__init__(scope, id, *kwargs)
super().__init__(scope, id, env=env,*kwargs)

if vpc_id is not None:
vpc_id = aws_ec2.Vpc.from_lookup(self, f"{id}-vpc", vpc_id=vpc_id)

ingest_function = aws_lambda.Function(
self,
Expand All @@ -58,11 +65,12 @@ def __init__(
],
),
handler="ingest.handler.handler",
runtime=aws_lambda.Runtime.PYTHON_3_8,
vpc=vpc_id,
runtime=aws_lambda.Runtime.PYTHON_3_12,
allow_public_subnet=True,
memory_size=ingest_lambda_memory_size,
memory_size=lambda_memory_size,
environment=stringify_settings(lambda_env),
timeout=Duration.seconds(ingest_lambda_timeout),
timeout=Duration.seconds(lambda_timeout),
layers=[
create_dependencies_layer(
self,
Expand All @@ -81,12 +89,12 @@ def __init__(

# Set how often the ingester will run
# If 0 the ingester will not run automatically
if ingest_rate_minutes > 0:
if rate_minutes > 0:
aws_events.Rule(
self,
f"{id}-ingest-event-rule",
schedule=aws_events.Schedule.cron(
minute=f"0/{ingest_rate_minutes}"
minute=f"0/{rate_minutes}"
),
targets=[
aws_events_targets.LambdaFunction(ingest_function),
Expand Down
14 changes: 0 additions & 14 deletions cdk/requirements.txt

This file was deleted.

14 changes: 8 additions & 6 deletions cdk/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,22 @@ def create_dependencies_layer(
function_name: str,
requirements_path: Path
) -> aws_lambda.LayerVersion:
requirements_file = str(requirements_path.resolve())
#requirements_file = str(requirements_path.resolve())
output_dir = f'../.build/{function_name}'
layer_id = f'openaq-{function_name}-{env_name}-dependencies'

if not environ.get('SKIP_PIP'):
print(f'Building {layer_id} from {requirements_file} into {output_dir}')
if not environ.get('SKIP_BUILD'):
print(f'Building {layer_id} into {output_dir}')
subprocess.run(
f"""python3.8 -m pip install -qq -r {requirements_file} \
f"""
poetry export --without=cdk -o requirements.txt --without-hashes && \
poetry run python -m pip install -qq -r requirements.txt \
-t {output_dir}/python && \
cd {output_dir}/python && \
find . -type f -name '*.pyc' | \
while read f; do n=$(echo $f | \
sed 's/__pycache__\///' | \
sed 's/.cpython-[2-3] [0-9]//'); \
sed 's/.cpython-[2-3][0-9]//'); \
cp $f $n; \
done \
&& find . -type d -a -name '__pycache__' -print0 | xargs -0 rm -rf \
Expand All @@ -47,5 +49,5 @@ def create_dependencies_layer(
self,
layer_id,
code=layer_code,
compatible_runtimes=[aws_lambda.Runtime.PYTHON_3_8]
compatible_runtimes=[aws_lambda.Runtime.PYTHON_3_12]
)
Loading
Loading