Skip to content

Commit 4011088

Browse files
authored
Merge pull request #263 from USACE/380-product-aprfc-data-products
380 product aprfc data products
2 parents 7cad0cf + 8f17c6b commit 4011088

File tree

3 files changed

+130
-1
lines changed

3 files changed

+130
-1
lines changed

dags/cumulus/aprfc_qpf_06h.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -105,4 +105,4 @@ def notify_cumulus(payload):
105105
notify_cumulus(download_raw_qpf())
106106

107107

108-
aprfc_qpe_dag = cumulus_aprfc_qpf_06h()
108+
aprfc_qpf_dag = cumulus_aprfc_qpf_06h()

dags/cumulus/aprfc_qtf_01h.py

+128
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,128 @@
1+
"""
2+
Acquire and Process APRFC qtf 01h
3+
"""
4+
5+
import json
6+
from datetime import datetime, timedelta
7+
import calendar
8+
from bs4 import BeautifulSoup
9+
import re
10+
import requests
11+
12+
from airflow import DAG
13+
from airflow.decorators import dag, task
14+
from airflow.operators.python import get_current_context
15+
from helpers.downloads import trigger_download
16+
17+
import helpers.cumulus as cumulus
18+
19+
# Default arguments
20+
default_args = {
21+
"owner": "airflow",
22+
"depends_on_past": False,
23+
"start_date": (datetime.utcnow() - timedelta(hours=36)).replace(minute=0, second=0),
24+
"catchup_by_default": False,
25+
"email_on_failure": False,
26+
"email_on_retry": False,
27+
"retries": 6,
28+
"retry_delay": timedelta(minutes=30),
29+
}
30+
31+
def get_latest_files(filenames):
32+
# Dictionary to store the latest file for each unique timestamp
33+
latest_files = {}
34+
35+
# Regular expression to extract the timestamp
36+
pattern = r'ta01f_has_\d+f_(\d{8}_\d{2})_awips_(\d+)'
37+
38+
for filename in filenames:
39+
match = re.search(pattern, filename)
40+
if match:
41+
key = match.group(1) + '_' + match.group(2)
42+
if key not in latest_files or filename > latest_files[key]:
43+
latest_files[key] = filename
44+
45+
# Return the list of latest files
46+
return list(latest_files.values())
47+
48+
# APRFC qtf filename generator
49+
def get_filenames(edate, url):
50+
"""
51+
date at end of filename hour and min can not be predicted
52+
scraping data from website and finding all matching filenames
53+
for the specified date.
54+
"""
55+
d_t1 = edate.strftime("%Y%m%d")
56+
57+
page = requests.get(url)
58+
soup = BeautifulSoup(page.content, "html.parser")
59+
links = [node.get("href") for node in soup.find_all("a")]
60+
filenames = []
61+
62+
63+
regex = f"^ta01f_has_\\d+f_\\d{{8}}_\\d{{2}}_awips_{d_t1}.*\\.grb(\\.gz)?$"
64+
filenames = [link for link in links if re.match(regex, link)]
65+
66+
67+
68+
69+
return get_latest_files(filenames)
70+
71+
72+
73+
@dag(
74+
default_args=default_args,
75+
schedule="40 5,23 * * *",
76+
tags=["cumulus", "temp", "QTF", "APRFC"],
77+
max_active_runs=1,
78+
max_active_tasks=1,
79+
)
80+
def cumulus_aprfc_qtf_01h():
81+
"""This pipeline handles download, processing, and derivative product creation for \n
82+
APRFC QTF\n
83+
URL Dir - https://cbt.crohms.org/akgrids
84+
Files matching ta01f_has_92f_20241219_08_awips_202412150008.grb. - 1 hour\n
85+
"""
86+
key_prefix = cumulus.S3_ACQUIRABLE_PREFIX
87+
URL_ROOT = f"https://cbt.crohms.org/akgrids"
88+
PRODUCT_SLUG = "aprfc-qtf-01h"
89+
90+
@task()
91+
def download_raw_qtf():
92+
logical_date = get_current_context()["logical_date"]
93+
94+
return_list = list()
95+
filenames = get_filenames(logical_date, URL_ROOT)
96+
for filename in filenames:
97+
url = f"{URL_ROOT}/{filename}"
98+
s3_key = f"{key_prefix}/{PRODUCT_SLUG}/{filename}"
99+
print(f"Downloading file: {filename}")
100+
try:
101+
trigger_download(url=url, s3_bucket=cumulus.S3_BUCKET, s3_key=s3_key)
102+
return_list.append(
103+
{
104+
"execution": logical_date.isoformat(),
105+
"s3_key": s3_key,
106+
"filename": filename,
107+
}
108+
)
109+
except:
110+
print(f"{filename} is not available to download")
111+
112+
return json.dumps(return_list)
113+
114+
@task()
115+
def notify_cumulus(payload):
116+
payload = json.loads(payload)
117+
for item in payload:
118+
print("Notifying Cumulus: " + item["filename"])
119+
cumulus.notify_acquirablefile(
120+
acquirable_id=cumulus.acquirables[PRODUCT_SLUG],
121+
datetime=item["execution"],
122+
s3_key=item["s3_key"],
123+
)
124+
125+
notify_cumulus(download_raw_qtf())
126+
127+
128+
aprfc_qtf_dag = cumulus_aprfc_qtf_01h()

plugins/helpers/cumulus.py

+1
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
"aprfc-qpe-06h": "1f67d822-7cbc-11ee-b962-0242ac120002",
1616
"aprfc-qpf-06h": "a64cb16f-01a8-45c0-a069-9afda805d3a7",
1717
"aprfc-qte-01h": "7f8b2d6a-1f3e-11ee-be56-0242ac120002",
18+
"aprfc-qtf-01h": "80f33047-6234-4949-9c2f-eec6bfcf7b0f",
1819
"cnrfc-qpe-06h": "34a89c35-090d-46e8-964a-c621403301b9",
1920
"cnrfc-qpf-06h": "c22785cd-400e-4664-aef8-426734825c2c",
2021
"cnrfc-nbm-qpf-06h": "40cfce36-cfad-4a10-8b2d-eb8862378ca5",

0 commit comments

Comments
 (0)