prometheus编程实践(二):应用实例
1. 应用需求与设计思路
视频巡检在视频监控的运维系统中是一个非常重要的功能,运维系统会定义定时的视频巡检任务。每一轮的巡检会对运维系统中维护的IPC设备进行码流的调用,来判断IPC设备的运行状态,码流延迟等指标,并记录在ES中供统计分析。目前在运维系统的ES中保存了每个设备每次巡检的得分和执行时间,我们以这两个指标作为放入prometheus时间序列中的指标,时间戳为设备本次巡检的完成时间戳。
因此,在prometheus中定义的指标示例如下:
#每个设备每次巡检的得分(时间戳为巡检任务完成时间):
vas_task_device_score{gbid="32050000001326000701",groupid="2c94e38c6aa06653016aee72d4552831",instance="ioms",job="zhangkai",taskid="default0000000000000000000000000"} 72 @1565057355
#每个设备每次巡检的消耗时间(时间戳为巡检任务完成时间):
vas_task_device_duration{gbid="32050000001326000701",groupid="2c94e38c6aa06653016aee72d4552831",instance="ioms",job="zhangkai",taskid="default0000000000000000000000000"} 7 @1565069700
每次巡检任务的结果由巡检服务写入到ES中,我们需要编写一个exporter,定时将ES中新产生的巡检结果取出来,格式化为指标提供给prometheus,这里我们将exporter对ES的采集周期设置为1分钟,同样也将prometheus的scrape的周期设为1分钟。
2. 为什么没有使用push gateway
1节的应用场景需要自定义时间戳,而且需要保证数据的完整性,根据前面对push gateway的描述,不适合使用push gateway。
3. prometheus对自定义时间戳指标的采集特性
(及时性)若自定义时间戳,prometheus采集时会比较自定义的时间戳与当前时间,如两者之间差别太大(如大于1小时),指标不被采集,报“Error on ingesting samples that are too old or are too far into the future”错误。
(时序性)若自定义时间戳,prometheus采集指标时会将采集的时间戳与待插入时间序列中的时间戳进行比较,若采集的时间戳小于待插入时间序列的当前时间戳(插入老数据), prometheus会报“Error on ingesting out-of-order samples”错误。
4. Prometheus服务端的配置
在Prometheus服务端配置了采集器的地址,并配置了告警规则和告警发送地址,服务器端的配置数据如下所示:
scrape_configs:
- job_name: prometheus
honor_timestamps: true
scrape_interval: 15s
scrape_timeout: 10s
metrics_path: /metrics
scheme: http
static_configs:
- targets:
- localhost:9090
- job_name: zhangkai
honor_labels: true
honor_timestamps: true
scrape_interval: 1m
scrape_timeout: 10s
metrics_path: /metrics
scheme: http
static_configs:
- targets:
- 172.16.64.159:8081
labels:
instance: ioms
- targets:
- 172.16.64.159:8082
labels:
instance: viid
5. 指标采集exporter的编写
整个采集类的代码如下所示:
@Slf4j
@Component
public class VasTaskCollector {
private static final long TIME_OFFSET = 1 * 60 * 60 * 1000; // prometheus采集指标的时间偏移量,
@Autowired
EsUtil esutil;
private long toOffset = 0; // 当前轮次采集的最大偏移量
private long fromOffset = 0; // 当前轮次采集的开始偏移量
private long validFromOffset = 0; // 根据当前时间算出的prometheus有效采集偏移量
@Scheduled(fixedDelay = GlobalConsts.VAS_TASK_COLLECT_INTERVAL)
private void genVasTaskMetrics() {
CloseableIterator<TaskInfo> taskInfoIterator = null;
long currTime = (new Date()).getTime();
validFromOffset = (currTime - TIME_OFFSET) / 1000; // 秒为单位
if (validFromOffset > fromOffset) {
fromOffset = validFromOffset;
}
toOffset = (long) esutil.getMaxByField("task", "taskresult", "scanEndTime"); // 单位为秒
log.info("####find current turn's max scanEndTime in ES is :" + toOffset);
if (toOffset >= fromOffset) {
taskInfoIterator = esutil.queryTaskInfoByOffsetRange(fromOffset, toOffset);
log.info("####search task info ES from " + fromOffset + " to " + toOffset);
genVasTaskMetrics(taskInfoIterator);
fromOffset = toOffset; // 为下次循环准备
}
}
/**
* 产生巡检任务的指标
*
* @param taskInfoList
*/
private void genVasTaskMetrics(CloseableIterator<TaskInfo> taskIt) {
int count = 0;
while (taskIt.hasNext()) {
TaskInfo taskInfo = taskIt.next();
log.debug(taskInfo.toString());
long timeStamp = taskInfo.getScanEndTime() * 1000;
String metric;
/* 每个设备每次巡检的得分 */
metric = String.format("vas_task_device_score{taskid=\"%s\",groupid=\"%s\",gbid=\"%s\"} %d %d",
taskInfo.getTaskID(), taskInfo.getGroupID(), taskInfo.getGbID(),
taskInfo.getScore(), timeStamp);
CollectorConfig.metricQueue.add(metric);
log.debug(metric);
/* 每个设备每次巡检的延时,单位为秒 */
metric = String.format("vas_task_device_duration{taskid=\"%s\",groupid=\"%s\",gbid=\"%s\"} %d %d",
taskInfo.getTaskID(), taskInfo.getGroupID(), taskInfo.getGbID(),
taskInfo.getScanEndTime() - taskInfo.getScanTime(), timeStamp);
CollectorConfig.metricQueue.add(metric);
log.debug(metric);
count++;
}
taskIt.close();
log.info("####add metrics of taskinfo count: " + count);
}
}
提供采集端点的controller代码如下:
@Slf4j
@RestController
public class MetricController {
/**
* 向prometheus提供指标数据
*
* @param response
* @throws IOException
*/
@RequestMapping(value = "/metrics", method = RequestMethod.GET)
private void pullMetrics(HttpServletResponse response) throws IOException {
StringBuffer sb = new StringBuffer();
Queue<String> queue = CollectorConfig.metricQueue;
String metric = null;
int count = 0;
// 每次最多只取QUEUE_MAX_SIZE数目的指标
while ((queue.size() > 0) && (count < GlobalConsts.PULL_MAX_SIZE)) {
if (null != metric) {
sb.append(metric).append("\n");
}
metric = queue.poll();
count++;
}
// 最后一条不加换行符
if (null != metric) {
sb.append(metric);
}
log.debug(sb.toString());
log.info("****prometheus scrape metrics count:"+count);
response.getWriter().print(sb.toString());
}
/**
* 告警接收地址
*
* @param alarms
*/
@RequestMapping(value = "/api/v1/alerts", method = RequestMethod.POST)
private void handleAlarms(@RequestBody String alarms) {
if (null != alarms) {
log.info("received alarms are:" + alarms);
JSONArray alarmArray = JSON.parseArray(alarms);
alarmArray.forEach(alarmObject -> {
handleAlarmObject((JSONObject) alarmObject);
});
}
}
// 处理接收的每条告警
private void handleAlarmObject(JSONObject alarmObj) {
String alertName = alarmObj.getJSONObject("labels").getString("alertname");
int alertValue = alarmObj.getJSONObject("labels").getIntValue("value");
String alertTimeStr = alarmObj.getString("startsAt"); //时间格式"2019-07-19T06:42:08.639138396Z"
alertTimeStr = alertTimeStr.substring(0, alertTimeStr.length()-7);
alertTimeStr+= "+0000";
long alertTime = -1;
try {
SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSZ");
alertTime = df.parse(alertTimeStr).getTime();
} catch (ParseException pe) {
log.error("convert " + alertTimeStr + " error!");
}
log.info("告警名:" + alertName + "告警值:" + alertValue + "告警时间:" + alertTime);
// todo: 这里添加处理每条告警的代码
}
}
6. 可视化展示
在grafana中创建仪表盘,设置好查询表达式,并设置变量用于选取任务ID,实时监控结果如下:
图10.png
7. 总结
- 对于业务应用需要保证数据完整性的场景下,需要在采集器中设置缓存避免指标值的丢失。采集周期应和指标生成周期保持一致。时间戳要用应用生成的时间戳。
- Prometheus适合于实时监控场景,特别是对实时时间滑窗(实时流)的处理非常方便。
- 可以使用Prometheus的HTTP API做自己的应用系统,需要注意的是“/api/v1/query_range”根据step查询出来的时间序列中时间戳是Prometheus根据查询时间计算出的时间戳,而非时间序列数据库中的原始采集时间戳。
- 由于prometheus的数据模型中并没有定义数据类型,因此标签值都视为文本类型,在数据查询中非常依赖于基于标签值的过滤,原来在关系模式中基于某些字段的比较与运算在这里就用不上了,需要使用正则表达式来变通实现。如某个指标的标签中有“month”这个标签,以前在关系模式中可以使用“month>1 and month<5”这样的查询条件来进行过滤,在prometheus的查询表达式则需要使用“[2-4]”来匹配。