度量系统--Metrics

2018-03-29  本文已影响0人  炮灰向前冲啦

Spark的度量系统有以下几部分,也可以参照MetricsSystem类的注释部分

Source

Spark将度量数据来源抽象为Source接口。提供了ApplicationSource、MasterSource、WorkerSource、DAGSchedulerSource、StreamingSource、JvmSource等实现

private[spark] trait Source {
  def sourceName: String
  def metricRegistry: MetricRegistry
}

具体分析下MasterSource、WorkerSource、JvmSource输入源

private[spark] class MasterSource(val master: Master) extends Source {
  override val metricRegistry = new MetricRegistry()
  override val sourceName = "master"

  // Gauge for worker numbers in cluster
  metricRegistry.register(MetricRegistry.name("workers"), new Gauge[Int] {
    override def getValue: Int = master.workers.size
  })

  // Gauge for alive worker numbers in cluster
  metricRegistry.register(MetricRegistry.name("aliveWorkers"), new Gauge[Int]{
    override def getValue: Int = master.workers.count(_.state == WorkerState.ALIVE)
  })

  // Gauge for application numbers in cluster
  metricRegistry.register(MetricRegistry.name("apps"), new Gauge[Int] {
    override def getValue: Int = master.apps.size
  })

  // Gauge for waiting application numbers in cluster
  metricRegistry.register(MetricRegistry.name("waitingApps"), new Gauge[Int] {
    override def getValue: Int = master.apps.count(_.state == ApplicationState.WAITING)
  })
}
private[worker] class WorkerSource(val worker: Worker) extends Source {
  override val sourceName = "worker"
  override val metricRegistry = new MetricRegistry()

  metricRegistry.register(MetricRegistry.name("executors"), new Gauge[Int] {
    override def getValue: Int = worker.executors.size
  })

  // Gauge for cores used of this worker
  metricRegistry.register(MetricRegistry.name("coresUsed"), new Gauge[Int] {
    override def getValue: Int = worker.coresUsed
  })

  // Gauge for memory used of this worker
  metricRegistry.register(MetricRegistry.name("memUsed_MB"), new Gauge[Int] {
    override def getValue: Int = worker.memoryUsed
  })

  // Gauge for cores free of this worker
  metricRegistry.register(MetricRegistry.name("coresFree"), new Gauge[Int] {
    override def getValue: Int = worker.coresFree
  })

  // Gauge for memory free of this worker
  metricRegistry.register(MetricRegistry.name("memFree_MB"), new Gauge[Int] {
    override def getValue: Int = worker.memoryFree
  })
}

MetricRegistry的Gauge统计数据来源于Master、Worker对象的字段属性

JvmSource的MetricSet来源于metrics-jvm包的实现

private[spark] class JvmSource extends Source {
  override val sourceName = "jvm"
  override val metricRegistry = new MetricRegistry()

  metricRegistry.registerAll(new GarbageCollectorMetricSet)
  metricRegistry.registerAll(new MemoryUsageGaugeSet)
  metricRegistry.registerAll(
    new BufferPoolMetricSet(ManagementFactory.getPlatformMBeanServer))
}

Source用来创建MetricRegistry对象,并register需要统计的数据指标,指标来源于Instance实例对象属性

Sink

Spark将度量数据统计输出源抽象为Sink接口。提供了ConsoleSink、CsvSink、MetricsServlet、GraphiteSink、JmxSink、Slf4jSink等实现

private[spark] trait Sink {
  def start(): Unit
  def stop(): Unit
  def report(): Unit
}

具体分析下Slf4jSink实现

private[spark] class Slf4jSink(
    val property: Properties,
    val registry: MetricRegistry,
    securityMgr: SecurityManager)
  extends Sink {
  val SLF4J_DEFAULT_PERIOD = 10
  val SLF4J_DEFAULT_UNIT = "SECONDS"

  val SLF4J_KEY_PERIOD = "period"
  val SLF4J_KEY_UNIT = "unit"

  val pollPeriod = Option(property.getProperty(SLF4J_KEY_PERIOD)) match {
    case Some(s) => s.toInt
    case None => SLF4J_DEFAULT_PERIOD
  }

  val pollUnit: TimeUnit = Option(property.getProperty(SLF4J_KEY_UNIT)) match {
    case Some(s) => TimeUnit.valueOf(s.toUpperCase(Locale.ROOT))
    case None => TimeUnit.valueOf(SLF4J_DEFAULT_UNIT)
  }
  // 检查scheduleAtFixedRate周期时间最短1s
  MetricsSystem.checkMinimalPollingPeriod(pollUnit, pollPeriod)

  val reporter: Slf4jReporter = Slf4jReporter.forRegistry(registry)
    .convertDurationsTo(TimeUnit.MILLISECONDS)
    .convertRatesTo(TimeUnit.SECONDS)
    .build()

  override def start() {
    reporter.start(pollPeriod, pollUnit)
  }

  override def stop() {
    reporter.stop()
  }

  override def report() {
    reporter.report()
  }
}

主要看start()方法,需要一个reporter对象,以及数据产生的周期时间pollPeriod、pollUnit。start再调用ScheduledReporter.start()

public void start(long period, TimeUnit unit) {
    // executor对象是Executors.newSingleThreadScheduledExecutor实现
    executor.scheduleAtFixedRate(new Runnable() {
        @Override
        public void run() {
            try {
                report();
            } catch (Exception ex) {
                LOG.error("Exception thrown from {}#report. Exception was suppressed.", ScheduledReporter.this.getClass().getSimpleName(), ex);
            }
        }
    }, period, period, unit);
}

Sink需要创建reporter、pollPeriod、pollUnit。周期性获取Source数据并reporter

MetricsConfig

读取Metrics相关的配置信息

private[spark] class MetricsConfig(conf: SparkConf) extends Logging {

  private val DEFAULT_PREFIX = "*"
  private val INSTANCE_REGEX = "^(\\*|[a-zA-Z]+)\\.(.+)".r
  private val DEFAULT_METRICS_CONF_FILENAME = "metrics.properties"

  private[metrics] val properties = new Properties()
  private[metrics] var perInstanceSubProperties: mutable.HashMap[String, Properties] = null
  
  // 设置default Properties属性
  private def setDefaultProperties(prop: Properties) {
    prop.setProperty("*.sink.servlet.class", "org.apache.spark.metrics.sink.MetricsServlet")
    prop.setProperty("*.sink.servlet.path", "/metrics/json")
    prop.setProperty("master.sink.servlet.path", "/metrics/master/json")
    prop.setProperty("applications.sink.servlet.path", "/metrics/applications/json")
  }

  /**
   * Load properties from various places, based on precedence
   * If the same property is set again latter on in the method, it overwrites the previous value
   */
  // 入口方法,加载配置信息
  def initialize() {
    // Add default properties in case there's no properties file
    setDefaultProperties(properties)

    loadPropertiesFromFile(conf.getOption("spark.metrics.conf"))

    // Also look for the properties in provided Spark configuration
    val prefix = "spark.metrics.conf."
    conf.getAll.foreach {
      case (k, v) if k.startsWith(prefix) =>
        properties.setProperty(k.substring(prefix.length()), v)
      case _ =>
    }

    // Now, let's populate a list of sub-properties per instance, instance being the prefix that
    // appears before the first dot in the property name.
    // Add to the sub-properties per instance, the default properties (those with prefix "*"), if
    // they don't have that exact same sub-property already defined.
    //
    // For example, if properties has ("*.class"->"default_class", "*.path"->"default_path",
    // "driver.path"->"driver_path"), for driver specific sub-properties, we'd like the output to be
    // ("driver"->Map("path"->"driver_path", "class"->"default_class")
    // Note how class got added to based on the default property, but path remained the same
    // since "driver.path" already existed and took precedence over "*.path"
    perInstanceSubProperties = subProperties(properties, INSTANCE_REGEX)
    if (perInstanceSubProperties.contains(DEFAULT_PREFIX)) {
      val defaultSubProperties = perInstanceSubProperties(DEFAULT_PREFIX).asScala
      for ((instance, prop) <- perInstanceSubProperties if (instance != DEFAULT_PREFIX);
           (k, v) <- defaultSubProperties if (prop.get(k) == null)) {
        prop.put(k, v)
      }
    }
  }

  /**
   * Take a simple set of properties and a regex that the instance names (part before the first dot)
   * have to conform to. And, return a map of the first order prefix (before the first dot) to the
   * sub-properties under that prefix.
   *
   * For example, if the properties sent were Properties("*.sink.servlet.class"->"class1",
   * "*.sink.servlet.path"->"path1"), the returned map would be
   * Map("*" -> Properties("sink.servlet.class" -> "class1", "sink.servlet.path" -> "path1"))
   * Note in the subProperties (value of the returned Map), only the suffixes are used as property
   * keys.
   * If, in the passed properties, there is only one property with a given prefix, it is still
   * "unflattened". For example, if the input was Properties("*.sink.servlet.class" -> "class1"
   * the returned Map would contain one key-value pair
   * Map("*" -> Properties("sink.servlet.class" -> "class1"))
   * Any passed in properties, not complying with the regex are ignored.
   *
   * @param prop the flat list of properties to "unflatten" based on prefixes
   * @param regex the regex that the prefix has to comply with
   * @return an unflatted map, mapping prefix with sub-properties under that prefix
   */
   // 参考下面图片示例
  def subProperties(prop: Properties, regex: Regex): mutable.HashMap[String, Properties] = {
    val subProperties = new mutable.HashMap[String, Properties]
    prop.asScala.foreach { kv =>
      if (regex.findPrefixOf(kv._1.toString).isDefined) {
        val regex(prefix, suffix) = kv._1.toString
        subProperties.getOrElseUpdate(prefix, new Properties).setProperty(suffix, kv._2.toString)
      }
    }
    subProperties
  }

  // 当key不存在时,获取*对应的properties属性
  def getInstance(inst: String): Properties = {
    perInstanceSubProperties.get(inst) match {
      case Some(s) => s
      case None => perInstanceSubProperties.getOrElse(DEFAULT_PREFIX, new Properties)
    }
  }

  /**
   * Loads configuration from a config file. If no config file is provided, try to get file
   * in class path.
   */
  private[this] def loadPropertiesFromFile(path: Option[String]): Unit = {
    var is: InputStream = null
    try {
      is = path match {
        // 标准写法。path存在时FileInputStream读取;不存在读取项目中的metrics.properties文件,通过classloader加载: Utils.getSparkClassLoader.getResourceAsStream
        case Some(f) => new FileInputStream(f)
        case None => Utils.getSparkClassLoader.getResourceAsStream(DEFAULT_METRICS_CONF_FILENAME)
      }

      if (is != null) {
        // load进properties里
        properties.load(is)
      }
    } catch {
      case e: Exception =>
        val file = path.getOrElse(DEFAULT_METRICS_CONF_FILENAME)
        logError(s"Error loading configuration file $file", e)
    } finally {
      if (is != null) {
        // 切记必须close
        is.close()
      }
    }
  }
}
subprop.png

MetricsSystem

负责register Sources、Sinks,并start sinks。MetricsSystem不是系统的控制中心,而是每个instance一个MetricsSystem对象,负责instance粒度的控制

MetricsSystem类三个核心方法: registerSources()、registerSinks()、sinks.foreach(_.start)

private[spark] class MetricsSystem private (
    val instance: String,
    conf: SparkConf,
    securityMgr: SecurityManager)
  extends Logging {
  // 构造MetricsConfig对象,用于读取配置信息
  private[this] val metricsConfig = new MetricsConfig(conf)

  private val sinks = new mutable.ArrayBuffer[Sink]
  private val sources = new mutable.ArrayBuffer[Source]
  private val registry = new MetricRegistry()

  private var running: Boolean = false

  // Treat MetricsServlet as a special sink as it should be exposed to add handlers to web ui
  private var metricsServlet: Option[MetricsServlet] = None

  /**
   * Get any UI handlers used by this metrics system; can only be called after start().
   */
  def getServletHandlers: Array[ServletContextHandler] = {
    require(running, "Can only call getServletHandlers on a running MetricsSystem")
    metricsServlet.map(_.getHandlers(conf)).getOrElse(Array())
  }
  // MetricsConfig对象初始化
  metricsConfig.initialize()

  def start() {
    require(!running, "Attempting to start a MetricsSystem that is already running")
    running = true
    // 注册StaticSources,也就是CodegenMetrics、HiveCatalogMetrics
    StaticSources.allSources.foreach(registerSource)
    // 注册Sources
    registerSources()
    // 获取Sinks
    registerSinks()
    // 启动Sinks
    sinks.foreach(_.start)
  }

  def stop() {
    if (running) {
      // foreach调用Sinks的stop方法
      sinks.foreach(_.stop)
    } else {
      logWarning("Stopping a MetricsSystem that is not running")
    }
    running = false
  }

  def report() {
    // foreach调用Sinks的report方法
    sinks.foreach(_.report())
  }

  /**
   * Build a name that uniquely identifies each metric source.
   * The name is structured as follows: <app ID>.<executor ID (or "driver")>.<source name>.
   * If either ID is not available, this defaults to just using <source name>.
   *
   * @param source Metric source to be named by this method.
   * @return An unique metric name for each combination of
   *         application, executor/driver and metric source.
   */
  // 构建registry name
  private[spark] def buildRegistryName(source: Source): String = {
    val metricsNamespace = conf.get(METRICS_NAMESPACE).orElse(conf.getOption("spark.app.id"))

    val executorId = conf.getOption("spark.executor.id")
    val defaultName = MetricRegistry.name(source.sourceName)

    if (instance == "driver" || instance == "executor") {
      if (metricsNamespace.isDefined && executorId.isDefined) {
        // 当instance是driver或executor时,name的元素构成
        // {{conf.getOption("spark.app.id")}}.{{conf.getOption("spark.executor.id")}}.{{source.sourceName}}
        MetricRegistry.name(metricsNamespace.get, executorId.get, source.sourceName)
      } else {
        // Only Driver and Executor set spark.app.id and spark.executor.id.
        // Other instance types, e.g. Master and Worker, are not related to a specific application.
        if (metricsNamespace.isEmpty) {
          logWarning(s"Using default name $defaultName for source because neither " +
            s"${METRICS_NAMESPACE.key} nor spark.app.id is set.")
        }
        if (executorId.isEmpty) {
          logWarning(s"Using default name $defaultName for source because spark.executor.id is " +
            s"not set.")
        }
        defaultName
      }
    } else { defaultName }
  }

  def getSourcesByName(sourceName: String): Seq[Source] =
    sources.filter(_.sourceName == sourceName)

  // 注册单个source
  def registerSource(source: Source) {
    sources += source
    try {
      val regName = buildRegistryName(source)
      registry.register(regName, source.metricRegistry)
    } catch {
      case e: IllegalArgumentException => logInfo("Metrics already registered", e)
    }
  }
  
  // 删除source
  def removeSource(source: Source) {
    sources -= source
    val regName = buildRegistryName(source)
    registry.removeMatching(new MetricFilter {
      def matches(name: String, metric: Metric): Boolean = name.startsWith(regName)
    })
  }

  // 注册所有以source开头的数据源
  private def registerSources() {
    val instConfig = metricsConfig.getInstance(instance)
    // MetricsSystem.SOURCE_REGEX: "^source\\.(.+)\\.(.+)".r
    val sourceConfigs = metricsConfig.subProperties(instConfig, MetricsSystem.SOURCE_REGEX)

    // Register all the sources related to instance
    sourceConfigs.foreach { kv =>
      val classPath = kv._2.getProperty("class")
      try {
        // 反射对象。这里只能反射无参数的Source对象,比如JvmSource
        val source = Utils.classForName(classPath).newInstance()
        registerSource(source.asInstanceOf[Source])
      } catch {
        case e: Exception => logError("Source class " + classPath + " cannot be instantiated", e)
      }
    }
  }

  // 获取以sink开头的Sinks
  private def registerSinks() {
    val instConfig = metricsConfig.getInstance(instance)
    // 以sink开头的属性配置: "^sink\\.(.+)\\.(.+)".r
    val sinkConfigs = metricsConfig.subProperties(instConfig, MetricsSystem.SINK_REGEX)

    sinkConfigs.foreach { kv =>
      val classPath = kv._2.getProperty("class")
      if (null != classPath) {
        try {
          // 传入构造函数参数值创建sink对象: kv._2, registry, securityMgr
          val sink = Utils.classForName(classPath)
            .getConstructor(classOf[Properties], classOf[MetricRegistry], classOf[SecurityManager])
            .newInstance(kv._2, registry, securityMgr)
          if (kv._1 == "servlet") {
            // key是servlet时,转换成MetricsServlet对象
            metricsServlet = Some(sink.asInstanceOf[MetricsServlet])
          } else {
            // 否则添加到sinks列表
            sinks += sink.asInstanceOf[Sink]
          }
        } catch {
          case e: Exception =>
            logError("Sink class " + classPath + " cannot be instantiated")
            throw e
        }
      }
    }
  }
}

总结

先看下metrics.properties.template模板

*.sink.jmx.class=org.apache.spark.metrics.sink.JmxSink

*.sink.console.class=org.apache.spark.metrics.sink.ConsoleSink

*.sink.statsd.class=org.apache.spark.metrics.sink.StatsdSink
*.sink.statsd.prefix=spark

*.sink.console.period=10
*.sink.console.unit=seconds

master.sink.console.period=15
master.sink.console.unit=seconds

*.sink.csv.class=org.apache.spark.metrics.sink.CsvSink

*.sink.csv.period=1
*.sink.csv.unit=minutes

*.sink.csv.directory=/tmp/

worker.sink.csv.period=10
worker.sink.csv.unit=minutes

*.sink.slf4j.class=org.apache.spark.metrics.sink.Slf4jSink

*.sink.slf4j.period=1
*.sink.slf4j.unit=minutes

master.source.jvm.class=org.apache.spark.metrics.source.JvmSource

worker.source.jvm.class=org.apache.spark.metrics.source.JvmSource

driver.source.jvm.class=org.apache.spark.metrics.source.JvmSource

executor.source.jvm.class=org.apache.spark.metrics.source.JvmSource
  1. 先读取properties配置信息
  2. 根据instance name,获取${name}开头的,不存在时读取*开头的属性值,生成instConfig对象
  3. instConfig基础上再分别获取source或sink开头的sourceConfigs对象
  4. sourceConfigs获取source class、sink class反射对象。source反射时调用默认的无参构造函数,只能反射比如JvmSource,对于MasterSource需要在Master类里new出来;sink反射时传入了构造函数参数值,参数也是从prop里读取的
  5. register sources、sinks start
  6. MetricsSystem负责source、sink的启停,而每个instance单独启停自身的metrics
上一篇下一篇

猜你喜欢

热点阅读