spark streaming源码分析之JobScheduler

2019-11-17  本文已影响0人  cclucc

jobGenerator做了哪些事情呢?

持有一个定时器实例
private val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds,
    longTime => eventLoop.post(GenerateJobs(new Time(longTime))), "JobGenerator")

这个定时器会依据batchDuration提交GenerateJobs消息,也就是说每隔一个batch生成一组job

有一个方法接收GenerateJobs消息,并且执行
/** Processes all events */
  private def processEvent(event: JobGeneratorEvent) {
    logDebug("Got event " + event)
    event match {
      case GenerateJobs(time) => generateJobs(time)//当前重点!!!
      //下面是考点,后面会讲,重要的!!!
      case ClearMetadata(time) => clearMetadata(time)
      case DoCheckpoint(time, clearCheckpointDataLater) =>
        doCheckpoint(time, clearCheckpointDataLater)
      case ClearCheckpointData(time) => clearCheckpointData(time)
    }
  }
/** Generate jobs and perform checkpointing for the given `time`.  */
  private def generateJobs(time: Time) {
    //....
    Try {
      jobScheduler.receiverTracker.allocateBlocksToBatch(time) // receiverTracker出现了!它来了!它把recevier接收到的数据block分配给具体的batch,上面讲啦!
      graph.generateJobs(time) //DStreamGraph通过其持有outputstreams来
    } match {
      case Success(jobs) =>
        val streamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo(time)
        jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos))
      case Failure(e) =>
        do something
    }
    eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = false))
  }

jobScheduler.receiverTracker.allocateBlocksToBatch(time)
graph.generateJobs(time)
spark streaming源码分析之job、rdd、blocks之间是如何对应的?会详细解析这一段代码

上一篇下一篇

猜你喜欢

热点阅读