20
20
default_args = {
21
21
"owner" : "airflow" ,
22
22
"depends_on_past" : False ,
23
- "start_date" : (datetime .now (timezone .utc ) - timedelta (hours = 72 )).replace (
23
+ "start_date" : (datetime .now (timezone .utc ) - timedelta (hours = 36 )).replace (
24
24
minute = 0 , second = 0
25
25
),
26
26
"catchup_by_default" : False ,
31
31
}
32
32
33
33
34
+ def get_latest_files (filenames ):
35
+ # Dictionary to store the latest file for each unique timestamp
36
+ latest_files = {}
37
+
38
+ # Regular expression to extract the timestamp
39
+ pattern = r"qpf06f_has_\d+f_(\d{8}_\d{2})_awips_(\d+)"
40
+
41
+ for filename in filenames :
42
+ match = re .search (pattern , filename )
43
+ if match :
44
+ key = match .group (1 ) + "_" + match .group (2 )
45
+ if key not in latest_files or filename > latest_files [key ]:
46
+ latest_files [key ] = filename
47
+
48
+ # Return the list of latest files
49
+ return list (latest_files .values ())
50
+
51
+
34
52
# ALR QPF filename generator
35
53
def get_filenames (edate , url ):
36
54
"""
@@ -39,22 +57,20 @@ def get_filenames(edate, url):
39
57
for the sprcified date.
40
58
"""
41
59
d_t1 = edate .strftime ("%Y%m%d" )
42
- d_t2 = (edate - timedelta (hours = 24 )).strftime ("%Y%m%d" )
43
60
44
61
page = requests .get (url )
45
62
soup = BeautifulSoup (page .content , "html.parser" )
46
63
links = [node .get ("href" ) for node in soup .find_all ("a" )]
47
64
filenames = []
48
- for d in [d_t2 , d_t1 ]:
49
- regex = f"^qpf06f_has_.*.awips_{ d } \d+.grb.gz$"
50
- filenames = filenames + [link for link in links if re .match (regex , link )]
65
+ regex = f"^qpf06f_has_\\ d+f_\\ d{{8}}_\\ d{{2}}_awips_{ d_t1 } .*\\ .grb(\\ .gz)?$"
66
+ filenames = [link for link in links if re .match (regex , link )]
51
67
52
- return filenames
68
+ return get_latest_files ( filenames )
53
69
54
70
55
71
@dag (
56
72
default_args = default_args ,
57
- schedule = "40 14,5 * * *" ,
73
+ schedule = "20 9,15,19 * * *" ,
58
74
tags = ["cumulus" , "precip" , "QPF" , "APRFC" ],
59
75
max_active_runs = 1 ,
60
76
max_active_tasks = 1 ,
0 commit comments