RDD介绍

2016-07-28  本文已影响136人  imarch1

全称Resilient Distributed Datasets。Resilient(弹性):数据集的划分(进而决定了并行度)可变

内部接口:

分区

分区表示表示并行计算的一个计算单元。

trait Partition extends Serializable 
{ 
    /** * Get the partition's index within its parent RDD */ 
    def index: Int // A better default implementation of HashCode override 
    
    def hashCode(): Int = index
}

RDD 只是数据集的抽象,分区内部并不存储具体的数据。Partition 类内包含一个 index 成员,表示该分区在 RDD 内的编号,通过 RDD 编号加上分区编号可以唯一确定该分区对应的块编号,利用底层数据存储层提供的接口,就能从存储介质中提取出分区对应的数据。

分区的个数:窄依赖子 RDD 由父 RDD 分区个数决定,Shuffle 依赖由子 RDD 分区器决定。

依赖

Spark是RDD的转换操作。子 RDD 与父 RDD 之间的关系称为依赖关系。依赖关系决定了stage的划分。

abstract class Dependency[T] extends Serializable { 
    def rdd: RDD[T]
}

每个 Dependency 子类内部都会存储一个 RDD 对象,对应一个父 RDD。

依赖分为窄依赖(Narrow Dependency)和 Shuffle 依赖(Shuffle Dependency)。
窄依赖中,父 RDD 中的一个分区最多只会被子 RDD 中的一个分区使用,换句话说,父 RDD 中,一个分区内的数据是不能被分割的,必须整个交付给子 RDD 中的一个分区。Shuffle 依赖中,父 RDD 中的分区可能会被多个子 RDD 分区使用。

分区器

哈希分区器:其 getPartition 方法的实现很简单,取键值的 hashCode,除以子 RDD 的分区个数取余即可。

class HashPartitioner(partitions: Int) extends Partitioner {
  def numPartitions: Int = partitions

  def getPartition(key: Any): Int = key match {
    case null => 0
    case _ => Utils.nonNegativeMod(key.hashCode, numPartitions)
  }

  override def equals(other: Any): Boolean = other match {
    case h: HashPartitioner =>
      h.numPartitions == numPartitions
    case _ =>
      false
  }

  override def hashCode: Int = numPartitions
}
上一篇下一篇

猜你喜欢

热点阅读