Spark Streaming Receiver启动过程分析
—————☼—————☼—————☼—————☼—————☼—————
Spark Streaming概述
Spark Streaming 初始化过程
Spark Streaming Receiver启动过程分析
Spark Streaming 数据准备阶段分析(Receiver方式)
Spark Streaming 数据计算阶段分析
SparkStreaming Backpressure分析
Spark Streaming Executor DynamicAllocation 机制分析
—————☼—————☼—————☼—————☼—————☼—————
Receiver是数据准备阶段的一个主要组件,其负载接入外部数据,其生命周期由ReceiverTracker负责管理。
Receiver的启动
1. Receiver抽取与Executor准备
“Spark Streaming 初始化过程”中提到 JobScheduler在启动时会创建和启动ReceiverTracker.
在ReceiverTracker创建时,其会从DStreamGraph中抽取出ReceiverInputStream,以便在启动Receiver时从中抽取出Receiver,然后一一启动。
private val receiverInputStreams = ssc.graph.getReceiverInputStreams()
在ReceiverTracker启动时,其主要做如下两件事:
- 创建ReceiverTrackerEndpoint,用于接收Receiver的信息
- 启动Receiver.
ReceiverTracker的Start方法如下所示:
/** Start the endpoint and receiver execution thread. */
def start(): Unit = synchronized {
if (isTrackerStarted) {
throw new SparkException("ReceiverTracker already started")
}
if (!receiverInputStreams.isEmpty) {
endpoint = ssc.env.rpcEnv.setupEndpoint(
"ReceiverTracker", new ReceiverTrackerEndpoint(ssc.env.rpcEnv))
if (!skipReceiverLaunch) launchReceivers()
logInfo("ReceiverTracker started")
trackerState = Started
}
}
其中 launchReceivers()方法用于启动Receiver, 其代码如下:
/**
* Get the receivers from the ReceiverInputDStreams, distributes them to the
* worker nodes as a parallel collection, and runs them.
*/
private def launchReceivers(): Unit = {
val receivers = receiverInputStreams.map { nis =>
val rcvr = nis.getReceiver()
rcvr.setReceiverId(nis.id)
rcvr
}
runDummySparkJob()
logInfo("Starting " + receivers.length + " receivers")
// 发送启动指令
endpoint.send(StartAllReceivers(receivers))
}
此方法的主要操作有:
- 从ReceiverInputStreams中抽取Receiver, 并将streamId做为Receiver的id.
- 执行runDummySparkJob,此方法是执行一个简单的SparkJob,目的是为确保应用申请的Executor的最小份额得以满足,最小份额由参数“spark.cores.max” 和 “spark.scheduler.minRegisteredResourcesRatio” 共同决定,默认为申请的所有Executor。当应用已获得的Executor数量小于最小份额时,Job将阻塞并等待Executor注册,直到满足其运行需要的最小限额。
runDummySparkJob的代码如下:
/**
* Run the dummy Spark job to ensure that all slaves have registered. This avoids all the
* receivers to be scheduled on the same node.
*
* TODO Should poll the executor number and wait for executors according to
* "spark.scheduler.minRegisteredResourcesRatio" and
* "spark.scheduler.maxRegisteredResourcesWaitingTime" rather than running a dummy job.
*/
private def runDummySparkJob(): Unit = {
if (!ssc.sparkContext.isLocal) {
ssc.sparkContext.makeRDD(1 to 50, 50).map(x => (x, 1)).reduceByKey(_ + _, 20).collect()
}
assert(getExecutors.nonEmpty)
}
程序逻辑非常简单,目的是使其在不消耗过多资源的情况下,可以保证在调度Recevier时,已有大量的Executor注册完成,从而使Recevier调度时尽量均匀的调度至不同的Executor 。
- 向ReceiverTrackerEndpoint发送启动所有executor指令(StartAllReceivers)
在ReceiverTrackerEndpoint收到StartAllReceivers指令后,其将
- 调度Receiver: 为Receiver设置执行位置信息
- 启动Receiver
其实现逻辑如下:
case StartAllReceivers(receivers) =>
val scheduledLocations = schedulingPolicy.scheduleReceivers(receivers, getExecutors)
for (receiver <- receivers) {
val executors = scheduledLocations(receiver.streamId)
updateReceiverScheduledExecutors(receiver.streamId, executors)
receiverPreferredLocations(receiver.streamId) = receiver.preferredLocation
startReceiver(receiver, executors)
}
2. Receiver 调度
Receiver调度工作由ReceiverSchedulingPolicy进行,对Receiver的调度工作主要可以分为如下两个阶段:
- 全局调度阶段
此阶段发生在首次调度Receiver时,此阶段会保证receivers尽量均匀的分散在Executors中。调度过程中会为每一个Receiver指定启动的位置信息(location) - 局部调度阶段
此阶段发生在Receiver重启时,仅需启动失败Receiver
全局调度阶段是必然会发生的,因此将以这种情况为例对Receiver调度进行详细说明。其调度过程如下:
- 获取所有executor的主要地址信息
- 创建numReceiversOnExecutor用于记录每个Executor分配的Receiver数目
- 创建scheduledLocations用于记录用户指定偏好位置的Receiver
- 调度指定preferredLocation信息的Receiver. 遍历Receivers, 为用户指定的preferredLocation的主机中选择启动Receiver数 最少的Executor做为当前Receiver启动位置,并更新记录scheduledLocations 和numReceiversOnExecutor。
- 调度未指定preferredLocation信息的Receiver.
将Executor依照分配的Receiver数目从小到大排序,为Receiver分配一个Executor. - 若还有剩余Executor, 将这些Executor 加入到拥有最少候选对象的Receiver列表中。
至此, Receiver与与Executor的关联联系建立完毕。
调度的实现代码如下所示:
/**
* Try our best to schedule receivers with evenly distributed. However, if the
* `preferredLocation`s of receivers are not even, we may not be able to schedule them evenly
* because we have to respect them.
*
* Here is the approach to schedule executors:
* <ol>
* <li>First, schedule all the receivers with preferred locations (hosts), evenly among the
* executors running on those host.</li>
* <li>Then, schedule all other receivers evenly among all the executors such that overall
* distribution over all the receivers is even.</li>
* </ol>
*
* This method is called when we start to launch receivers at the first time.
*
* @return a map for receivers and their scheduled locations
*/
def scheduleReceivers(
receivers: Seq[Receiver[_]],
executors: Seq[ExecutorCacheTaskLocation]): Map[Int, Seq[TaskLocation]] = {
if (receivers.isEmpty) {
return Map.empty
}
if (executors.isEmpty) {
return receivers.map(_.streamId -> Seq.empty).toMap
}
val hostToExecutors = executors.groupBy(_.host)
val scheduledLocations = Array.fill(receivers.length)(new mutable.ArrayBuffer[TaskLocation])
val numReceiversOnExecutor = mutable.HashMap[ExecutorCacheTaskLocation, Int]()
// Set the initial value to 0
executors.foreach(e => numReceiversOnExecutor(e) = 0)
// Firstly, we need to respect "preferredLocation". So if a receiver has "preferredLocation",
// we need to make sure the "preferredLocation" is in the candidate scheduled executor list.
for (i <- 0 until receivers.length) {
// Note: preferredLocation is host but executors are host_executorId
receivers(i).preferredLocation.foreach { host =>
hostToExecutors.get(host) match {
case Some(executorsOnHost) =>
// preferredLocation is a known host. Select an executor that has the least receivers in
// this host
val leastScheduledExecutor =
executorsOnHost.minBy(executor => numReceiversOnExecutor(executor))
scheduledLocations(i) += leastScheduledExecutor
numReceiversOnExecutor(leastScheduledExecutor) =
numReceiversOnExecutor(leastScheduledExecutor) + 1
case None =>
// preferredLocation is an unknown host.
// Note: There are two cases:
// 1. This executor is not up. But it may be up later.
// 2. This executor is dead, or it's not a host in the cluster.
// Currently, simply add host to the scheduled executors.
// Note: host could be `HDFSCacheTaskLocation`, so use `TaskLocation.apply` to handle
// this case
scheduledLocations(i) += TaskLocation(host)
}
}
}
// For those receivers that don't have preferredLocation, make sure we assign at least one
// executor to them.
for (scheduledLocationsForOneReceiver <- scheduledLocations.filter(_.isEmpty)) {
// Select the executor that has the least receivers
val (leastScheduledExecutor, numReceivers) = numReceiversOnExecutor.minBy(_._2)
scheduledLocationsForOneReceiver += leastScheduledExecutor
numReceiversOnExecutor(leastScheduledExecutor) = numReceivers + 1
}
// Assign idle executors to receivers that have less executors
val idleExecutors = numReceiversOnExecutor.filter(_._2 == 0).map(_._1)
for (executor <- idleExecutors) {
// Assign an idle executor to the receiver that has least candidate executors.
val leastScheduledExecutors = scheduledLocations.minBy(_.size)
leastScheduledExecutors += executor
}
receivers.map(_.streamId).zip(scheduledLocations).toMap
}
此实现,存在一个问题,如果Receiver设置了preferredLocation且preferredLocation所对应的主机存在此应用的Executor的情况下,也不一定保证Receiver调度至此Executor.
3. Receiver 启动
在为Receiver设置完启动位置之后,将调用startReceiver方法启动Receiver, 启动过程如下:
- 依据preferredLocation将Receiver包装成RDD
// Create the RDD using the scheduledLocations to run the receiver in a Spark job
val receiverRDD: RDD[Receiver[_]] =
if (scheduledLocations.isEmpty) {
ssc.sc.makeRDD(Seq(receiver), 1)
} else {
val preferredLocations = scheduledLocations.map(_.toString).distinct
ssc.sc.makeRDD(Seq(receiver -> preferredLocations))
}
receiverRDD.setName(s"Receiver $receiverId")
- 以SparkJob的形式提交作业, Receiver作为Task 以线程方式执行
val future = ssc.sparkContext.submitJob[Receiver[_], Unit, Unit](
receiverRDD, startReceiverFunc, Seq(0), (_, _) => Unit, ())
// We will keep restarting the receiver job until ReceiverTracker is stopped
future.onComplete {
case Success(_) =>
if (!shouldStartReceiver) {
onReceiverJobFinish(receiverId)
} else {
logInfo(s"Restarting Receiver $receiverId")
self.send(RestartReceiver(receiver))
}
case Failure(e) =>
if (!shouldStartReceiver) {
onReceiverJobFinish(receiverId)
} else {
logError("Receiver has been stopped. Try to restart it.", e)
logInfo(s"Restarting Receiver $receiverId")
self.send(RestartReceiver(receiver))
}
}(ThreadUtils.sameThread)
- Task执行, 执行的startReceiverFunc方法,该方法会创建并启动ReceiverSupervisorImpl(Job及Task调度过程此处不再详细说明,同批处理)
// Function to start the receiver on the worker node
val startReceiverFunc: Iterator[Receiver[_]] => Unit =
(iterator: Iterator[Receiver[_]]) => {
if (!iterator.hasNext) {
throw new SparkException(
"Could not start receiver as object not found.")
}
if (TaskContext.get().attemptNumber() == 0) {
val receiver = iterator.next()
assert(iterator.hasNext == false)
val supervisor = new ReceiverSupervisorImpl(
receiver, SparkEnv.get, serializableHadoopConf.value, checkpointDirOption)
supervisor.start()
supervisor.awaitTermination()
} else {
// It's restarted by TaskScheduler, but we want to reschedule it again. So exit it.
}
}
其中ReceiverSupervisorImpl 提供了处理Receiver接收数据的所有必要的方法。并且它还创建了BlockGenerator,用于对Receiver接收的数据流进行切片操作。
其ReceiverSupervisorImpl的Start方法实现如下:
/** Start the supervisor */
def start() {
onStart()
startReceiver()
}
其中onStart() 会创建BlockGenerator并启动。
startReceiver()方法,首先会向ReceiverTracker注册Receiver信息,并验证Receiver是否合法。若合法,则调用Receiver的onStart方法进行数据接收,其实现逻辑如下:
/** Start receiver */
def startReceiver(): Unit = synchronized {
try {
if (onReceiverStart()) {
logInfo(s"Starting receiver $streamId")
receiverState = Started
receiver.onStart()
logInfo(s"Called receiver $streamId onStart")
} else {
// The driver refused us
stop("Registered unsuccessfully because Driver refused to start receiver " + streamId, None)
}
} catch {
case NonFatal(t) =>
stop("Error starting receiver " + streamId, Some(t))
}
}
下面以WordCount中的SocketInputDStream中的SocketReceiver为例进行说明,其onStart方法实现如下:
def onStart() {
logInfo(s"Connecting to $host:$port")
try {
socket = new Socket(host, port)
} catch {
case e: ConnectException =>
restart(s"Error connecting to $host:$port", e)
return
}
logInfo(s"Connected to $host:$port")
// Start the thread that receives data over a connection
new Thread("Socket Receiver") {
setDaemon(true)
override def run() { receive() }
}.start()
}
/** Create a socket connection and receive data until receiver is stopped */
def receive() {
try {
val iterator = bytesToObjects(socket.getInputStream())
while(!isStopped && iterator.hasNext) {
store(iterator.next())
}
if (!isStopped()) {
restart("Socket data stream had no more data")
} else {
logInfo("Stopped receiving")
}
} catch {
case NonFatal(e) =>
logWarning("Error receiving data", e)
restart("Error receiving data", e)
} finally {
onStop()
}
}
}
通过上述实现可知,其将通过socket方式进行数据接收。
Receiver启动流程至此结束,Receiver启动之后会接收源源不断的数据流并对数据分片,副本分发工作,为计算阶段做准备,接下来将进行数据准备环节的分析。