Skip to content

Commit a1c9a71

Browse files
committed
add aprfc_qpf_06h
1 parent 305b773 commit a1c9a71

File tree

1 file changed

+110
-0
lines changed

1 file changed

+110
-0
lines changed

dags/cumulus/aprfc_qpf_06h.py

+110
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
"""
2+
Acquire and Process APRFC QPF 06h
3+
"""
4+
5+
import json
6+
from datetime import datetime, timedelta, timezone
7+
import calendar
8+
from bs4 import BeautifulSoup
9+
import re
10+
import requests
11+
12+
from airflow import DAG
13+
from airflow.decorators import dag, task
14+
from airflow.operators.python import get_current_context
15+
from helpers.downloads import trigger_download
16+
17+
import helpers.cumulus as cumulus
18+
19+
# Default arguments
20+
default_args = {
21+
"owner": "airflow",
22+
"depends_on_past": False,
23+
"start_date": (datetime.now(timezone.utc) - timedelta(hours=72)).replace(
24+
minute=0, second=0
25+
),
26+
"catchup_by_default": False,
27+
"email_on_failure": False,
28+
"email_on_retry": False,
29+
"retries": 6,
30+
"retry_delay": timedelta(minutes=30),
31+
}
32+
33+
34+
# ALR QPF filename generator
35+
def get_filenames(edate, url):
36+
"""
37+
date at end of filename hour and min can not be predicted
38+
scraping data from website and finding all matching filenames
39+
for the sprcified date.
40+
"""
41+
d_t1 = edate.strftime("%Y%m%d")
42+
d_t2 = (edate - timedelta(hours=24)).strftime("%Y%m%d")
43+
44+
page = requests.get(url)
45+
soup = BeautifulSoup(page.content, "html.parser")
46+
links = [node.get("href") for node in soup.find_all("a")]
47+
filenames = []
48+
for d in [d_t2, d_t1]:
49+
regex = f"^qpf06f_has_.*.awips_{d}\d+.grb.gz$"
50+
filenames = filenames + [link for link in links if re.match(regex, link)]
51+
52+
return filenames
53+
54+
55+
@dag(
56+
default_args=default_args,
57+
schedule="40 14,5 * * *",
58+
tags=["cumulus", "precip", "QPF", "APRFC"],
59+
max_active_runs=2,
60+
max_active_tasks=4,
61+
)
62+
def cumulus_aprfc_qpf_06h():
63+
"""This pipeline handles download, processing, and derivative product creation for \n
64+
APRFC QPE\n
65+
URL Dir - https://cbt.crohms.org/akgrids
66+
Files matching qpf06f_has_6f_20200917_18_awips_202009170949.grb - 6 hour\n
67+
"""
68+
key_prefix = cumulus.S3_ACQUIRABLE_PREFIX
69+
URL_ROOT = f"https://cbt.crohms.org/akgrids"
70+
PRODUCT_SLUG = "aprfc-qpf-06h"
71+
72+
@task()
73+
def download_raw_qpf():
74+
logical_date = get_current_context()["logical_date"]
75+
76+
return_list = list()
77+
filenames = get_filenames(logical_date, URL_ROOT)
78+
for filename in filenames:
79+
url = f"{URL_ROOT}/{filename}"
80+
s3_key = f"{key_prefix}/{PRODUCT_SLUG}/{filename}"
81+
print(f"Downloading file: {filename}")
82+
try:
83+
trigger_download(url=url, s3_bucket=cumulus.S3_BUCKET, s3_key=s3_key)
84+
return_list.append(
85+
{
86+
"execution": logical_date.isoformat(),
87+
"s3_key": s3_key,
88+
"filename": filename,
89+
}
90+
)
91+
except:
92+
print(f"{filename} is not available to download")
93+
94+
return json.dumps(return_list)
95+
96+
@task()
97+
def notify_cumulus(payload):
98+
payload = json.loads(payload)
99+
for item in payload:
100+
print("Notifying Cumulus: " + item["filename"])
101+
cumulus.notify_acquirablefile(
102+
acquirable_id=cumulus.acquirables[PRODUCT_SLUG],
103+
datetime=item["execution"],
104+
s3_key=item["s3_key"],
105+
)
106+
107+
notify_cumulus(download_raw_qpf())
108+
109+
110+
aprfc_qpe_dag = cumulus_aprfc_qpf_06h()

0 commit comments

Comments
 (0)