Spark-streaming监控批次处理信息

2020-07-24  本文已影响0人  海轩_fan

实时任务监控原因

在实时任务执行的过程中,由于数据突然激增或网络阻塞等情况,使得任务数据堆积或失败等

解决办法

通过实现SparkListener和StreamingListener重写其中的相关方法,便能监控任务的执行情况,对症下药

class NeiStreamingListener(ssc:StreamingContext,delay:Long) extends SparkListener with StreamingListener{
  override def onBatchCompleted(batchStarted: StreamingListenerBatchCompleted): Unit = synchronized{
    //  获取任务名称
    val appName = ssc.sparkContext.appName
    if (batchStarted != null){
      val batchInfo = batchStarted.batchInfo
      // 接收批次数据量
      val numRecords = batchInfo.numRecords
      //   调度延迟时间
      val schedulingTime = batchInfo.schedulingDelay.getOrElse(0L)
      //   执行时间
      val processingTime = batchInfo.processingDelay.getOrElse(0L)
      //  总延迟时间
      val totalDelay = batchInfo.totalDelay.getOrElse(0L)
      if(delay != -1 && schedulingTime > delay ){
        val yarnManager = new YarnAppManager()
        yarnManager.getJobState()
        val sparMap = yarnManager.sparkMap()
        if (sparMap.containsKey(appName) ) {
          val appid: String = sparMap.get(appName).getApplicationId.toString
          //  可以在此处添加对应逻辑,告警邮件或短信等
        }
      }
    }
  }
}

以上代码为重写onBatchCompleted方法,获取实时任务中批次数据处理延迟时间,并通过yarn获取到任务的applicationId;

上一篇下一篇

猜你喜欢

热点阅读