dengzedong
2025-01-14 22e32104183a26e0414e44ac5f0c9ba805a9e524
iailab-module-model/iailab-module-model-biz/src/main/java/com/iailab/module/model/influxdb/service/impl/InfluxDBServiceImpl.java
@@ -2,6 +2,7 @@
import com.iailab.module.model.influxdb.common.config.InfluxDBInstance;
import com.iailab.module.model.influxdb.common.utils.MeasurementUtils;
import com.iailab.module.model.influxdb.pojo.InfluxModelResultByOutPutIdsPOJO;
import com.iailab.module.model.influxdb.pojo.InfluxModelResultPOJO;
import com.iailab.module.model.influxdb.service.InfluxDBService;
import com.iailab.module.model.influxdb.vo.InfluxModelResultVO;
@@ -17,6 +18,7 @@
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.util.*;
import java.util.stream.Collectors;
/**
 * InfluxDB操作类
@@ -80,4 +82,48 @@
        }
        return dataList;
    }
    @Override
    public Map<String, List<InfluxModelResultVO>> queryModelResultsByOutPutIds(InfluxModelResultByOutPutIdsPOJO pojo, Date startTime, Date endTime) {
        Calendar calendar = Calendar.getInstance();
        calendar.set(Calendar.MILLISECOND, 0);
        String start = startTime.toInstant().toString();
        if (startTime.getTime() == endTime.getTime()) {
            // 如果相等,则engTime加1毫秒,否则influxdb报错(因为influxdb的range函数是左闭右开区间,所以将engTime加一毫秒,才可以查到startTime时间点的数据)
            endTime.setTime(endTime.getTime() + 1);
        }
        String stop = endTime.toInstant().toString();
        String measurement = MeasurementUtils.getMeasurement(pojo.getType());
        // 拼接OutPutIds
        String outPutIdsFilter = pojo.getOutPutIds().stream().map(id -> "r[\"outPutId\"] == \"" + id + "\"").collect(Collectors.joining(" or "));
        StringBuilder sb = new StringBuilder();
        sb.append("from(bucket:\"" + influxDBInstance.getBucket() + "\") ");
        sb.append("|> range(start: ").append(start).append(", stop: ").append(stop).append(") ");
        sb.append("|> filter(fn: (r) => r[\"_measurement\"] == \"" + measurement + "\")");
        sb.append("|> filter(fn: (r) => r[\"_field\"] == \"value\")");
        sb.append("|> filter(fn: (r) => " + outPutIdsFilter + ")");
        sb.append("|> sort(columns: [\"_time\"]) ");
        sb.append("|> yield(name: \"mean\")");
        log.info("influxdbSql===============" + sb);
        List<FluxTable> tables = queryApi.query(sb.toString(), influxDBInstance.org);
        Map<String, List<InfluxModelResultVO>> result = new HashMap<>(pojo.getOutPutIds().size());
        for (FluxTable table : tables) {
            List<FluxRecord> records = table.getRecords();
            for (FluxRecord record : records) {
                String outPutId = record.getValueByKey("outPutId").toString();
                if (result.containsKey(outPutId)) {
                    result.get(outPutId).add( new InfluxModelResultVO(record.getValueByKey("_value"),record.getTime()));
                } else {
                    List<InfluxModelResultVO> dataList = new ArrayList<>();
                    InfluxModelResultVO vo = new InfluxModelResultVO(record.getValueByKey("_value"),record.getTime());
                    dataList.add(vo);
                    result.put(outPutId,dataList);
                }
            }
        }
        return result;
    }
}