Flink 指标(一)
Flink 自带一个度量系统,允许收集和公开指标到外部系统。
注册指标
可以通过继承 RichFunction
,在继承类里面调用 getRuntimeContext().getMetricGroup()
来访问 Flink 的指标系统,这个方法返回一个 MetricGroup
对象,可以通过这个对象创建和注册新的度量指标。
度量类型
支持 Counters
、Gauges
、Histograms
和 Meters
这四个类型的度量值。
Counter(计数器)
Counter
用于计数。可以使用 inc()
/inc(long n)
或 dec()
/dec(long n)
更新(增加或减少)计数器。可以通过调用 MetricGroup
的 counter(String name)
方法来创建和注册 Counter
类型的度量值。
class MyMapper extends RichMapFunction[String,String] {
@transient private var counter: Counter = _
override def open(parameters: Configuration): Unit = {
// 使用默认 Counter 实现
counter = getRuntimeContext()
.getMetricGroup()
.counter("myCounter")
}
override def map(value: String): String = {
counter.inc()
value
}
}
也可以使用自己的 Counter
实现:
class MyMapper extends RichMapFunction[String,String] {
@transient private var counter: Counter = _
override def open(parameters: Configuration): Unit = {
// 使用自定义 Counter 实现
counter = getRuntimeContext()
.getMetricGroup()
.counter("myCustomCounter", new CustomCounter())
}
override def map(value: String): String = {
counter.inc()
value
}
}
Gauges(测量)
Gauge
根据需要可提供任何类型的值。首先需要创建一个实现了 org.apache.flink.metrics.Gauge
接口的类,这个类对返回值的类型没有限制。然后,通过调用 MetricGroup
的 gauge(String name, Gauge gauge)
方法创建和注册 Gauge
类型的度量指标。
new class MyMapper extends RichMapFunction[String,String] {
@transient private var valueToExpose = 0
override def open(parameters: Configuration): Unit = {
getRuntimeContext()
.getMetricGroup()
.gauge[Int, ScalaGauge[Int]]("MyGauge", ScalaGauge[Int]( () => valueToExpose ) )
}
override def map(value: String): String = {
valueToExpose += 1
value
}
}
报告会把导出的数据转换成 String
类型,所以返回的统计类型需要实现 toString()
方法。
Histograms(直方图)
Histogram
用来测量长期变化值的分布。可以用过调用 MetricGroup
的 histogram(String name, Histogram histogram)
方法创建和注册 Histogram
类型的度量指标。
class MyMapper extends RichMapFunction[Long,Long] {
@transient private var histogram: Histogram = _
override def open(parameters: Configuration): Unit = {
histogram = getRuntimeContext()
.getMetricGroup()
.histogram("myHistogram", new MyHistogram())
}
override def map(value: Long): Long = {
histogram.update(value)
value
}
}
Flink 没有提供默认 Histogram
实现 ,但提供了一个允许使用 Codahale / DropWizard 直方图的包装类(Wrapper),添加以下依赖项:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-metrics-dropwizard</artifactId>
<version>1.6.1</version>
</dependency>
代码如下:
class MyMapper extends RichMapFunction[Long, Long] {
@transient private var histogram: Histogram = _
override def open(config: Configuration): Unit = {
com.codahale.metrics.Histogram dropwizardHistogram =
new com.codahale.metrics.Histogram(new SlidingWindowReservoir(500))
histogram = getRuntimeContext()
.getMetricGroup()
.histogram("myHistogram", new DropwizardHistogramWrapper(dropwizardHistogram))
}
override def map(value: Long): Long = {
histogram.update(value)
value
}
}
Meters(仪表)
Meter
用来衡量平均吞吐量。可以通过 markEvent()
方法用来注册事件的发生。可以通过 markEvent(long n)
方法注册多个事件同时发生。可以通过调用 MetricGroup
的 meter(String name, Meter meter)
方法用来注册 Meter
类型的指标。
class MyMapper extends RichMapFunction[Long,Long] {
@transient private var meter: Meter = _
override def open(config: Configuration): Unit = {
meter = getRuntimeContext()
.getMetricGroup()
.meter("myMeter", new MyMeter())
}
override def map(value: Long): Long = {
meter.markEvent()
value
}
}
Flink 提供了一个允许使用 Codahale / DropWizard 表的 Wrapper,添加以下依赖项:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-metrics-dropwizard</artifactId>
<version>1.6.1</version>
</dependency>
代码如下:
class MyMapper extends RichMapFunction[Long,Long] {
@transient private var meter: Meter = _
override def open(config: Configuration): Unit = {
com.codahale.metrics.Meter dropwizardMeter = new com.codahale.metrics.Meter()
meter = getRuntimeContext()
.getMetricGroup()
.meter("myMeter", new DropwizardMeterWrapper(dropwizardMeter))
}
override def map(value: Long): Long = {
meter.markEvent()
value
}
}
作用域(Scope)
为每个被报告的度量值分配一个标识符和一组键值对。标识符基于3个部分:
- 注册度量标准时的用户定义名称
- 可选的用户定义范围
- 系统提供的范围。
例如,如果A.B是系统作用域的,C.D是用户作用域的,E是度量值的名称。那么 A.B.C.D.E 就是这个度量值的标识符。
可以通过设置 conf/flink-conf.yaml
中的 metrics.scope.delimiterKeys
来配置要用于标识符的分隔符(默认值: .
) 。
用户作用域(User Scope)
用户范围可以通过调用 MetricGroup#addGroup(String name)
、MetricGroup#addGroup(int name)
或 Metric#addGroup(String key, String value)
来定义。这些方法会影响 MetricGroup#getMetricIdentifier
和 MetricGroup#getScopeComponents
的返回。
counter = getRuntimeContext()
.getMetricGroup()
.addGroup("MyMetrics")
.counter("myCounter")
counter = getRuntimeContext()
.getMetricGroup()
.addGroup("MyMetricsKey", "MyMetricsValue")
.counter("myCounter")
系统作用域(System Scope)
系统范围包含有关度量的上下文信息,例如:在哪个 Task 中注册或该 Task 属于哪个 Job。
可以通过设置 conf/flink-conf.yaml
中的以下键,来配置需要包含哪些上下文信息。这些键值的格式由常量(比如“taskmanager”)和变量(比如“<task_id>”)组成,其中变量会在运行时被替换掉:
-
metrics.scope.jm
默认值:<host>.jobmanager
应用于 Scope 为 JobManager 的所有指标。 -
metrics.scope.jm.job
默认值:<host>.jobmanager.<job_name>
应用于 Scope 为 JobManager 和作业的所有指标。 -
metrics.scope.tm
默认值:<host>.taskmanager.<tm_id>
应用于 Scope 为 TaskManager 的所有指标。 -
metrics.scope.tm.job
默认值:<host>.taskmanager.<tm_id>.<job_name>
应用于 Scope 为 TaskManager 和作业的所有指标。 -
metrics.scope.task
默认值:<host>.taskmanager.<tm_id>.<job_name>.<task_name>.<subtask_index>
应用于 Scope 为 Task 的所有指标。 -
metrics.scope.operator
默认值:<host>.taskmanager.<tm_id>.<job_name>.<operator_name>.<subtask_index>
应用于 Scope 为 Operator 的所有指标。
- 变量的数量或顺序没有限制。
- 变量区分大小写。
- 算子指标的默认作用域将产生类似于
localhost.taskmanager.1234.MyJob.MyOperator.0.MyMetric
的标识符 - 如果还想包含任务名称但省略 TaskManager 信息,则可以指定以下格式:
metrics.scope.operator: <host>.<job_name>.<task_name>.<operator_name>.<subtask_index>
将产生类似于 localhost.MyJob.MySource_->_MyOperator.MyOperator.0.MyMetric
的标识符
- 对于此格式字符串,如果同时多次运行同一作业,则可能发生标识符冲突,这可能导致度量标准数据不一致。因此,建议使用
<job_id>
或通过为作业和算子分配唯一名称来提供一定程度的唯一性的格式字符串。
所有变量列表
- JobManager:
<host>
- TaskManager:
<host>
,<tm_id>
- 作业:
<job_id>
,<作业名称>
- 任务:
<task_id>
,<task_name>
,<task_attempt_id>
,<task_attempt_num>
,<subtask_index>
- 算子:
<operator_id>
,<operator_name>
,<subtask_index>
对于 Batch API,<operator_id>
始终等于<task_id>
用户变量
用户变量可以通过调用 MetricGroup#addGroup(String key, String value)
来定义。会影响 MetricGroup#getMetricIdentifier
,MetricGroup#getScopeComponents
和 MetricGroup#getAllVariables()
返回。用户变量不能用于 Scope 定义中。
counter = getRuntimeContext()
.getMetricGroup()
.addGroup("MyMetricsKey", "MyMetricsValue")
.counter("myCounter")
Reference:
https://ci.apache.org/projects/flink/flink-docs-release-1.6/monitoring/metrics.html