| | |
| | | |
| | | import com.iailab.module.model.influxdb.common.config.InfluxDBInstance; |
| | | import com.iailab.module.model.influxdb.common.utils.MeasurementUtils; |
| | | import com.iailab.module.model.influxdb.pojo.InfluxModelResultByOutPutIdsPOJO; |
| | | import com.iailab.module.model.influxdb.pojo.InfluxModelResultPOJO; |
| | | import com.iailab.module.model.influxdb.service.InfluxDBService; |
| | | import com.iailab.module.model.influxdb.vo.InfluxModelResultVO; |
| | |
| | | import javax.annotation.PostConstruct; |
| | | import javax.annotation.Resource; |
| | | import java.util.*; |
| | | import java.util.stream.Collectors; |
| | | |
| | | /** |
| | | * InfluxDB操作类 |
| | |
| | | } |
| | | return dataList; |
| | | } |
| | | |
| | | @Override |
| | | public Map<String, List<InfluxModelResultVO>> queryModelResultsByOutPutIds(InfluxModelResultByOutPutIdsPOJO pojo, Date startTime, Date endTime) { |
| | | Calendar calendar = Calendar.getInstance(); |
| | | calendar.set(Calendar.MILLISECOND, 0); |
| | | String start = startTime.toInstant().toString(); |
| | | |
| | | if (startTime.getTime() == endTime.getTime()) { |
| | | // 如果相等,则engTime加1毫秒,否则influxdb报错(因为influxdb的range函数是左闭右开区间,所以将engTime加一毫秒,才可以查到startTime时间点的数据) |
| | | endTime.setTime(endTime.getTime() + 1); |
| | | } |
| | | String stop = endTime.toInstant().toString(); |
| | | |
| | | String measurement = MeasurementUtils.getMeasurement(pojo.getType()); |
| | | // 拼接OutPutIds |
| | | String outPutIdsFilter = pojo.getOutPutIds().stream().map(id -> "r[\"outPutId\"] == \"" + id + "\"").collect(Collectors.joining(" or ")); |
| | | StringBuilder sb = new StringBuilder(); |
| | | sb.append("from(bucket:\"" + influxDBInstance.getBucket() + "\") "); |
| | | sb.append("|> range(start: ").append(start).append(", stop: ").append(stop).append(") "); |
| | | sb.append("|> filter(fn: (r) => r[\"_measurement\"] == \"" + measurement + "\")"); |
| | | sb.append("|> filter(fn: (r) => r[\"_field\"] == \"value\")"); |
| | | sb.append("|> filter(fn: (r) => " + outPutIdsFilter + ")"); |
| | | sb.append("|> sort(columns: [\"_time\"]) "); |
| | | sb.append("|> yield(name: \"mean\")"); |
| | | log.info("influxdbSql===============" + sb); |
| | | List<FluxTable> tables = queryApi.query(sb.toString(), influxDBInstance.org); |
| | | |
| | | Map<String, List<InfluxModelResultVO>> result = new HashMap<>(pojo.getOutPutIds().size()); |
| | | for (FluxTable table : tables) { |
| | | List<FluxRecord> records = table.getRecords(); |
| | | for (FluxRecord record : records) { |
| | | String outPutId = record.getValueByKey("outPutId").toString(); |
| | | if (result.containsKey(outPutId)) { |
| | | result.get(outPutId).add( new InfluxModelResultVO(record.getValueByKey("_value"),record.getTime())); |
| | | } else { |
| | | List<InfluxModelResultVO> dataList = new ArrayList<>(); |
| | | InfluxModelResultVO vo = new InfluxModelResultVO(record.getValueByKey("_value"),record.getTime()); |
| | | dataList.add(vo); |
| | | result.put(outPutId,dataList); |
| | | } |
| | | } |
| | | } |
| | | return result; |
| | | } |
| | | } |