package com.iailab.module.model.influxdb.service.impl; import com.iailab.module.model.influxdb.common.config.InfluxDBInstance; import com.iailab.module.model.influxdb.common.utils.MeasurementUtils; import com.iailab.module.model.influxdb.pojo.InfluxModelResultByOutPutIdsPOJO; import com.iailab.module.model.influxdb.pojo.InfluxModelResultPOJO; import com.iailab.module.model.influxdb.service.InfluxDBService; import com.iailab.module.model.influxdb.vo.InfluxModelResultVO; import com.influxdb.client.QueryApi; import com.influxdb.client.WriteApi; import com.influxdb.client.domain.WritePrecision; import com.influxdb.query.FluxRecord; import com.influxdb.query.FluxTable; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Service; import org.springframework.util.CollectionUtils; import javax.annotation.PostConstruct; import javax.annotation.Resource; import java.util.*; import java.util.stream.Collectors; /** * InfluxDB操作类 */ @Slf4j @Service public class InfluxDBServiceImpl implements InfluxDBService { @Resource private InfluxDBInstance influxDBInstance; private WriteApi writeApi; private QueryApi queryApi; @PostConstruct private void init() { writeApi = influxDBInstance.getClient().makeWriteApi(); queryApi = influxDBInstance.getClient().getQueryApi(); } @Override public void asyncWriteModelResults(List pointValues) { if (!CollectionUtils.isEmpty(pointValues)) { pointValues.forEach(item -> { String bucket = influxDBInstance.getBucket(); writeApi.writeMeasurement(bucket, influxDBInstance.org, WritePrecision.MS, item); }); } writeApi.flush(); } @Override public List queryModelResults(InfluxModelResultPOJO pojo, Date startTime, Date endTime) { Calendar calendar = Calendar.getInstance(); calendar.set(Calendar.MILLISECOND, 0); String start = startTime.toInstant().toString(); if (startTime.getTime() == endTime.getTime()) { // 如果相等,则engTime加1毫秒,否则influxdb报错(因为influxdb的range函数是左闭右开区间,所以将engTime加一毫秒,才可以查到startTime时间点的数据) endTime.setTime(endTime.getTime() + 1); } String stop = endTime.toInstant().toString(); List dataList = new ArrayList<>(); String measurement = MeasurementUtils.getMeasurement(pojo.getType()); StringBuilder sb = new StringBuilder(); sb.append("from(bucket:\"" + influxDBInstance.getBucket() + "\") "); sb.append("|> range(start: ").append(start).append(", stop: ").append(stop).append(") "); sb.append("|> filter(fn: (r) => r[\"_measurement\"] == \"" + measurement + "\")"); sb.append("|> filter(fn: (r) => r[\"_field\"] == \"value\")"); sb.append("|> filter(fn: (r) => r[\"outPutId\"] == \"" + pojo.getOutPutId() + "\")"); sb.append("|> sort(columns: [\"_time\"]) "); sb.append("|> yield(name: \"mean\")"); log.info("influxdbSql===============" + sb); List tables = queryApi.query(sb.toString(), influxDBInstance.org); for (FluxTable table : tables) { List records = table.getRecords(); for (FluxRecord record : records) { InfluxModelResultVO vo = new InfluxModelResultVO(record.getValueByKey("_value"),record.getTime()); dataList.add(vo); } } return dataList; } @Override public Map> queryModelResultsByOutPutIds(InfluxModelResultByOutPutIdsPOJO pojo, Date startTime, Date endTime) { Calendar calendar = Calendar.getInstance(); calendar.set(Calendar.MILLISECOND, 0); String start = startTime.toInstant().toString(); if (startTime.getTime() == endTime.getTime()) { // 如果相等,则engTime加1毫秒,否则influxdb报错(因为influxdb的range函数是左闭右开区间,所以将engTime加一毫秒,才可以查到startTime时间点的数据) endTime.setTime(endTime.getTime() + 1); } String stop = endTime.toInstant().toString(); String measurement = MeasurementUtils.getMeasurement(pojo.getType()); // 拼接OutPutIds String outPutIdsFilter = pojo.getOutPutIds().stream().map(id -> "r[\"outPutId\"] == \"" + id + "\"").collect(Collectors.joining(" or ")); StringBuilder sb = new StringBuilder(); sb.append("from(bucket:\"" + influxDBInstance.getBucket() + "\") "); sb.append("|> range(start: ").append(start).append(", stop: ").append(stop).append(") "); sb.append("|> filter(fn: (r) => r[\"_measurement\"] == \"" + measurement + "\")"); sb.append("|> filter(fn: (r) => r[\"_field\"] == \"value\")"); sb.append("|> filter(fn: (r) => " + outPutIdsFilter + ")"); sb.append("|> sort(columns: [\"_time\"]) "); sb.append("|> yield(name: \"mean\")"); log.info("influxdbSql===============" + sb); List tables = queryApi.query(sb.toString(), influxDBInstance.org); Map> result = new HashMap<>(pojo.getOutPutIds().size()); for (FluxTable table : tables) { List records = table.getRecords(); for (FluxRecord record : records) { String outPutId = record.getValueByKey("outPutId").toString(); if (result.containsKey(outPutId)) { result.get(outPutId).add( new InfluxModelResultVO(record.getValueByKey("_value"),record.getTime())); } else { List dataList = new ArrayList<>(); InfluxModelResultVO vo = new InfluxModelResultVO(record.getValueByKey("_value"),record.getTime()); dataList.add(vo); result.put(outPutId,dataList); } } } return result; } }