Skip to content

Commit

Permalink
Merge pull request #35 from wri/develop
Browse files Browse the repository at this point in the history
Allow more than 115 tiles
  • Loading branch information
Thomas Maschler authored Apr 12, 2020
2 parents 3409633 + 9e8ca4f commit 6dec7bb
Show file tree
Hide file tree
Showing 3 changed files with 111 additions and 12 deletions.
2 changes: 1 addition & 1 deletion glad/stages/collectors.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ def get_most_recent_day(**kwargs: Any) -> Tuple[str, List[str]]:
)

available_tiles: List[str] = _check_tifs_exist(process_date, tile_ids, years)
if len(available_tiles) == num_tiles:
if len(available_tiles) >= num_tiles:
return process_date, available_tiles

msg: str = (
Expand Down
2 changes: 1 addition & 1 deletion lambda/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ ENV WORKDIR /var/task
ENV LAMBDANAME gfw-glad-pipeline

RUN mkdir -p packages/ \
&& pip install retrying boto3 google-cloud-storage -t packages
&& pip install retrying boto3 google-cloud-storage requests -t packages

#Precompile all python packages and remove .py files
#RUN python -m compileall .
Expand Down
119 changes: 109 additions & 10 deletions lambda/lambda_function.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
import json

import requests
from google.auth.exceptions import TransportError
from google.cloud import storage
from math import floor
Expand All @@ -7,27 +10,90 @@
import boto3
import botocore


S3 = boto3.resource("s3")
LASTRUN = S3.Object("gfw2-data", "forest_change/umd_landsat_alerts/prod/events/lastrun")
STATUS = S3.Object("gfw2-data", "forest_change/umd_landsat_alerts/prod/events/status")


def lambda_handler(event, context):
kwargs = {
"lastrun": get_lastrun(),
"status": get_status(),
"tile_ids": get_tile_ids_by_bbox(-120, -40, 180, 30),
"years": get_current_years(),
"num_tiles": 115,
}

if "report" in event.keys() and event["report"]:
return report(**kwargs)
else:
return trigger_pipeline(**kwargs)


def report(**kwargs):
try:
lastrun = kwargs["lastrun"]
status = kwargs["status"]

lastrun = get_lastrun()
status = get_status()
available_tiles = get_export_history(**kwargs)
r = requests.get(
"https://production-api.globalforestwatch.org/v1/glad-alerts/latest"
)
latest_alert = r.json()["data"][0]["attributes"]["date"]

msg = f"""
Lastest GEE export processed: {lastrun.strftime("%Y-%m-%d")}\n
Status of last run: {status}\n
Latest detected alert: {latest_alert}\n
\n
Export status of GEE tiles:\n
{available_tiles}\n
\n
Next update will be triggered as soon as all 115 tiles for a newer date are available.
"""

slack_webhook(level="INFO", message=msg)
return {
"statusCode": 200,
"headers": {"Content-Type": "application/json"},
"body": {"msg": msg},
}
except Exception as e:

tile_ids = get_tile_ids_by_bbox(-120, -40, 180, 30)
return {
"statusCode": 500,
"headers": {"Content-Type": "application/json"},
"body": {"message": str(e)},
}

num_tiles = 115
years = get_current_years()

kwargs = {"years": years, "num_tiles": num_tiles}
def get_export_history(**kwargs):
tile_ids = kwargs["tile_ids"]
years = kwargs["years"]
num_tiles = kwargs["num_tiles"]
today = datetime.datetime.today()

available_tiles = ""
# check for most recent day of GLAD data
for day_offset in range(0, 11):
process_date = (today - datetime.timedelta(days=day_offset)).strftime(
"%Y/%m_%d"
)
tiles = _check_tifs_exist(process_date, tile_ids, years)
available_tiles += f"GEE process date: {datetime.datetime.strptime(process_date, '%Y/%m_%d').strftime('%Y-%m-%d')} - {len(tiles)} tiles for {years} exported\n"
if len(tiles) >= num_tiles:
return available_tiles

available_tiles += "Checked GCS for last 10 days - none had all {} tiled TIFs."
return available_tiles


def trigger_pipeline(**kwargs):
lastrun = kwargs["lastrun"]
status = kwargs["status"]
try:
try:
tile_date = get_most_recent_day(tile_ids=tile_ids, **kwargs)
tile_date = get_most_recent_day(**kwargs)
except ValueError:
return {
"statusCode": 500,
Expand Down Expand Up @@ -85,6 +151,7 @@ def lambda_handler(event, context):
"Action": "No action taken",
},
}

except Exception as e:
return {
"statusCode": 500,
Expand Down Expand Up @@ -209,7 +276,7 @@ def get_most_recent_day(**kwargs):
)

available_tiles = _check_tifs_exist(process_date, tile_ids, years)
if len(available_tiles) == num_tiles:
if len(available_tiles) >= num_tiles:
return datetime.datetime.strptime(process_date, "%Y/%m_%d")

msg: str = ("Checked GCS for last 10 days - none had all {} tiled TIFs.")
Expand Down Expand Up @@ -328,5 +395,37 @@ def serialize_dates(d):
return d


def get_slack_webhook(channel):
client = boto3.client("secretsmanager")
response = client.get_secret_value(SecretId="slack/gfw-sync")
return json.loads(response["SecretString"])[channel]


def slack_webhook(level, message):
app = "GFW SYNC - DAILY GLAD UPDATE REPORT"

if level.upper() == "WARNING":
color = "#E2AC37"
elif level.upper() == "ERROR" or level.upper() == "CRITICAL":
color = "#FF0000"

else:
color = "#36A64F"

attachement = {
"attachments": [
{
"fallback": "{} - {} - {}".format(app, level.upper(), message),
"color": color,
"title": app,
"fields": [{"title": level.upper(), "value": message, "short": False}],
}
]
}

url = get_slack_webhook("data-updates")
return requests.post(url, json=attachement)


if __name__ == "__main__":
print(lambda_handler(None, None))
print(lambda_handler({"report": True}, None))

0 comments on commit 6dec7bb

Please sign in to comment.