package com.iailab.module.data.influxdb.service.impl;
|
|
import com.iailab.module.data.common.utils.ExcelUtil;
|
import com.iailab.framework.common.util.date.DateUtils;
|
import com.iailab.module.data.api.dto.ApiExportValueDTO;
|
import com.iailab.module.data.api.dto.ApiTagValueQueryDTO;
|
import com.iailab.module.data.influxdb.common.config.InfluxDBInstance;
|
import com.iailab.module.data.influxdb.common.enums.DataMeasurement;
|
import com.iailab.module.data.influxdb.common.utils.PointValueUtils;
|
import com.iailab.module.data.influxdb.common.utils.TagValueUtils;
|
import com.iailab.module.data.influxdb.pojo.InfluxPointValuePOJO;
|
import com.iailab.module.data.influxdb.pojo.InfluxTagValuePOJO;
|
import com.iailab.module.data.influxdb.service.InfluxDBService;
|
import com.influxdb.client.InfluxQLQueryApi;
|
import com.influxdb.client.QueryApi;
|
import com.influxdb.client.WriteApi;
|
import com.influxdb.client.WriteApiBlocking;
|
import com.influxdb.client.domain.InfluxQLQuery;
|
import com.influxdb.client.domain.WritePrecision;
|
import com.influxdb.client.write.Point;
|
import com.influxdb.query.FluxRecord;
|
import com.influxdb.query.FluxTable;
|
import com.influxdb.query.InfluxQLQueryResult;
|
import lombok.extern.slf4j.Slf4j;
|
import javax.annotation.Resource;
|
import org.springframework.stereotype.Service;
|
import org.springframework.util.CollectionUtils;
|
|
import javax.servlet.http.HttpServletRequest;
|
import javax.servlet.http.HttpServletResponse;
|
import java.math.BigDecimal;
|
import java.util.*;
|
|
/**
|
* InfluxDB操作类
|
*/
|
@Slf4j
|
@Service
|
public class InfluxDBServiceImpl implements InfluxDBService {
|
|
@Resource
|
private InfluxDBInstance influxDBInstance;
|
|
private WriteApi writeApi;
|
|
private WriteApiBlocking writeApiBlocking;
|
|
private QueryApi queryApi;
|
|
private InfluxQLQueryApi influxQLQueryApi;
|
|
private String VALUE = "value";
|
|
private String TIME = "time";
|
|
private int rawOffset = TimeZone.getDefault().getRawOffset();
|
|
private int pas_ms = 1000;
|
|
@Override
|
public void syncWriteFloatValue(String pointNo, String dataValue, long time) {
|
if (writeApiBlocking == null) {
|
writeApiBlocking = influxDBInstance.getClient().getWriteApiBlocking();
|
}
|
Point point = Point.measurement(DataMeasurement.t_da_sim_value.name());
|
point.addTag("point", pointNo);
|
point.addField("value", Double.parseDouble(dataValue));
|
point.time(time, WritePrecision.MS);
|
writeApiBlocking.writePoint(influxDBInstance.getBucket(), influxDBInstance.org, point);
|
}
|
|
@Override
|
public void syncWriteIntValue(String pointNo, String dataValue, long time) {
|
if (writeApiBlocking == null) {
|
writeApiBlocking = influxDBInstance.getClient().getWriteApiBlocking();
|
}
|
Point point = Point.measurement(DataMeasurement.t_da_dig_value.name());
|
point.addTag("point", pointNo);
|
point.addField("value", Integer.parseInt(dataValue));
|
point.time(time, WritePrecision.MS);
|
writeApiBlocking.writePoint(influxDBInstance.getBucket(), influxDBInstance.org, point);
|
}
|
|
@Override
|
public void syncWriteBooleanValue(String pointNo, String dataValue, long time) {
|
if (writeApiBlocking == null) {
|
writeApiBlocking = influxDBInstance.getClient().getWriteApiBlocking();
|
}
|
Point point = Point.measurement(DataMeasurement.t_da_bool_value.name());
|
point.addTag("point", pointNo);
|
point.addField("value", Boolean.parseBoolean(dataValue));
|
point.time(time, WritePrecision.MS);
|
writeApiBlocking.writePoint(influxDBInstance.getBucket(), influxDBInstance.org, point);
|
}
|
|
@Override
|
public void asyncWritePointValues(List<InfluxPointValuePOJO> pointValues) {
|
if (writeApi == null) {
|
writeApi = influxDBInstance.getClient().makeWriteApi();
|
}
|
if (!CollectionUtils.isEmpty(pointValues)) {
|
pointValues.forEach(item -> {
|
writeApi.writeMeasurement(influxDBInstance.bucket, influxDBInstance.org, WritePrecision.MS, item);
|
});
|
}
|
writeApi.flush();
|
}
|
|
@Override
|
public void asyncWriteTagValues(List<InfluxTagValuePOJO> tagValues) {
|
if (writeApi == null) {
|
writeApi = influxDBInstance.getClient().makeWriteApi();
|
}
|
if (!CollectionUtils.isEmpty(tagValues)) {
|
tagValues.forEach(item -> {
|
log.info("bucket==="+ influxDBInstance.getBucket() + ",org===" + influxDBInstance.org);
|
writeApi.writeMeasurement(influxDBInstance.getBucket(), influxDBInstance.org, WritePrecision.MS, item);
|
});
|
}
|
writeApi.flush();
|
}
|
|
@Override
|
public Map<String, List<Object>> queryTagsValues(List<InfluxTagValuePOJO> influxParams, Date startTime, Date endTime) {
|
if (queryApi == null) {
|
queryApi = influxDBInstance.getClient().getQueryApi();
|
}
|
if (CollectionUtils.isEmpty(influxParams)) {
|
return null;
|
}
|
Map<String, List<Object>> result = new HashMap<>();
|
Calendar calendar = Calendar.getInstance();
|
calendar.set(Calendar.MILLISECOND, 0);
|
// String start = startTime.getTime() - calendar.getTime().getTime() + "ms";
|
String start = startTime.getTime() - calendar.getTime().getTime() - pas_ms + "ms";
|
if (endTime==null){
|
endTime= new Date();
|
}
|
String stop = endTime.getTime() - calendar.getTime().getTime() + "ms";
|
|
for (int i = 0; i < influxParams.size(); i++) {
|
List<Object> dataList = new ArrayList<>();
|
InfluxTagValuePOJO tag = influxParams.get(i);
|
String measurement = TagValueUtils.getMeasurement(tag.getType());
|
StringBuilder sb = new StringBuilder();
|
sb.append("from(bucket:\"" + influxDBInstance.getBucket() + "\") ");
|
sb.append("|> range(start: ").append(start).append(", stop: ").append(stop).append(") ");
|
sb.append("|> filter(fn: (r) => r[\"_measurement\"] == \"" + measurement + "\")");
|
sb.append("|> filter(fn: (r) => r[\"_field\"] == \"value\")");
|
sb.append("|> filter(fn: (r) => r[\"id\"] == \"" + tag.getId() + "\")");
|
sb.append("|> sort(columns: [\"_time\"]) ");
|
sb.append("|> yield(name: \"mean\")");
|
List<FluxTable> tables = queryApi.query(sb.toString(), influxDBInstance.org);
|
|
for (FluxTable table : tables) {
|
List<FluxRecord> records = table.getRecords();
|
for (FluxRecord record : records) {
|
Map<String, Object> dataIem = new HashMap<>(2);
|
dataIem.put(VALUE, record.getValueByKey("_value"));
|
dataIem.put(TIME, Date.from(record.getTime()));
|
dataList.add(dataIem);
|
}
|
}
|
result.put(tag.getId(), dataList);
|
}
|
return result;
|
}
|
|
private List<ApiExportValueDTO> getExportValue(InfluxTagValuePOJO tag, Date startTime, Date endTime) {
|
List<ApiExportValueDTO> dataList = new ArrayList<>();
|
if (queryApi == null) {
|
queryApi = influxDBInstance.getClient().getQueryApi();
|
}
|
Map<String, List<Object>> result = new HashMap<>();
|
Calendar calendar = Calendar.getInstance();
|
calendar.set(Calendar.MILLISECOND, 0);
|
String start = startTime.getTime() - calendar.getTime().getTime() + "ms";
|
String stop = endTime.getTime() - calendar.getTime().getTime() + "ms";
|
|
|
String measurement = TagValueUtils.getMeasurement(tag.getType());
|
StringBuilder sb = new StringBuilder();
|
sb.append("from(bucket:\"" + influxDBInstance.getBucket() + "\") ");
|
sb.append("|> range(start: ").append(start).append(", stop: ").append(stop).append(") ");
|
sb.append("|> filter(fn: (r) => r[\"_measurement\"] == \"" + measurement + "\")");
|
sb.append("|> filter(fn: (r) => r[\"_field\"] == \"value\")");
|
sb.append("|> filter(fn: (r) => r[\"id\"] == \"" + tag.getId() + "\")");
|
sb.append("|> sort(columns: [\"_time\"]) ");
|
sb.append("|> yield(name: \"mean\")");
|
List<FluxTable> tables = queryApi.query(sb.toString(), influxDBInstance.org);
|
|
for (FluxTable table : tables) {
|
List<FluxRecord> records = table.getRecords();
|
for (FluxRecord record : records) {
|
ApiExportValueDTO dataIem = new ApiExportValueDTO();
|
dataIem.setDataValue(record.getValueByKey("_value").toString());
|
dataIem.setDataTime(DateUtils.format(Date.from(record.getTime()), DateUtils.FORMAT_YEAR_MONTH_DAY));
|
dataList.add(dataIem);
|
}
|
}
|
return dataList;
|
}
|
|
@Override
|
public boolean exportTagValue(HttpServletResponse response, HttpServletRequest request, ApiTagValueQueryDTO params) {
|
InfluxTagValuePOJO tag = new InfluxTagValuePOJO();
|
tag.setId(params.getTagId());
|
tag.setType(params.getDataType());
|
List<ApiExportValueDTO> valueList = getExportValue(tag, params.getStart(), params.getEnd());
|
try{
|
String sheetTitle = "采集数据";
|
String[] title = new String[]{"值", "时间"};
|
ExcelUtil.exportPointValue(sheetTitle, title, new ArrayList<>(valueList), response);
|
} catch (Exception ex) {
|
return false;
|
}
|
return true;
|
}
|
|
|
@Override
|
public Map<String, Object> queryPointsLastValue(List<InfluxPointValuePOJO> influxParams) {
|
Map<String, Object> result = new HashMap<>(influxParams.size());
|
if (influxQLQueryApi == null) {
|
influxQLQueryApi = influxDBInstance.getClient().getInfluxQLQueryApi();
|
}
|
for (int i = 0; i < influxParams.size(); i++) {
|
InfluxPointValuePOJO point = influxParams.get(i);
|
String measurement = PointValueUtils.getMeasurement(point.getType());
|
StringBuilder sql = new StringBuilder();
|
sql.append("SELECT LAST(value) FROM ");
|
sql.append(measurement);
|
sql.append(" WHERE point = '");
|
sql.append(point.getPoint());
|
sql.append("'");
|
InfluxQLQueryResult data = influxQLQueryApi.query(new InfluxQLQuery(sql.toString(), influxDBInstance.getBucket()));
|
Object value = data.getResults().get(0).getSeries().get(0).getValues().get(0).getValueByKey("last");
|
result.put(point.getPoint(), value);
|
}
|
return result;
|
}
|
|
@Override
|
public Object queryPointMaxValue(InfluxPointValuePOJO point, Date startTime) {
|
if (influxQLQueryApi == null) {
|
influxQLQueryApi = influxDBInstance.getClient().getInfluxQLQueryApi();
|
}
|
long utcMillis = startTime.getTime() -rawOffset;
|
String utsStart = DateUtils.format(new Date(utcMillis), DateUtils.FORMAT_YEAR_MONTH_DAY);
|
log.info("utsStart=" + utsStart);
|
String measurement = PointValueUtils.getMeasurement(point.getType());
|
StringBuilder sql = new StringBuilder();
|
sql.append("SELECT MAX(value) FROM ");
|
sql.append(measurement);
|
sql.append(" WHERE point = '");
|
sql.append(point.getPoint());
|
sql.append("' AND time >= '" + utsStart +"'");
|
InfluxQLQueryResult data = influxQLQueryApi.query(new InfluxQLQuery(sql.toString(), influxDBInstance.getBucket()));
|
if (data == null) {
|
return null;
|
}
|
return data.getResults().get(0).getSeries().get(0).getValues().get(0).getValues()[1];
|
}
|
|
@Override
|
public List<Map<String, Object>> queryPointValues(InfluxPointValuePOJO pojo, Date startTime, Date endTime) {
|
List<InfluxPointValuePOJO> influxParams = new ArrayList<>();
|
influxParams.add(pojo);
|
Map<String, List<Map<String, Object>>> data = this.queryPointsValues(influxParams, startTime, endTime);
|
return data.get(pojo.getPoint());
|
}
|
|
@Override
|
public Map<String, List<Map<String, Object>>> queryPointsValues(List<InfluxPointValuePOJO> influxParams, Date startTime, Date endTime) {
|
if (queryApi == null) {
|
queryApi = influxDBInstance.getClient().getQueryApi();
|
}
|
if (CollectionUtils.isEmpty(influxParams)) {
|
return null;
|
}
|
Map<String, List<Map<String, Object>>> result = new HashMap<>();
|
Calendar calendar = Calendar.getInstance();
|
calendar.set(Calendar.MILLISECOND, 0);
|
String start = startTime.getTime() - calendar.getTime().getTime() + "ms";
|
if (endTime==null){
|
endTime= new Date();
|
}
|
String stop = endTime.getTime() - calendar.getTime().getTime() + "ms";
|
|
for (int i = 0; i < influxParams.size(); i++) {
|
List<Map<String, Object>> dataList = new ArrayList<>();
|
InfluxPointValuePOJO point = influxParams.get(i);
|
String measurement = PointValueUtils.getMeasurement(point.getType());
|
StringBuilder sb = new StringBuilder();
|
sb.append("from(bucket:\"" + influxDBInstance.getBucket() + "\") ");
|
sb.append("|> range(start: ").append(start).append(", stop: ").append(stop).append(") ");
|
sb.append("|> filter(fn: (r) => r[\"_measurement\"] == \"" + measurement + "\")");
|
sb.append("|> filter(fn: (r) => r[\"_field\"] == \"value\")");
|
sb.append("|> filter(fn: (r) => r[\"point\"] == \"" + point.getPoint() + "\")");
|
sb.append("|> sort(columns: [\"_time\"]) ");
|
sb.append("|> yield(name: \"mean\")");
|
System.out.println("influxdbSql===============" + sb.toString());
|
List<FluxTable> tables = queryApi.query(sb.toString(), influxDBInstance.org);
|
|
for (FluxTable table : tables) {
|
List<FluxRecord> records = table.getRecords();
|
for (FluxRecord record : records) {
|
Map<String, Object> dataIem = new HashMap<>(2);
|
dataIem.put(VALUE, record.getValueByKey("_value"));
|
dataIem.put(TIME, Date.from(record.getTime()));
|
dataList.add(dataIem);
|
}
|
}
|
result.put(point.getPoint(), dataList);
|
}
|
return result;
|
}
|
|
@Override
|
public Map<String, Object> queryPointsSpread(List<InfluxPointValuePOJO> influxParams, Date startTime, Date endTime) {
|
Map<String, Object> result = new HashMap<>();
|
Map<String, List<Map<String, Object>>> data = this.queryPointsValues(influxParams, startTime, endTime);
|
if (CollectionUtils.isEmpty(data)) {
|
return result;
|
}
|
data.forEach((k, v) -> {
|
if (!CollectionUtils.isEmpty(v)) {
|
BigDecimal lastValue = new BigDecimal(v.get(v.size() - 1).get(VALUE).toString());
|
BigDecimal firstValue = new BigDecimal(v.get(0).get(VALUE).toString());
|
BigDecimal spread = (lastValue.subtract(firstValue));
|
result.put(k, spread.compareTo(BigDecimal.ZERO) < 0 ? lastValue : spread);
|
}
|
log.info(k + ",isEmpty");
|
});
|
return result;
|
}
|
}
|