SparkStreaming优雅关闭剖析
简介
在前面的文章中,总结了SparkStreaming入门级的文章,了解到SparkStreaming是一种微批处理的"实时"流技术,在实际场景中,当我们使用SparkStreaming开发好功能并通过测试之后部署到生产环境,那么之后就会7*24不间断执行的,除非出现异常退出。当然SparkStreaming提供了checkpoint和WAL机制能够保证我们的程序再次启动时候不会出现数据丢失的情况。但是需求并不是一成不变的,相信读者们都经历过需求不断迭代的情况,当我们需要迭代逻辑的时候,那么我们如何停止线上正在运行的程序呢?本文将为读者们详细介绍一些关于SparkStreaming优雅关闭的手段。接下来我们将针对以下几个问题进行展开讲解:
- 为什么需要优雅关闭?
- 什么时候触发关闭?
- 采用什么策略关闭?
1.为什么需要优雅关闭
基于前面提到的,当我们的场景需要保证数据准确,不允许数据丢失,那么这个时候我们就得考虑优雅关闭了。说到关闭,那么非优雅关闭就是通过kill -9 processId的方式或者yarn -kill applicationId的方式进行暴力关闭,为什么说这种方式是属于暴力关闭呢?由于Spark Streaming是基于micro-batch机制工作的,按照间隔时间生成RDD,如果在间隔期间执行了暴力关闭,那么就会导致这段时间的数据丢失,虽然提供了checkpoin机制,可以使程序启动的时候进行恢复,但是当出现程序发生变更的场景,必须要删除掉checkpoint,因此这里就会有丢失的风险。
因此我们需要优雅关闭,将剩余未处理的数据或者正在处理的数据能够全部执行完成后,这样才不会出现数据丢失的情况。
2.什么时候触发关闭
既然我们知道了需要优雅关闭,那么就需要知道什么会触发关闭,这样才能有针对性的策略实现优雅关闭。
首先我们先来了解一下整体流程:
- 首先StreamContext在做初始化的时候,会增加Shutdown hook方法 ,放入到一个钩子队列中,并设置优先级为51
- 当程序jvm退出时,会启动一个线程从钩子队列中按照优先级取出执行,然后就会执行Shutdown钩子方法
- 当执行Shutdown钩子方法时,首先会将receiver进行关闭,即不再接收数据
- 然后停止生成BatchRDD
- 等待task全部完成,停止Executor
- 最后释放所有资源,即整个关闭流程结束
接下来看源码的具体实现
StreamingContext.scala:调用start方法会调用ShutdownHookManager注册stopOnShutdown函数
def start(): Unit = synchronized { state match { case INITIALIZED => startSite.set(DStream.getCreationSite()) ...... /** * StreamContext启动时会增加Shutdown钩子函数,优先级为51 */ shutdownHookRef = ShutdownHookManager.addShutdownHook( StreamingContext.SHUTDOWN_HOOK_PRIORITY)(() => stopOnShutdown()) .... case ACTIVE => logWarning("StreamingContext has already been started") case STOPPED => throw new IllegalStateException("StreamingContext has already been stopped") } }
ShutdownHookManager.scala:在增加钩子函数的时候底层调用了SparkShutdownHookManager内部类
def addShutdownHook(priority: Int)(hook: () => Unit): AnyRef = { shutdownHooks.add(priority, hook) } private lazy val shutdownHooks = { val manager = new SparkShutdownHookManager() manager.install() manager } private [util] class SparkShutdownHookManager { def install(): Unit = { val hookTask = new Runnable() { override def run(): Unit = runAll() } org.apache.hadoop.util.ShutdownHookManager.get().addShutdownHook( hookTask, FileSystem.SHUTDOWN_HOOK_PRIORITY + 30) } /** * jvm退出的时候会开启一个线程按照优先级逐个调用钩子函数 */ def runAll(): Unit = { shuttingDown = true var nextHook: SparkShutdownHook = null while ({ nextHook = hooks.synchronized { hooks.poll() }; nextHook != null }) { Try(Utils.logUncaughtExceptions(nextHook.run())) } } def add(priority: Int, hook: () => Unit): AnyRef = { hooks.synchronized { if (shuttingDown) { throw new IllegalStateException("Shutdown hooks cannot be modified during shutdown.") } val hookRef = new SparkShutdownHook(priority, hook) hooks.add(hookRef) hookRef } } } private class SparkShutdownHook(private val priority: Int, hook: () => Unit) extends Comparable[SparkShutdownHook] { //这里真正调用注册的函数 def run(): Unit = hook() }
那么接下来看下真正执行关闭的逻辑,即StreamingContext#stopOnShutdown方法
private def stopOnShutdown(): Unit = { val stopGracefully = conf.getBoolean("spark.streaming.stopGracefullyOnShutdown", false) stop(stopSparkContext = false, stopGracefully = stopGracefully) } def stop(stopSparkContext: Boolean, stopGracefully: Boolean): Unit = { synchronized { state match { case ACTIVE => //调度相关的关闭 Utils.tryLogNonFatalError { scheduler.stop(stopGracefully) } //监控 Utils.tryLogNonFatalError { env.metricsSystem.removeSource(streamingSource) } //ui Utils.tryLogNonFatalError { uiTab.foreach(_.detach()) } Utils.tryLogNonFatalError { unregisterProgressListener() } StreamingContext.setActiveContext(null) //设置状态为停止 state = STOPPED } } if (shutdownHookRefToRemove != null) { ShutdownHookManager.removeShutdownHook(shutdownHookRefToRemove) } // a user might stop(stopSparkContext = false) and then call stop(stopSparkContext = true). if (stopSparkContext) sc.stop() }
可以看到这里有一个spark.streaming.stopGracefullyOnShutdown参数来传给底层的stop方法,即调用Jobscheduler#stop方法
JobScheduler#stop
def stop(processAllReceivedData: Boolean): Unit = synchronized { //1.首先停止接收数据 if (receiverTracker != null) { receiverTracker.stop(processAllReceivedData) } if (executorAllocationManager != null) { executorAllocationManager.foreach(_.stop()) } //2.停止生成BatchRdd,处理剩余的数据 jobGenerator.stop(processAllReceivedData) //3.停止Exectuor jobExecutor.shutdown() val terminated = if (processAllReceivedData) { jobExecutor.awaitTermination(1, TimeUnit.HOURS) // just a very large period of time } else { jobExecutor.awaitTermination(2, TimeUnit.SECONDS) } if (!terminated) { jobExecutor.shutdownNow() } // Stop everything else listenerBus.stop() eventLoop.stop() eventLoop = null logInfo("Stopped JobScheduler") }
3.采用什么策略关闭?
3.1 配置策略
根据刚才梳理的触发关闭流程中,其实可以通过配置spark.streaming.stopGracefullyOnShutdown=true来实现优雅关闭,但是需要发送 SIGTERM 信号给driver端,这里有两种方案
方案一,具体步骤如下:
通过Spark UI找到driver所在节点。
登录driver节点,执行 ps -ef |grep java |grep ApplicationMaster命令找到对应的pid
执行**kill -SIGTERM ** 发送SIGTERM信号
当spark driver收到该信号时,在日志中会有以下信息
ERROR yarn.ApplicationMaster: RECEIVED SIGNAL 15: SIGTERM INFO streaming.StreamingContext: Invoking stop(stopGracefully=true) from shutdown hook INFO streaming.StreamingContext: StreamingContext stopped successfully INFO spark.SparkContext: Invoking stop() from shutdown hook INFO spark.SparkContext: Successfully stopped SparkContext INFO util.ShutdownHookManager: Shutdown hook called
注意:
这里有一个坑,默认情况下在yarn模式下,spark.yarn.maxAppAttempts参数值和yarn.resourcemanager.am.max-attempts是同一个值,即为2。当通过Kill命令杀掉AM时,Yarn会自动重新启动一个AM,因此需要再发送一次Kill命令。当然也可以通过spark-submit命令提交的时候指定spark.yarn.maxAppAttempts=1这个配置参数;但这里也会有容灾风险,比如出现网络问题的时候,这里就无法自动重启了,程序就会以失败而告终。
方案二:通过yarn application -kill < applicationid >命令来kill掉job(不建议使用)
该命令会发送SIGTERM信号给container,同时也会立即发送 SIGKILL 命令。虽然可以通过yarn.nodemanager.sleep-delay-before-sigkill.ms参数来调整SIGTERM和SIGKILL之间的间隔,但是好像没什么作用。具体日志信息如下:
ERROR yarn.ApplicationMaster: RECEIVED SIGNAL 15: SIGTERM INFO streaming.StreamingContext: Invoking stop(stopGracefully=true) from shutdown hook
3.2 标记策略
该种策略通过借助于三方系统来标记状态, 一种方法是将标记HDFS文件,如果标记文件存在,则调用scc.stop(true,true);或者是借助于redis的key是否存在等方式
val checkIntervalMillis = 60000
var isStopped = false
while (! isStopped) {
isStopped = ssc.awaitTerminationOrTimeout(checkIntervalMillis)
checkShutdownMarker
if (!isStopped && stopFlag) {
ssc.stop(true, true)
}
}
def checkShutdownMarker = {
if (!stopFlag) {
val fs = FileSystem.get(new Configuration())
stopFlag = fs.exists(new Path(shutdownMarker))
}
3.3 服务策略
即提供一个restful服务,暴露出一个接口提供关闭功能。
def httpServer(port:Int,ssc:StreamingContext)={
val server = new Server(port)
val context = new ContextHandler()
context.setContextPath("/shutdown")
context.setHandler( new CloseStreamHandler(ssc) )
server.setHandler(context)
server.start()
}
class CloseStreamHandler(ssc:StreamingContext) extends AbstractHandler {
override def handle(s: String, baseRequest: Request, req: HttpServletRequest, response: HttpServletResponse): Unit ={
ssc.stop(true,true)
response.setContentType("text/html; charset=utf-8");
response.setStatus(HttpServletResponse.SC_OK);
val out = response.getWriter();
baseRequest.setHandled(true);
}
}