package com.iailab.module.model.influxdb.service.impl;
|
|
import com.iailab.module.model.influxdb.common.config.InfluxDBInstance;
|
import com.iailab.module.model.influxdb.common.utils.MeasurementUtils;
|
import com.iailab.module.model.influxdb.pojo.InfluxModelResultPOJO;
|
import com.iailab.module.model.influxdb.service.InfluxDBService;
|
import com.iailab.module.model.influxdb.vo.InfluxModelResultVO;
|
import com.influxdb.client.QueryApi;
|
import com.influxdb.client.WriteApi;
|
import com.influxdb.client.domain.WritePrecision;
|
import com.influxdb.query.FluxRecord;
|
import com.influxdb.query.FluxTable;
|
import lombok.extern.slf4j.Slf4j;
|
import org.springframework.stereotype.Service;
|
import org.springframework.util.CollectionUtils;
|
|
import javax.annotation.PostConstruct;
|
import javax.annotation.Resource;
|
import java.util.*;
|
|
/**
|
* InfluxDB操作类
|
*/
|
@Slf4j
|
@Service
|
public class InfluxDBServiceImpl implements InfluxDBService {
|
@Resource
|
private InfluxDBInstance influxDBInstance;
|
private WriteApi writeApi;
|
private QueryApi queryApi;
|
|
@PostConstruct
|
private void init() {
|
writeApi = influxDBInstance.getClient().makeWriteApi();
|
queryApi = influxDBInstance.getClient().getQueryApi();
|
}
|
|
@Override
|
public void asyncWriteModelResults(List<InfluxModelResultPOJO> pointValues) {
|
if (!CollectionUtils.isEmpty(pointValues)) {
|
pointValues.forEach(item -> {
|
String bucket = influxDBInstance.getBucket();
|
writeApi.writeMeasurement(bucket, influxDBInstance.org, WritePrecision.MS, item);
|
});
|
}
|
writeApi.flush();
|
}
|
|
@Override
|
public List<InfluxModelResultVO> queryModelResults(InfluxModelResultPOJO pojo, Date startTime, Date endTime) {
|
Calendar calendar = Calendar.getInstance();
|
calendar.set(Calendar.MILLISECOND, 0);
|
String start = startTime.toInstant().toString();
|
|
if (startTime.getTime() == endTime.getTime()) {
|
// 如果相等,则engTime加1毫秒,负责influxdb报错(因为influxdb的range函数是左闭右开区间,所以将engTime加一毫秒,才可以查到startTime时间点的数据)
|
endTime.setTime(endTime.getTime() + 1);
|
}
|
String stop = endTime.toInstant().toString();
|
|
List<InfluxModelResultVO> dataList = new ArrayList<>();
|
String measurement = MeasurementUtils.getMeasurement(pojo.getType());
|
StringBuilder sb = new StringBuilder();
|
sb.append("from(bucket:\"" + influxDBInstance.getBucket() + "\") ");
|
sb.append("|> range(start: ").append(start).append(", stop: ").append(stop).append(") ");
|
sb.append("|> filter(fn: (r) => r[\"_measurement\"] == \"" + measurement + "\")");
|
sb.append("|> filter(fn: (r) => r[\"_field\"] == \"value\")");
|
sb.append("|> filter(fn: (r) => r[\"outPutId\"] == \"" + pojo.getOutPutId() + "\")");
|
sb.append("|> sort(columns: [\"_time\"]) ");
|
sb.append("|> yield(name: \"mean\")");
|
log.info("influxdbSql===============" + sb);
|
List<FluxTable> tables = queryApi.query(sb.toString(), influxDBInstance.org);
|
|
for (FluxTable table : tables) {
|
List<FluxRecord> records = table.getRecords();
|
for (FluxRecord record : records) {
|
InfluxModelResultVO vo = new InfluxModelResultVO(record.getValueByKey("_value"),record.getTime());
|
dataList.add(vo);
|
}
|
}
|
return dataList;
|
}
|
}
|