spark streaming源码分析之JobScheduler
2019-11-17 本文已影响0人
cclucc
jobGenerator做了哪些事情呢?
持有一个定时器实例
private val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds,
longTime => eventLoop.post(GenerateJobs(new Time(longTime))), "JobGenerator")
这个定时器会依据batchDuration提交GenerateJobs消息,也就是说每隔一个batch生成一组job
有一个方法接收GenerateJobs消息,并且执行
/** Processes all events */
private def processEvent(event: JobGeneratorEvent) {
logDebug("Got event " + event)
event match {
case GenerateJobs(time) => generateJobs(time)//当前重点!!!
//下面是考点,后面会讲,重要的!!!
case ClearMetadata(time) => clearMetadata(time)
case DoCheckpoint(time, clearCheckpointDataLater) =>
doCheckpoint(time, clearCheckpointDataLater)
case ClearCheckpointData(time) => clearCheckpointData(time)
}
}
/** Generate jobs and perform checkpointing for the given `time`. */
private def generateJobs(time: Time) {
//....
Try {
jobScheduler.receiverTracker.allocateBlocksToBatch(time) // receiverTracker出现了!它来了!它把recevier接收到的数据block分配给具体的batch,上面讲啦!
graph.generateJobs(time) //DStreamGraph通过其持有outputstreams来
} match {
case Success(jobs) =>
val streamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo(time)
jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos))
case Failure(e) =>
do something
}
eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = false))
}
jobScheduler.receiverTracker.allocateBlocksToBatch(time)
graph.generateJobs(time)
在spark streaming源码分析之job、rdd、blocks之间是如何对应的?会详细解析这一段代码