@@ -55,6 +55,7 @@ def qpf_filenames(edate):
55
55
default_args = default_args ,
56
56
schedule = "8 */6 * * *" ,
57
57
tags = ["cumulus" , "precip" , "QPF" , "ABRFC" ],
58
+ doc_md = __doc__ ,
58
59
max_active_runs = 2 ,
59
60
max_active_tasks = 4 ,
60
61
)
@@ -67,25 +68,25 @@ def cumulus_abrfc_qpf_06h():
67
68
68
69
"""
69
70
Because this is a forecast product, we don't want to wait to get the product based
70
- on the last time period, but rather based on the current. This is why the execution
71
+ on the last time period, but rather based on the current. This is why the logical
71
72
date is being shifted forward by 6 hours.
72
73
"""
73
74
74
75
@task ()
75
76
def generate_filenames ():
76
- context = get_current_context ()
77
- ti = context [ "ti" ]
78
- execution_date = ti . execution_date + timedelta ( hours = 6 )
77
+ # Overwrite the logical date to be 6 hours in the future
78
+ logical_date = get_current_context ()[ "logical_date" ] + timedelta ( hours = 6 )
79
+
79
80
# This task generates the list of filenames
80
- return list (qpf_filenames (execution_date ))
81
+ return list (qpf_filenames (logical_date ))
81
82
82
83
###########################################################################
83
84
@task ()
84
85
def check_first_file ():
85
86
context = get_current_context ()
86
- ti = context ["ti" ]
87
- execution_date = ti . execution_date + timedelta ( hours = 6 )
88
- filename = next (qpf_filenames (execution_date ))
87
+ logical_date = context ["logical_date" ] + timedelta ( hours = 6 )
88
+ ti = context [ "ti" ] # task instance
89
+ filename = next (qpf_filenames (logical_date ))
89
90
url = f"{ URL_ROOT } /{ filename } "
90
91
91
92
try :
@@ -97,7 +98,7 @@ def check_first_file():
97
98
except Exception as e :
98
99
# If we don't always get a product for this time period
99
100
# AND we've reached the try limit, skip the task instead of failing for better metrics analysis
100
- if execution_date .hour not in [0 , 12 , 18 ] and ti .try_number >= ti .max_tries :
101
+ if logical_date .hour not in [0 , 12 , 18 ] and ti .try_number >= ti .max_tries :
101
102
raise AirflowSkipException (
102
103
f"Skipping task due to no files available and max_tries ({ ti .max_tries } ) reached: { e } "
103
104
)
@@ -109,8 +110,7 @@ def check_first_file():
109
110
def download_file (filename ):
110
111
print (f"Downloading { filename } " )
111
112
context = get_current_context ()
112
- ti = context ["ti" ]
113
- execution_date = ti .execution_date + timedelta (hours = 6 )
113
+ logical_date = context ["logical_date" ] + timedelta (hours = 6 )
114
114
115
115
# Name the dynamic task instead of leaving the index number
116
116
context ["task_id" ] = filename
@@ -119,7 +119,7 @@ def download_file(filename):
119
119
s3_key = f"{ key_prefix } /{ PRODUCT_SLUG } /{ filename } "
120
120
result = trigger_download (url = url , s3_bucket = cumulus .S3_BUCKET , s3_key = s3_key )
121
121
return {
122
- "execution" : execution_date .isoformat (),
122
+ "execution" : logical_date .isoformat (),
123
123
"url" : url ,
124
124
"s3_key" : s3_key ,
125
125
"s3_bucket" : cumulus .S3_BUCKET ,
0 commit comments