-
Notifications
You must be signed in to change notification settings - Fork 1
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
9 changed files
with
291 additions
and
3 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,7 @@ | ||
{ | ||
"zoom": 0, | ||
"rasters": [ | ||
"nlcd-2011-30m-epsg5070-0.10.0" | ||
], | ||
"multiPolygon": "{\"type\":\"MultiPolygon\",\"coordinates\":[[[[-75.1626205444336,39.95580659996906],[-75.25531768798828,39.94514735903112],[-75.22785186767578,39.89446035777916],[-75.1461410522461,39.88761144548104],[-75.09309768676758,39.91078961774283],[-75.09464263916016,39.93817189499188],[-75.12039184570312,39.94435771955196],[-75.1626205444336,39.95580659996906]]]]}" | ||
} |
83 changes: 83 additions & 0 deletions
83
geop/src/main/scala/com/azavea/usace/programanalysis/geop/ClippedLayers.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,83 @@ | ||
package com.azavea.usace.programanalysis.geop | ||
|
||
import geotrellis.raster.Tile | ||
import geotrellis.spark.io.{Intersects, _} | ||
import geotrellis.spark.{SpatialKey, TileLayerMetadata, _} | ||
import geotrellis.spark.io.s3.{S3AttributeStore, S3LayerReader} | ||
import geotrellis.vector.{Extent, MultiPolygon} | ||
|
||
import org.apache.spark.SparkContext | ||
|
||
|
||
object ClippedLayers { | ||
/** | ||
* Given a list of layer ids, a multipolygon and a spark context, returns | ||
* a list of [[TileLayerRDD]]s cropped to the multipolygon. | ||
* | ||
* @param rasterLayerIds List of [[LayerId]]s to crop | ||
* @param multiPolygon [[MultiPolygon]] to crop to | ||
* @param sc [[SparkContext]] for the [[S3LayerReader]] | ||
* @return List of [[TileLayerRDD]]s which includes only | ||
* the tiles that either completely or partially | ||
* overlap given multiPolygon. | ||
*/ | ||
def apply( | ||
rasterLayerIds: Seq[LayerId], | ||
multiPolygon: MultiPolygon, | ||
sc: SparkContext | ||
): Seq[TileLayerRDD[SpatialKey]] = { | ||
val extent = multiPolygon.envelope | ||
val rasterLayers = rasterLayerIds.map(rasterLayerId => | ||
queryAndCropLayer(catalog(sc), rasterLayerId, extent) | ||
) | ||
|
||
rasterLayers | ||
} | ||
|
||
/** | ||
* Given a catalog, layer id and extent, returns a [[TileLayerRDD]] of the | ||
* layer from the catalog with the matching id cropped to the extent. | ||
* | ||
* @param catalog The [[S3LayerReader]] to look up layers from | ||
* @param layerId The [[LayerId]] of the layer to look up | ||
* @param extent The [[Extent]] to crop to | ||
* @return A [[TileLayerRDD]] cropped to the extent | ||
*/ | ||
def queryAndCropLayer( | ||
catalog: S3LayerReader, | ||
layerId: LayerId, | ||
extent: Extent | ||
): TileLayerRDD[SpatialKey] = { | ||
catalog.query[SpatialKey, Tile, TileLayerMetadata[SpatialKey]](layerId) | ||
.where(Intersects(extent)) | ||
.result | ||
} | ||
|
||
/** | ||
* Given a spark context, returns the correct catalog. This configures the | ||
* next method with defaults. | ||
* | ||
*/ | ||
def catalog(sc: SparkContext): S3LayerReader = | ||
catalog("azavea-datahub", "catalog")(sc) | ||
|
||
/** | ||
* Given an S3 bucket name and root path, returns a catalog of layers | ||
* | ||
* @param bucket The name of the S3 Bucket of the catalog | ||
* @param rootPath Root path of the catalog in the bucket | ||
* @param sc [[SparkContext]] to use when connecting | ||
* @return An [[S3LayerReader]] which represents the | ||
* catalog of available layers from which | ||
* individual layers can be looked up. | ||
*/ | ||
def catalog( | ||
bucket: String, | ||
rootPath: String | ||
)(implicit sc: SparkContext): S3LayerReader = { | ||
val attributeStore = new S3AttributeStore(bucket, rootPath) | ||
val catalog = new S3LayerReader(attributeStore) | ||
|
||
catalog | ||
} | ||
} |
51 changes: 50 additions & 1 deletion
51
geop/src/main/scala/com/azavea/usace/programanalysis/geop/GeopServiceActor.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,14 +1,63 @@ | ||
package com.azavea.usace.programanalysis.geop | ||
|
||
import akka.actor.Actor | ||
|
||
import geotrellis.proj4.{ConusAlbers, LatLng} | ||
|
||
import org.apache.spark.SparkContext | ||
import spray.routing.HttpService | ||
|
||
import scala.concurrent.future | ||
|
||
import spray.http.AllOrigins | ||
import spray.http.HttpHeaders.{`Access-Control-Allow-Headers`, `Access-Control-Allow-Methods`, `Access-Control-Allow-Origin`} | ||
import spray.http.HttpMethods.{DELETE, GET, OPTIONS, POST} | ||
import spray.json.{JsNumber, JsObject} | ||
import spray.routing.{Directive0, HttpService, RejectionHandler} | ||
|
||
|
||
class GeopServiceActor(sc: SparkContext) extends Actor with HttpService { | ||
import scala.concurrent.ExecutionContext.Implicits.global | ||
import JsonProtocol._ | ||
|
||
implicit val _sc = sc | ||
|
||
def actorRefFactory = context | ||
def receive = runRoute(root) | ||
|
||
val corsHeaders = List( | ||
`Access-Control-Allow-Origin`(AllOrigins), | ||
`Access-Control-Allow-Methods`(GET, POST, OPTIONS, DELETE), | ||
`Access-Control-Allow-Headers`("Origin, X-Requested-With, Content-Type, Accept, Accept-Encoding, Accept-Language, Host, Referer, User-Agent, Access-Control-Request-Method, Access-Control-Request-Headers") | ||
) | ||
|
||
def cors: Directive0 = { | ||
val rh = implicitly[RejectionHandler] | ||
respondWithHeaders(corsHeaders) & handleRejections(rh) | ||
} | ||
|
||
def root = | ||
pathPrefix("count") { rasterGroupedCount } ~ | ||
path("ping") { complete { "OK" } } | ||
|
||
def rasterGroupedCount = | ||
cors { | ||
import spray.json.DefaultJsonProtocol._ | ||
|
||
entity(as[CountArgs]) { args => | ||
complete { | ||
future { | ||
val multiPolygon = args.multiPolygon.reproject(LatLng, ConusAlbers) | ||
val rasterLayers = ClippedLayers(args.rasters, multiPolygon, sc) | ||
val rasterGroupedCount = RasterGroupedCount(rasterLayers, multiPolygon) | ||
|
||
JsObject( | ||
rasterGroupedCount | ||
.map { case (keys, count) => | ||
keys.mkString(",") -> JsNumber(count) | ||
} | ||
) | ||
} | ||
} | ||
} | ||
} | ||
} |
36 changes: 36 additions & 0 deletions
36
geop/src/main/scala/com/azavea/usace/programanalysis/geop/JsonProtocol.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,36 @@ | ||
package com.azavea.usace.programanalysis.geop | ||
|
||
import geotrellis.spark.LayerId | ||
import geotrellis.vector.MultiPolygon | ||
import geotrellis.vector.io._ | ||
import geotrellis.vector.io.json.GeoJsonSupport | ||
|
||
import spray.httpx.SprayJsonSupport | ||
import spray.json._ | ||
import spray.json.DefaultJsonProtocol._ | ||
|
||
|
||
// TODO Nest under "input" | ||
case class CountArgs (rasters: Seq[LayerId], multiPolygon: MultiPolygon) | ||
|
||
object JsonProtocol extends SprayJsonSupport with GeoJsonSupport { | ||
implicit object CountArgsJsonFormat extends RootJsonFormat[CountArgs] { | ||
def write(args: CountArgs) = JsObject( | ||
"zoom" -> JsNumber(args.rasters.head.zoom), | ||
"rasters" -> JsArray(args.rasters.map(r => JsString(r.name)).toVector), | ||
"multiPolygon" -> JsString(args.multiPolygon.toGeoJson()) | ||
) | ||
|
||
def read(value: JsValue) = { | ||
value.asJsObject.getFields("zoom", "rasters", "multiPolygon") match { | ||
case Seq(JsNumber(zoom), JsArray(rasters), JsString(multiPolygon)) => | ||
new CountArgs( | ||
rasters.map { r => LayerId(r.convertTo[String], zoom.toInt) }, | ||
multiPolygon.parseGeoJson[MultiPolygon] | ||
) | ||
case _ => | ||
throw new DeserializationException("Bad Count Arguments") | ||
} | ||
} | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
107 changes: 107 additions & 0 deletions
107
geop/src/main/scala/com/azavea/usace/programanalysis/geop/RasterGroupedCount.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,107 @@ | ||
package com.azavea.usace.programanalysis.geop | ||
|
||
import geotrellis.raster.rasterize.{Callback, Rasterizer} | ||
import geotrellis.raster.{RasterExtent, Tile} | ||
import geotrellis.spark.{SpatialKey, TileLayerRDD} | ||
import geotrellis.vector.{MultiPolygon, MultiPolygonResult, PolygonResult} | ||
|
||
import org.apache.spark.rdd.RDD | ||
|
||
import scala.collection.mutable.ListBuffer | ||
|
||
|
||
object RasterGroupedCount { | ||
/** | ||
* Given a sequence of 1 - 3 raster layers, and a multipolygon, returns a | ||
* mapping from a tuple of raster values of the layers in order to the | ||
* count of pixels matching that combination. | ||
* | ||
* @param rasterLayers The list of [[TileLayerRDD]]s | ||
* @param multiPolygon The [[MultiPolygon]] to count within | ||
* @return Map from a tuple of integer values to the count | ||
* of pixels matching that combination. | ||
*/ | ||
def apply( | ||
rasterLayers: Seq[TileLayerRDD[SpatialKey]], | ||
multiPolygon: MultiPolygon | ||
): Map[Seq[Int], Int] = { | ||
joinRasters(rasterLayers) | ||
.map { case (key, tiles) => | ||
getDistinctPixels(rasterLayers.head, key, tiles.head, multiPolygon) | ||
.map { case (col, row) => tiles.map { tile => tile.get(col, row) } } | ||
.groupBy(identity).map { case (k, list) => k -> list.length } | ||
.toList | ||
} | ||
.reduce { (left, right) => left ++ right } | ||
.groupBy(_._1).map { case (k, list) => k -> list.map(_._2).sum } | ||
} | ||
|
||
/** | ||
* Given a sequence of 1 - 3 raster layers, returns a join of them all. | ||
* | ||
* @param rasterLayers The list of [[TileLayerRDD]]s | ||
* @return Joined RDD with a list of tiles, corresponding | ||
* to each raster in the list, matching a spatial | ||
* key. | ||
*/ | ||
def joinRasters( | ||
rasterLayers: Seq[TileLayerRDD[SpatialKey]] | ||
): RDD[(SpatialKey, List[Tile])] = { | ||
rasterLayers.length match { | ||
case 1 => | ||
rasterLayers.head | ||
.map { case (k, v) => (k, List(v)) } | ||
case 2 => | ||
rasterLayers.head.join(rasterLayers.tail.head) | ||
.map { case (k, (v1, v2)) => (k, List(v1, v2)) } | ||
case 3 => | ||
rasterLayers.head.join(rasterLayers.tail.head).join(rasterLayers.tail.tail.head) | ||
.map { case (k, ((v1, v2), v3)) => (k, List(v1, v2, v3)) } | ||
|
||
case 0 => throw new Exception("At least 1 raster must be specified") | ||
case _ => throw new Exception("At most 3 rasters can be specified") | ||
} | ||
} | ||
|
||
/** | ||
* Given a layer, a key, a tile, and a multipolygon, returns a list of | ||
* distinct pixels present in the multipolygon clipped to an extent | ||
* corresponding to the key and tile. | ||
* | ||
* @param rasterLayer The [[TileLayerRDD]] to clip | ||
* @param key The [[SpatialKey]] to transform extent to | ||
* @param tile The [[Tile]] to calculate raster extent from | ||
* @param multiPolygon The [[MultiPolygon]] to clip to | ||
* @return List of distinct pixels | ||
*/ | ||
def getDistinctPixels( | ||
rasterLayer: TileLayerRDD[SpatialKey], | ||
key: SpatialKey, | ||
tile: Tile, | ||
multiPolygon: MultiPolygon | ||
): ListBuffer[(Int, Int)] = { | ||
val extent = rasterLayer.metadata.mapTransform(key) | ||
val rasterExtent = RasterExtent(extent, tile.cols, tile.rows) | ||
|
||
val pixels = ListBuffer.empty[(Int, Int)] | ||
val cb = new Callback { | ||
def apply(col: Int, row: Int): Unit = { | ||
val pixel = (col, row) | ||
pixels += pixel | ||
} | ||
} | ||
|
||
multiPolygon & extent match { | ||
case PolygonResult(p) => | ||
Rasterizer.foreachCellByPolygon(p, rasterExtent)(cb) | ||
case MultiPolygonResult(mp) => | ||
mp.polygons.foreach { p => | ||
Rasterizer.foreachCellByPolygon(p, rasterExtent)(cb) | ||
} | ||
|
||
case _ => | ||
} | ||
|
||
pixels.distinct | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters