3
3
"""
4
4
5
5
import json
6
- from datetime import datetime , timedelta
6
+ from datetime import datetime , timedelta , timezone
7
7
import calendar
8
8
from bs4 import BeautifulSoup
9
9
import re
20
20
default_args = {
21
21
"owner" : "airflow" ,
22
22
"depends_on_past" : False ,
23
- "start_date" : (datetime .utcnow ( ) - timedelta (hours = 72 )).replace (minute = 0 , second = 0 ),
23
+ "start_date" : (datetime .now ( timezone . utc ) - timedelta (hours = 36 )).replace (minute = 0 , second = 0 ),
24
24
"catchup_by_default" : False ,
25
25
"email_on_failure" : False ,
26
26
"email_on_retry" : False ,
27
27
"retries" : 6 ,
28
28
"retry_delay" : timedelta (minutes = 30 ),
29
29
}
30
30
31
+ def get_latest_files (filenames ):
32
+ # Dictionary to store the latest file for each unique timestamp
33
+ latest_files = {}
34
+
35
+ # Regular expression to extract the timestamp
36
+ pattern = r'qpf06f_has_\d+f_(\d{8}_\d{2})_awips_(\d+)'
37
+
38
+ for filename in filenames :
39
+ match = re .search (pattern , filename )
40
+ if match :
41
+ key = match .group (1 ) + '_' + match .group (2 )
42
+ if key not in latest_files or filename > latest_files [key ]:
43
+ latest_files [key ] = filename
44
+
45
+ # Return the list of latest files
46
+ return list (latest_files .values ())
31
47
32
48
# ALR QPF filename generator
33
49
def get_filenames (edate , url ):
@@ -37,25 +53,24 @@ def get_filenames(edate, url):
37
53
for the sprcified date.
38
54
"""
39
55
d_t1 = edate .strftime ("%Y%m%d" )
40
- d_t2 = ( edate - timedelta ( hours = 24 )). strftime ( "%Y%m%d" )
56
+
41
57
42
58
page = requests .get (url )
43
59
soup = BeautifulSoup (page .content , "html.parser" )
44
60
links = [node .get ("href" ) for node in soup .find_all ("a" )]
45
61
filenames = []
46
- for d in [d_t2 , d_t1 ]:
47
- regex = f"^qpf06f_has_.*.awips_{ d } \d+.grb.gz$"
48
- filenames = filenames + [link for link in links if re .match (regex , link )]
62
+ regex = f"^qpf06f_has_\\ d+f_\\ d{{8}}_\\ d{{2}}_awips_{ d_t1 } .*\\ .grb(\\ .gz)?$"
63
+ filenames = [link for link in links if re .match (regex , link )]
49
64
50
- return filenames
65
+ return get_latest_files ( filenames )
51
66
52
67
53
68
@dag (
54
69
default_args = default_args ,
55
- schedule = "40 14,5 * * *" ,
70
+ schedule = "20 9,15,19 * * *" ,
56
71
tags = ["cumulus" , "precip" , "QPF" , "APRFC" ],
57
- max_active_runs = 2 ,
58
- max_active_tasks = 4 ,
72
+ max_active_runs = 1 ,
73
+ max_active_tasks = 1 ,
59
74
)
60
75
def cumulus_aprfc_qpf_06h ():
61
76
"""This pipeline handles download, processing, and derivative product creation for \n
0 commit comments