SpringBoot2.0快速整合InfluxDB,新增、批量新
2020-07-28 本文已影响0人
程就人生
网上有不少SpringBoot整合InfluxDB的案例,但是功能比较单一,案例不够完全。这里整理了一个功能全面的案例,包括单一新增、批量新增,通过反射机制对查询结果进行封装,很简单,一看就懂,拿来即用。
这里使用的是SpringBoot 2.1.4.RELEASE,本地需要安装influxdb 1.8.1,接下来看看如何实现的。
第一步,在pom文件中引入influxdb-java架包,还使用了lombok来简化代码量;
<!-- Spring Boot的核心启动器,包含了自动配置、日志和YAML -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- 测试专用 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<scope>runtime</scope>
<optional>true</optional>
</dependency>
<!-- lombok架包 -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<!-- influxdb架包 -->
<dependency>
<groupId>org.influxdb</groupId>
<artifactId>influxdb-java</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
properties配置文件:
server.port=8080
# influxDB
spring.influx.url=http://127.0.0.1:6086
spring.influx.user=admin
spring.influx.password=admin
spring.influx.database=my_sensor1
第二步,增加Influxdb的配置文件;
import java.util.concurrent.TimeUnit;
import org.influxdb.InfluxDB;
import org.influxdb.InfluxDBFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* influxdb基础配置,也可以直接使用InfluxDB,这么做是为了配置更多的默认参数
* @author 程就人生
* @date 2020年7月24日
* @Description
*
*/
@Configuration
public class InfluxdbConfig {
@Value("${spring.influx.url}")
private String influxDBUrl;
@Value("${spring.influx.user}")
private String userName;
@Value("${spring.influx.password}")
private String password;
@Value("${spring.influx.database}")
private String database;
@Bean
public InfluxDB influxdb(){
InfluxDB influxDB = InfluxDBFactory.connect(influxDBUrl, userName, password);
try {
/**
* 异步插入:
* enableBatch这里第一个是point的个数,第二个是时间,单位毫秒
* point的个数和时间是联合使用的,如果满100条或者60 * 1000毫秒
* 满足任何一个条件就会发送一次写的请求。
*/
influxDB.setDatabase(database).enableBatch(100,1000 * 60, TimeUnit.MILLISECONDS);
} catch (Exception e) {
e.printStackTrace();
} finally {
//设置默认策略
influxDB.setRetentionPolicy("sensor_retention");
}
//设置日志输出级别
influxDB.setLogLevel(InfluxDB.LogLevel.BASIC);
return influxDB;
}
}
第三步,测试代码;
package com.example.controller;
import java.util.ArrayList;
import java.util.List;
import org.influxdb.InfluxDB;
import org.influxdb.dto.BatchPoints;
import org.influxdb.dto.Point;
import org.influxdb.dto.Query;
import org.influxdb.dto.QueryResult;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import lombok.extern.slf4j.Slf4j;
@Slf4j
@RestController
public class IndexController {
@Autowired
private InfluxDB influxDB;
//measurement
private final String measurement = "sensor";
@Value("${spring.influx.database}")
private String database;
/**
* 批量插入第一种方式
*/
@GetMapping("/index")
public void insert(){
List<String> lines = new ArrayList<String>();
Point point = null;
for(int i=0;i<50;i++){
point = Point.measurement(measurement)
.tag("deviceId", "sensor" + i)
.addField("temp", 3)
.addField("voltage", 145+i)
.addField("A1", "4i")
.addField("A2", "4i").build();
lines.add(point.lineProtocol());
}
//写入
influxDB.write(lines);
}
/**
* 批量插入第二种方式
*/
@GetMapping("/index1")
public void batchInsert(){
BatchPoints batchPoints = BatchPoints
.database(database)
.consistency(InfluxDB.ConsistencyLevel.ALL)
.build();
//遍历sqlserver获取数据
for(int i=0;i<50;i++){
//创建单条数据对象——表名
Point point = Point.measurement(measurement)
//tag属性——只能存储String类型
.tag("deviceId", "sensor" + i)
.addField("temp", 3)
.addField("voltage", 145+i)
.addField("A1", "4i")
.addField("A2", "4i").build();
//将单条数据存储到集合中
batchPoints.point(point);
}
//批量插入
influxDB.write(batchPoints);
}
/**
* 获取数据
*/
@GetMapping("/datas")
public void datas(@RequestParam Integer page){
int pageSize = 10;
// InfluxDB支持分页查询,因此可以设置分页查询条件
String pageQuery = " LIMIT " + pageSize + " OFFSET " + (page - 1) * pageSize;
String queryCondition = ""; //查询条件暂且为空
// 此处查询所有内容,如果
String queryCmd = "SELECT * FROM "
// 查询指定设备下的日志信息
// 要指定从 RetentionPolicyName.measurement中查询指定数据,默认的策略可以不加;
// + 策略name + "." + measurement
+ measurement
// 添加查询条件(注意查询条件选择tag值,选择field数值会严重拖慢查询速度)
+ queryCondition
// 查询结果需要按照时间排序
+ " ORDER BY time DESC"
// 添加分页查询条件
+ pageQuery;
QueryResult queryResult = influxDB.query(new Query(queryCmd, database));
log.info("query result => {}", queryResult);
}
}
第四步,influxdb工具类的编写;
package com.example.config;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.influxdb.InfluxDB;
import org.influxdb.annotation.Column;
import org.influxdb.annotation.Measurement;
import org.influxdb.dto.BatchPoints;
import org.influxdb.dto.Point;
import org.influxdb.dto.Query;
import org.influxdb.dto.QueryResult;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
/**
* influxdb操作工具类
* @author 程就人生
* @date 2020年7月27日
* @Description
*
*/
@Component
public class InfluxdbUtils {
@Autowired
private InfluxDB influxDB;
@Value("${spring.influx.database}")
private String database;
/**
* 新增单条记录,利用java的反射机制进行新增操作
*/
public void insertOne(Object obj){
//获取度量
Class<?> clasz = obj.getClass();
Measurement measurement = clasz.getAnnotation(Measurement.class);
//构建
Point.Builder builder = Point.measurement(measurement.name());
// 获取对象属性
Field[] fieldArray = clasz.getDeclaredFields();
Column column = null;
for(Field field : fieldArray){
try {
column = field.getAnnotation(Column.class);
//设置属性可操作
field.setAccessible(true);
if(column.tag()){
//tag属性只能存储String类型
builder.tag(column.name(), field.get(obj).toString());
}else{
//设置field
if(field.get(obj) != null){
builder.addField(column.name(), field.get(obj).toString());
}
}
} catch (IllegalArgumentException | IllegalAccessException e) {
e.printStackTrace();
}
}
influxDB.write(builder.build());
}
/**
* 批量新增,方法一
*/
public void insertBatchByRecords(List<?> records){
List<String> lines = new ArrayList<String>();
records.forEach(record->{
Class<?> clasz = record.getClass();
//获取度量
Measurement measurement = clasz.getAnnotation(Measurement.class);
//构建
Point.Builder builder = Point.measurement(measurement.name());
Field[] fieldArray = clasz.getDeclaredFields();
Column column = null;
for(Field field : fieldArray){
try {
column = field.getAnnotation(Column.class);
//设置属性可操作
field.setAccessible(true);
if(column.tag()){
//tag属性只能存储String类型
builder.tag(column.name(), field.get(record).toString());
}else{
//设置field
if(field.get(record) != null){
builder.addField(column.name(), field.get(record).toString());
}
}
} catch (IllegalArgumentException | IllegalAccessException e) {
e.printStackTrace();
}
}
lines.add(builder.build().lineProtocol());
});
influxDB.write(lines);
}
/**
* 批量新增,方法二
*/
public void insertBatchByPoints(List<?> records){
BatchPoints batchPoints = BatchPoints.database(database)
.consistency(InfluxDB.ConsistencyLevel.ALL)
.build();
records.forEach(record->{
Class<?> clasz = record.getClass();
//获取度量
Measurement measurement = clasz.getAnnotation(Measurement.class);
//构建
Point.Builder builder = Point.measurement(measurement.name());
Field[] fieldArray = clasz.getDeclaredFields();
Column column = null;
for(Field field : fieldArray){
try {
column = field.getAnnotation(Column.class);
//设置属性可操作
field.setAccessible(true);
if(column.tag()){
//tag属性只能存储String类型
builder.tag(column.name(), field.get(record).toString());
}else{
//设置field
if(field.get(record) != null){
builder.addField(column.name(), field.get(record).toString());
}
}
} catch (IllegalArgumentException | IllegalAccessException e) {
e.printStackTrace();
}
}
batchPoints.point(builder.build());
});
influxDB.write(batchPoints);
}
/**
* 查询,返回Map集合
* @param query 完整的查询语句
* @return
*/
public List<Object> fetchRecords(String query){
List<Object> results = new ArrayList<Object>();
QueryResult queryResult = influxDB.query(new Query(query, database));
queryResult.getResults().forEach(result->{
result.getSeries().forEach(serial->{
List<String> columns = serial.getColumns();
int fieldSize = columns.size();
serial.getValues().forEach(value->{
Map<String,Object> obj = new HashMap<String,Object>();
for(int i=0;i<fieldSize;i++){
obj.put(columns.get(i), value.get(i));
}
results.add(obj);
});
});
});
return results;
}
/**
* 查询,返回map集合
* @param fieldKeys 查询的字段,不可为空;不可为单独的tag
* @param measurement 度量,不可为空;
* @param order
* @param limit
* @return
*/
public List<Object> fetchRecords(String fieldKeys, String measurement){
StringBuilder query = new StringBuilder();
query.append("select ").append(fieldKeys).append(" from ").append(measurement);
return this.fetchRecords(query.toString());
}
/**
* 查询,返回map集合
* @param fieldKeys 查询的字段,不可为空;不可为单独的tag
* @param measurement 度量,不可为空;
* @param order
* @param limit
* @return
*/
public List<Object> fetchRecords(String fieldKeys, String measurement, String order){
StringBuilder query = new StringBuilder();
query.append("select ").append(fieldKeys).append(" from ").append(measurement);
query.append(" order by ").append(order);
return this.fetchRecords(query.toString());
}
/**
* 查询,返回map集合
* @param fieldKeys 查询的字段,不可为空;不可为单独的tag
* @param measurement 度量,不可为空;
* @param order
* @param limit
* @return
*/
public List<Object> fetchRecords(String fieldKeys, String measurement, String order, String limit){
StringBuilder query = new StringBuilder();
query.append("select ").append(fieldKeys).append(" from ").append(measurement);
query.append(" order by ").append(order);
query.append(limit);
return this.fetchRecords(query.toString());
}
/**
* 查询,返回对象的list集合
* @param query
* @return
*/
@SuppressWarnings({ "rawtypes", "unchecked" })
public <T> List<T> fetchResults(String query, Class<?> clasz){
List results = new ArrayList<>();
QueryResult queryResult = influxDB.query(new Query(query, database));
queryResult.getResults().forEach(result->{
result.getSeries().forEach(serial->{
List<String> columns = serial.getColumns();
int fieldSize = columns.size();
serial.getValues().forEach(value->{
Object obj = null;
try {
obj = clasz.newInstance();
for(int i=0;i<fieldSize;i++){
String fieldName = columns.get(i);
Field field = clasz.getDeclaredField(fieldName);
field.setAccessible(true);
Class<?> type = field.getType();
if(type == float.class){
field.set(obj, Float.valueOf(value.get(i).toString()));
}else{
field.set(obj, value.get(i));
}
}
} catch (NoSuchFieldException | SecurityException | InstantiationException | IllegalAccessException e) {
e.printStackTrace();
}
results.add(obj);
});
});
});
return results;
}
/**
* 查询,返回对象的list集合
* @param fieldKeys
* @param measurement
* @param clasz
* @return
*/
public <T> List<T> fetchResults(String fieldKeys, String measurement, Class<?> clasz){
StringBuilder query = new StringBuilder();
query.append("select ").append(fieldKeys).append(" from ").append(measurement);
return this.fetchResults(query.toString(), clasz);
}
/**
* 查询,返回对象的list集合
* @param fieldKeys
* @param measurement
* @param order
* @param clasz
* @return
*/
public <T> List<T> fetchResults(String fieldKeys, String measurement, String order, Class<?> clasz){
StringBuilder query = new StringBuilder();
query.append("select ").append(fieldKeys).append(" from ").append(measurement);
query.append(" order by ").append(order);
return this.fetchResults(query.toString(), clasz);
}
/**
* 查询,返回对象的list集合
* @param fieldKeys
* @param measurement
* @param order
* @param limit
* @param clasz
* @return
*/
public <T> List<T> fetchResults(String fieldKeys, String measurement, String order, String limit, Class<?> clasz){
StringBuilder query = new StringBuilder();
query.append("select ").append(fieldKeys).append(" from ").append(measurement);
query.append(" order by ").append(order);
query.append(limit);
return this.fetchResults(query.toString(), clasz);
}
}
Java的bean文件不可少,和Mybatis、Hibernate的bean还是有区别的,这里使用了Influxdb-java架包里的注解,针对influxdb的注解;
package com.example.bean;
import org.influxdb.annotation.Column;
import org.influxdb.annotation.Measurement;
import lombok.Data;
//@Builder
@Data
@Measurement(name = "sensor")
public class Sensor {
@Column(name="deviceId",tag=true)
private String deviceId;
@Column(name="temp")
private float temp;
@Column(name="voltage")
private float voltage;
@Column(name="A1")
private float A1;
@Column(name="A2")
private float A2;
@Column(name="time")
private String time;
}
第五步,测试代码的编写;
package com.example.controller;
import java.util.ArrayList;
import java.util.List;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import com.example.bean.Sensor;
import com.example.config.InfluxdbUtils;
import lombok.extern.slf4j.Slf4j;
@Slf4j
@RestController
public class IndexController1 {
@Autowired
private InfluxdbUtils influxdbUtils;
/**
* 插入单挑记录
*/
@GetMapping("/index21")
public void insert(){
// Sensor sensor = Sensor.builder().deviceId("0002")
// .A1(10).A2(10).temp(10L).voltage(10).build();
// influxdbUtils.insertOne(sensor);
}
/**
* 批量插入第一种方式
*/
@GetMapping("/index22")
public void batchInsert(){
List<Sensor> sensorList = new ArrayList<Sensor>();
Sensor sensor = null;
// for(int i=0; i<50; i++){
// sensor = Sensor.builder().deviceId("000"+i)
// .A1(10).A2(10).temp(10).voltage(10).build();
// sensorList.add(sensor);
// }
for(int i=0; i<50; i++){
sensor = new Sensor();
sensor.setA1(2);
sensor.setA2(12);
sensor.setTemp(9);
sensor.setVoltage(12);
sensor.setDeviceId("sensor4545-"+i);
sensorList.add(sensor);
}
influxdbUtils.insertBatchByRecords(sensorList);
}
/**
* 批量插入第二种方式
*/
@GetMapping("/index23")
public void batchInsert1(){
List<Sensor> sensorList = new ArrayList<Sensor>();
Sensor sensor = null;
// for(int i=0; i<50; i++){
// sensor = Sensor.builder().deviceId("000"+i)
// .A1(10).A2(10).temp(10).voltage(10).build();
// sensorList.add(sensor);
// }
for(int i=0; i<50; i++){
sensor = new Sensor();
sensor.setA1(2);
sensor.setA2(12);
sensor.setTemp(9);
sensor.setVoltage(12);
sensor.setDeviceId("sensor4545-"+i);
sensorList.add(sensor);
}
influxdbUtils.insertBatchByPoints(sensorList);
}
/**
* 获取数据
*/
@GetMapping("/datas2")
public void datas(@RequestParam Integer page){
int pageSize = 10;
// InfluxDB支持分页查询,因此可以设置分页查询条件
String pageQuery = " LIMIT " + pageSize + " OFFSET " + (page - 1) * pageSize;
String queryCondition = ""; //查询条件暂且为空
// 此处查询所有内容,如果
String queryCmd = "SELECT * FROM sensor"
// 查询指定设备下的日志信息
// 要指定从 RetentionPolicyName.measurement中查询指定数据,默认的策略可以不加;
// + 策略name + "." + measurement
// 添加查询条件(注意查询条件选择tag值,选择field数值会严重拖慢查询速度)
+ queryCondition
// 查询结果需要按照时间排序
+ " ORDER BY time DESC"
// 添加分页查询条件
+ pageQuery;
List<Object> sensorList = influxdbUtils.fetchRecords(queryCmd);
log.info("query result => {}", sensorList);
}
/**
* 获取数据
*/
@GetMapping("/datas21")
public void datas1(@RequestParam Integer page){
int pageSize = 10;
// InfluxDB支持分页查询,因此可以设置分页查询条件
String pageQuery = " LIMIT " + pageSize + " OFFSET " + (page - 1) * pageSize;
String queryCondition = ""; //查询条件暂且为空
// 此处查询所有内容,如果
String queryCmd = "SELECT * FROM sensor"
// 查询指定设备下的日志信息
// 要指定从 RetentionPolicyName.measurement中查询指定数据,默认的策略可以不加;
// + 策略name + "." + measurement
// 添加查询条件(注意查询条件选择tag值,选择field数值会严重拖慢查询速度)
+ queryCondition
// 查询结果需要按照时间排序
+ " ORDER BY time DESC"
// 添加分页查询条件
+ pageQuery;
List<Sensor> sensorList = influxdbUtils.fetchResults(queryCmd, Sensor.class);
//List<Sensor> sensorList = influxdbUtils.fetchResults("*", "sensor", Sensor.class);
sensorList.forEach(sensor->{
log.info("query sensor => {}", sensor);
});
}
}