(十二)Geospark源码解析(一)

2019-07-26  本文已影响0人  Scially

Geospark源码解析(一)

本节我们以查询为例,看下GeoSpark如何利用分布式来实现高效查询。首先,对于Spark来说,想要利用Spark,必须要将自己的类型转为RDD,我们就先看下Geospark是如何读取GeoJson,并将Geometry转为RDD的。

public class SpatialRDD<T extends Geometry>
        implements Serializable
{
    /**
     * The raw spatial RDD.
     */
    public JavaRDD<T> rawSpatialRDD;
    
    ...
 }}

Geospark自定义了一个RDD,SpatialRDD,他是一个泛型类,并且泛型要求是Geometry的子类,对于Geometry来说,他的子类有PointLinePolygon等,这个大家可以去看JTS库http://www.tsusiatsoftware.net/jts/main.html。然后我这里列举了SpatialRDD一个重要的成员,对于rawSpatialRDD来说,他里面存储的就是我们的需要分析的Geometry

GeoSpark提供了PointRDDPolygonRDD等,他们都继承自SpatialRDD,我们以PointRDD为例,分析一下GeoSpark是如何将geojson转为RDD的。

public PointRDD(JavaSparkContext sparkContext, String InputLocation, Integer Offset, FileDataSplitter splitter,
            boolean carryInputData, Integer partitions, StorageLevel newLevel, String sourceEpsgCRSCode, String targetEpsgCode)
    {
        JavaRDD rawTextRDD = partitions != null ? sparkContext.textFile(InputLocation, partitions) : sparkContext.textFile(InputLocation);
        if (Offset != null) {this.setRawSpatialRDD(rawTextRDD.mapPartitions(new PointFormatMapper(Offset, splitter, carryInputData)));}
        else {this.setRawSpatialRDD(rawTextRDD.mapPartitions(new PointFormatMapper(splitter, carryInputData)));}
        if (sourceEpsgCRSCode != null && targetEpsgCode != null) { this.CRSTransform(sourceEpsgCRSCode, targetEpsgCode);}
        if (newLevel != null) { this.analyze(newLevel);}
        if (splitter.equals(FileDataSplitter.GEOJSON)) { this.fieldNames = FormatMapper.readGeoJsonPropertyNames(rawTextRDD.take(1).get(0).toString()); }
    }

这是PointRDD常用的一个构造函数,其中第4行JavaRDD rawTextRDD = partitions != null ? sparkContext.textFile(InputLocation, partitions) : sparkContext.textFile(InputLocation);则是利用Spark的原生方法将geojson首先转为一个RDD,他的类型可以理解为是String,第7行if (sourceEpsgCRSCode != null && targetEpsgCode != null) { this.CRSTransform(sourceEpsgCRSCode, targetEpsgCode);}则是做了一个坐标转换,关键是第5行this.setRawSpatialRDD(rawTextRDD.mapPartitions(new PointFormatMapper(Offset, splitter, carryInputData)));

在第5行中,Geospark首先调用了mapPartitions方法来将rawTextRDD中的每一行转为Geometry,其中pointFormatMapper中有一个方法

public Iterator<T> call(Iterator<String> stringIterator)
            throws Exception
    {
        List<T> result = new ArrayList<>();
        while (stringIterator.hasNext()) {
            String line = stringIterator.next();
            addGeometry(readGeometry(line), result);
        }
        return result.iterator();
    }

他是一个重载,函数参数stringIterator是每个分区的所有string,Geospark遍历这个集合,在每一行调用了一个addGeometry方法,将String转为Geometry,这个方法就不细讲,主要是解析GeoJson,感兴趣的可以去看GeoSpark源码。

这样构造完成后,就将GeoJson转为了一个RDD,此时我们还没有构建空间索引,但是对于大数据量的空间数据我们已经可以利用Spark的RDD进行并行计算了。

public static <U extends Geometry, T extends Geometry> JavaRDD<T> SpatialRangeQuery(SpatialRDD<T> spatialRDD, U originalQueryGeometry, boolean considerBoundaryIntersection, boolean useIndex)
            throws Exception
    {
        U queryGeometry = originalQueryGeometry;
        if (spatialRDD.getCRStransformation()) {
            queryGeometry = CRSTransformation.Transform(spatialRDD.getSourceEpsgCode(), spatialRDD.getTargetEpgsgCode(), originalQueryGeometry);
        }

        if (useIndex == true) {
            if (spatialRDD.indexedRawRDD == null) {
                throw new Exception("[RangeQuery][SpatialRangeQuery] Index doesn't exist. Please build index on rawSpatialRDD.");
            }
            return spatialRDD.indexedRawRDD.mapPartitions(new RangeFilterUsingIndex(queryGeometry, considerBoundaryIntersection, true));
        }
        else {
            return spatialRDD.getRawSpatialRDD().filter(new RangeFilter(queryGeometry, considerBoundaryIntersection, true));
        }
    }

这里我们看第16行return spatialRDD.getRawSpatialRDD().filter(new RangeFilter(queryGeometry, considerBoundaryIntersection, true));在第9行if (useIndex == true)判断不用索引时,就会跳到第16行,本质上还是用了RDD来利用自定义函数进行判断,如果是真,就过滤出来,我们看RangeFilter这个类。

public class RangeFilter<U extends Geometry, T extends Geometry>
        extends JudgementBase
        implements Function<T, Boolean>
{
    public RangeFilter(U queryWindow, boolean considerBoundaryIntersection, boolean leftCoveredByRight)
    {
        super(queryWindow, considerBoundaryIntersection, leftCoveredByRight);
    }
    public Boolean call(T geometry)
            throws Exception
    {
        if (leftCoveredByRight) {
            return match(geometry, queryGeometry);
        }
        else {
            return match(queryGeometry, queryGeometry);
        }
    }
}

注意到call这个方法,里面又调用了match方法,它在父类JudgementBase定义有:

public boolean match(Geometry spatialObject, Geometry queryWindow)
    {
        if (considerBoundaryIntersection) {
            if (queryWindow.intersects(spatialObject)) { return true; }
        }
        else {
            if (queryWindow.covers(spatialObject)) { return true; }
        }
        return false;
    }

这里面,我们可以看到第4行和第7行均是利用了JTS来判断的,到这里,就一目了然了,实际上还是我们提供了match这个方法,利用Spark来计算。

本文中,我们并没有涉及到索引,GeoSpark也将JTS的索引进行了封装,原理和上面讲的是一样的,我们下一篇文章中在进行分析。

上一篇下一篇

猜你喜欢

热点阅读