liriming
2025-01-14 0809aa554fc906e73a383c34f88bb4153bb69b00
提交 | 用户 | 时间
a955f1 1 package com.iailab.module.model.influxdb.service.impl;
D 2
3 import com.iailab.module.model.influxdb.common.config.InfluxDBInstance;
4 import com.iailab.module.model.influxdb.common.utils.MeasurementUtils;
22e321 5 import com.iailab.module.model.influxdb.pojo.InfluxModelResultByOutPutIdsPOJO;
a955f1 6 import com.iailab.module.model.influxdb.pojo.InfluxModelResultPOJO;
D 7 import com.iailab.module.model.influxdb.service.InfluxDBService;
8 import com.iailab.module.model.influxdb.vo.InfluxModelResultVO;
9 import com.influxdb.client.QueryApi;
10 import com.influxdb.client.WriteApi;
11 import com.influxdb.client.domain.WritePrecision;
12 import com.influxdb.query.FluxRecord;
13 import com.influxdb.query.FluxTable;
14 import lombok.extern.slf4j.Slf4j;
15 import org.springframework.stereotype.Service;
16 import org.springframework.util.CollectionUtils;
17
18 import javax.annotation.PostConstruct;
19 import javax.annotation.Resource;
20 import java.util.*;
22e321 21 import java.util.stream.Collectors;
a955f1 22
D 23 /**
24  * InfluxDB操作类
25  */
26 @Slf4j
27 @Service
28 public class InfluxDBServiceImpl implements InfluxDBService {
29     @Resource
30     private InfluxDBInstance influxDBInstance;
31     private WriteApi writeApi;
32     private QueryApi queryApi;
33
34     @PostConstruct
35     private void init() {
36         writeApi = influxDBInstance.getClient().makeWriteApi();
37         queryApi = influxDBInstance.getClient().getQueryApi();
38     }
39
40     @Override
41     public void asyncWriteModelResults(List<InfluxModelResultPOJO> pointValues) {
42         if (!CollectionUtils.isEmpty(pointValues)) {
43             pointValues.forEach(item -> {
44                 String bucket = influxDBInstance.getBucket();
45                 writeApi.writeMeasurement(bucket, influxDBInstance.org, WritePrecision.MS, item);
46             });
47         }
48         writeApi.flush();
49     }
50
51     @Override
52     public List<InfluxModelResultVO> queryModelResults(InfluxModelResultPOJO pojo, Date startTime, Date endTime) {
53         Calendar calendar = Calendar.getInstance();
54         calendar.set(Calendar.MILLISECOND, 0);
1d4ac2 55         String start = startTime.toInstant().toString();
D 56
57         if (startTime.getTime() == endTime.getTime()) {
033e69 58             // 如果相等,则engTime加1毫秒,否则influxdb报错(因为influxdb的range函数是左闭右开区间,所以将engTime加一毫秒,才可以查到startTime时间点的数据)
1d4ac2 59             endTime.setTime(endTime.getTime() + 1);
a955f1 60         }
1d4ac2 61         String stop = endTime.toInstant().toString();
a955f1 62
D 63         List<InfluxModelResultVO> dataList = new ArrayList<>();
64         String measurement = MeasurementUtils.getMeasurement(pojo.getType());
65         StringBuilder sb = new StringBuilder();
66         sb.append("from(bucket:\"" + influxDBInstance.getBucket() + "\") ");
67         sb.append("|> range(start: ").append(start).append(", stop: ").append(stop).append(") ");
68         sb.append("|> filter(fn: (r) => r[\"_measurement\"] == \"" + measurement + "\")");
69         sb.append("|> filter(fn: (r) => r[\"_field\"] == \"value\")");
70         sb.append("|> filter(fn: (r) => r[\"outPutId\"] == \"" + pojo.getOutPutId() + "\")");
71         sb.append("|> sort(columns: [\"_time\"]) ");
72         sb.append("|> yield(name: \"mean\")");
73         log.info("influxdbSql===============" + sb);
74         List<FluxTable> tables = queryApi.query(sb.toString(), influxDBInstance.org);
75
76         for (FluxTable table : tables) {
77             List<FluxRecord> records = table.getRecords();
78             for (FluxRecord record : records) {
79                 InfluxModelResultVO vo = new InfluxModelResultVO(record.getValueByKey("_value"),record.getTime());
80                 dataList.add(vo);
81             }
82         }
83         return dataList;
84     }
22e321 85
D 86     @Override
87     public Map<String, List<InfluxModelResultVO>> queryModelResultsByOutPutIds(InfluxModelResultByOutPutIdsPOJO pojo, Date startTime, Date endTime) {
88         Calendar calendar = Calendar.getInstance();
89         calendar.set(Calendar.MILLISECOND, 0);
90         String start = startTime.toInstant().toString();
91
92         if (startTime.getTime() == endTime.getTime()) {
93             // 如果相等,则engTime加1毫秒,否则influxdb报错(因为influxdb的range函数是左闭右开区间,所以将engTime加一毫秒,才可以查到startTime时间点的数据)
94             endTime.setTime(endTime.getTime() + 1);
95         }
96         String stop = endTime.toInstant().toString();
97
98         String measurement = MeasurementUtils.getMeasurement(pojo.getType());
99         // 拼接OutPutIds
100         String outPutIdsFilter = pojo.getOutPutIds().stream().map(id -> "r[\"outPutId\"] == \"" + id + "\"").collect(Collectors.joining(" or "));
101         StringBuilder sb = new StringBuilder();
102         sb.append("from(bucket:\"" + influxDBInstance.getBucket() + "\") ");
103         sb.append("|> range(start: ").append(start).append(", stop: ").append(stop).append(") ");
104         sb.append("|> filter(fn: (r) => r[\"_measurement\"] == \"" + measurement + "\")");
105         sb.append("|> filter(fn: (r) => r[\"_field\"] == \"value\")");
106         sb.append("|> filter(fn: (r) => " + outPutIdsFilter + ")");
107         sb.append("|> sort(columns: [\"_time\"]) ");
108         sb.append("|> yield(name: \"mean\")");
109         log.info("influxdbSql===============" + sb);
110         List<FluxTable> tables = queryApi.query(sb.toString(), influxDBInstance.org);
111
112         Map<String, List<InfluxModelResultVO>> result = new HashMap<>(pojo.getOutPutIds().size());
113         for (FluxTable table : tables) {
114             List<FluxRecord> records = table.getRecords();
115             for (FluxRecord record : records) {
116                 String outPutId = record.getValueByKey("outPutId").toString();
117                 if (result.containsKey(outPutId)) {
118                     result.get(outPutId).add( new InfluxModelResultVO(record.getValueByKey("_value"),record.getTime()));
119                 } else {
120                     List<InfluxModelResultVO> dataList = new ArrayList<>();
121                     InfluxModelResultVO vo = new InfluxModelResultVO(record.getValueByKey("_value"),record.getTime());
122                     dataList.add(vo);
123                     result.put(outPutId,dataList);
124                 }
125             }
126         }
127         return result;
128     }
a955f1 129 }