SparkStreaming优雅关闭剖析

2020-10-20  本文已影响0人  Bloo_m

简介

在前面的文章中,总结了SparkStreaming入门级的文章,了解到SparkStreaming是一种微批处理的"实时"流技术,在实际场景中,当我们使用SparkStreaming开发好功能并通过测试之后部署到生产环境,那么之后就会7*24不间断执行的,除非出现异常退出。当然SparkStreaming提供了checkpoint和WAL机制能够保证我们的程序再次启动时候不会出现数据丢失的情况。但是需求并不是一成不变的,相信读者们都经历过需求不断迭代的情况,当我们需要迭代逻辑的时候,那么我们如何停止线上正在运行的程序呢?本文将为读者们详细介绍一些关于SparkStreaming优雅关闭的手段。接下来我们将针对以下几个问题进行展开讲解:

  1. 为什么需要优雅关闭?
  2. 什么时候触发关闭?
  3. 采用什么策略关闭?

1.为什么需要优雅关闭

基于前面提到的,当我们的场景需要保证数据准确,不允许数据丢失,那么这个时候我们就得考虑优雅关闭了。说到关闭,那么非优雅关闭就是通过kill -9 processId的方式或者yarn -kill applicationId的方式进行暴力关闭,为什么说这种方式是属于暴力关闭呢?由于Spark Streaming是基于micro-batch机制工作的,按照间隔时间生成RDD,如果在间隔期间执行了暴力关闭,那么就会导致这段时间的数据丢失,虽然提供了checkpoin机制,可以使程序启动的时候进行恢复,但是当出现程序发生变更的场景,必须要删除掉checkpoint,因此这里就会有丢失的风险。

因此我们需要优雅关闭,将剩余未处理的数据或者正在处理的数据能够全部执行完成后,这样才不会出现数据丢失的情况。

2.什么时候触发关闭

既然我们知道了需要优雅关闭,那么就需要知道什么会触发关闭,这样才能有针对性的策略实现优雅关闭。

首先我们先来了解一下整体流程:

  1. 首先StreamContext在做初始化的时候,会增加Shutdown hook方法 ,放入到一个钩子队列中,并设置优先级为51
  2. 当程序jvm退出时,会启动一个线程从钩子队列中按照优先级取出执行,然后就会执行Shutdown钩子方法
  3. 当执行Shutdown钩子方法时,首先会将receiver进行关闭,即不再接收数据
  4. 然后停止生成BatchRDD
  5. 等待task全部完成,停止Executor
  6. 最后释放所有资源,即整个关闭流程结束

接下来看源码的具体实现

StreamingContext.scala:调用start方法会调用ShutdownHookManager注册stopOnShutdown函数

def start(): Unit = synchronized {
    state match {
      case INITIALIZED =>
        startSite.set(DStream.getCreationSite())
        ......
        /**
         * StreamContext启动时会增加Shutdown钩子函数,优先级为51
         */
        shutdownHookRef = ShutdownHookManager.addShutdownHook(
          StreamingContext.SHUTDOWN_HOOK_PRIORITY)(() => stopOnShutdown())
       ....
      case ACTIVE =>
        logWarning("StreamingContext has already been started")
      case STOPPED =>
        throw new IllegalStateException("StreamingContext has already been stopped")
    }
  }

ShutdownHookManager.scala:在增加钩子函数的时候底层调用了SparkShutdownHookManager内部类

def addShutdownHook(priority: Int)(hook: () => Unit): AnyRef = {
    shutdownHooks.add(priority, hook)
} 
private lazy val shutdownHooks = {
    val manager = new SparkShutdownHookManager()
    manager.install()
    manager
  }

private [util] class SparkShutdownHookManager {
  def install(): Unit = {
    val hookTask = new Runnable() {
      override def run(): Unit = runAll()
    }
    org.apache.hadoop.util.ShutdownHookManager.get().addShutdownHook(
      hookTask, FileSystem.SHUTDOWN_HOOK_PRIORITY + 30)
  }

  /**
   * jvm退出的时候会开启一个线程按照优先级逐个调用钩子函数
   */
  def runAll(): Unit = {
    shuttingDown = true
    var nextHook: SparkShutdownHook = null
    while ({ nextHook = hooks.synchronized { hooks.poll() }; nextHook != null }) {
      Try(Utils.logUncaughtExceptions(nextHook.run()))
    }
  }

  def add(priority: Int, hook: () => Unit): AnyRef = {
    hooks.synchronized {
      if (shuttingDown) {
        throw new IllegalStateException("Shutdown hooks cannot be modified during shutdown.")
      }
      val hookRef = new SparkShutdownHook(priority, hook)
      hooks.add(hookRef)
      hookRef
    }
  }
}

private class SparkShutdownHook(private val priority: Int, hook: () => Unit)
  extends Comparable[SparkShutdownHook] {
  //这里真正调用注册的函数
  def run(): Unit = hook()
}

那么接下来看下真正执行关闭的逻辑,即StreamingContext#stopOnShutdown方法

 private def stopOnShutdown(): Unit = {
    val stopGracefully = conf.getBoolean("spark.streaming.stopGracefullyOnShutdown", false)
    stop(stopSparkContext = false, stopGracefully = stopGracefully)
  }
 def stop(stopSparkContext: Boolean, stopGracefully: Boolean): Unit = {
    synchronized {
      state match {
        case ACTIVE =>
          //调度相关的关闭
          Utils.tryLogNonFatalError {
            scheduler.stop(stopGracefully)
          }
         
          //监控
          Utils.tryLogNonFatalError {
            env.metricsSystem.removeSource(streamingSource)
          }
          
          //ui
          Utils.tryLogNonFatalError {
            uiTab.foreach(_.detach())
          }
          Utils.tryLogNonFatalError {
            unregisterProgressListener()
          }
          StreamingContext.setActiveContext(null)
          //设置状态为停止
          state = STOPPED
      }
    }
    if (shutdownHookRefToRemove != null) {
      ShutdownHookManager.removeShutdownHook(shutdownHookRefToRemove)
    }
     // a user might stop(stopSparkContext = false) and then call stop(stopSparkContext = true).
    if (stopSparkContext) sc.stop()
  }

可以看到这里有一个spark.streaming.stopGracefullyOnShutdown参数来传给底层的stop方法,即调用Jobscheduler#stop方法

JobScheduler#stop

 def stop(processAllReceivedData: Boolean): Unit = synchronized {
    //1.首先停止接收数据
    if (receiverTracker != null) {
      receiverTracker.stop(processAllReceivedData)
    }

    if (executorAllocationManager != null) {
      executorAllocationManager.foreach(_.stop())
    }

    //2.停止生成BatchRdd,处理剩余的数据
    jobGenerator.stop(processAllReceivedData)

    //3.停止Exectuor
    jobExecutor.shutdown()

    val terminated = if (processAllReceivedData) {
      jobExecutor.awaitTermination(1, TimeUnit.HOURS)  // just a very large period of time
    } else {
      jobExecutor.awaitTermination(2, TimeUnit.SECONDS)
    }
    if (!terminated) {
      jobExecutor.shutdownNow()
    }
  
    // Stop everything else
    listenerBus.stop()
    eventLoop.stop()
    eventLoop = null
    logInfo("Stopped JobScheduler")
  }

3.采用什么策略关闭?

3.1 配置策略

根据刚才梳理的触发关闭流程中,其实可以通过配置spark.streaming.stopGracefullyOnShutdown=true来实现优雅关闭,但是需要发送 SIGTERM 信号给driver端,这里有两种方案

方案一,具体步骤如下:

  1. 通过Spark UI找到driver所在节点。

  2. 登录driver节点,执行 ps -ef |grep java |grep ApplicationMaster命令找到对应的pid

  3. 执行**kill -SIGTERM ** 发送SIGTERM信号

  4. 当spark driver收到该信号时,在日志中会有以下信息

    ERROR yarn.ApplicationMaster: RECEIVED SIGNAL 15: SIGTERM
    INFO streaming.StreamingContext: Invoking stop(stopGracefully=true) from shutdown hook
    INFO streaming.StreamingContext: StreamingContext stopped successfully
    INFO spark.SparkContext: Invoking stop() from shutdown hook
    INFO spark.SparkContext: Successfully stopped SparkContext
    INFO util.ShutdownHookManager: Shutdown hook called
    

    注意:

    这里有一个坑,默认情况下在yarn模式下,spark.yarn.maxAppAttempts参数值和yarn.resourcemanager.am.max-attempts是同一个值,即为2。当通过Kill命令杀掉AM时,Yarn会自动重新启动一个AM,因此需要再发送一次Kill命令。当然也可以通过spark-submit命令提交的时候指定spark.yarn.maxAppAttempts=1这个配置参数;但这里也会有容灾风险,比如出现网络问题的时候,这里就无法自动重启了,程序就会以失败而告终。

方案二:通过yarn application -kill < applicationid >命令来kill掉job(不建议使用)

该命令会发送SIGTERM信号给container,同时也会立即发送 SIGKILL 命令。虽然可以通过yarn.nodemanager.sleep-delay-before-sigkill.ms参数来调整SIGTERM和SIGKILL之间的间隔,但是好像没什么作用。具体日志信息如下:

ERROR yarn.ApplicationMaster: RECEIVED SIGNAL 15: SIGTERM
INFO streaming.StreamingContext: Invoking stop(stopGracefully=true) from shutdown hook

3.2 标记策略

该种策略通过借助于三方系统来标记状态, 一种方法是将标记HDFS文件,如果标记文件存在,则调用scc.stop(true,true);或者是借助于redis的key是否存在等方式

val checkIntervalMillis = 60000
var isStopped = false

while (! isStopped) {
    isStopped = ssc.awaitTerminationOrTimeout(checkIntervalMillis)
    checkShutdownMarker
    if (!isStopped && stopFlag) {
        ssc.stop(true, true)
    }
}

def checkShutdownMarker = {
    if (!stopFlag) {
        val fs = FileSystem.get(new Configuration())
        stopFlag = fs.exists(new Path(shutdownMarker))
    }

3.3 服务策略

即提供一个restful服务,暴露出一个接口提供关闭功能。

def httpServer(port:Int,ssc:StreamingContext)={
    val server = new Server(port)
    val context = new ContextHandler()
    context.setContextPath("/shutdown")
    context.setHandler( new CloseStreamHandler(ssc) )
    server.setHandler(context)
    server.start()
}
class CloseStreamHandler(ssc:StreamingContext) extends AbstractHandler {
    override def handle(s: String, baseRequest: Request, req: HttpServletRequest, response: HttpServletResponse): Unit ={
      ssc.stop(true,true)
      response.setContentType("text/html; charset=utf-8");
      response.setStatus(HttpServletResponse.SC_OK);
      val out = response.getWriter();
      baseRequest.setHandled(true);
    }
  }
上一篇下一篇

猜你喜欢

热点阅读