package com.iailab.module.data.influxdb.service.impl;
|
|
import com.iailab.module.data.api.dto.ApiPointValueQueryDTO;
|
import com.iailab.module.data.common.utils.ExcelUtil;
|
import com.iailab.framework.common.util.date.DateUtils;
|
import com.iailab.module.data.api.dto.ApiExportValueDTO;
|
import com.iailab.module.data.api.dto.ApiTagValueQueryDTO;
|
import com.iailab.module.data.influxdb.common.config.InfluxDBInstance;
|
import com.iailab.module.data.influxdb.common.enums.DataMeasurement;
|
import com.iailab.module.data.influxdb.common.utils.PointValueUtils;
|
import com.iailab.module.data.influxdb.common.utils.TagValueUtils;
|
import com.iailab.module.data.influxdb.pojo.InfluxPointValuePOJO;
|
import com.iailab.module.data.influxdb.pojo.InfluxTagValuePOJO;
|
import com.iailab.module.data.influxdb.service.InfluxDBService;
|
import com.iailab.module.data.point.dto.DaPointDTO;
|
import com.iailab.module.data.point.service.DaPointService;
|
import com.iailab.module.data.point.vo.PointValueExportVO;
|
import com.influxdb.client.InfluxQLQueryApi;
|
import com.influxdb.client.QueryApi;
|
import com.influxdb.client.WriteApi;
|
import com.influxdb.client.WriteApiBlocking;
|
import com.influxdb.client.domain.InfluxQLQuery;
|
import com.influxdb.client.domain.WritePrecision;
|
import com.influxdb.client.write.Point;
|
import com.influxdb.query.FluxRecord;
|
import com.influxdb.query.FluxTable;
|
import com.influxdb.query.InfluxQLQueryResult;
|
import lombok.extern.slf4j.Slf4j;
|
import javax.annotation.Resource;
|
|
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.stereotype.Service;
|
import org.springframework.util.CollectionUtils;
|
|
import javax.servlet.http.HttpServletRequest;
|
import javax.servlet.http.HttpServletResponse;
|
import java.math.BigDecimal;
|
import java.text.SimpleDateFormat;
|
import java.util.*;
|
import java.util.stream.Collectors;
|
|
import static com.iailab.framework.common.pojo.CommonResult.success;
|
|
/**
|
* InfluxDB操作类
|
*/
|
@Slf4j
|
@Service
|
public class InfluxDBServiceImpl implements InfluxDBService {
|
|
@Resource
|
private InfluxDBInstance influxDBInstance;
|
|
private WriteApi writeApi;
|
|
@Autowired
|
private DaPointService daPointService;
|
|
private WriteApiBlocking writeApiBlocking;
|
|
private QueryApi queryApi;
|
|
private InfluxQLQueryApi influxQLQueryApi;
|
|
public static final String VALUE = "value";
|
|
public static final String TIME = "time";
|
|
private int rawOffset = TimeZone.getDefault().getRawOffset();
|
|
private int pas_ms = 1000;
|
|
private SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
|
|
@Resource
|
private InfluxDBService influxDBService;
|
|
@Override
|
public void syncWriteFloatValue(String pointNo, String dataValue, long time) {
|
if (writeApiBlocking == null) {
|
writeApiBlocking = influxDBInstance.getClient().getWriteApiBlocking();
|
}
|
Point point = Point.measurement(DataMeasurement.t_da_sim_value.name());
|
point.addTag("point", pointNo);
|
point.addField("value", Double.parseDouble(dataValue));
|
point.time(time, WritePrecision.MS);
|
writeApiBlocking.writePoint(influxDBInstance.getBucket(), influxDBInstance.org, point);
|
}
|
|
@Override
|
public void syncWriteIntValue(String pointNo, String dataValue, long time) {
|
if (writeApiBlocking == null) {
|
writeApiBlocking = influxDBInstance.getClient().getWriteApiBlocking();
|
}
|
Point point = Point.measurement(DataMeasurement.t_da_dig_value.name());
|
point.addTag("point", pointNo);
|
point.addField("value", Integer.parseInt(dataValue));
|
point.time(time, WritePrecision.MS);
|
writeApiBlocking.writePoint(influxDBInstance.getBucket(), influxDBInstance.org, point);
|
}
|
|
@Override
|
public void syncWriteBooleanValue(String pointNo, String dataValue, long time) {
|
if (writeApiBlocking == null) {
|
writeApiBlocking = influxDBInstance.getClient().getWriteApiBlocking();
|
}
|
Point point = Point.measurement(DataMeasurement.t_da_bool_value.name());
|
point.addTag("point", pointNo);
|
point.addField("value", Boolean.parseBoolean(dataValue));
|
point.time(time, WritePrecision.MS);
|
writeApiBlocking.writePoint(influxDBInstance.getBucket(), influxDBInstance.org, point);
|
}
|
|
@Override
|
public void asyncWritePointValues(List<InfluxPointValuePOJO> pointValues) {
|
if (writeApi == null) {
|
writeApi = influxDBInstance.getClient().makeWriteApi();
|
}
|
if (!CollectionUtils.isEmpty(pointValues)) {
|
pointValues.forEach(item -> {
|
String bucket = influxDBInstance.getBucket();
|
writeApi.writeMeasurement(bucket, influxDBInstance.org, WritePrecision.MS, item);
|
});
|
}
|
writeApi.flush();
|
}
|
|
@Override
|
public void asyncWriteTagValues(List<InfluxTagValuePOJO> tagValues) {
|
if (writeApi == null) {
|
writeApi = influxDBInstance.getClient().makeWriteApi();
|
}
|
if (!CollectionUtils.isEmpty(tagValues)) {
|
tagValues.forEach(item -> {
|
log.info("bucket==="+ influxDBInstance.getBucket() + ",org===" + influxDBInstance.org);
|
writeApi.writeMeasurement(influxDBInstance.getBucket(), influxDBInstance.org, WritePrecision.MS, item);
|
});
|
}
|
writeApi.flush();
|
}
|
|
@Override
|
public Map<String, List<Object>> queryTagsValues(List<InfluxTagValuePOJO> influxParams, Date startTime, Date endTime) {
|
if (queryApi == null) {
|
queryApi = influxDBInstance.getClient().getQueryApi();
|
}
|
if (CollectionUtils.isEmpty(influxParams)) {
|
return null;
|
}
|
Map<String, List<Object>> result = new HashMap<>();
|
Calendar calendar = Calendar.getInstance();
|
calendar.set(Calendar.MILLISECOND, 0);
|
// String start = startTime.getTime() - calendar.getTime().getTime() + "ms";
|
String start = startTime.getTime() - calendar.getTime().getTime() - pas_ms + "ms";
|
if (endTime==null){
|
endTime= new Date();
|
}
|
String stop = endTime.getTime() - calendar.getTime().getTime() + "ms";
|
|
for (int i = 0; i < influxParams.size(); i++) {
|
List<Object> dataList = new ArrayList<>();
|
InfluxTagValuePOJO tag = influxParams.get(i);
|
String measurement = TagValueUtils.getMeasurement(tag.getType());
|
StringBuilder sb = new StringBuilder();
|
sb.append("from(bucket:\"" + influxDBInstance.getBucket() + "\") ");
|
sb.append("|> range(start: ").append(start).append(", stop: ").append(stop).append(") ");
|
sb.append("|> filter(fn: (r) => r[\"_measurement\"] == \"" + measurement + "\")");
|
sb.append("|> filter(fn: (r) => r[\"_field\"] == \"value\")");
|
sb.append("|> filter(fn: (r) => r[\"id\"] == \"" + tag.getId() + "\")");
|
sb.append("|> sort(columns: [\"_time\"]) ");
|
sb.append("|> yield(name: \"mean\")");
|
List<FluxTable> tables = queryApi.query(sb.toString(), influxDBInstance.org);
|
|
for (FluxTable table : tables) {
|
List<FluxRecord> records = table.getRecords();
|
for (FluxRecord record : records) {
|
Map<String, Object> dataIem = new HashMap<>(2);
|
dataIem.put(VALUE, record.getValueByKey("_value"));
|
dataIem.put(TIME, Date.from(record.getTime()));
|
dataList.add(dataIem);
|
}
|
}
|
result.put(tag.getId(), dataList);
|
}
|
return result;
|
}
|
|
private List<ApiExportValueDTO> getExportValue(InfluxTagValuePOJO tag, Date startTime, Date endTime) {
|
List<ApiExportValueDTO> dataList = new ArrayList<>();
|
if (queryApi == null) {
|
queryApi = influxDBInstance.getClient().getQueryApi();
|
}
|
Map<String, List<Object>> result = new HashMap<>();
|
Calendar calendar = Calendar.getInstance();
|
calendar.set(Calendar.MILLISECOND, 0);
|
String start = startTime.getTime() - calendar.getTime().getTime() + "ms";
|
String stop = endTime.getTime() - calendar.getTime().getTime() + "ms";
|
|
|
String measurement = TagValueUtils.getMeasurement(tag.getType());
|
StringBuilder sb = new StringBuilder();
|
sb.append("from(bucket:\"" + influxDBInstance.getBucket() + "\") ");
|
sb.append("|> range(start: ").append(start).append(", stop: ").append(stop).append(") ");
|
sb.append("|> filter(fn: (r) => r[\"_measurement\"] == \"" + measurement + "\")");
|
sb.append("|> filter(fn: (r) => r[\"_field\"] == \"value\")");
|
sb.append("|> filter(fn: (r) => r[\"id\"] == \"" + tag.getId() + "\")");
|
sb.append("|> sort(columns: [\"_time\"]) ");
|
sb.append("|> yield(name: \"mean\")");
|
List<FluxTable> tables = queryApi.query(sb.toString(), influxDBInstance.org);
|
|
for (FluxTable table : tables) {
|
List<FluxRecord> records = table.getRecords();
|
for (FluxRecord record : records) {
|
ApiExportValueDTO dataIem = new ApiExportValueDTO();
|
dataIem.setDataValue(record.getValueByKey("_value").toString());
|
dataIem.setDataTime(DateUtils.format(Date.from(record.getTime()), DateUtils.FORMAT_YEAR_MONTH_DAY));
|
dataList.add(dataIem);
|
}
|
}
|
return dataList;
|
}
|
|
@Override
|
public boolean exportTagValue(HttpServletResponse response, HttpServletRequest request, ApiTagValueQueryDTO queryDto) {
|
//构建参数
|
Map<String, Object> params = new HashMap<>(1);
|
params.put("pointNos", queryDto.getTagIds());
|
|
//查询point列表
|
List<DaPointDTO> pointList = daPointService.list(params);
|
if (CollectionUtils.isEmpty(pointList)) {
|
return true;
|
}
|
|
//插入pointType
|
List<InfluxPointValuePOJO> influxParams = pointList.stream().map(item -> {
|
InfluxPointValuePOJO pojo = new InfluxPointValuePOJO();
|
pojo.setPoint(item.getTagNo());
|
pojo.setType(item.getDataType());
|
return pojo;
|
}).collect(Collectors.toList());
|
|
//查询
|
Map<String, List<Map<String, Object>>> data = queryPointsValues(influxParams, queryDto.getStart(), queryDto.getEnd());
|
|
//提取list
|
List<Map<String, Object>> list = data.get(queryDto.getTagIds().get(0));
|
|
//导出
|
try{
|
String sheetTitle = "采集数据";
|
String[] title = new String[]{"值", "时间"};
|
ExcelUtil.exportPointValue(sheetTitle, title, new ArrayList<>(list), response);
|
} catch (Exception ex) {
|
return false;
|
}
|
return true;
|
}
|
|
@Override
|
public List<PointValueExportVO> exportPointValue(ApiPointValueQueryDTO queryDto) {
|
|
List<PointValueExportVO> pointValueExportList = new ArrayList<>();
|
|
//构建参数
|
Map<String, Object> params = new HashMap<>(1);
|
params.put("pointNos", queryDto.getPointNos());
|
|
//查询point列表
|
List<DaPointDTO> pointList = daPointService.list(params);
|
if (CollectionUtils.isEmpty(pointList)) {
|
return pointValueExportList;
|
}
|
|
//插入pointType
|
List<InfluxPointValuePOJO> influxParams = pointList.stream().map(item -> {
|
InfluxPointValuePOJO pojo = new InfluxPointValuePOJO();
|
pojo.setPoint(item.getPointNo());
|
pojo.setType(item.getDataType());
|
return pojo;
|
}).collect(Collectors.toList());
|
|
//查询
|
Map<String, List<Map<String, Object>>> data = queryPointsValues(influxParams, queryDto.getStart(), queryDto.getEnd());
|
|
//提取list
|
List<Map<String, Object>> list = data.get(queryDto.getPointNos().get(0));
|
for(Map<String, Object> map : list){
|
PointValueExportVO dto = new PointValueExportVO();
|
dto.setDatatime(map.get("time").toString());
|
dto.setDatavalue(map.get("value").toString());
|
pointValueExportList.add(dto);
|
}
|
|
return pointValueExportList;
|
}
|
|
@Override
|
public Map<String, Object> queryPointsLastValue(List<InfluxPointValuePOJO> influxParams) {
|
Map<String, Object> result = new HashMap<>(influxParams.size());
|
if (influxQLQueryApi == null) {
|
influxQLQueryApi = influxDBInstance.getClient().getInfluxQLQueryApi();
|
}
|
for (int i = 0; i < influxParams.size(); i++) {
|
InfluxPointValuePOJO point = influxParams.get(i);
|
String measurement = PointValueUtils.getMeasurement(point.getType());
|
StringBuilder sql = new StringBuilder();
|
sql.append("SELECT LAST(value) FROM ");
|
sql.append(measurement);
|
sql.append(" WHERE point = '");
|
sql.append(point.getPoint());
|
sql.append("'");
|
InfluxQLQueryResult data = influxQLQueryApi.query(new InfluxQLQuery(sql.toString(), influxDBInstance.getBucket()));
|
Object value = data.getResults().get(0).getSeries().get(0).getValues().get(0).getValueByKey("last");
|
result.put(point.getPoint(), value);
|
}
|
return result;
|
}
|
|
@Override
|
public Object queryPointMaxValue(InfluxPointValuePOJO point, Date startTime) {
|
if (influxQLQueryApi == null) {
|
influxQLQueryApi = influxDBInstance.getClient().getInfluxQLQueryApi();
|
}
|
long utcMillis = startTime.getTime() -rawOffset;
|
String utsStart = DateUtils.format(new Date(utcMillis), DateUtils.FORMAT_YEAR_MONTH_DAY);
|
log.info("utsStart=" + utsStart);
|
String measurement = PointValueUtils.getMeasurement(point.getType());
|
StringBuilder sql = new StringBuilder();
|
sql.append("SELECT MAX(value) FROM ");
|
sql.append(measurement);
|
sql.append(" WHERE point = '");
|
sql.append(point.getPoint());
|
sql.append("' AND time >= '" + utsStart +"'");
|
InfluxQLQueryResult data = influxQLQueryApi.query(new InfluxQLQuery(sql.toString(), influxDBInstance.getBucket()));
|
if (data == null) {
|
return null;
|
}
|
return data.getResults().get(0).getSeries().get(0).getValues().get(0).getValues()[1];
|
}
|
|
@Override
|
public List<Map<String, Object>> queryPointValues(InfluxPointValuePOJO pojo, Date startTime, Date endTime) {
|
List<InfluxPointValuePOJO> influxParams = new ArrayList<>();
|
influxParams.add(pojo);
|
Map<String, List<Map<String, Object>>> data = this.queryPointsValues(influxParams, startTime, endTime);
|
return data.get(pojo.getPoint());
|
}
|
|
@Override
|
public Map<String, List<Map<String, Object>>> queryPointsValues(List<InfluxPointValuePOJO> influxParams, Date startTime, Date endTime) {
|
if (queryApi == null) {
|
queryApi = influxDBInstance.getClient().getQueryApi();
|
}
|
if (CollectionUtils.isEmpty(influxParams)) {
|
return null;
|
}
|
Map<String, List<Map<String, Object>>> result = new HashMap<>();
|
Calendar calendar = Calendar.getInstance();
|
calendar.set(Calendar.MILLISECOND, 0);
|
String start = startTime.getTime() - calendar.getTime().getTime() + "ms";
|
if (endTime==null){
|
endTime= new Date();
|
}
|
String stop = endTime.getTime() - calendar.getTime().getTime() + "ms";
|
|
for (int i = 0; i < influxParams.size(); i++) {
|
List<Map<String, Object>> dataList = new ArrayList<>();
|
InfluxPointValuePOJO point = influxParams.get(i);
|
String measurement = PointValueUtils.getMeasurement(point.getType());
|
StringBuilder sb = new StringBuilder();
|
sb.append("from(bucket:\"" + influxDBInstance.getBucket() + "\") ");
|
sb.append("|> range(start: ").append(start).append(", stop: ").append(stop).append(") ");
|
sb.append("|> filter(fn: (r) => r[\"_measurement\"] == \"" + measurement + "\")");
|
sb.append("|> filter(fn: (r) => r[\"_field\"] == \"value\")");
|
sb.append("|> filter(fn: (r) => r[\"point\"] == \"" + point.getPoint() + "\")");
|
sb.append("|> sort(columns: [\"_time\"]) ");
|
sb.append("|> yield(name: \"mean\")");
|
System.out.println("influxdbSql===============" + sb.toString());
|
List<FluxTable> tables = queryApi.query(sb.toString(), influxDBInstance.org);
|
|
for (FluxTable table : tables) {
|
List<FluxRecord> records = table.getRecords();
|
for (FluxRecord record : records) {
|
Map<String, Object> dataIem = new HashMap<>(2);
|
dataIem.put(VALUE, record.getValueByKey("_value"));
|
dataIem.put(TIME, sdf.format(Date.from(record.getTime())));
|
dataList.add(dataIem);
|
}
|
}
|
result.put(point.getPoint(), dataList);
|
}
|
return result;
|
}
|
|
@Override
|
public Map<String, Object> queryPointsSpread(List<InfluxPointValuePOJO> influxParams, Date startTime, Date endTime) {
|
Map<String, Object> result = new HashMap<>();
|
Map<String, List<Map<String, Object>>> data = this.queryPointsValues(influxParams, startTime, endTime);
|
if (CollectionUtils.isEmpty(data)) {
|
return result;
|
}
|
data.forEach((k, v) -> {
|
if (!CollectionUtils.isEmpty(v)) {
|
BigDecimal lastValue = new BigDecimal(v.get(v.size() - 1).get(VALUE).toString());
|
BigDecimal firstValue = new BigDecimal(v.get(0).get(VALUE).toString());
|
BigDecimal spread = (lastValue.subtract(firstValue));
|
result.put(k, spread.compareTo(BigDecimal.ZERO) < 0 ? lastValue : spread);
|
}
|
log.info(k + ",isEmpty");
|
});
|
return result;
|
}
|
}
|