Spark & FlinkFlink专题

Flink 指标(二)

2019-05-06  本文已影响5人  Alex90

报告(Reporter)

通过 conf/flink-conf.yaml 文件配置一个或多个 Reporters 来暴露度量值给外部系统,这些 Reporter 将在作业和任务启动的时候实例化。

所有的 Reporter 配置至少需要配置 class 属性,还有一些允许配置记录间隔。下面是一些 Reporter 的配置实例:

metrics.reporters: my_jmx_reporter,my_other_reporter

metrics.reporter.my_jmx_reporter.class: org.apache.flink.metrics.jmx.JMXReporter
metrics.reporter.my_jmx_reporter.port: 9020-9040

metrics.reporter.my_other_reporter.class: org.apache.flink.metrics.graphite.GraphiteReporter
metrics.reporter.my_other_reporter.host: 192.168.1.1
metrics.reporter.my_other_reporter.port: 10000

包含 Reporter 的 jar 必须放到 /lib 文件夹,这样 Flink 就可以访问到这些 jar。
可以通过继承 org.apache.flink.metrics.reporter.MetricReporter 接口来实现自己的 Reporter,如果需要定期发送记录,需要继承 Scheduled 接口。

下面是一些支持的 Reporter:

JMX(org.apache.flink.metrics.jmx.JMXReporter)

不需要添加额外的依赖就可以支持 JMX Reporter,默认是不激活的。

参数:

配置示例:

metrics.reporter.jmx.class: org.apache.flink.metrics.jmx.JMXReporter
metrics.reporter.jmx.port: 8789

通过 JMX 公开的度量由域(domain)和键属性列表(key-properties)标识,这些属性一起构成对象名。

域始终以 org.apache.flink 开头,后跟一个通用的度量标识符。与通常的标识符不同,它不受作用域格式的影响,不包含任何变量,并且在跨作业时也是常量。例子:org.apache.flink.job.task.numbytesout

键属性列表包含与给定指标关联的所有变量的值,无论配置的作用域格式如何。例子:host=localhost,job_name=myjob,task_name=mytask

因此,域标识一个度量类,键属性列表标识该度量的一个(或多个)实例。

Ganglia(org.apache.flink.metrics.ganglia.GangliaReporter)

要使用此 Reporter,必须复制 /opt/flink-metrics-ganglia-1.6.1-SNAPSHOT.jar 到 Flink 的 /lib 文件夹下。

参数:

配置示例:

metrics.reporter.gang.class: org.apache.flink.metrics.ganglia.GangliaReporter
metrics.reporter.gang.host: localhost
metrics.reporter.gang.port: 8649
metrics.reporter.gang.tmax: 60
metrics.reporter.gang.dmax: 0
metrics.reporter.gang.ttl: 1
metrics.reporter.gang.addressingMode: MULTICAST

Graphite(org.apache.flink.metrics.graphite.GraphiteReporter)

要使用此 Reporter,必须复制 /opt/flink-metrics-graphite-1.6.1-SNAPSHOT.jar 到 Flink 的 /lib 文件夹下。

参数:

配置示例:

metrics.reporter.grph.class: org.apache.flink.metrics.graphite.GraphiteReporter
metrics.reporter.grph.host: localhost
metrics.reporter.grph.port: 2003
metrics.reporter.grph.protocol: TCP

Prometheus (org.apache.flink.metrics.prometheus.PrometheusReporter)

要使用此 Reporter,必须复制 /opt/flink-metrics-prometheus-1.6.1-SNAPSHOT.jar 到 Flink 的 /lib 文件夹下。

参数:

配置示例:

metrics.reporter.prom.class: org.apache.flink.metrics.prometheus.PrometheusReporter

Flink 度量类型映射到 Prometheus 度量类型,如下所示:

Flink Prometheus Description
Counter Gauge Prometheus 计数器不能减
Gauge Gauge Prometheus 仅支持数字和布尔类型
Histogram Summary 分位数 .5,.75,.95,.98,.99 和 .999
Meter Gauge The gauge exports the meter’s rate

PrometheusPushGateway(org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporter)

要使用此 Reporter,必须复制 /opt/flink-metrics-prometheus-1.6.1-SNAPSHOT.jar 到 Flink 的 /lib 文件夹下。

参数:

默认值 描述
deleteOnShutdown true 指定是否在关闭时从 PushGateway 中删除指标。
Host (none) PushGateway 服务器主机。
jobName (none) 将推送指标的作业名称。
port -1 PushGateway 服务器端口。
randomJobNameSuffix true 指定是否应将随机后缀附加到作业名称。

配置示例:

metrics.reporter.promgateway.class: org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporter
metrics.reporter.promgateway.host: localhost
metrics.reporter.promgateway.port: 9091
metrics.reporter.promgateway.jobName: myJob
metrics.reporter.promgateway.randomJobNameSuffix: true
metrics.reporter.promgateway.deleteOnShutdown: false

PrometheusPushGatewayReporter 将指标推送到 Pushgateway,可由 Prometheus 抓取。

StatsD(org.apache.flink.metrics.statsd.StatsDReporter)

要使用此 Reporter,必须复制 /opt/flink-metrics-statsd-1.6.1-SNAPSHOT.jar 到 Flink 的 /lib 文件夹下。
参数:

配置示例:

metrics.reporter.stsd.class: org.apache.flink.metrics.statsd.StatsDReporter
metrics.reporter.stsd.host: localhost
metrics.reporter.stsd.port: 8125

Datadog(org.apache.flink.metrics.datadog.DatadogHttpReporter)

要使用此 Reporter,必须复制 /opt/flink-metrics-datadog-1.6.1-SNAPSHOT.jar 到 Flink 的 /lib 文件夹下。
Flink 指标,如任何变量 <host><job_name><tm_id><subtask_index><task_name><operator_name>,将被发送到 Datadog 作为标签。标签看起来像 host:localhostjob_name:myjobname

参数:

配置示例:

metrics.reporter.dghttp.class: org.apache.flink.metrics.datadog.DatadogHttpReporter
metrics.reporter.dghttp.apikey: xxx
metrics.reporter.dghttp.tags: myflinkapp,prod

Slf4j(org.apache.flink.metrics.slf4j.Slf4jReporter)

要使用此 Reporter,必须复制 /opt/flink-metrics-slf4j-1.6.1-SNAPSHOT.jar 到 Flink 的 /lib 文件夹下。

配置示例:

metrics.reporter.slf4j.class: org.apache.flink.metrics.slf4j.Slf4jReporter
metrics.reporter.slf4j.interval: 60 SECONDS

系统指标

Flink 默认会收集当前状态的指标,下文的表格中包括以下5列:

请注意,“infix” 和 “Metrics” 列中所有的点根据 “metrics.delimiter” 设置变化。

因此,为了推断指标的标识符:

  1. 先从“Scope”列获取范围格式。
  2. 如果“Infix”列有值的话,附加到范围格式后面,并根据“metrices.delimiter”设置附加相应的分隔符。
  3. 附加指标的名称。

CPU

Scope Infix Metrics Description Type
Job-/TaskManager Status.JVM.CPU Load JVM CPU使用情况。 Gauge
- - Time JVM CPU时间。 Gauge

Memory

Scope Infix Metrics Description Type
Job-/TaskManager Status.JVM.Memory Heap.Used 当前使用的堆内存量(bytes)。 Gauge
- - Heap.Committed 保证可供 JVM 使用的堆内存量(bytes)。 Gauge
- - Heap.Max 可用于内存管理的最大堆内存量(bytes)。 Gauge
- - NonHeap.Used 当前使用的非堆内存量(bytes)。 Gauge
- - NonHeap.Committed 保证 JVM 可用的非堆内存量(bytes)。 Gauge
- - NonHeap.Max 可用于内存管理的最大非堆内存量(bytes)。 Gauge
- - Direct.Count 直接缓冲池中的缓冲区数。 Gauge
- - Direct.MemoryUsed JVM 用于直接缓冲池的内存量(bytes)。 Gauge
- - Direct.TotalCapacity 直接缓冲池中所有缓冲区的总容量(bytes)。 Gauge
- - Mapped.Count 映射缓冲池中的缓冲区数。 Gauge
- - Mapped.MemoryUsed JVM 用于映射缓冲池的内存量(bytes)。 Gauge
- - Mapped.TotalCapacity 映射缓冲池中的缓冲区数(bytes)。 Gauge

Threads

Scope Infix Metrics Description Type
Job-/TaskManager Status.JVM.Threads Count 活动线程总数。 Gauge

GarbageCollection

Scope Infix Metrics Description Type
Job-/TaskManager Status.JVM.GarbageCollector <GarbageCollector>.Count 已发生的集合总数。 Gauge
- - <GarbageCollector>.Time 执行垃圾收集所花费的总时间。 Gauge

ClassLoader

Scope Infix Metrics Description Type
Job-/TaskManager Status.JVM.ClassLoader ClassesLoaded JVM 启动以来加载的类总数。 Gauge
- - ClassesUnloaded JVM 启动以来卸载的类总数。 Gauge

Network

Scope Infix Metrics Description Type
TaskManager Status.Network AvailableMemorySegments 未使用的内存段数。 Gauge
- - TotalMemorySegments 分配的内存段数。 Gauge
Task buffers inputQueueLength 排队的输入缓冲区数。 Gauge
- - outputQueueLength 排队输出缓冲区的数量。 Gauge
- - inPoolUsage 估计输入缓冲区的使用情况。 Gauge
- - outPoolUsage 估计输出缓冲区的使用情况。 Gauge
- Network.
<Input/Output>.
<gate>
totalQueueLen 所有输入/输出通道中排队缓冲区的总数。 Gauge
- - minQueueLen 所有输入/输出通道中的最小排队缓冲区数。 Gauge
- - maxQueueLen 所有输入/输出通道中的最大排队缓冲区数。 Gauge
- - avgQueueLen 所有输入/输出通道中的平均缓冲区数。 Gauge

Cluster

Scope Metrics Description Type
JobManager numRegisteredTaskManagers 注册 TaskManager 的数量。 Gauge
- numRunningJobs 正在运行的作业数量。 Gauge
- taskSlotsAvailable 可用任务槽的数量。 Gauge
- taskSlotsTotal 任务槽的总数。 Gauge

Availability

Scope Metrics Description Type
Job restartingTime 重新启动作业所花费的时间,或当前重新启动的持续时间(ms)。 Gauge
- uptime 作业运行的时间不间断。对于已完成的作业,返回-1(ms)。 Gauge
- downtime 对于当前处于故障/恢复状态的作业,在此中断期间经过的时间。对于正在运行的作业返回0,对于已完成的作业返回-1(ms)。 Gauge
- fullRestarts 自提交此作业以来完全重新启动的总次数。 Gauge

Checkpointing

Scope Metrics Description Type
Job lastCheckpointDuration 完成最后一个检查点所花费的时间(ms)。 Gauge
- lastCheckpointSize 最后一个检查点的总大小(bytes)。 Gauge
- lastCheckpointExternalPath 存储最后一个外部检查点的路径。 Gauge
- lastCheckpointRestoreTimestamp 在协调器上恢复最后一个检查点时的时间戳(ms)。 Gauge
- lastCheckpointAlignmentBuffered 在最后一个检查点的所有子任务上进行对齐期间的缓冲字节数(ms)。 Gauge
- numberOfInProgressCheckpoints 进行中检查点的数量。 Gauge
- numberOfCompletedCheckpoints 成功完成检查点的数量。 Gauge
- numberOfFailedCheckpoints 失败检查点的数量。 Gauge
- totalNumberOfCheckpoints 总检查点的数量(正在进行,已完成,失败)。 Gauge
Task checkpointAlignmentTime 最后一次屏障对齐完成所花费的时间(nanoseconds),或当前对齐到目前为止所用的时间(nanoseconds)。 Gauge

IO

Scope Metrics Description Type
Job <SOURCE_ID>.
<source_subtask_index>.
<operator_id>.
<operator_subtask_index>.
latency
从给定源子任务到算子子任务的延迟分布(ms)。 Histogram
Task numBytesInLocal 此任务从本地源读取的总字节数。 Counter
- numBytesInLocalPerSecond 此任务每秒从本地源读取的字节数。 Meter
- numBytesInRemote 此任务从远程源读取的总字节数。 Counter
- numBytesInRemotePerSecond 此任务每秒从远程源读取的字节数。 Meter
- numBuffersInLocal 此任务从本地源读取的网络缓冲区总数。 Counter
- numBuffersInLocalPerSecond 此任务每秒从本地源读取的网络缓冲区数。 Meter
- numBuffersInRemote 此任务从远程源读取的网络缓冲区总数。 Counter
- numBuffersInRemotePerSecond 此任务每秒从远程源读取的网络缓冲区数。 Meter
- numBytesOut 此任务已发出的总字节数。 Counter
- numBytesOutPerSecond 此任务每秒发出的字节数。 Meter
- numBuffersOut 此任务已发出的网络缓冲区总数。 Counter
- numBuffersOutPerSecond 此任务每秒发出的网络缓冲区数。 Meter
Task/Operator numRecordsIn 此算子/任务已收到的记录总数。 Counter
- numRecordsInPerSecond 此算子/任务每秒接收的记录数。 Meter
- numRecordsOut 此算子/任务已发出的记录总数。 Counter
- numRecordsOutPerSecond 此算子/任务每秒发送的记录数。 Meter
- numLateRecordsDropped 此算子/任务因迟到而丢失的记录数。 Counter
- currentInputWatermark 此算子/任务收到的最后一个水印(ms)。注意:对于具有2个输入的算子/任务,这是最后收到的水印的最小值。 Gauge
Operator currentInput1Watermark 此算子在其第一个输入(ms)中收到的最后一个水印。注意:仅适用于具有2个输入的算子。 Gauge
- currentInput2Watermark 此算子在其第二个输入中接收的最后一个水印(ms)。注意:仅适用于具有2个输入的算子。 Gauge
- currentOutputWatermark 此算子发出的最后一个水印(ms)。 Gauge
- numSplitsProcessed 此数据源已处理的InputSplits总数。 Gauge

Connectors

Kafka 连接器

Scope Metrics User Variables Description Type
Operator commitsSucceeded N / A 如果启用了偏移提交并且启用了检查点,则成功向 Kafka 提交的偏移提交总数。 Counter
- commitsFailed N / A 如果启用了偏移提交并且启用了检查点,则 Kafka 的偏移提交失败总数。请注意,将偏移量提交回 Kafka 只是暴露消费者进度的一种方法,因此提交失败不会影响 Flink 的检查点分区偏移的完整性。 Counter
- committedOffsets Topic,分区 对于每个分区,最后成功提交到 Kafka 的偏移量。可以通过主题名称和分区ID指定特定分区的度量标准。 Gauge
- currentOffsets Topic,分区 消费者对每个分区的当前读取偏移量。可以通过主题名称和分区ID指定特定分区的度量标准。 Gauge

Kinesis 连接器

Scope Metrics User Variables Description Type
Operator millisBehindLatest stream,shardId 对于每个 Kinesis 分片,消费者在流的头部后面的毫秒数,表示消费者当前时间落后多少。可以通过流名称和分片标识指定特定分片的度量标准。值为0表示记录处理被捕获,此时没有要处理的新记录。值-1表示该度量标准尚未报告。 Gauge
- sleepTimeMillis stream,shardId 消费者在从 Kinesis 获取记录之前花费的毫秒数。可以通过流名称和分片标识指定特定分片的度量标准。 Gauge
- maxNumberOfRecordsPerFetch stream,shardId 消费者在单个 getRecords 调用 Kinesis 时请求的最大记录数。 Gauge
- numberOfAggregatedRecordsPerFetch stream,shardId 消费者在单个 getRecords 调用 Kinesis 时获取的聚合 Kinesis 记录数。 Gauge
- numberOfDeggregatedRecordsPerFetch stream,shardId 消费者在单个 getRecords 调用 Kinesis 时获取的分解 Kinesis 记录的数量。 Gauge
- averageRecordSizeBytes stream,shardId Kinesis 记录的平均大小(bytes),由消费者在单个 getRecords 调用中获取。 Gauge
- runLoopTimeNanos stream,shardId 消费者在运行循环中花费的实际时间(ns)。 Gauge
- loopFrequencyHz stream,shardId 一秒钟内调用 getRecords 的次数。 Gauge
- bytesRequestedPerFetch stream,shardId 在一次调用 getRecords 中请求的字节数。 Gauge

Reference:
https://ci.apache.org/projects/flink/flink-docs-release-1.6/monitoring/metrics.html

上一篇下一篇

猜你喜欢

热点阅读