Spark中,为何在Driver中监听RabbitMQ队列,这个
2018-11-01 本文已影响22人
AlstonWilliams
在Driver中,我们有这么一段代码:
val rabbitMQAccessor = realtimeLabelCompute.createRabbitMQAccessor()
while (true) {
val message = rabbitMQAccessor.poll()
realtimeLabelCompute.processMessage(message)
rabbitMQAccessor.ack()
}
rabbitMQAccessor.close()
很诡异的是,我们发现,当我们用yarn -kill
命令kill掉它的时候,它依然不会停止,而是会继续监听这个队列。只不过SparkContext
确实被关掉了。
我们本来是这样子猜测的,SparkContext
启动时,会启动一个组件,在一个单独的线程中,当它接收到ApplicationMaster的kill消息时,就kill掉Driver线程。然而,由于Driver线程,在rabbitMQAccessor.poll()
这里,会有wait()
操作,所以没有被kill掉。
这种想法实在是无厘头。一个是,子线程怎么会干掉主线程。另一个是,如果能干掉,跟主线程是否wait()
有什么关系。
好在,在Mastering Apache Spark中找到了答案:
就是说,在YARN这种模式中,当ApplicationMaster运行时,它会把用户代码放到一个单独的线程来运行。然后用join方法,等待这个线程的结束。
而我们的用户代码里面,包含了一个while
循环,而且还有wait()
,所以基本上不可能结束。这就导致Driver也不会结束。
那为什么即使yarn -kill,它都停止不了呢?个人猜测是,即使接收到kill
命令,它也不会用System.exit()
这种强制退出的方式。所以,用户代码线程就高枕无忧,依然在运行,导致Driver停不了。