获取yarn资源管理器中application (job)的执行

2019-04-16  本文已影响0人  alexlee666

在实际应用中经常需要去获取yarn中application的执行状态(比如某个application是submitted还是running或者finished),通常可以通过如下三种方式实现:

第一种方式是最常见的;第二种方式由于是单点,可能会存在单点故障,可靠性不高;第三种不常用但是可靠性高,本文讲述如何通过第三种方式来获取yarn 资源管理器中的application的执行状态。

1. application执行状态

hadoop-yarn模块中在枚举类org.apache.hadoop.yarn.api.records.YarnApplicationState中定义了被提交的application的执行状态,如下:

public enum YarnApplicationState {
  /** Application which was just created. */
  NEW,

  /** Application which is being saved. */
  NEW_SAVING,

  /** Application which has been submitted. */
  SUBMITTED,

  /** Application has been accepted by the scheduler */
  ACCEPTED,

  /** Application which is currently running. */
  RUNNING,

  /** Application which finished successfully. */
  FINISHED,

  /** Application which failed. */
  FAILED,

  /** Application which was terminated by a user or admin. */
  KILLED
}

此外还在同一个package下定义了枚举类FinalApplicationStatus来表示某个application的最终执行状态,状态包括:

public enum FinalApplicationStatus {

  /** Undefined state when either the application has not yet finished */
  UNDEFINED,

  /** Application which finished successfully. */
  SUCCEEDED,

  /** Application which failed. */
  FAILED,

  /** Application which was terminated by a user or admin. */
  KILLED
}
2. 业务代码(scala语言编写)

使用maven管理依赖,pom.xml中添加依赖

......
    <properties>
        <hadoop.version>2.6.0-cdh5.8.0</hadoop.version>
    </properties>

    <dependencies>
         <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>${hadoop.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>${hadoop.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-yarn-api</artifactId>
            <version>${hadoop.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-yarn-client</artifactId>
            <version>${hadoop.version}</version>
        </dependency>
    </dependencies>
......

scala代码


import java.io.IOException

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.yarn.api.records.YarnApplicationState
import org.apache.hadoop.yarn.client.api.YarnClient
import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.hadoop.yarn.exceptions.YarnException

object ReadSparkJobStatus{

  def main(args: Array[String]): Unit = {

    val yarnConf = new YarnConfiguration
    // YarnConfiguration继承自Configuration
    val yarnClient = YarnClient.createYarnClient
    yarnClient.init(yarnConf)
    yarnClient.start()
    try {
      val applications = yarnClient.getApplications(java.util.EnumSet.of(YarnApplicationState.RUNNING))
      // 获取running状态的applications
      if (applications.size > 0) {
        for(i <- 0 until applications.size()){
          // 此处的applicationid、name、queue即yarn web界面application的信息,如图所示
          println("ApplicationId ============> " +applications.get(i).getApplicationId)
          println("name ============> " + applications.get(i).getName)
          println("queue ============> " + applications.get(i).getQueue)
          println("user ============> " + applications.get(i).getUser)
        }
      }
      else println(">>>>>> no target applications found on yarn.")
    } catch {
      case e: YarnException =>
        e.printStackTrace()
      case e: IOException =>
        e.printStackTrace()
    }
    yarnClient.stop()
  }
}

yarn web界面application的信息

application的执行状态示例
3. 运行结果
ApplicationId ============> application_1553758691889_1111
name ============> Spark shell
queue ============> root.users.user1
user ============> datalake
ApplicationId ============> application_1553758691889_1101
name ============> Spark shell
queue ============> root.users.user2
user ============> datalake
ApplicationId ============> application_1553758691889_1103
name ============> aps_sparkstreaming
queue ============> root.users.aps
user ============> aps

上一篇 下一篇

猜你喜欢

热点阅读