Spark源码[5]-BlockManager

2020-05-24  本文已影响0人  蠟筆小噺没有烦恼

在Spark中,只要涉及到非Partition级别的数据,都会有一个Block的概念,而这里的Block并不是HDFS的Block,而是Spark内部为了数据存储而设立的一个概念,每个Block都会有BlockId,BlockInfo等信息。所以这里先介绍一下BlockId和BlockInfo。

1 BlockId

该部分代码在spark-core模块的org.apache.spark.storage包中,可以看到是将其作为存储部分进行区分的。BlockId是一个不可修改的抽象类,有一个name,及几个判断方法;

 sealed abstract class BlockId {
  /** A globally unique identifier for this Block. Can be used for ser/de. */
  def name: String

  // convenience methods
  def asRDDId: Option[RDDBlockId] = if (isRDD) Some(asInstanceOf[RDDBlockId]) else None
  def isRDD: Boolean = isInstanceOf[RDDBlockId]
  def isShuffle: Boolean = isInstanceOf[ShuffleBlockId]
  def isBroadcast: Boolean = isInstanceOf[BroadcastBlockId]
  override def toString: String = name
}

BlockId有多个继承case class,包括RDDBlockId、ShuffleBlockId、ShuffleIndexBlockId等,只根据传入的rddId/reduceId/mapId等来计算的到Blockid,是一个字符串。

还定义了一个BlockId的对象,用于解构具体的BlockId,根据名称的到具体的BlockId,使用了scala的模式匹配以及正则表达式:

object BlockId {
  val RDD = "rdd_([0-9]+)_([0-9]+)".r
  val SHUFFLE = "shuffle_([0-9]+)_([0-9]+)_([0-9]+)".r
  val SHUFFLE_DATA = "shuffle_([0-9]+)_([0-9]+)_([0-9]+).data".r
  val SHUFFLE_INDEX = "shuffle_([0-9]+)_([0-9]+)_([0-9]+).index".r
  val BROADCAST = "broadcast_([0-9]+)([_A-Za-z0-9]*)".r
  val TASKRESULT = "taskresult_([0-9]+)".r
  val STREAM = "input-([0-9]+)-([0-9]+)".r
  val TEMP_LOCAL = "temp_local_([-A-Fa-f0-9]+)".r
  val TEMP_SHUFFLE = "temp_shuffle_([-A-Fa-f0-9]+)".r
  val TEST = "test_(.*)".r

  def apply(name: String): BlockId = name match {
    case RDD(rddId, splitIndex) => RDDBlockId(rddId.toInt, splitIndex.toInt)
    case SHUFFLE(shuffleId, mapId, reduceId) => ShuffleBlockId(shuffleId.toInt, mapId.toInt, reduceId.toInt)
    case SHUFFLE_DATA(shuffleId, mapId, reduceId) => ShuffleDataBlockId(shuffleId.toInt, mapId.toInt, reduceId.toInt)
    case SHUFFLE_INDEX(shuffleId, mapId, reduceId) =>  ShuffleIndexBlockId(shuffleId.toInt, mapId.toInt, reduceId.toInt)
    case BROADCAST(broadcastId, field) => BroadcastBlockId(broadcastId.toLong, field.stripPrefix("_"))
    case TASKRESULT(taskId) => TaskResultBlockId(taskId.toLong)
    case STREAM(streamId, uniqueId) => StreamBlockId(streamId.toInt, uniqueId.toLong)
    case TEMP_LOCAL(uuid) => TempLocalBlockId(UUID.fromString(uuid))
    case TEMP_SHUFFLE(uuid) => TempShuffleBlockId(UUID.fromString(uuid))
    case TEST(value) => TestBlockId(value)
    case _ => throw new UnrecognizedBlockId(name)
  }
}

通过正则表达式,我们也可以得知不同的Block在命名上有什么区别。

2 BlockInfo

用户描述Block得元数据信息,包括得属性又:
level:Block得存储级别,即StorageLevel
classTag:Block得类型
tellMaster:Block是否需要告知Master
_size:Block得大小
_readerCount:当Block被加锁时候得读取次数
_writerTask:由于Task在写Block时候需要获得锁,这里存放每次获取锁得TaskId,初始值为-1;

3 BlockManager

每个Executor会创建一个BlockManager,在其中运行的一个或多个Task都会共用BlockManager。我们所常用的Broadcast的数据被driver分发后,就存储在BlockManager中。

BlockManager有所有关于存储、使用、移除Block的方法。

上一篇下一篇

猜你喜欢

热点阅读