-
Notifications
You must be signed in to change notification settings - Fork 0
/
tasks.py
235 lines (203 loc) · 7.98 KB
/
tasks.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
import json
import re
from my_celery_app import app
from celery.signals import worker_init
from sqlalchemy import create_engine, text
import requests
from utils import make_time_fields
# this is essentially what I'm attempting..
# https://stackoverflow.com/questions/14526249/celery-worker-database-connection-pooling
# TODO: do the sqlalchemy connections in my @worker_init.connect function need to be global?
# >>> yes.
@app.task
def get_and_load_weather_data() -> str:
"""
Requests data from a weather API, stores the raw JSON in my LAN database. It will then
attempt to parse only the fields I care about and pass those up to the cloud database.
"""
print("=" * 50, "calling get_and_load_weather_data()", "=" * 50)
# TODO: let's just see if we can get this up and running before setting up database schema and all that
resp = requests.get(f"https://api.openweathermap.org/data/2.5/weather?lat={lat}&lon={lon}&appid={apikey}&units=imperial")
json_str = resp.text
utc_ts, mtn_ts, mtn_date, mtn_time = make_time_fields()
data_dict = {
"weather_json_raw": json_str,
"weather_jsonb": json_str,
"utc_ts": utc_ts,
"mtn_date": mtn_date,
"mtn_time": mtn_time
}
with eng_local.begin() as conn:
conn.execute(
text("""INSERT INTO boulder.weather_raw (
weather_json_raw, weather_jsonb,
utc_ts, mtn_date, mtn_time
)
VALUES (
:weather_json_raw, :weather_jsonb,
:utc_ts, :mtn_date, :mtn_time
)"""),
data_dict
)
data = json.loads(json_str)
try:
data_dict = {
"id": data["id"]
,"lon": data["coord"]["lon"]
,"lat": data["coord"]["lat"]
,"weather_desc": data["weather"][0]["main"]
,"temp": data["main"]["temp"]
,"temp_feels_like": data["main"]["feels_like"]
,"temp_min": data["main"]["temp_min"]
,"temp_max": data["main"]["temp_max"]
,"pressure": data["main"]["pressure"]
,"humidity": data["main"]["humidity"]
,"visibility": data["visibility"]
,"wind_speed": data["wind"]["speed"]
,"wind_deg": data["wind"]["deg"]
,"clouds_all": data["clouds"]["all"]
,"utc_ts": str(utc_ts)
,"mtn_date": mtn_date
,"mtn_time": mtn_time
,"dt": data["dt"]
,"dt_sunrise": data["sys"]["sunrise"]
,"dt_sunset": data["sys"]["sunset"]
,"timezone": data["timezone"]
}
except KeyError:
print("Couldn't parse the key in the weather data")
with eng_local.begin() as conn:
conn.execute(
text("""INSERT INTO boulder.weather VALUES (
:id,
:lon,
:lat,
:weather_desc,
:temp,
:temp_feels_like,
:temp_min,
:temp_max,
:pressure,
:humidity,
:visibility,
:wind_speed,
:wind_deg,
:clouds_all,
:utc_ts,
:mtn_date,
:mtn_time,
:dt,
:dt_sunrise,
:dt_sunset,
:timezone
)"""),
data_dict
)
with eng_cloud.begin() as conn:
conn.execute(
text("""INSERT INTO boulder.weather VALUES (
:id,
:lon,
:lat,
:weather_desc,
:temp,
:temp_feels_like,
:temp_min,
:temp_max,
:pressure,
:humidity,
:visibility,
:wind_speed,
:wind_deg,
:clouds_all,
:utc_ts,
:mtn_date,
:mtn_time,
:dt,
:dt_sunrise,
:dt_sunset,
:timezone
)"""),
data_dict
)
@app.task
def load_sensor(data_dict: dict) -> str:
"""
Task responsible for parsing JSON data into SQL query strings, then loading
that data into a local and cloud database.
Call this in Flask app like:
>>> load_sensor.delay(content)
where "content" is the dict that represents our JSON data.
Returns a string '200' HTTP code if successful.
"""
print("Loading sensor data")
# eng_local is a global... it will exist on all workers
with eng_local.begin() as conn:
conn.execute(
text("""INSERT INTO boulder.sensor (
displayName, name, value, unit, deviceId,
utc_LAN_received, utc_cloud_insertion,
mtn_date, mtn_time)
VALUES (
:displayName, :name,
:value, :unit, :deviceId,
:utc_LAN_received, :utc_cloud_insertion,
:mtn_date, :mtn_time
)"""),
data_dict
)
with eng_cloud.begin() as conn:
conn.execute(
text("""INSERT INTO boulder.sensor (
displayName, name, value, unit, deviceId,
utc_LAN_received, utc_cloud_insertion,
mtn_date, mtn_time)
VALUES (
:displayName, :name,
:value, :unit, :deviceId,
:utc_LAN_received, :utc_cloud_insertion,
:mtn_date, :mtn_time
)"""),
data_dict
)
@app.task
def retry_failed_records():
# TODO: this isn't necessary yet. Leaving stub here in case it's necessary some day
pass
@worker_init.connect
def make_sqlalchemy_engines(**kwargs):
"""
This function will run on each worker as soon as that worker starts. This is how
I make sure my workers are connected to my databases without having to put the
connection logic within each task that I want to run.
"""
# these apparently do need to be global in order to use them in other tasks
global eng_cloud # sqlalchemy db engine connection to cloud database
global eng_local # sqlalchemy db engine connection to LAN database
global apikey # apikey for weather data
global lat # latitude for weather data collection
global lon # longitude for weather data collection
with open('secrets.json', 'r') as f:
secrets = json.load(f)
pw_cloud = secrets["cloud_pw"]
# pw = os.environ.get("TAYLOR_DB_PW")
eng_cloud = create_engine(f"postgresql+psycopg2://taylor:{pw_cloud}@taylorvananne.com:5432/taylor?sslmode=require",
executemany_mode='values_only',
echo=True,
future=True
)
pw_local = secrets["local_pw"]
# pw = os.environ.get("TAYLOR_DB_PW_LOCAL")
eng_local = create_engine(f"postgresql+psycopg2://taylor:{pw_local}@localhost:5432/taylor?sslmode=require",
executemany_mode='values_only',
echo=True,
future=True
)
# add my weather API key secrets in here as well. This will be my test to see if "global" is required.
apikey = secrets.get("openweathermap")
lat = secrets.get("lat")
lon = secrets.get("lon")
# Time-based (periodic) tasks
@app.on_after_finalize.connect
def setup_periodic_tasks(sender, **kwargs):
sender.add_periodic_task(10.0 * 60.0, get_and_load_weather_data.s())