Springboot

SpringBoot2.0快速整合InfluxDB,新增、批量新

2020-07-28  本文已影响0人  程就人生

网上有不少SpringBoot整合InfluxDB的案例,但是功能比较单一,案例不够完全。这里整理了一个功能全面的案例,包括单一新增、批量新增,通过反射机制对查询结果进行封装,很简单,一看就懂,拿来即用。

这里使用的是SpringBoot 2.1.4.RELEASE,本地需要安装influxdb 1.8.1,接下来看看如何实现的。

第一步,在pom文件中引入influxdb-java架包,还使用了lombok来简化代码量;

<!-- Spring Boot的核心启动器,包含了自动配置、日志和YAML -->
      <dependency>
          <groupId>org.springframework.boot</groupId>
          <artifactId>spring-boot-starter-web</artifactId>
      </dependency>
      <!-- 测试专用 -->
      <dependency>
          <groupId>org.springframework.boot</groupId>
          <artifactId>spring-boot-devtools</artifactId>
          <scope>runtime</scope>
          <optional>true</optional>
      </dependency>
      <!-- lombok架包 -->
      <dependency>
          <groupId>org.projectlombok</groupId>
          <artifactId>lombok</artifactId>
          <optional>true</optional>
      </dependency>     
      <!-- influxdb架包 -->    
      <dependency>
          <groupId>org.influxdb</groupId>
          <artifactId>influxdb-java</artifactId>
      </dependency>
      <dependency>
          <groupId>org.springframework.boot</groupId>
          <artifactId>spring-boot-starter-test</artifactId>
          <scope>test</scope>
      </dependency>

properties配置文件:

server.port=8080
# influxDB
spring.influx.url=http://127.0.0.1:6086
spring.influx.user=admin
spring.influx.password=admin
spring.influx.database=my_sensor1

第二步,增加Influxdb的配置文件;

import java.util.concurrent.TimeUnit;
import org.influxdb.InfluxDB;
import org.influxdb.InfluxDBFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * influxdb基础配置,也可以直接使用InfluxDB,这么做是为了配置更多的默认参数
 * @author 程就人生
 * @date 2020年7月24日
 * @Description 
 *
 */
@Configuration
public class InfluxdbConfig {
        
    @Value("${spring.influx.url}")
    private String influxDBUrl; 

    @Value("${spring.influx.user}")
    private String userName;    

    @Value("${spring.influx.password}")
    private String password;    

    @Value("${spring.influx.database}")
    private String database;    

    @Bean
    public InfluxDB influxdb(){     
        InfluxDB influxDB = InfluxDBFactory.connect(influxDBUrl, userName, password);
        try {
            
            /** 
             * 异步插入:
             * enableBatch这里第一个是point的个数,第二个是时间,单位毫秒    
             * point的个数和时间是联合使用的,如果满100条或者60 * 1000毫秒   
             * 满足任何一个条件就会发送一次写的请求。
             */
            influxDB.setDatabase(database).enableBatch(100,1000 * 60, TimeUnit.MILLISECONDS);
            
        } catch (Exception e) { 
            e.printStackTrace();
        } finally { 
            //设置默认策略
            influxDB.setRetentionPolicy("sensor_retention");    
        }
        //设置日志输出级别
        influxDB.setLogLevel(InfluxDB.LogLevel.BASIC);  
        return influxDB;
    }
}

第三步,测试代码;

package com.example.controller;

import java.util.ArrayList;
import java.util.List;

import org.influxdb.InfluxDB;
import org.influxdb.dto.BatchPoints;
import org.influxdb.dto.Point;
import org.influxdb.dto.Query;
import org.influxdb.dto.QueryResult;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

import lombok.extern.slf4j.Slf4j;

@Slf4j
@RestController
public class IndexController {

    @Autowired
    private InfluxDB influxDB;
    
    //measurement
    private final String measurement = "sensor";
    
    @Value("${spring.influx.database}")
    private String database;
    
    /**
     * 批量插入第一种方式
     */
    @GetMapping("/index")
    public void insert(){
        List<String> lines = new ArrayList<String>();       
        Point point = null;     
        for(int i=0;i<50;i++){          
            point = Point.measurement(measurement)
            .tag("deviceId", "sensor" + i)
            .addField("temp", 3)
            .addField("voltage", 145+i)
            .addField("A1", "4i")
            .addField("A2", "4i").build();
            lines.add(point.lineProtocol());
        }
        //写入
        influxDB.write(lines);
    }
    
    /**
     * 批量插入第二种方式
     */
    @GetMapping("/index1")
    public void batchInsert(){      
        BatchPoints batchPoints = BatchPoints
                .database(database)
                .consistency(InfluxDB.ConsistencyLevel.ALL)
                .build();
      //遍历sqlserver获取数据
      for(int i=0;i<50;i++){
        //创建单条数据对象——表名
        Point point = Point.measurement(measurement)
          //tag属性——只能存储String类型
                .tag("deviceId", "sensor" + i)
                .addField("temp", 3)
                .addField("voltage", 145+i)
                .addField("A1", "4i")
                .addField("A2", "4i").build();
        //将单条数据存储到集合中
        batchPoints.point(point);
      }
      //批量插入
      influxDB.write(batchPoints); 
    }
    
    /**
     * 获取数据
     */
    @GetMapping("/datas")
    public void datas(@RequestParam Integer page){
        int pageSize = 10;
        // InfluxDB支持分页查询,因此可以设置分页查询条件
        String pageQuery = " LIMIT " + pageSize + " OFFSET " + (page - 1) * pageSize;
        
        String queryCondition = "";  //查询条件暂且为空
        // 此处查询所有内容,如果
        String queryCmd = "SELECT * FROM "
            // 查询指定设备下的日志信息
            // 要指定从 RetentionPolicyName.measurement中查询指定数据,默认的策略可以不加;
            // + 策略name + "." + measurement
            + measurement
            // 添加查询条件(注意查询条件选择tag值,选择field数值会严重拖慢查询速度)
            + queryCondition
            // 查询结果需要按照时间排序
            + " ORDER BY time DESC"
            // 添加分页查询条件
            + pageQuery;
        
        QueryResult queryResult = influxDB.query(new Query(queryCmd, database));        
        log.info("query result => {}", queryResult);
    }
}

第四步,influxdb工具类的编写;

package com.example.config;

import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.influxdb.InfluxDB;
import org.influxdb.annotation.Column;
import org.influxdb.annotation.Measurement;
import org.influxdb.dto.BatchPoints;
import org.influxdb.dto.Point;
import org.influxdb.dto.Query;
import org.influxdb.dto.QueryResult;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

/**
 * influxdb操作工具类
 * @author 程就人生
 * @date 2020年7月27日
 * @Description 
 *
 */
@Component
public class InfluxdbUtils {

    @Autowired
    private InfluxDB influxDB;
    
    @Value("${spring.influx.database}")
    private String database;    
    
    /**
     * 新增单条记录,利用java的反射机制进行新增操作
     */
    public void insertOne(Object obj){
        //获取度量
        Class<?> clasz = obj.getClass();
        Measurement measurement = clasz.getAnnotation(Measurement.class);
        //构建
        Point.Builder builder = Point.measurement(measurement.name());
        // 获取对象属性
        Field[] fieldArray = clasz.getDeclaredFields();
        Column column = null;
        for(Field field : fieldArray){
            try {
                column = field.getAnnotation(Column.class);
                //设置属性可操作
                field.setAccessible(true); 
                if(column.tag()){
                    //tag属性只能存储String类型
                    builder.tag(column.name(), field.get(obj).toString());
                }else{
                    //设置field
                    if(field.get(obj) != null){
                        builder.addField(column.name(), field.get(obj).toString());
                    }
                }
            } catch (IllegalArgumentException | IllegalAccessException e) {
                e.printStackTrace();
            }
        }
        influxDB.write(builder.build());
    }
    
    /**
     * 批量新增,方法一
     */
    public void insertBatchByRecords(List<?> records){
        List<String> lines = new ArrayList<String>();   
        records.forEach(record->{
            Class<?> clasz = record.getClass();
            //获取度量
            Measurement measurement = clasz.getAnnotation(Measurement.class);
            //构建
            Point.Builder builder = Point.measurement(measurement.name());
            Field[] fieldArray = clasz.getDeclaredFields();
            Column column = null;
            for(Field field : fieldArray){
                try {
                    column = field.getAnnotation(Column.class);
                    //设置属性可操作
                    field.setAccessible(true); 
                    if(column.tag()){
                        //tag属性只能存储String类型
                        builder.tag(column.name(), field.get(record).toString());
                    }else{
                        //设置field
                        if(field.get(record) != null){
                            builder.addField(column.name(), field.get(record).toString());
                        }
                    }
                } catch (IllegalArgumentException | IllegalAccessException e) {
                    e.printStackTrace();
                }
            }
            lines.add(builder.build().lineProtocol());
        });
        influxDB.write(lines);
    }
    
    /**
     * 批量新增,方法二
     */
    public void insertBatchByPoints(List<?> records){
        BatchPoints batchPoints = BatchPoints.database(database)
                .consistency(InfluxDB.ConsistencyLevel.ALL)
                .build();
        records.forEach(record->{
            Class<?> clasz = record.getClass();
            //获取度量
            Measurement measurement = clasz.getAnnotation(Measurement.class);
            //构建
            Point.Builder builder = Point.measurement(measurement.name());
            Field[] fieldArray = clasz.getDeclaredFields();
            Column column = null;
            for(Field field : fieldArray){
                try {
                    column = field.getAnnotation(Column.class);
                    //设置属性可操作
                    field.setAccessible(true); 
                    if(column.tag()){
                        //tag属性只能存储String类型
                        builder.tag(column.name(), field.get(record).toString());
                    }else{
                        //设置field
                        if(field.get(record) != null){
                            builder.addField(column.name(), field.get(record).toString());
                        }
                    }
                } catch (IllegalArgumentException | IllegalAccessException e) {
                    e.printStackTrace();
                }
            }
            batchPoints.point(builder.build());
        });
        influxDB.write(batchPoints);
    }
    
    /**
     * 查询,返回Map集合
     * @param query 完整的查询语句
     * @return
     */
    public List<Object> fetchRecords(String query){
        List<Object> results = new ArrayList<Object>();
        QueryResult queryResult = influxDB.query(new Query(query, database));
        queryResult.getResults().forEach(result->{
            result.getSeries().forEach(serial->{
                List<String> columns = serial.getColumns();
                int fieldSize = columns.size();
                serial.getValues().forEach(value->{     
                    Map<String,Object> obj = new HashMap<String,Object>();
                    for(int i=0;i<fieldSize;i++){   
                        obj.put(columns.get(i), value.get(i));
                    }
                    results.add(obj);
                });
            });
        });
        return results;
    }
    
    /**
     * 查询,返回map集合
     * @param fieldKeys 查询的字段,不可为空;不可为单独的tag
     * @param measurement 度量,不可为空;
     * @param order
     * @param limit
     * @return
     */
    public List<Object> fetchRecords(String fieldKeys, String measurement){
        StringBuilder query = new StringBuilder();
        query.append("select ").append(fieldKeys).append(" from ").append(measurement);     
        return this.fetchRecords(query.toString());
    }
    
    /**
     * 查询,返回map集合
     * @param fieldKeys 查询的字段,不可为空;不可为单独的tag
     * @param measurement 度量,不可为空;
     * @param order
     * @param limit
     * @return
     */
    public List<Object> fetchRecords(String fieldKeys, String measurement, String order){
        StringBuilder query = new StringBuilder();
        query.append("select ").append(fieldKeys).append(" from ").append(measurement);
        query.append(" order by ").append(order);       
        return this.fetchRecords(query.toString());
    }
    
    /**
     * 查询,返回map集合
     * @param fieldKeys 查询的字段,不可为空;不可为单独的tag
     * @param measurement 度量,不可为空;
     * @param order
     * @param limit
     * @return
     */
    public List<Object> fetchRecords(String fieldKeys, String measurement, String order, String limit){
        StringBuilder query = new StringBuilder();
        query.append("select ").append(fieldKeys).append(" from ").append(measurement);
        query.append(" order by ").append(order);
        query.append(limit);
        return this.fetchRecords(query.toString());
    }
    
    /**
     * 查询,返回对象的list集合
     * @param query
     * @return
     */
    @SuppressWarnings({ "rawtypes", "unchecked" })
    public <T> List<T> fetchResults(String query, Class<?> clasz){
        List results = new ArrayList<>();
        QueryResult queryResult = influxDB.query(new Query(query, database));
        queryResult.getResults().forEach(result->{
            result.getSeries().forEach(serial->{
                List<String> columns = serial.getColumns();
                int fieldSize = columns.size();     
                serial.getValues().forEach(value->{ 
                    Object obj = null;
                    try {
                        obj = clasz.newInstance();
                        for(int i=0;i<fieldSize;i++){   
                            String fieldName = columns.get(i);
                            Field field = clasz.getDeclaredField(fieldName);
                            field.setAccessible(true);
                            Class<?> type = field.getType();
                            if(type == float.class){
                                field.set(obj, Float.valueOf(value.get(i).toString()));
                            }else{
                                field.set(obj, value.get(i));
                            }                           
                        }
                    } catch (NoSuchFieldException | SecurityException | InstantiationException | IllegalAccessException e) {
                        e.printStackTrace();
                    }                   
                    results.add(obj);
                });
            });
        });
        return results;
    }
    
    /**
     * 查询,返回对象的list集合
     * @param fieldKeys
     * @param measurement
     * @param clasz
     * @return
     */
    public <T> List<T> fetchResults(String fieldKeys, String measurement, Class<?> clasz){
        StringBuilder query = new StringBuilder();
        query.append("select ").append(fieldKeys).append(" from ").append(measurement);     
        return this.fetchResults(query.toString(), clasz);
    }
    
    /**
     * 查询,返回对象的list集合
     * @param fieldKeys
     * @param measurement
     * @param order
     * @param clasz
     * @return
     */
    public <T> List<T> fetchResults(String fieldKeys, String measurement, String order, Class<?> clasz){
        StringBuilder query = new StringBuilder();
        query.append("select ").append(fieldKeys).append(" from ").append(measurement);
        query.append(" order by ").append(order);
        return this.fetchResults(query.toString(), clasz);
    }
    
    /**
     * 查询,返回对象的list集合
     * @param fieldKeys
     * @param measurement
     * @param order
     * @param limit
     * @param clasz
     * @return
     */
    public <T> List<T> fetchResults(String fieldKeys, String measurement, String order, String limit, Class<?> clasz){
        StringBuilder query = new StringBuilder();
        query.append("select ").append(fieldKeys).append(" from ").append(measurement);
        query.append(" order by ").append(order);
        query.append(limit);        
        return this.fetchResults(query.toString(), clasz);
    }
}

Java的bean文件不可少,和Mybatis、Hibernate的bean还是有区别的,这里使用了Influxdb-java架包里的注解,针对influxdb的注解;

package com.example.bean;

import org.influxdb.annotation.Column;
import org.influxdb.annotation.Measurement;

import lombok.Data;

//@Builder
@Data
@Measurement(name = "sensor")
public class Sensor {

    @Column(name="deviceId",tag=true)
    private String deviceId;
    
    @Column(name="temp")
    private float temp;
    
    @Column(name="voltage")
    private float voltage;
    
    @Column(name="A1")
    private float A1;
    
    @Column(name="A2")
    private float A2;
    
    @Column(name="time")
    private String time;    
    
}

第五步,测试代码的编写;

package com.example.controller;

import java.util.ArrayList;
import java.util.List;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

import com.example.bean.Sensor;
import com.example.config.InfluxdbUtils;

import lombok.extern.slf4j.Slf4j;

@Slf4j
@RestController
public class IndexController1 {

    @Autowired
    private InfluxdbUtils influxdbUtils;
    
    /**
     * 插入单挑记录
     */
    @GetMapping("/index21")
    public void insert(){
//      Sensor sensor = Sensor.builder().deviceId("0002")
//              .A1(10).A2(10).temp(10L).voltage(10).build();
//      influxdbUtils.insertOne(sensor);
    }
    
    /**
     * 批量插入第一种方式
     */
    @GetMapping("/index22")
    public void batchInsert(){  
        List<Sensor> sensorList = new ArrayList<Sensor>();
        Sensor sensor = null;
//      for(int i=0; i<50; i++){
//          sensor = Sensor.builder().deviceId("000"+i)
//                  .A1(10).A2(10).temp(10).voltage(10).build();
//          sensorList.add(sensor);
//      }
        for(int i=0; i<50; i++){
            sensor = new Sensor();
            sensor.setA1(2);
            sensor.setA2(12);
            sensor.setTemp(9);
            sensor.setVoltage(12);
            sensor.setDeviceId("sensor4545-"+i);
            sensorList.add(sensor);
        }
        influxdbUtils.insertBatchByRecords(sensorList);
    }
    
    /**
     * 批量插入第二种方式
     */
    @GetMapping("/index23")
    public void batchInsert1(){ 
        List<Sensor> sensorList = new ArrayList<Sensor>();
        Sensor sensor = null;
//      for(int i=0; i<50; i++){
//          sensor = Sensor.builder().deviceId("000"+i)
//                  .A1(10).A2(10).temp(10).voltage(10).build();
//          sensorList.add(sensor);
//      }
        for(int i=0; i<50; i++){
            sensor = new Sensor();
            sensor.setA1(2);
            sensor.setA2(12);
            sensor.setTemp(9);
            sensor.setVoltage(12);
            sensor.setDeviceId("sensor4545-"+i);
            sensorList.add(sensor);
        }
        influxdbUtils.insertBatchByPoints(sensorList);
    }
        
    /**
     * 获取数据
     */
    @GetMapping("/datas2")
    public void datas(@RequestParam Integer page){
        int pageSize = 10;
        // InfluxDB支持分页查询,因此可以设置分页查询条件
        String pageQuery = " LIMIT " + pageSize + " OFFSET " + (page - 1) * pageSize;
        
        String queryCondition = "";  //查询条件暂且为空
        // 此处查询所有内容,如果
        String queryCmd = "SELECT * FROM sensor"
            // 查询指定设备下的日志信息
            // 要指定从 RetentionPolicyName.measurement中查询指定数据,默认的策略可以不加;
            // + 策略name + "." + measurement
            // 添加查询条件(注意查询条件选择tag值,选择field数值会严重拖慢查询速度)
            + queryCondition
            // 查询结果需要按照时间排序
            + " ORDER BY time DESC"
            // 添加分页查询条件
            + pageQuery;
        
        List<Object> sensorList = influxdbUtils.fetchRecords(queryCmd);
        log.info("query result => {}", sensorList);
    }
    
    /**
     * 获取数据
     */
    @GetMapping("/datas21")
    public void datas1(@RequestParam Integer page){
        int pageSize = 10;
        // InfluxDB支持分页查询,因此可以设置分页查询条件
        String pageQuery = " LIMIT " + pageSize + " OFFSET " + (page - 1) * pageSize;
        
        String queryCondition = "";  //查询条件暂且为空
        // 此处查询所有内容,如果
        String queryCmd = "SELECT * FROM sensor"
            // 查询指定设备下的日志信息
            // 要指定从 RetentionPolicyName.measurement中查询指定数据,默认的策略可以不加;
            // + 策略name + "." + measurement
            // 添加查询条件(注意查询条件选择tag值,选择field数值会严重拖慢查询速度)
            + queryCondition
            // 查询结果需要按照时间排序
            + " ORDER BY time DESC"
            // 添加分页查询条件
            + pageQuery;
        List<Sensor> sensorList = influxdbUtils.fetchResults(queryCmd, Sensor.class);
        //List<Sensor> sensorList = influxdbUtils.fetchResults("*", "sensor", Sensor.class);
        sensorList.forEach(sensor->{
            log.info("query sensor => {}", sensor);
        });     
    }
}
上一篇下一篇

猜你喜欢

热点阅读