Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ross fix empty data #42

Merged
merged 7 commits into from
Feb 10, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions aqandu/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,16 @@
from google.cloud import bigquery
import logging
import time
import sys


load_dotenv()
PROJECT_ID = os.getenv("PROJECTID")


logfile = "serve.log"
logging.basicConfig(filename=logfile, level=logging.DEBUG, format = '%(levelname)s: %(filename)s: %(message)s')
# logfile = "serve.log"
# logging.basicConfig(filename=logfile, level=logging.DEBUG, format = '%(levelname)s: %(filename)s: %(message)s')
logging.basicConfig(stream=sys.stdout, level=logging.DEBUG, format = '%(levelname)s: %(filename)s: %(message)s')
logging.info('API server started at %s', time.asctime(time.localtime()))

app = Flask(__name__)
Expand Down
28 changes: 17 additions & 11 deletions aqandu/api_routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ def rawDataFrom():

# Check that the data is formatted correctly
if not utils.validateDate(start) or not utils.validateDate(end):
msg = "Incorrect date format, should be {utils.DATETIME_FORMAT}, e.g.: 2018-01-03T20:00:00Z"
msg = f"Incorrect date format, should be {utils.DATETIME_FORMAT}, e.g.: 2018-01-03T20:00:00Z"
return msg, 400

# Define the BigQuery query
Expand Down Expand Up @@ -442,7 +442,7 @@ def submit_sensor_query(lat_lo, lat_hi, lon_lo, lon_hi, start_date, end_date):
query_job = bq_client.query(query, job_config=job_config)

if query_job.error_result:
print(query_job.error_result)
app.logger.error(query_job.error_result)
return "Invalid API call - check documentation.", 400
# Waits for query to finish
sensor_data = query_job.result()
Expand Down Expand Up @@ -551,9 +551,7 @@ def getEstimatesForLocation():
app.logger.info(
"Query parameters: lat= %f lon= %f start_date= %s end_date=%s estimatesrate=%f hours/estimate" %(query_lat, query_lon, query_start_datetime, query_end_datetime, query_rate))

yPred, yVar = computeEstimatesForLocations(query_dates, query_locations, query_elevations)


yPred, yVar, status = computeEstimatesForLocations(query_dates, query_locations, query_elevations)


# convert the arrays to lists of floats
Expand All @@ -565,7 +563,7 @@ def getEstimatesForLocation():
estimates = []
for i in range(num_times):
estimates.append(
{'PM2_5': (yPred[0, i]), 'variance': (yVar[0, i]), 'datetime': query_dates[i].strftime('%Y-%m-%d %H:%M:%S%z'), 'Latitude': query_lat, 'Longitude': query_lon, 'Elevation': query_elevations[0]}
{'PM2_5': (yPred[0, i]), 'variance': (yVar[0, i]), 'datetime': query_dates[i].strftime('%Y-%m-%d %H:%M:%S%z'), 'Latitude': query_lat, 'Longitude': query_lon, 'Elevation': query_elevations[0], 'Status': status[i]}
)

# estimates = [
Expand Down Expand Up @@ -630,7 +628,7 @@ def getEstimatesForLocations():

for i in range(num_times):
estimates.append(
{'PM2_5': (yPred[:,i]).tolist(), 'variance': (yVar[:,i]).tolist(), 'datetime': query_dates[i].strftime('%Y-%m-%d %H:%M:%S%z'), 'Latitude': query_lats.tolist(), 'Longitude': query_lons.tolist(), 'Elevation': query_elevations.tolist()}
{'PM2_5': (yPred[:,i]).tolist(), 'variance': (yVar[:,i]).tolist(), 'datetime': query_dates[i].strftime('%Y-%m-%d %H:%M:%S%z'), 'Latitude': query_lats.tolist(), 'Longitude': query_lons.tolist(), 'Elevation': query_elevations.tolist(), 'Status': status[i]}
)

return jsonify(estimates)
Expand Down Expand Up @@ -750,23 +748,31 @@ def computeEstimatesForLocations(query_dates, query_locations, query_elevations)

yPred = np.empty((num_locations, 0))
yVar = np.empty((num_locations, 0))
status = []
for i in range(len(query_sequence)):
# step 7, Create Model
model, time_offset = gaussian_model_utils.createModel(
model, time_offset, model_status = gaussian_model_utils.createModel(
sensor_data, latlon_length_scale, elevation_length_scale, time_length_scale, sensor_sequence[i][0], sensor_sequence[i][1], save_matrices=False)
yPred_tmp, yVar_tmp = gaussian_model_utils.estimateUsingModel(
model, query_lats, query_lons, query_elevations, query_sequence[i], time_offset, save_matrices=True)
# check to see if there is a valid model
if (model == None):
yPred_tmp = np.full((query_lats.shape[0], len(query_sequence[i])), 0.0)
yVar_tmp = np.full((query_lats.shape[0], len(query_sequence[i])), np.nan)
status_estimate_tmp = [model_status for i in range(len(query_sequence[i]))]
else:
yPred_tmp, yVar_tmp, status_estimate_tmp = gaussian_model_utils.estimateUsingModel(
model, query_lats, query_lons, query_elevations, query_sequence[i], time_offset, save_matrices=True)
# put the estimates together into one matrix
yPred = np.concatenate((yPred, yPred_tmp), axis=1)
yVar = np.concatenate((yVar, yVar_tmp), axis=1)
status = status + status_estimate_tmp

if np.min(yPred) < MIN_ACCEPTABLE_ESTIMATE:
app.logger.warn("got estimate below level " + str(MIN_ACCEPTABLE_ESTIMATE))

# Here we clamp values to ensure that small negative values to do not appear
yPred = np.clip(yPred, a_min = 0., a_max = None)

return yPred, yVar
return yPred, yVar, status



Expand Down
8 changes: 6 additions & 2 deletions aqandu/gaussian_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,8 @@ def __init__(self, space_coordinates, time_coordinates, stData,
self.log_signal_variance = nn.Parameter(torch.log(torch.tensor(signal_variance)))
# this says whether or not you can use the FFT for time
self.time_structured = time_structured
# for reporting purposes
self.measurements = stData.numel()

# build and invert kernel matrix
self.update()
Expand Down Expand Up @@ -215,7 +217,7 @@ def update(self):
self.time_coordinates,
torch.exp(self.log_time_length_scale)
) + torch.eye(self.time_coordinates.size(0)) * JITTER
np.savetxt('temp_kernel_unstructured.csv', (temporal_kernel).detach().numpy(), delimiter = ';')
# np.savetxt('temp_kernel_unstructured.csv', (temporal_kernel).detach().numpy(), delimiter = ';')
eigen_value_t, eigen_vector_t = torch.symeig(temporal_kernel, eigenvectors=True)
eigen_vector_st = kronecker(eigen_vector_t, eigen_vector_s)
eigen_value_st = kronecker(eigen_value_t.view(-1, 1), eigen_value_s.view(-1, 1)).view(-1)
Expand Down Expand Up @@ -311,7 +313,9 @@ def forward(self, test_space_coordinates, test_time_coordinates):
yPred = yPred.view(test_time_coordinates.size(0), test_space_coordinates.size(0)).transpose(-2, -1)
yVar = yVar.view(test_time_coordinates.size(0), test_space_coordinates.size(0)).transpose(-2, -1)

return yPred, yVar
status_string = str(self.measurements) + " measurements"
status = [status_string for i in range(test_time_coordinates.size(0))]
return yPred, yVar, status

def negative_log_likelihood(self):
nll = 0
Expand Down
95 changes: 51 additions & 44 deletions aqandu/gaussian_model_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ def interpolateBadElements(matrix, bad_value = 0):
logging.debug("got full row of bad indices?" + str(row))
if (float(num_interp_fails)/matrix.shape[0]) > FRACTION_SIGNIFICANT_FAILS:
logging.warn("got %d interp failures out of %d sensors", num_interp_fails, matrix.shape[0])
saveMatrixToFile(matrix, 'failed_interp_matrix.txt')
# saveMatrixToFile(matrix, 'failed_interp_matrix.txt')



Expand Down Expand Up @@ -316,26 +316,28 @@ def setupDataMatrix2(sensor_data, space_coordinates, time_coordinates, device_lo
# data_matrix[space_index,i] = time_data_array[i,0]
data_matrix[space_index,:] = time_data_array

saveMatrixToFile(data_matrix, '1matrix.txt')
numpy.savetxt('1matrix.csv', data_matrix, delimiter=',')
interpolateBadElements(data_matrix, -1)
saveMatrixToFile(data_matrix, '2interpolated.txt')
numpy.savetxt('2interpolated.csv', data_matrix, delimiter=',')
data_matrix, space_coordinates = removeBadSensors(data_matrix, space_coordinates, 0.75)
saveMatrixToFile(data_matrix, '3matrix_removed_bad.txt')
numpy.savetxt('3removed_bad.csv', data_matrix, delimiter=',')
# fill in missing readings using the average values for each time slice

# This is important because bins on either end of the time range might not have enough measurements
# This is now taken care of again. Once full tested, remove this.
# data_matrix, time_coordinates = trimBadEdgeElements(data_matrix, time_coordinates)
# saveMatrixToFile(data_matrix, '4matrixtrimmed.txt')
# numpy.savetxt('4matrixtrimmed.csv', data_matrix, delimiter=',')

# fill in missing readings using the average values for each time slice
data_matrix = fillInMissingReadings(data_matrix, -1)
saveMatrixToFile(data_matrix, '4matrix_filled_bad.txt')
numpy.savetxt('4filled_bad.csv', data_matrix, delimiter=',')
# check to make sure we have data
if (data_matrix.size > 0):
# saveMatrixToFile(data_matrix, '1matrix.txt')
# numpy.savetxt('1matrix.csv', data_matrix, delimiter=',')
interpolateBadElements(data_matrix, -1)
# saveMatrixToFile(data_matrix, '2interpolated.txt')
# numpy.savetxt('2interpolated.csv', data_matrix, delimiter=',')
data_matrix, space_coordinates = removeBadSensors(data_matrix, space_coordinates, 0.75)
# saveMatrixToFile(data_matrix, '3matrix_removed_bad.txt')
# numpy.savetxt('3removed_bad.csv', data_matrix, delimiter=',')
# fill in missing readings using the average values for each time slice

# This is important because bins on either end of the time range might not have enough measurements
# This is now taken care of again. Once full tested, remove this.
# data_matrix, time_coordinates = trimBadEdgeElements(data_matrix, time_coordinates)
# saveMatrixToFile(data_matrix, '4matrixtrimmed.txt')
# numpy.savetxt('4matrixtrimmed.csv', data_matrix, delimiter=',')

# fill in missing readings using the average values for each time slice
data_matrix = fillInMissingReadings(data_matrix, -1)
# saveMatrixToFile(data_matrix, '4matrix_filled_bad.txt')
# numpy.savetxt('4filled_bad.csv', data_matrix, delimiter=',')


# for debugging report id of last sensor in matrix - to get raw data
Expand Down Expand Up @@ -399,25 +401,30 @@ def createModel(sensor_data, latlon_length_scale, elevation_length_scale, time_l
# data_matrix, space_coordinates, time_coordinates = setupDataMatrix(sensor_data, space_coordinates, time_coordinates, device_location_map)
data_matrix, space_coordinates, time_coordinates = setupDataMatrix2(sensor_data, space_coordinates, time_coordinates, device_location_map)

space_coordinates = torch.tensor(space_coordinates) # convert data to pytorch tensor
time_coordinates = torch.tensor(time_coordinates) # convert data to pytorch tensor
data_matrix = torch.tensor(data_matrix) # convert data to pytorch tensor

model = gaussian_model.gaussian_model(space_coordinates, time_coordinates, data_matrix,
latlon_length_scale=float(latlon_length_scale),
elevation_length_scale=float(elevation_length_scale),
time_length_scale=float(time_length_scale),
noise_variance=36.0, signal_variance=400.0, time_structured=False)

if save_matrices:
numpy.savetxt('space_coords.csv', space_coordinates, delimiter=',')
numpy.savetxt('time_coords.csv', time_coordinates, delimiter=',')
numpy.savetxt('PM_data.csv', data_matrix, delimiter=',')
numpy.savetxt('latlon_scale.csv', numpy.full([1], latlon_length_scale), delimiter=',')
numpy.savetxt('time_scale.csv', numpy.full([1], time_length_scale), delimiter=',')
numpy.savetxt('elevation_scale.csv', numpy.full([1], elevation_length_scale), delimiter=',')
if (data_matrix.size > 0):
space_coordinates = torch.tensor(space_coordinates) # convert data to pytorch tensor
time_coordinates = torch.tensor(time_coordinates) # convert data to pytorch tensor
data_matrix = torch.tensor(data_matrix) # convert data to pytorch tensor

model = gaussian_model.gaussian_model(space_coordinates, time_coordinates, data_matrix,
latlon_length_scale=float(latlon_length_scale),
elevation_length_scale=float(elevation_length_scale),
time_length_scale=float(time_length_scale),
noise_variance=36.0, signal_variance=400.0, time_structured=False)
status = ""
else:
model = None
status = "0 measurements"

# if save_matrices:
# numpy.savetxt('space_coords.csv', space_coordinates, delimiter=',')
# numpy.savetxt('time_coords.csv', time_coordinates, delimiter=',')
# numpy.savetxt('PM_data.csv', data_matrix, delimiter=',')
# numpy.savetxt('latlon_scale.csv', numpy.full([1], latlon_length_scale), delimiter=',')
# numpy.savetxt('time_scale.csv', numpy.full([1], time_length_scale), delimiter=',')
# numpy.savetxt('elevation_scale.csv', numpy.full([1], elevation_length_scale), delimiter=',')

return model, time_offset
return model, time_offset, status


# Ross changed this to do the formatting in the api_routes call instead of here
Expand Down Expand Up @@ -456,17 +463,17 @@ def estimateUsingModel(model, lats, lons, elevations, query_dates, time_offset,
query_dates2 = numpy.transpose(numpy.asarray([time_coordinates]))
query_time = torch.tensor(query_dates2)

if save_matrices:
numpy.savetxt('query_space_coords.csv', space_coordinates, delimiter=',')
numpy.savetxt('query_time_coords.csv', query_time, delimiter=',')
# if save_matrices:
# numpy.savetxt('query_space_coords.csv', space_coordinates, delimiter=',')
# numpy.savetxt('query_time_coords.csv', query_time, delimiter=',')

yPred, yVar = model(query_space, query_time)
yPred, yVar, status = model(query_space, query_time)
yPred = numpy.maximum(yPred.numpy(), 0.0)
yVar = yVar.numpy()
if numpy.amin(yVar) < 0.0:
logging.warn("Got negative values in variance, suggesting a numerical problem")

return yPred, yVar
return yPred, yVar, status

#
# this kind of formatting of data is now done in the API (api_routes), because it will get formatted differently for different types of queries.
Expand Down
2 changes: 0 additions & 2 deletions aqandu/static/js/map_reconfigure.js
Original file line number Diff line number Diff line change
Expand Up @@ -1235,8 +1235,6 @@ function createNewMarker(location) {
// create Dot
const randomClickMarker = [{ 'ID': markerID, 'SensorSource': 'sensorLayerRandomMarker', 'Latitude': String(location.latlng.lat), 'Longitude': String(location.latlng.lng) }]
sensorLayerRandomMarker(randomClickMarker)

console.log(pastDate, todaysDate)
const predictionsForLocationURL = generateURL('/getEstimatesForLocation', { 'location': { 'lat': String(location.latlng.lat), 'lon': String(location.latlng.lng) }, 'start_date': formatDateTime(pastDate), 'end_date': formatDateTime(todaysDate), 'estimatesrate': 1.0 })

getDataFromDB(predictionsForLocationURL).then(data => {
Expand Down
12 changes: 6 additions & 6 deletions aqandu/utils.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from datetime import datetime, timedelta
from datetime import datetime, timedelta, timezone
import pytz
import utm
from matplotlib.path import Path
Expand All @@ -22,7 +22,7 @@ def validateDate(dateString):

def parseDateString(datetime_string):
"""Parse date string into a datetime object"""
return datetime.strptime(datetime_string, DATETIME_FORMAT).astimezone(pytz.timezone('US/Mountain'))
return datetime.strptime(datetime_string, DATETIME_FORMAT).replace(tzinfo=timezone.utc)

# this breaks the time part of the eatimation/data into pieces to speed up computation
# sequence_size_mins
Expand Down Expand Up @@ -67,9 +67,9 @@ def setupElevationInterpolator(filename):
elevation_grid = data['elevs']
gridLongs = data['gridLongs']
gridLats = data['gridLats']
np.savetxt('grid_lats.txt',gridLats)
np.savetxt('grid_lons.txt',gridLongs)
np.savetxt('elev_grid.txt', elevation_grid)
# np.savetxt('grid_lats.txt',gridLats)
# np.savetxt('grid_lons.txt',gridLongs)
# np.savetxt('elev_grid.txt', elevation_grid)
return interpolate.interp2d(gridLongs, gridLats, elevation_grid, kind='cubic')


Expand Down Expand Up @@ -136,7 +136,7 @@ def isQueryInBoundingBox(bounding_box_vertices, query_lat, query_lon):
def removeInvalidSensors(sensor_data):
# sensor is invalid if its average reading for any day exceeds 350 ug/m3
epoch = datetime(1970, 1, 1)
epoch = pytz.timezone('US/Mountain').localize(epoch)
epoch = pytz.timezone('UTC').localize(epoch)
dayCounts = {}
dayReadings = {}
for datum in sensor_data:
Expand Down