20
20
default_args = {
21
21
"owner" : "airflow" ,
22
22
"depends_on_past" : False ,
23
- "start_date" : (datetime .utcnow () - timedelta (hours = 48 )).replace (minute = 0 , second = 0 , microsecond = 0 ),
23
+ "start_date" : (datetime .utcnow () - timedelta (hours = 48 )).replace (
24
+ minute = 0 , second = 0 , microsecond = 0
25
+ ),
24
26
"catchup_by_default" : False ,
25
27
"email_on_failure" : False ,
26
28
"email_on_retry" : False ,
27
- "retries" : 5 ,
29
+ "retries" : 6 ,
28
30
"retry_delay" : timedelta (minutes = 30 ),
29
31
}
30
32
33
35
default_args = default_args ,
34
36
tags = ["cumulus" , "AIRTEMP" , "QTE" , "APRFC" ],
35
37
schedule = "45 * * * *" ,
36
- max_active_runs = 2 ,
37
- max_active_tasks = 4 ,
38
+ max_active_runs = 1 ,
39
+ max_active_tasks = 1 ,
38
40
)
39
41
def cumulus_aprfc_qte_01h ():
40
42
"""
@@ -54,47 +56,36 @@ def cumulus_aprfc_qte_01h():
54
56
s3_bucket = cumulus .S3_BUCKET
55
57
key_prefix = cumulus .S3_ACQUIRABLE_PREFIX
56
58
57
-
58
-
59
59
URL_ROOT = "https://nomads.ncep.noaa.gov/pub/data/nccf/com/urma/prod/"
60
60
PRODUCT_SLUG = "aprfc-qte-01h"
61
61
62
- filename_template = Template (
63
- "akurma.t${hr_}z.2dvaranl_ndfd_3p0.grb2 "
64
- )
65
-
66
- url_suffix_template = Template (
67
- "akurma.${date_}"
68
- )
62
+ filename_template = Template ("akurma.t${hr_}z.2dvaranl_ndfd_3p0.grb2 " )
69
63
64
+ url_suffix_template = Template ("akurma.${date_}" )
70
65
71
66
@task ()
72
67
def download_raw_qte ():
73
68
logical_date = get_current_context ()["logical_date" ]
74
69
date_only = logical_date .strftime ("%Y%m%d" )
75
70
76
71
url_suffix = url_suffix_template .substitute (
77
- date_ = date_only ,
78
- )
72
+ date_ = date_only ,
73
+ )
79
74
80
75
filename = filename_template .substitute (
81
- hr_ = logical_date .strftime ("%H" ),
82
- )
76
+ hr_ = logical_date .strftime ("%H" ),
77
+ )
83
78
84
79
file_dir = f"{ URL_ROOT } { url_suffix } "
85
80
86
-
87
- s3_filename = f'{ date_only } _{ filename } '
81
+ s3_filename = f"{ date_only } _{ filename } "
88
82
s3_key = f"{ key_prefix } /{ PRODUCT_SLUG } /{ s3_filename } "
89
83
90
-
91
-
92
-
93
84
print (f"Downloading file: { filename } " )
94
85
95
86
trigger_download (
96
- url = f"{ file_dir } /{ filename } " , s3_bucket = s3_bucket , s3_key = s3_key
97
- )
87
+ url = f"{ file_dir } /{ filename } " , s3_bucket = s3_bucket , s3_key = s3_key
88
+ )
98
89
return json .dumps (
99
90
{
100
91
"execution" : logical_date .isoformat (),
@@ -103,11 +94,6 @@ def download_raw_qte():
103
94
}
104
95
)
105
96
106
-
107
-
108
-
109
-
110
-
111
97
@task ()
112
98
def notify_cumulus (payload ):
113
99
payload = json .loads (payload )
@@ -116,11 +102,9 @@ def notify_cumulus(payload):
116
102
acquirable_id = cumulus .acquirables [PRODUCT_SLUG ],
117
103
datetime = payload ["execution" ],
118
104
s3_key = payload ["s3_key" ],
119
- )
105
+ )
120
106
121
107
notify_cumulus (download_raw_qte ())
122
-
123
-
124
108
125
109
126
110
aprfc_qte_dag = cumulus_aprfc_qte_01h ()
0 commit comments