时序数据库大数据

InfluxDB 入门

2019-06-26  本文已影响0人  楚_kw

1. 简述

1.1 influxdb概念

  1. influxdb是一个开源分布式时序、时间和指标数据库,使用 Go 语言编写,无需外部依赖。其设计目标是实现分布式和水平伸缩扩展,是 InfluxData 的核心产品。
  2. 应用:性能监控,应用程序指标,物联网传感器数据和实时分析等的后端存储。
  3. influxdb 完整的上下游产业还包括:Chronograf、Telegraf、Kapacitor,其具体作用及关系如下:

有些人也会选择 Telegraf (Heapster)+ influxdb + grafana组合。

1.2 与传统数据库相关区别

influxdb 中的概念 传统数据库中的概念
database 数据库
measurement 数据库中的表
point 表中的一行数据
point 属性 含义
time 数据记录的时间,是主索引(自动生成)
tags 各种有索引的属性
fields 各种value值(没有索引的属性)

1.3 保留策略(retention policy)

  1. 每个数据库刚开始会自动创建一个默认的存储策略 autogen,数据保留时间为永久,在集群中的副本个数为1,之后用户可以自己设置(查看、新建、修改、删除),例如保留最近2小时的数据。插入和查询数据时如果不指定存储策略,则使用默认存储策略,且默认存储策略可以修改。InfluxDB 会定期清除过期的数据。
  2. 每个数据库可以有多个过期策略:
    show retention policies on "db_name"
  3. Shard 在 influxdb中是一个比较重要的概念,它和 retention policy 相关联。每一个存储策略下会存在许多 shard,每一个 shard 存储一个指定时间段内的数据,并且不重复,例如 7点-8点 的数据落入 shard0 中,8点-9点的数据则落入 shard1 中。每一个 shard 都对应一个底层的 tsm 存储引擎,有独立的 cache、wal、tsm file。
    这样做的目的就是为了可以通过时间来快速定位到要查询数据的相关资源,加速查询的过程,并且也让之后的批量删除数据的操作变得非常简单且高效。
  4. 建议在数据库建立的时候设置存储策略,不建议设置过多且随意切换
    create database testdb2 with duration 30d

1.4 存储引擎

1.4.1 存储引擎(Timestamp-Structure Merge Tree)

TSM是在LSM的基础上优化改善的,引入了serieskey的概念,对数据实现了很好的分类组织。
TSM主要由四个部分组成: cache、wal、tsm file、compactor:

1.4.2 存储目录

influxdb的数据存储有三个目录,分别是meta、wal、data:

1.5 相关特点

1) 基于时间序列,支持与时间有关的相关函数(如最大,最小,求和等);
2) 可度量性:你可以实时对大量数据进行计算;
3) 基于事件:它支持任意的事件数据;
4) 无结构(无模式):可以是任意数量的列;
5)支持min, max, sum, count, mean, median 等一系列函数;
6)内置http支持,使用http读写;
7)强大的类SQL语法;
8)自带管理界面,方便使用(新版本需要通过Chronograf)

1.6 influxdb和其他时序数据库比较

特性 InfluxDB OpentsDB
单机部署 部署简单、无依赖 需要搭建 Hbase,创建 TSD 数据表,配置 JAVA 等
集群 开源版本没有集群功能 集群方案成熟
资源占用 cpu 消耗更小,磁盘占用更小 资源消耗相比更多
存储模型 TSM 基于HBase存储时序数据(LSM)
存储特点 同一数据源的tags不再冗余存储 ;列式存储,独立压缩 存在很多无用的字段;无法有效的压缩;聚合能力弱
性能 查询更快,数据汇聚分析较快 数据写入和存储较快,但查询和分析能力略有不足
开发 版本升级快,但架构简单,类SQL语言(InfluxQL)易开发 API较为丰富,版本较为稳定

1.7 相关资料文档

2. influxdb 访问

2.1 访问方式

influxdb访问本质上都是 HTTP 方式,具体有如下:

2.2 连续查询

CREATE CONTINUOUS QUERY wj_30m ON shhnwangjian 
BEGIN 
    SELECT mean(connected_clients), MEDIAN(connected_clients),       
    MAX(connected_clients), MIN(connected_clients) 
    INTO redis_clients_30m 
    FROM redis_clients 
    GROUP BY ip,port,time(30m) 
END
 /*在shhnwangjian库中新建了一个名为 wj_30m 的连续查询,
每三十分钟取一个connected_clients字段的平均值、中位值、最大值、最小值 redis_clients_30m 表中,
使用的数据保留策略都是 default。*/

2.4 操作优化

  1. 控制 series 的数量;
  2. 使用批量写;
  3. 使用恰当的时间粒度;
  4. 存储的时候尽量对 Tag 进行排序;
  5. 根据数据情况,调整 shard 的 duration;
  6. 无关的数据写不同的database;
  7. 控制 Tag Key, 与 Tag Value 值的大小;
  8. 存储分离 ,将 wal 目录与 data 目录分别映射到不同的磁盘上,以减少读写操作的相互影响。

3. 安装

3.1 资源下载配置

InfluxDB、Telegraf、Chronograf、Kapacitor可到下方官网下载各平台版本

这里以Windows为例,说明一下环境搭建,使用Docker或者Linux下安装配置都基本一样

  1. 选择对应平台的influxdb:https://dl.influxdata.com/influxdb/releases/influxdb-1.7.6_windows_amd64.zip
  2. 下载后解压,得到 influxd.exe、influx.exe、influxdb.conf 等文件,data、meta、wal 是自己建立的文件夹:
  1. influx.exe 表示客户端,influxd.exe 表示服务端,influx_inspect.exe 表示查看工具,influx_stress.exe 表示压力测试工具,influx_tsm 表示数据库转换工具(将数据库从 b1 或 bz1 格式转换为 tsm1 格式),influxdb.conf 是配置文件,我们需要修改该文件,主要是三个路径修改:
[meta]  
# Where the metadata/raft database is stored
 dir = "C:/Install/influxdb-1.7.6-1/meta"
[data]
# The directory where the TSM storage engine stores TSM files.
#/var/lib/influxdb/data
dir = "C:/Install/influxdb-1.7.6-1/data"
# The directory where the TSM storage engine stores WAL files.
#/var/lib/influxdb/wal
wal-dir = "C:/Install/influxdb-1.7.6-1/wal"

3.2 启动

3.3 命令行操作

3.4 Web管理

  1. 下载Telegraf,启动命令行切换到对应目录,然后输入以下命令生成conf配置文件:
    C:\Users\meng.chai>cd C:\Install\telegraf
    C:\Install\telegraf>telegraf -sample-config -input-filter cpu:mem -output-filter influxdb > telegraf1.conf
    这里生成的配置文件名是 telegraf1.conf,是为了和原有的 telegraf.conf 区分开。

  2. 输入启动命令,打开 telegraf.exe,开始采集信息并输入到 influxdb

  1. 这时候就可以看到 web 中的数据了:


3.5 go 语言开发

import (
 "github.com/influxdata/influxdb/client/v2"
 ...
)
//连接influxdb
func ConnectInflux()(client.Client, error){
   conn, err := client.NewHTTPClient(client.HTTPConfig{
      Addr:"http://localhost:8086",
      Username:username,
      Password:password,
   })
if nil != err{
      fmt.Println(err)
return nil, err
   }
return conn, nil
}
//写入point
func WritePoints(con client.Client)error{
   batchpoint ,err := client.NewBatchPoints(client.BatchPointsConfig{
      Precision:"s",
      Database:MyDB,
   })
if nil != err{
      fmt.Println(err)
return err
   }
   record := Record{AssertId:"assert_aaaaa", ModelId:"model0", PoinntId:"point1",
      ModelPath:"model0/model1/point1", Attr:"", ModelTime:"123456789"}
   tags := map[string]string{Tag1:record.AssertId, Tag2:record.ModelId}
   fields := map[string]interface{}{Field1:record.PoinntId, Field2:record.ModelPath,
Field3:record.Attr, Field4:record.ModelTime}
   point, err := client.NewPoint(Measurement, tags, fields, time.Now())
if nil != err{
      fmt.Println(err)
return err
   }
   batchpoint.AddPoint(point)
if err := con.Write(batchpoint); err != nil{
      fmt.Println(err)
return err
   }
}
//查询时要注意时区,东八区设置为:tz('Asia/Shanghai'),命令行需要:precision rfc3339
query := fmt.Sprintf("select * from %s limit %d tz('Asia/Shanghai')", Measurement, 5)
res, err := querydb(conn, query)
上一篇 下一篇

猜你喜欢

热点阅读