Skip to content

Commit

Permalink
Merge branch 'release/1.16.0'
Browse files Browse the repository at this point in the history
  • Loading branch information
Taylor Nation committed Nov 2, 2016
2 parents 589db94 + aafb20c commit 0a6d1df
Show file tree
Hide file tree
Showing 61 changed files with 1,886 additions and 430 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,19 @@ server {
access_log /var/log/nginx/mmw-app.access.log logstash_json;

{% if ['packer'] | is_in(group_names) -%}
location /version.txt {
location = /version.txt {
alias /srv/version.txt;
}
{% endif %}

location /favicon.ico {
location = /favicon.ico {
alias {{ app_static_root }}favicon.png;
}

location = /micro/ {
return 301 https://micro.$host;
}

location /static/ {
{% if ['packer'] | is_in(group_names) -%}
etag on;
Expand All @@ -42,7 +46,7 @@ server {
alias {{ app_media_root }};
}

location /health-check/ {
location = /health-check/ {
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header Host $http_host;
proxy_redirect off;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ server {
access_log /var/log/nginx/mmw-tiler.access.log logstash_json;

{% if ['packer'] | is_in(group_names) -%}
location /version.txt {
location = /version.txt {
alias /srv/version.txt;
}
{% endif %}
Expand Down
25 changes: 20 additions & 5 deletions scripts/aws/setupdb.sh
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,10 @@ where: \n
-b load/reload boundary data\n
-f load a named boundary sql.gz\n
-s load/reload stream data\n
-d load/reload DRB stream data\n
-d load/reload DRB stream data\n
-m load/reload mapshed data\n
-p load/reload DEP data\n
-q load/reload water quality data\n
"

# HTTP accessible storage for initial app data
Expand All @@ -21,8 +23,9 @@ load_boundary=false
file_to_load=
load_stream=false
load_mapshed=false
load_water_quality=false

while getopts ":hbsdpmf:" opt; do
while getopts ":hbsdpmqf:" opt; do
case $opt in
h)
echo -e $usage
Expand All @@ -37,6 +40,8 @@ while getopts ":hbsdpmf:" opt; do
load_dep=true ;;
m)
load_mapshed=true ;;
q)
load_water_quality=true ;;
f)
file_to_load=$OPTARG ;;
\?)
Expand Down Expand Up @@ -89,7 +94,7 @@ fi
if [ "$load_boundary" = "true" ] ; then
# Fetch boundary layer sql files
FILES=("boundary_county.sql.gz" "boundary_school_district.sql.gz" "boundary_district.sql.gz" "boundary_huc12.sql.gz" "boundary_huc10.sql.gz" "boundary_huc08.sql.gz")
PATHS=("county" "district" "huc8" "huc10" "huc12")
PATHS=("county" "district" "huc8" "huc10" "huc12" "school")

download_and_load $FILES
purge_tile_cache $PATHS
Expand All @@ -98,7 +103,7 @@ fi
if [ "$load_stream" = "true" ] ; then
# Fetch stream network layer sql files
FILES=("nhdflowline.sql.gz")
PATHS=("stream")
PATHS=("nhd_streams_v2")

download_and_load $FILES
purge_tile_cache $PATHS
Expand All @@ -107,7 +112,7 @@ fi
if [ "$load_drb_streams" = "true" ] ; then
# Fetch DRB stream network layer sql file
FILES=("drb_streams_50.sql.gz")
PATHS=("drb_streams")
PATHS=("drb_streams_v2")

download_and_load $FILES
purge_tile_cache $PATHS
Expand All @@ -120,3 +125,13 @@ if [ "$load_mapshed" = "true" ] ; then

download_and_load $FILES
fi

if [ "$load_water_quality" = "true" ] ; then
# Fetch water quality data
FILES=("nhd_water_quality.sql.gz" "drb_catchment_water_quality.sql.gz")
PATHS=("drb_catchment_water_quality_tn" "drb_catchment_water_quality_tp"
"drb_catchment_water_quality_tss" "nhd_quality_tp" "nhd_quality_tn")

download_and_load $FILES
purge_tile_cache $PATHS
fi
46 changes: 46 additions & 0 deletions src/mmw/apps/modeling/calcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,3 +92,49 @@ def point_source_pollution(geojson):
'name': 'pointsource',
'categories': point_source_results
}


def catchment_water_quality(geojson):
"""
Given a GeoJSON shape, retrieve Catchment Water Quality data
from the `drb_catchment_water_quality` table to display
in the Analyze tab.
Returns a dictionary to append to the outgoing JSON for analysis
result
"""
geom = GEOSGeometry(geojson, srid=4326)
table_name = 'drb_catchment_water_quality'
sql = '''
SELECT nord, areaha, tn_tot_kgy, tp_tot_kgy, tss_tot_kg,
tn_urban_k, tn_riparia, tn_ag_kgyr, tn_natural, tn_pt_kgyr,
tp_urban_k, tp_riparia, tp_ag_kgyr, tp_natural, tp_pt_kgyr,
tss_urban_, tss_rip_kg, tss_ag_kgy, tss_natura,
tn_yr_avg_, tp_yr_avg_, tss_concmg,
ST_AsGeoJSON(ST_Simplify(geom, 0.0003)) as geom
FROM {table_name}
WHERE ST_Intersects(geom, ST_SetSRID(ST_GeomFromText(%s), 4326))
'''.format(table_name=table_name)

with connection.cursor() as cursor:
cursor.execute(sql, [geom.wkt])

if cursor.rowcount != 0:
columns = [col[0] for col in cursor.description]
catchment_water_quality_results = [
# The TN, TP, and TSS values return as type Decimal,
# but we want floats.
dict(zip(columns,
([int(row[0]) if row[0] else None] +
[float(value) if value else None
for value in row[1:22]] +
[row[22]])))
for row in cursor.fetchall()
]
else:
catchment_water_quality_results = []
return {
'displayName': 'Water Quality',
'name': 'catchment_water_quality',
'categories': catchment_water_quality_results
}
40 changes: 26 additions & 14 deletions src/mmw/apps/modeling/geoprocessing.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from django.conf import settings
from django_statsd.clients import statsd
from celery.exceptions import MaxRetriesExceededError
from requests.exceptions import ConnectionError

import requests
import json
Expand Down Expand Up @@ -60,29 +61,36 @@ def sjs_submit(host, port, args, data, retry=None):
"""
base_url = 'http://{}:{}'.format(host, port)
jobs_url = '{}/jobs?{}'.format(base_url, args)
response = requests.post(jobs_url, data=json.dumps(data))

try:
response = requests.post(jobs_url, data=json.dumps(data))
except ConnectionError as exc:
if retry is not None:
retry(exc=exc)

if response.ok:
job = response.json()
else:
error = response.json()
if error['status'] == 'NO SLOTS AVAILABLE' and retry:
try:
retry()
except MaxRetriesExceededError:
raise Exception('No slots available in Spark JobServer.\n'
'Details = {}'.format(response.text))

if error['status'] == 'NO SLOTS AVAILABLE' and retry is not None:
retry(exc=Exception('No slots available in Spark JobServer.\n'
'Details = {}'.format(response.text)))
elif error['result'] == 'context geoprocessing not found':
reboot_sjs_url = '{}/contexts?reset=reboot'.format(base_url)
context_response = requests.put(reboot_sjs_url)

if context_response.ok:
if retry:
retry()
if retry is not None:
retry(exc=Exception('Geoprocessing context missing in '
'Spark JobServer\nDetails = {}'.format(
context_response.text)))
else:
raise Exception('Geoprocessing context missing in Spark'
'JobServer, but no retry was set.\n'
'Details = {}'.format())
raise Exception('Geoprocessing context missing in '
'Spark JobServer, but no retry was set.\n'
'Details = {}'.format(
context_response.text))

else:
raise Exception('Unable to create missing geoprocessing '
'context in Spark JobServer.\n'
Expand All @@ -106,7 +114,11 @@ def sjs_retrieve(host, port, job_id, retry=None):
proceeding.
"""
url = 'http://{}:{}/jobs/{}'.format(host, port, job_id)
response = requests.get(url)
try:
response = requests.get(url)
except ConnectionError as exc:
if retry is not None:
retry(exc=exc)

if response.ok:
job = response.json()
Expand All @@ -117,7 +129,7 @@ def sjs_retrieve(host, port, job_id, retry=None):
if job['status'] == 'FINISHED':
return job['result']
elif job['status'] == 'RUNNING':
if retry:
if retry is not None:
try:
retry()
except MaxRetriesExceededError:
Expand Down
74 changes: 43 additions & 31 deletions src/mmw/apps/modeling/mapshed/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
sed_a_factor
)


NLU = settings.GWLFE_CONFIG['NLU']
NRur = settings.GWLFE_DEFAULTS['NRur']
AG_NLCD_CODES = settings.GWLFE_CONFIG['AgriculturalNLCDCodes']
Expand Down Expand Up @@ -457,38 +458,49 @@ def nlcd_kfactor(result):
return output


def geop_tasks(geom, errback, exchange, choose_worker):
def geop_tasks():
return geop_task_defs.keys()


geop_task_defs = {
'nlcd_streams': lambda geom: (
'nlcd_streams',
{'polygon': [geom.geojson], 'vector': streams(geom)},
nlcd_streams),
'nlcd_soils': lambda geom: (
'nlcd_soils',
{'polygon': [geom.geojson]},
nlcd_soils),
'gwn': lambda geom: ('gwn', {'polygon': [geom.geojson]}, gwn),
'avg_awc': lambda geom: (
'avg_awc',
{'polygon': [geom.geojson]},
avg_awc),
'nlcd_slope': lambda geom: (
'nlcd_slope',
{'polygon': [geom.geojson]},
nlcd_slope),
'slope': lambda geom: (
'slope',
{'polygon': [geom.geojson]},
slope),
'nlcd_kfactor': lambda geom: (
'nlcd_kfactor',
{'polygon': [geom.geojson]},
nlcd_kfactor)
}


def geop_task(taskName, geom, exchange, errback, choose_worker):
(opname, data, callback) = geop_task_defs[taskName](geom)
geop_worker = choose_worker()

# List of tuples of (opname, data, callback) for each geop task
definitions = [
('nlcd_streams',
{'polygon': [geom.geojson], 'vector': streams(geom)},
nlcd_streams),
('nlcd_soils', {'polygon': [geom.geojson]}, nlcd_soils),
('gwn',
{'polygon': [geom.geojson]}, gwn),
('avg_awc',
{'polygon': [geom.geojson]}, avg_awc),
('nlcd_slope',
{'polygon': [geom.geojson]},
nlcd_slope),
('slope',
{'polygon': [geom.geojson]},
slope),
('nlcd_kfactor',
{'polygon': [geom.geojson]},
nlcd_kfactor)
]

return [(mapshed_start.s(opname, data).set(exchange=exchange,
routing_key=geop_worker) |
mapshed_finish.s().set(exchange=exchange,
routing_key=geop_worker) |
callback.s().set(link_error=errback,
exchange=exchange,
routing_key=choose_worker()))
for (opname, data, callback) in definitions]
return (mapshed_start.s(opname, data).set(exchange=exchange,
routing_key=geop_worker) |
mapshed_finish.s().set(exchange=exchange,
routing_key=geop_worker) |
callback.s().set(link_error=errback,
exchange=exchange,
routing_key=choose_worker()))


@shared_task
Expand Down
29 changes: 20 additions & 9 deletions src/mmw/apps/modeling/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,11 @@
from celery import shared_task

from apps.modeling.geoprocessing import histogram_start, histogram_finish, \
data_to_survey, data_to_censuses
data_to_survey, data_to_census, data_to_censuses

from apps.modeling.calcs import (animal_population,
point_source_pollution)
point_source_pollution,
catchment_water_quality)

from tr55.model import simulate_day
from gwlfe import gwlfe, parser
Expand Down Expand Up @@ -108,17 +109,18 @@ def get_histogram_job_results(self, incoming):


@shared_task
def histogram_to_survey(incoming, aoi):
def histogram_to_survey_census(incoming):
"""
Converts the histogram results (aka analyze results)
to a survey of land use, which are rendered in the UI.
to a survey & census of land use, which are rendered in the UI.
"""
pixel_width = incoming['pixel_width']
data = incoming['histogram'][0]
results = data_to_survey(data)
convert_result_areas(pixel_width, results)
census = data_to_census(data)
survey = data_to_survey(data)
convert_result_areas(pixel_width, survey)

return results
return {'survey': survey, 'census': census}


@shared_task
Expand All @@ -138,15 +140,24 @@ def analyze_animals(area_of_interest):
"""
Given an area of interest, returns the animal population within it.
"""
return [animal_population(area_of_interest)]
return {'survey': [animal_population(area_of_interest)]}


@shared_task
def analyze_pointsource(area_of_interest):
"""
Given an area of interest, returns point sources of pollution within it.
"""
return [point_source_pollution(area_of_interest)]
return {'survey': [point_source_pollution(area_of_interest)]}


@shared_task
def analyze_catchment_water_quality(area_of_interest):
"""
Given an area of interest in the DRB, returns catchment water quality data
within it.
"""
return {'survey': [catchment_water_quality(area_of_interest)]}


def parse_single_ring_multipolygon(area_of_interest):
Expand Down
Loading

0 comments on commit 0a6d1df

Please sign in to comment.