GeoSpark---Polygon查询
2020-11-26 本文已影响0人
一个懒散的人
环境idea-2020.1 + gradle-4.9 + scala-2.11
// gradle 的配置
plugins {
id 'java'
id 'scala'
}
group 'com.rhb'
version '1.0-SNAPSHOT'
sourceCompatibility = 1.8
repositories {
maven{ url 'http://maven.aliyun.com/nexus/content/groups/public'}
}
dependencies {
compile group: 'org.apache.spark', name: 'spark-hive_2.11', version: '2.3.3'
compile group: 'org.apache.spark', name: 'spark-core_2.11', version: '2.3.3'
compile group: 'org.apache.spark', name: 'spark-sql_2.11', version: '2.3.3'
compile group: 'org.apache.spark', name: 'spark-streaming_2.11', version: '2.3.2'
compile group: 'org.datasyslab', name: 'geospark', version: '1.1.2'
compile group: 'org.datasyslab', name: 'geospark-sql_2.1', version: '1.1.2'
compile group: 'com.vividsolutions', name: 'jts', version: '1.13'
}
代码如下:
package GeoSpark
import com.vividsolutions.jts.geom.{Coordinate, Envelope, GeometryFactory}
import org.apache.spark.serializer.KryoSerializer
import org.apache.spark.{SparkConf, SparkContext}
import org.datasyslab.geospark.enums.FileDataSplitter
import org.datasyslab.geospark.serde.GeoSparkKryoRegistrator
import org.datasyslab.geospark.spatialOperator.RangeQuery
import org.datasyslab.geospark.spatialRDD.{PointRDD, PolygonRDD}
object geo_demo2 {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().
setAppName("SpatialRangeQueryApp").setMaster("local[*]").
set("spark.serializer",classOf[KryoSerializer].getName).
set("spark.kryo.registrator", classOf[GeoSparkKryoRegistrator].getName)
implicit val sc = new SparkContext(conf)
val polygonRDD = createPolygonRDD
polygonRDD.rawSpatialRDD.rdd.collect().foreach(println(_))
println("===============================")
// // 定义Polygon 查询窗口
val geometryFactory = new GeometryFactory()
val coordinates = new Array[Coordinate](5)
coordinates(0) = new Coordinate(-80,0)
coordinates(1) = new Coordinate(-80,33)
coordinates(2) = new Coordinate(33,33)
coordinates(3) = new Coordinate(33,0)
coordinates(4) = coordinates(0)
val polygonObject = geometryFactory.createPolygon(coordinates)
val queryResult = RangeQuery.SpatialRangeQuery(polygonRDD,polygonObject,false,false)
queryResult.rdd.collect().foreach(println(_))
}
def createPolygonRDD(implicit sc:SparkContext):PolygonRDD={
val polygonRDDInputLocation = "D:\\idea\\polygon.csv"
val polygonRDDStartOffset = 0
val polygonRDDEndOffset = 9
val polygonRDDSplitter = FileDataSplitter.CSV // or use FileDataSplitter.TSV
val carryOtherAttributes = true
val objectRDD = new PolygonRDD(sc, polygonRDDInputLocation, polygonRDDStartOffset, polygonRDDEndOffset, polygonRDDSplitter, carryOtherAttributes)
objectRDD
}
}
polygon.csv 数据集
-88.331492,32.324142,-88.331492,32.324142,-88.331492,32.324142,-88.331492,32.324142,-88.331492,32.324142,hotel
-88.175933,32.360763,-88.175933,32.360763,-88.175933,32.360763,-88.175933,32.360763,-88.175933,32.360763,gas
-88.388954,32.357073,-88.388954,32.357073,-88.388954,32.357073,-88.388954,32.357073,-88.388954,32.357073,bar
-88.221102,32.35078,-88.221102,32.35078,-88.221102,32.35078,-88.221102,32.35078,-88.221102,32.35078,restaurant
-78.221102,32.35078,-78.221102,32.35078,-78.221102,32.35078,-78.221102,32.35078,-78.221102,32.35078,restaurant2
输出结果如下:
输入打印:
POLYGON ((-88.331492 32.324142, -88.331492 32.324142, -88.331492 32.324142, -88.331492 32.324142, -88.331492 32.324142)) hotel
POLYGON ((-88.175933 32.360763, -88.175933 32.360763, -88.175933 32.360763, -88.175933 32.360763, -88.175933 32.360763)) hotelgas
POLYGON ((-88.388954 32.357073, -88.388954 32.357073, -88.388954 32.357073, -88.388954 32.357073, -88.388954 32.357073)) hotelgasbar
POLYGON ((-88.221102 32.35078, -88.221102 32.35078, -88.221102 32.35078, -88.221102 32.35078, -88.221102 32.35078)) restaurant
POLYGON ((-78.221102 32.35078, -78.221102 32.35078, -78.221102 32.35078, -78.221102 32.35078, -78.221102 32.35078)) restaurantrestaurant2
===============================
输出打印:
POLYGON ((-78.221102 32.35078, -78.221102 32.35078, -78.221102 32.35078, -78.221102 32.35078, -78.221102 32.35078)) restaurantrestaurant2
Process finished with exit code 0