Skip to content

Commit 3759826

Browse files
committed
sync newer aprfc dags with adjustments
1 parent eef1591 commit 3759826

File tree

5 files changed

+243
-4
lines changed

5 files changed

+243
-4
lines changed

dags/cumulus/abrfc_qpf_06h.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -56,8 +56,8 @@ def qpf_filenames(edate):
5656
schedule="8 */6 * * *",
5757
tags=["cumulus", "precip", "QPF", "ABRFC"],
5858
doc_md=__doc__,
59-
max_active_runs=2,
60-
max_active_tasks=4,
59+
max_active_runs=1,
60+
max_active_tasks=2,
6161
)
6262
def cumulus_abrfc_qpf_06h():
6363
key_prefix = cumulus.S3_ACQUIRABLE_PREFIX

dags/cumulus/aprfc_qpf_06h.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -56,8 +56,8 @@ def get_filenames(edate, url):
5656
default_args=default_args,
5757
schedule="40 14,5 * * *",
5858
tags=["cumulus", "precip", "QPF", "APRFC"],
59-
max_active_runs=2,
60-
max_active_tasks=4,
59+
max_active_runs=1,
60+
max_active_tasks=1,
6161
)
6262
def cumulus_aprfc_qpf_06h():
6363
"""This pipeline handles download, processing, and derivative product creation for \n

dags/cumulus/aprfc_qte_01h.py

+110
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
"""
2+
Acquire and Process APRFC QTE 01h
3+
4+
Returns
5+
-------
6+
Airflow DAG
7+
Directed Acyclic Graph
8+
"""
9+
10+
from datetime import datetime, timedelta, timezone
11+
import json
12+
from string import Template
13+
from airflow.decorators import dag, task
14+
from airflow.operators.python import get_current_context
15+
from airflow.utils.task_group import TaskGroup
16+
from helpers.downloads import trigger_download
17+
18+
import helpers.cumulus as cumulus
19+
20+
default_args = {
21+
"owner": "airflow",
22+
"depends_on_past": False,
23+
"start_date": (datetime.now(timezone.utc) - timedelta(hours=48)).replace(
24+
minute=0, second=0
25+
),
26+
"catchup_by_default": False,
27+
"email_on_failure": False,
28+
"email_on_retry": False,
29+
"retries": 6,
30+
"retry_delay": timedelta(minutes=30),
31+
}
32+
33+
34+
@dag(
35+
default_args=default_args,
36+
tags=["cumulus", "AIRTEMP", "QTE", "APRFC"],
37+
schedule="45 * * * *",
38+
max_active_runs=1,
39+
max_active_tasks=1,
40+
)
41+
def cumulus_aprfc_qte_01h():
42+
"""
43+
# APRFC hourly estimated temps
44+
45+
This pipeline handles download, processing, and derivative product creation for APRFC hourly estimated temps
46+
Raw data downloaded to S3 and notifies the Cumulus API of new product(s)
47+
48+
URLs:
49+
- BASE - https://nomads.ncep.noaa.gov/pub/data/nccf/com/urma/prod/akurma.YYYYMMDD/
50+
51+
Filename/Dir Pattern:
52+
53+
URL Dir - https://nomads.ncep.noaa.gov/pub/data/nccf/com/urma/prod/akurma.YYYYMMDD/
54+
Files matching akurma.tHHz.2dvaranl_ndfd_3p0.grb2 - 1 hour\n
55+
"""
56+
s3_bucket = cumulus.S3_BUCKET
57+
key_prefix = cumulus.S3_ACQUIRABLE_PREFIX
58+
59+
URL_ROOT = "https://nomads.ncep.noaa.gov/pub/data/nccf/com/urma/prod/"
60+
PRODUCT_SLUG = "aprfc-qte-01h"
61+
62+
filename_template = Template("akurma.t${hr_}z.2dvaranl_ndfd_3p0.grb2 ")
63+
64+
url_suffix_template = Template("akurma.${date_}")
65+
66+
@task()
67+
def download_raw_qte():
68+
logical_date = get_current_context()["logical_date"]
69+
date_only = logical_date.strftime("%Y%m%d")
70+
71+
url_suffix = url_suffix_template.substitute(
72+
date_=date_only,
73+
)
74+
75+
filename = filename_template.substitute(
76+
hr_=logical_date.strftime("%H"),
77+
)
78+
79+
file_dir = f"{URL_ROOT}{url_suffix}"
80+
81+
s3_filename = f"{date_only}_{filename}"
82+
s3_key = f"{key_prefix}/{PRODUCT_SLUG}/{s3_filename}"
83+
84+
print(f"Downloading file: {filename}")
85+
86+
trigger_download(
87+
url=f"{file_dir}/{filename}", s3_bucket=s3_bucket, s3_key=s3_key
88+
)
89+
return json.dumps(
90+
{
91+
"execution": logical_date.isoformat(),
92+
"s3_key": s3_key,
93+
"filename": s3_filename,
94+
}
95+
)
96+
97+
@task()
98+
def notify_cumulus(payload):
99+
payload = json.loads(payload)
100+
print("Notifying Cumulus: " + payload["filename"])
101+
cumulus.notify_acquirablefile(
102+
acquirable_id=cumulus.acquirables[PRODUCT_SLUG],
103+
datetime=payload["execution"],
104+
s3_key=payload["s3_key"],
105+
)
106+
107+
notify_cumulus(download_raw_qte())
108+
109+
110+
aprfc_qte_dag = cumulus_aprfc_qte_01h()

dags/cumulus/aprfc_qtf_01h.py

+127
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,127 @@
1+
"""
2+
Acquire and Process APRFC qtf 01h
3+
"""
4+
5+
import json
6+
from datetime import datetime, timedelta, timezone
7+
import calendar
8+
from bs4 import BeautifulSoup
9+
import re
10+
import requests
11+
12+
from airflow import DAG
13+
from airflow.decorators import dag, task
14+
from airflow.operators.python import get_current_context
15+
from helpers.downloads import trigger_download
16+
17+
import helpers.cumulus as cumulus
18+
19+
# Default arguments
20+
default_args = {
21+
"owner": "airflow",
22+
"depends_on_past": False,
23+
"start_date": (datetime.now(timezone.utc) - timedelta(hours=36)).replace(
24+
minute=0, second=0
25+
),
26+
"catchup_by_default": False,
27+
"email_on_failure": False,
28+
"email_on_retry": False,
29+
"retries": 6,
30+
"retry_delay": timedelta(minutes=30),
31+
}
32+
33+
34+
def get_latest_files(filenames):
35+
# Dictionary to store the latest file for each unique timestamp
36+
latest_files = {}
37+
38+
# Regular expression to extract the timestamp
39+
pattern = r"ta01f_has_\d+f_(\d{8}_\d{2})_awips_(\d+)"
40+
41+
for filename in filenames:
42+
match = re.search(pattern, filename)
43+
if match:
44+
key = match.group(1) + "_" + match.group(2)
45+
if key not in latest_files or filename > latest_files[key]:
46+
latest_files[key] = filename
47+
48+
# Return the list of latest files
49+
return list(latest_files.values())
50+
51+
52+
# APRFC qtf filename generator
53+
def get_filenames(edate, url):
54+
"""
55+
date at end of filename hour and min can not be predicted
56+
scraping data from website and finding all matching filenames
57+
for the specified date.
58+
"""
59+
d_t1 = edate.strftime("%Y%m%d")
60+
61+
page = requests.get(url)
62+
soup = BeautifulSoup(page.content, "html.parser")
63+
links = [node.get("href") for node in soup.find_all("a")]
64+
filenames = []
65+
66+
regex = f"^ta01f_has_\\d+f_\\d{{8}}_\\d{{2}}_awips_{d_t1}.*\\.grb(\\.gz)?$"
67+
filenames = [link for link in links if re.match(regex, link)]
68+
69+
return get_latest_files(filenames)
70+
71+
72+
@dag(
73+
default_args=default_args,
74+
schedule="21 9,15,19 * * *",
75+
tags=["cumulus", "temp", "QTF", "APRFC"],
76+
max_active_runs=1,
77+
max_active_tasks=1,
78+
)
79+
def cumulus_aprfc_qtf_01h():
80+
"""This pipeline handles download, processing, and derivative product creation for \n
81+
APRFC QTF\n
82+
URL Dir - https://cbt.crohms.org/akgrids
83+
Files matching ta01f_has_92f_20241219_08_awips_202412150008.grb. - 1 hour\n
84+
"""
85+
key_prefix = cumulus.S3_ACQUIRABLE_PREFIX
86+
URL_ROOT = f"https://cbt.crohms.org/akgrids"
87+
PRODUCT_SLUG = "aprfc-qtf-01h"
88+
89+
@task()
90+
def download_raw_qtf():
91+
logical_date = get_current_context()["logical_date"]
92+
93+
return_list = list()
94+
filenames = get_filenames(logical_date, URL_ROOT)
95+
for filename in filenames:
96+
url = f"{URL_ROOT}/{filename}"
97+
s3_key = f"{key_prefix}/{PRODUCT_SLUG}/{filename}"
98+
print(f"Downloading file: {filename}")
99+
try:
100+
trigger_download(url=url, s3_bucket=cumulus.S3_BUCKET, s3_key=s3_key)
101+
return_list.append(
102+
{
103+
"execution": logical_date.isoformat(),
104+
"s3_key": s3_key,
105+
"filename": filename,
106+
}
107+
)
108+
except:
109+
print(f"{filename} is not available to download")
110+
111+
return json.dumps(return_list)
112+
113+
@task()
114+
def notify_cumulus(payload):
115+
payload = json.loads(payload)
116+
for item in payload:
117+
print("Notifying Cumulus: " + item["filename"])
118+
cumulus.notify_acquirablefile(
119+
acquirable_id=cumulus.acquirables[PRODUCT_SLUG],
120+
datetime=item["execution"],
121+
s3_key=item["s3_key"],
122+
)
123+
124+
notify_cumulus(download_raw_qtf())
125+
126+
127+
aprfc_qtf_dag = cumulus_aprfc_qtf_01h()

plugins/helpers/cumulus.py

+2
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@
2424
"abrfc-qpf-06h": "b1a4754c-5971-11ee-8c99-0242ac120002",
2525
"aprfc-qpe-06h": "1f67d822-7cbc-11ee-b962-0242ac120002",
2626
"aprfc-qpf-06h": "a64cb16f-01a8-45c0-a069-9afda805d3a7",
27+
"aprfc-qte-01h": "7f8b2d6a-1f3e-11ee-be56-0242ac120002",
28+
"aprfc-qtf-01h": "80f33047-6234-4949-9c2f-eec6bfcf7b0f",
2729
"cnrfc-qpe-06h": "34a89c35-090d-46e8-964a-c621403301b9",
2830
"cnrfc-qpf-06h": "c22785cd-400e-4664-aef8-426734825c2c",
2931
"cnrfc-nbm-qpf-06h": "40cfce36-cfad-4a10-8b2d-eb8862378ca5",

0 commit comments

Comments
 (0)