潘志宝
2024-08-21 4d3533b6e75e6afa5af325288b03915715add4b6
提交 | 用户 | 时间
a6de49 1 package com.iailab.module.data.influxdb.service.impl;
H 2
3 import com.iailab.module.data.common.utils.ExcelUtil;
4 import com.iailab.framework.common.util.date.DateUtils;
5 import com.iailab.module.data.api.dto.ApiExportValueDTO;
6 import com.iailab.module.data.api.dto.ApiTagValueQueryDTO;
7 import com.iailab.module.data.influxdb.common.config.InfluxDBInstance;
8 import com.iailab.module.data.influxdb.common.enums.DataMeasurement;
9 import com.iailab.module.data.influxdb.common.utils.PointValueUtils;
10 import com.iailab.module.data.influxdb.common.utils.TagValueUtils;
11 import com.iailab.module.data.influxdb.pojo.InfluxPointValuePOJO;
12 import com.iailab.module.data.influxdb.pojo.InfluxTagValuePOJO;
13 import com.iailab.module.data.influxdb.service.InfluxDBService;
14 import com.influxdb.client.InfluxQLQueryApi;
15 import com.influxdb.client.QueryApi;
16 import com.influxdb.client.WriteApi;
17 import com.influxdb.client.WriteApiBlocking;
18 import com.influxdb.client.domain.InfluxQLQuery;
19 import com.influxdb.client.domain.WritePrecision;
20 import com.influxdb.client.write.Point;
21 import com.influxdb.query.FluxRecord;
22 import com.influxdb.query.FluxTable;
23 import com.influxdb.query.InfluxQLQueryResult;
24 import lombok.extern.slf4j.Slf4j;
25 import javax.annotation.Resource;
26 import org.springframework.stereotype.Service;
27 import org.springframework.util.CollectionUtils;
28
29 import javax.servlet.http.HttpServletRequest;
30 import javax.servlet.http.HttpServletResponse;
31 import java.math.BigDecimal;
32 import java.util.*;
33
34 /**
35  * InfluxDB操作类
36  */
37 @Slf4j
38 @Service
39 public class InfluxDBServiceImpl implements InfluxDBService {
40
41     @Resource
42     private InfluxDBInstance influxDBInstance;
43
44     private WriteApi writeApi;
45
46     private WriteApiBlocking writeApiBlocking;
47
48     private QueryApi queryApi;
49
50     private InfluxQLQueryApi influxQLQueryApi;
51
52     private String VALUE = "value";
53
54     private String TIME = "time";
55
56     private int rawOffset = TimeZone.getDefault().getRawOffset();
57
58     private int pas_ms = 1000;
59
60     @Override
61     public void syncWriteFloatValue(String pointNo, String dataValue, long time) {
62         if (writeApiBlocking == null) {
63             writeApiBlocking = influxDBInstance.getClient().getWriteApiBlocking();
64         }
65         Point point = Point.measurement(DataMeasurement.t_da_sim_value.name());
66         point.addTag("point", pointNo);
67         point.addField("value", Double.parseDouble(dataValue));
68         point.time(time, WritePrecision.MS);
69         writeApiBlocking.writePoint(influxDBInstance.bucket, influxDBInstance.org, point);
70     }
71
72     @Override
73     public void syncWriteIntValue(String pointNo, String dataValue, long time) {
74         if (writeApiBlocking == null) {
75             writeApiBlocking = influxDBInstance.getClient().getWriteApiBlocking();
76         }
77         Point point = Point.measurement(DataMeasurement.t_da_dig_value.name());
78         point.addTag("point", pointNo);
79         point.addField("value", Integer.parseInt(dataValue));
80         point.time(time, WritePrecision.MS);
81         writeApiBlocking.writePoint(influxDBInstance.bucket, influxDBInstance.org, point);
82     }
83
84     @Override
85     public void syncWriteBooleanValue(String pointNo, String dataValue, long time) {
86         if (writeApiBlocking == null) {
87             writeApiBlocking = influxDBInstance.getClient().getWriteApiBlocking();
88         }
89         Point point = Point.measurement(DataMeasurement.t_da_bool_value.name());
90         point.addTag("point", pointNo);
91         point.addField("value", Boolean.parseBoolean(dataValue));
92         point.time(time, WritePrecision.MS);
93         writeApiBlocking.writePoint(influxDBInstance.bucket, influxDBInstance.org, point);
94     }
95
96     @Override
97     public void asyncWritePointValues(List<InfluxPointValuePOJO> pointValues) {
98         if (writeApi == null) {
99             writeApi = influxDBInstance.getClient().makeWriteApi();
100         }
101         if (!CollectionUtils.isEmpty(pointValues)) {
102             pointValues.forEach(item -> {
103                 writeApi.writeMeasurement(influxDBInstance.bucket, influxDBInstance.org, WritePrecision.MS, item);
104             });
105         }
106         writeApi.flush();
107     }
108
109     @Override
110     public void asyncWriteTagValues(List<InfluxTagValuePOJO> tagValues) {
111         if (writeApi == null) {
112             writeApi = influxDBInstance.getClient().makeWriteApi();
113         }
114         if (!CollectionUtils.isEmpty(tagValues)) {
115             tagValues.forEach(item -> {
116                 writeApi.writeMeasurement(influxDBInstance.bucket, influxDBInstance.org, WritePrecision.MS, item);
117             });
118         }
119         writeApi.flush();
120     }
121
122     @Override
123     public Map<String, List<Object>> queryTagsValues(List<InfluxTagValuePOJO> influxParams, Date startTime, Date endTime) {
124         if (queryApi == null) {
125             queryApi = influxDBInstance.getClient().getQueryApi();
126         }
127         if (CollectionUtils.isEmpty(influxParams)) {
128             return null;
129         }
130         Map<String, List<Object>> result = new HashMap<>();
131         Calendar calendar = Calendar.getInstance();
132         calendar.set(Calendar.MILLISECOND, 0);
133         // String start = startTime.getTime() - calendar.getTime().getTime() + "ms";
134         String start = startTime.getTime() - calendar.getTime().getTime() - pas_ms + "ms";
135         if (endTime==null){
136             endTime= new Date();
137         }
138         String stop = endTime.getTime() - calendar.getTime().getTime() + "ms";
139
140         for (int i = 0; i < influxParams.size(); i++) {
141             List<Object> dataList = new ArrayList<>();
142             InfluxTagValuePOJO tag = influxParams.get(i);
143             String measurement = TagValueUtils.getMeasurement(tag.getType());
144             StringBuilder sb = new StringBuilder();
145             sb.append("from(bucket:\"" + influxDBInstance.bucket + "\") ");
146             sb.append("|> range(start: ").append(start).append(", stop: ").append(stop).append(") ");
147             sb.append("|> filter(fn: (r) => r[\"_measurement\"] == \"" + measurement + "\")");
148             sb.append("|> filter(fn: (r) => r[\"_field\"] == \"value\")");
149             sb.append("|> filter(fn: (r) => r[\"id\"] == \"" + tag.getId() + "\")");
150             sb.append("|> sort(columns: [\"_time\"]) ");
151             sb.append("|> yield(name: \"mean\")");
152             List<FluxTable> tables = queryApi.query(sb.toString(), influxDBInstance.org);
153
154             for (FluxTable table : tables) {
155                 List<FluxRecord> records = table.getRecords();
156                 for (FluxRecord record : records) {
157                     Map<String, Object> dataIem = new HashMap<>(2);
158                     dataIem.put(VALUE, record.getValueByKey("_value"));
159                     dataIem.put(TIME, Date.from(record.getTime()));
160                     dataList.add(dataIem);
161                 }
162             }
163             result.put(tag.getId(), dataList);
164         }
165         return result;
166     }
167
168     private List<ApiExportValueDTO> getExportValue(InfluxTagValuePOJO tag, Date startTime, Date endTime) {
169         List<ApiExportValueDTO> dataList = new ArrayList<>();
170         if (queryApi == null) {
171             queryApi = influxDBInstance.getClient().getQueryApi();
172         }
173         Map<String, List<Object>> result = new HashMap<>();
174         Calendar calendar = Calendar.getInstance();
175         calendar.set(Calendar.MILLISECOND, 0);
176         String start = startTime.getTime() - calendar.getTime().getTime() + "ms";
177         String stop = endTime.getTime() - calendar.getTime().getTime() + "ms";
178
179
180         String measurement = TagValueUtils.getMeasurement(tag.getType());
181         StringBuilder sb = new StringBuilder();
182         sb.append("from(bucket:\"" + influxDBInstance.bucket + "\") ");
183         sb.append("|> range(start: ").append(start).append(", stop: ").append(stop).append(") ");
184         sb.append("|> filter(fn: (r) => r[\"_measurement\"] == \"" + measurement + "\")");
185         sb.append("|> filter(fn: (r) => r[\"_field\"] == \"value\")");
186         sb.append("|> filter(fn: (r) => r[\"id\"] == \"" + tag.getId() + "\")");
187         sb.append("|> sort(columns: [\"_time\"]) ");
188         sb.append("|> yield(name: \"mean\")");
189         List<FluxTable> tables = queryApi.query(sb.toString(), influxDBInstance.org);
190
191         for (FluxTable table : tables) {
192             List<FluxRecord> records = table.getRecords();
193             for (FluxRecord record : records) {
194                 ApiExportValueDTO dataIem = new ApiExportValueDTO();
195                 dataIem.setDataValue(record.getValueByKey("_value").toString());
196                 dataIem.setDataTime(DateUtils.format(Date.from(record.getTime()), DateUtils.FORMAT_YEAR_MONTH_DAY));
197                 dataList.add(dataIem);
198             }
199         }
200         return dataList;
201     }
202
203     @Override
204     public boolean exportTagValue(HttpServletResponse response, HttpServletRequest request, ApiTagValueQueryDTO params) {
205         InfluxTagValuePOJO tag = new InfluxTagValuePOJO();
206         tag.setId(params.getTagId());
207         tag.setType(params.getDataType());
208         List<ApiExportValueDTO> valueList = getExportValue(tag, params.getStart(), params.getEnd());
209         try{
210             String sheetTitle = "采集数据";
211             String[] title = new String[]{"值", "时间"};
212             ExcelUtil.exportPointValue(sheetTitle, title, new ArrayList<>(valueList), response);
213         } catch (Exception ex) {
214             return false;
215         }
216         return true;
217     }
218
219
220     @Override
221     public Map<String, Object> queryPointsLastValue(List<InfluxPointValuePOJO> influxParams) {
222         Map<String, Object> result = new HashMap<>(influxParams.size());
223         if (influxQLQueryApi == null) {
224             influxQLQueryApi = influxDBInstance.getClient().getInfluxQLQueryApi();
225         }
226         for (int i = 0; i < influxParams.size(); i++) {
227             InfluxPointValuePOJO point = influxParams.get(i);
228             String measurement = PointValueUtils.getMeasurement(point.getType());
229             StringBuilder sql = new StringBuilder();
230             sql.append("SELECT LAST(value) FROM ");
231             sql.append(measurement);
232             sql.append(" WHERE point = '");
233             sql.append(point.getPoint());
234             sql.append("'");
235             InfluxQLQueryResult data = influxQLQueryApi.query(new InfluxQLQuery(sql.toString(), influxDBInstance.bucket));
236             Object value = data.getResults().get(0).getSeries().get(0).getValues().get(0).getValueByKey("last");
237             result.put(point.getPoint(), value);
238         }
239         return result;
240     }
241
242     @Override
243     public Object queryPointMaxValue(InfluxPointValuePOJO point, Date startTime) {
244         if (influxQLQueryApi == null) {
245             influxQLQueryApi = influxDBInstance.getClient().getInfluxQLQueryApi();
246         }
247         long utcMillis = startTime.getTime() -rawOffset;
248         String utsStart = DateUtils.format(new Date(utcMillis), DateUtils.FORMAT_YEAR_MONTH_DAY);
249         log.info("utsStart=" + utsStart);
250         String measurement = PointValueUtils.getMeasurement(point.getType());
251         StringBuilder sql = new StringBuilder();
252         sql.append("SELECT MAX(value) FROM ");
253         sql.append(measurement);
254         sql.append(" WHERE point = '");
255         sql.append(point.getPoint());
256         sql.append("' AND time >= '" + utsStart +"'");
257         InfluxQLQueryResult data = influxQLQueryApi.query(new InfluxQLQuery(sql.toString(), influxDBInstance.bucket));
258         if (data == null) {
259             return null;
260         }
261         return data.getResults().get(0).getSeries().get(0).getValues().get(0).getValues()[1];
262     }
263
264     @Override
265     public List<Map<String, Object>> queryPointValues(InfluxPointValuePOJO pojo, Date startTime, Date endTime) {
266         List<InfluxPointValuePOJO> influxParams = new ArrayList<>();
267         influxParams.add(pojo);
268         Map<String, List<Map<String, Object>>> data = this.queryPointsValues(influxParams, startTime, endTime);
269         return data.get(pojo.getPoint());
270     }
271
272     @Override
273     public Map<String, List<Map<String, Object>>> queryPointsValues(List<InfluxPointValuePOJO> influxParams, Date startTime, Date endTime) {
274         if (queryApi == null) {
275             queryApi = influxDBInstance.getClient().getQueryApi();
276         }
277         if (CollectionUtils.isEmpty(influxParams)) {
278             return null;
279         }
280         Map<String, List<Map<String, Object>>> result = new HashMap<>();
281         Calendar calendar = Calendar.getInstance();
282         calendar.set(Calendar.MILLISECOND, 0);
283         String start = startTime.getTime() - calendar.getTime().getTime() + "ms";
284         if (endTime==null){
285             endTime= new Date();
286         }
287         String stop = endTime.getTime() - calendar.getTime().getTime() + "ms";
288
289         for (int i = 0; i < influxParams.size(); i++) {
290             List<Map<String, Object>> dataList = new ArrayList<>();
291             InfluxPointValuePOJO point = influxParams.get(i);
292             String measurement = PointValueUtils.getMeasurement(point.getType());
293             StringBuilder sb = new StringBuilder();
294             sb.append("from(bucket:\"" + influxDBInstance.bucket + "\") ");
295             sb.append("|> range(start: ").append(start).append(", stop: ").append(stop).append(") ");
296             sb.append("|> filter(fn: (r) => r[\"_measurement\"] == \"" + measurement + "\")");
297             sb.append("|> filter(fn: (r) => r[\"_field\"] == \"value\")");
298             sb.append("|> filter(fn: (r) => r[\"point\"] == \"" + point.getPoint() + "\")");
299             sb.append("|> sort(columns: [\"_time\"]) ");
300             sb.append("|> yield(name: \"mean\")");
301             System.out.println("influxdbSql===============" + sb.toString());
302             List<FluxTable> tables = queryApi.query(sb.toString(), influxDBInstance.org);
303
304             for (FluxTable table : tables) {
305                 List<FluxRecord> records = table.getRecords();
306                 for (FluxRecord record : records) {
307                     Map<String, Object> dataIem = new HashMap<>(2);
308                     dataIem.put(VALUE, record.getValueByKey("_value"));
309                     dataIem.put(TIME, Date.from(record.getTime()));
310                     dataList.add(dataIem);
311                 }
312             }
313             result.put(point.getPoint(), dataList);
314         }
315         return result;
316     }
317
318     @Override
319     public Map<String, Object> queryPointsSpread(List<InfluxPointValuePOJO> influxParams, Date startTime, Date endTime) {
320         Map<String, Object> result = new HashMap<>();
321         Map<String, List<Map<String, Object>>> data = this.queryPointsValues(influxParams, startTime, endTime);
322         if (CollectionUtils.isEmpty(data)) {
323             return result;
324         }
325         data.forEach((k, v) -> {
326             if (!CollectionUtils.isEmpty(v)) {
327                 BigDecimal lastValue = new BigDecimal(v.get(v.size() - 1).get(VALUE).toString());
328                 BigDecimal firstValue = new BigDecimal(v.get(0).get(VALUE).toString());
329                 BigDecimal spread = (lastValue.subtract(firstValue));
330                 result.put(k, spread.compareTo(BigDecimal.ZERO) < 0 ? lastValue : spread);
331             }
332             log.info(k + ",isEmpty");
333         });
334         return result;
335     }
336 }