spark streaming源码分析之ReceiverTrac

2019-11-17  本文已影响0人  cclucc

我们关注的问题是数据是怎么被接收的?又是怎么存储的?

数据是被executor上的线程receiver接收的,接收之后交由executor上的线程ReceiverSupervisorImpl处理。

JobScheduler的重要成员之一登场!!ReceiverTracker!!!
ReceiverTracker的简单介绍?

ReceiverTracker的目的是为每个batch的RDD提供输入数据。通过以下三步完成:

  1. 分发receiver到executor,启动接收的线程。
  2. 分发ReceiverSupervisorImpl到executor,启动处理数据的线程,并掌握数据的信息
  3. 一个job提交了,它是怎么为其提供数据进行etl的?

++首先看下Receiver是怎么被分发到各个executor上的++

def start(): Unit = synchronized {
    //....

    if (!receiverInputStreams.isEmpty) {
      endpoint = ssc.env.rpcEnv.setupEndpoint(
        "ReceiverTracker", new ReceiverTrackerEndpoint(ssc.env.rpcEnv))//用来接收和处理来自 ReceiverTracker 和 receivers 发送的消息
      if (!skipReceiverLaunch) launchReceivers() //重要!考点!!!将receiver分发到executers
      //.....
    }
  }
//来!具体来看launchReceivers
private def launchReceivers(): Unit = {
    val receivers = receiverInputStreams.map {...}//DStreamGraph持有所有的inputDS,获取到这些inputDS的receiver

    
    endpoint.send(StartAllReceivers(receivers))//拿到receivers后分发的具体实现
}

override def receive: PartialFunction[Any, Unit] = {
      // 确定了每个 receiver 要分发到哪些 executors 
      case StartAllReceivers(receivers) =>
        val scheduledLocations = schedulingPolicy.scheduleReceivers(receivers, getExecutors)
        for (receiver <- receivers) {
          val executors = scheduledLocations(receiver.streamId)
          updateReceiverScheduledExecutors(receiver.streamId, executors)
          receiverPreferredLocations(receiver.streamId) = receiver.preferredLocation
          startReceiver(receiver, executors)
        }
      //.....  
}

private def startReceiver(
        receiver: Receiver[_],
        scheduledLocations: Seq[TaskLocation]): Unit = {

      // Function to start the receiver on the worker node
      //重点!考点!!这个函数会和rdd一起提交,它new了一个ReceiverSupervisorImpl用来具体处理接收的数据,后面会具体讲!!
      val startReceiverFunc: Iterator[Receiver[_]] => Unit =
        (iterator: Iterator[Receiver[_]]) => {
          
          if (TaskContext.get().attemptNumber() == 0) {
            val receiver = iterator.next()
            val supervisor = new ReceiverSupervisorImpl(
              receiver, SparkEnv.get, serializableHadoopConf.value, checkpointDirOption)//真正处理接收到的数据
            supervisor.start()//启动线程
            supervisor.awaitTermination()//重要!堵塞线程,源源不断的从reciver处获取数据!
          }
        }

      // Create the RDD using the scheduledLocations to run the receiver in a Spark job
      //重点!考点!!这里把recever和location打包成一个rdd了,所以recevier可以在多个executor上运行!!!
      val receiverRDD: RDD[Receiver[_]] =
        if (scheduledLocations.isEmpty) {
          ssc.sc.makeRDD(Seq(receiver), 1)
        } else {
          val preferredLocations = scheduledLocations.map(_.toString).distinct
          ssc.sc.makeRDD(Seq(receiver -> preferredLocations))
        }
      
      //.....

     //提交啦!⚠️ 到这里recevier就被分发到具体的executor上了
      val future = ssc.sparkContext.submitJob[Receiver[_], Unit, Unit](
        receiverRDD, startReceiverFunc, Seq(0), (_, _) => Unit, ())
      
      //....
    }
  

++来,再看一下具体在executor上是怎么实现处理数据的?++

第一部分,怎么接收数据?

recevier被分发到具体的executor上之后会怎么实现数据的处理呢?reciver会调用supervisor的put方法!!!也就是说recevier其实只关心从哪儿接数据以及数据接过来怎么解析,而并不关心数据怎么存!!!谁在用!!!

//先看下recevier怎么把数据给ReceiverSupervisorImpl,比如KafkaReceiver
class KafkaReceiver(....) extends Receiver[(K, V)](storageLevel) with Logging {

  def onStart() {

  
    //去哪儿接收数据
    // Kafka connection properties
    // Create the connection to the cluster

    //接收到的数据怎么解析
    val keyDecoder = ...
    val valueDecoder = ...


    //线程池接收数据
    val executorPool = ...
    topicMessageStreams.values.foreach { streams =>
        streams.foreach { stream => executorPool.submit(new MessageHandler(stream)) }
  }

  // 处理接收到的数据,store!!!这里会调用supervisor.pushSingle!!!!
  private class MessageHandler(stream: KafkaStream[K, V])
    extends Runnable {
    def run() {
      val streamIterator = stream.iterator()
        while (streamIterator.hasNext()) {
          val msgAndMetadata = streamIterator.next()
          store((msgAndMetadata.key, msgAndMetadata.message))
        }
    }
  }
}

第二部分,那么数据接过来了,怎么存储呢?这里是ReceiverSupervisorImpl实现的,主要有三个方法:

//put类,会把一条条的数据交给BlockGenerator,汇聚成block
def pushSingle(data: Any) {
    defaultBlockGenerator.addData(data)
}


def pushAndReportBlock(
      receivedBlock: ReceivedBlock,
      metadataOption: Option[Any],
      blockIdOption: Option[StreamBlockId]
    ) {
    
    //存储block的具体逻辑
    val blockStoreResult = receivedBlockHandler.storeBlock(blockId, receivedBlock)
    
    //存储成功之后,发送新增的blockInfo到ReceiverTracker
    val blockInfo = ReceivedBlockInfo(streamId, numRecords, metadataOption, blockStoreResult)
    trackerEndpoint.askWithRetry[Boolean](AddBlock(blockInfo))
    
}

//把每个block通过blockManager存到内存/硬盘,同rdd逻辑一致
private val receivedBlockHandler: ReceivedBlockHandler = {
    if (WriteAheadLogUtils.enableReceiverLog(env.conf)) {
     //wal,重点!预写!!防丢数据
      new WriteAheadLogBasedBlockHandler(env.blockManager, env.serializerManager, receiver.streamId,
        receiver.storageLevel, env.conf, hadoopConf, checkpointDirOption.get)
    } else {
      new BlockManagerBasedBlockHandler(env.blockManager, receiver.storageLevel)
    }
  }

第三部分,数据怎么被用呢?数据被存储之后告知了ReceiverTracker,但是怎么用呢?

//ReceiverTracker自己是不管block的,它有一个成员receivedBlockTracker来处理!它是个老板!!!
private def addBlock(receivedBlockInfo: ReceivedBlockInfo): Boolean = {
    receivedBlockTracker.addBlock(receivedBlockInfo)
}


//注意⚠️定时器JobGenerate在定时提交job的时候会调用ReceiverTracker的allocateBlocksToBatch方法来把block和batch对应起来,可以看到block怎么被分配到batch这个过程是receivedBlockTracker处理的!!
def allocateBlocksToBatch(batchTime: Time): Unit = {
    if (receiverInputStreams.nonEmpty) {
      receivedBlockTracker.allocateBlocksToBatch(batchTime)
    }
  }

关于数据被存储之后,是怎么和rdd关联起来的,更多的内容在spark streaming源码分析之job、rdd、blocks之间是如何对应的?

上一篇下一篇

猜你喜欢

热点阅读