潘志宝
2024-10-29 d41f14d2986b46da9dd7742f6df63d9725cd29f3
提交 | 用户 | 时间
a6de49 1 package com.iailab.module.data.influxdb.service.impl;
H 2
3 import com.iailab.module.data.common.utils.ExcelUtil;
4 import com.iailab.framework.common.util.date.DateUtils;
5 import com.iailab.module.data.api.dto.ApiExportValueDTO;
6 import com.iailab.module.data.api.dto.ApiTagValueQueryDTO;
7 import com.iailab.module.data.influxdb.common.config.InfluxDBInstance;
8 import com.iailab.module.data.influxdb.common.enums.DataMeasurement;
9 import com.iailab.module.data.influxdb.common.utils.PointValueUtils;
10 import com.iailab.module.data.influxdb.common.utils.TagValueUtils;
11 import com.iailab.module.data.influxdb.pojo.InfluxPointValuePOJO;
12 import com.iailab.module.data.influxdb.pojo.InfluxTagValuePOJO;
13 import com.iailab.module.data.influxdb.service.InfluxDBService;
14 import com.influxdb.client.InfluxQLQueryApi;
15 import com.influxdb.client.QueryApi;
16 import com.influxdb.client.WriteApi;
17 import com.influxdb.client.WriteApiBlocking;
18 import com.influxdb.client.domain.InfluxQLQuery;
19 import com.influxdb.client.domain.WritePrecision;
20 import com.influxdb.client.write.Point;
21 import com.influxdb.query.FluxRecord;
22 import com.influxdb.query.FluxTable;
23 import com.influxdb.query.InfluxQLQueryResult;
24 import lombok.extern.slf4j.Slf4j;
25 import javax.annotation.Resource;
26 import org.springframework.stereotype.Service;
27 import org.springframework.util.CollectionUtils;
28
29 import javax.servlet.http.HttpServletRequest;
30 import javax.servlet.http.HttpServletResponse;
31 import java.math.BigDecimal;
32 import java.util.*;
33
34 /**
35  * InfluxDB操作类
36  */
37 @Slf4j
38 @Service
39 public class InfluxDBServiceImpl implements InfluxDBService {
40
41     @Resource
42     private InfluxDBInstance influxDBInstance;
43
44     private WriteApi writeApi;
45
46     private WriteApiBlocking writeApiBlocking;
47
48     private QueryApi queryApi;
49
50     private InfluxQLQueryApi influxQLQueryApi;
51
52     private String VALUE = "value";
53
54     private String TIME = "time";
55
56     private int rawOffset = TimeZone.getDefault().getRawOffset();
57
58     private int pas_ms = 1000;
59
60     @Override
61     public void syncWriteFloatValue(String pointNo, String dataValue, long time) {
62         if (writeApiBlocking == null) {
63             writeApiBlocking = influxDBInstance.getClient().getWriteApiBlocking();
64         }
65         Point point = Point.measurement(DataMeasurement.t_da_sim_value.name());
66         point.addTag("point", pointNo);
67         point.addField("value", Double.parseDouble(dataValue));
68         point.time(time, WritePrecision.MS);
585be5 69         writeApiBlocking.writePoint(influxDBInstance.getBucket(), influxDBInstance.org, point);
a6de49 70     }
H 71
72     @Override
73     public void syncWriteIntValue(String pointNo, String dataValue, long time) {
74         if (writeApiBlocking == null) {
75             writeApiBlocking = influxDBInstance.getClient().getWriteApiBlocking();
76         }
77         Point point = Point.measurement(DataMeasurement.t_da_dig_value.name());
78         point.addTag("point", pointNo);
79         point.addField("value", Integer.parseInt(dataValue));
80         point.time(time, WritePrecision.MS);
585be5 81         writeApiBlocking.writePoint(influxDBInstance.getBucket(), influxDBInstance.org, point);
a6de49 82     }
H 83
84     @Override
85     public void syncWriteBooleanValue(String pointNo, String dataValue, long time) {
86         if (writeApiBlocking == null) {
87             writeApiBlocking = influxDBInstance.getClient().getWriteApiBlocking();
88         }
89         Point point = Point.measurement(DataMeasurement.t_da_bool_value.name());
90         point.addTag("point", pointNo);
91         point.addField("value", Boolean.parseBoolean(dataValue));
92         point.time(time, WritePrecision.MS);
585be5 93         writeApiBlocking.writePoint(influxDBInstance.getBucket(), influxDBInstance.org, point);
a6de49 94     }
H 95
96     @Override
97     public void asyncWritePointValues(List<InfluxPointValuePOJO> pointValues) {
98         if (writeApi == null) {
99             writeApi = influxDBInstance.getClient().makeWriteApi();
100         }
101         if (!CollectionUtils.isEmpty(pointValues)) {
102             pointValues.forEach(item -> {
d41f14 103                 String bucket = influxDBInstance.getBucket();
104                 writeApi.writeMeasurement(bucket, influxDBInstance.org, WritePrecision.MS, item);
a6de49 105             });
H 106         }
107         writeApi.flush();
108     }
109
110     @Override
111     public void asyncWriteTagValues(List<InfluxTagValuePOJO> tagValues) {
112         if (writeApi == null) {
113             writeApi = influxDBInstance.getClient().makeWriteApi();
114         }
115         if (!CollectionUtils.isEmpty(tagValues)) {
116             tagValues.forEach(item -> {
585be5 117                 writeApi.writeMeasurement(influxDBInstance.getBucket(), influxDBInstance.org, WritePrecision.MS, item);
a6de49 118             });
H 119         }
120         writeApi.flush();
121     }
122
123     @Override
124     public Map<String, List<Object>> queryTagsValues(List<InfluxTagValuePOJO> influxParams, Date startTime, Date endTime) {
125         if (queryApi == null) {
126             queryApi = influxDBInstance.getClient().getQueryApi();
127         }
128         if (CollectionUtils.isEmpty(influxParams)) {
129             return null;
130         }
131         Map<String, List<Object>> result = new HashMap<>();
132         Calendar calendar = Calendar.getInstance();
133         calendar.set(Calendar.MILLISECOND, 0);
134         // String start = startTime.getTime() - calendar.getTime().getTime() + "ms";
135         String start = startTime.getTime() - calendar.getTime().getTime() - pas_ms + "ms";
136         if (endTime==null){
137             endTime= new Date();
138         }
139         String stop = endTime.getTime() - calendar.getTime().getTime() + "ms";
140
141         for (int i = 0; i < influxParams.size(); i++) {
142             List<Object> dataList = new ArrayList<>();
143             InfluxTagValuePOJO tag = influxParams.get(i);
144             String measurement = TagValueUtils.getMeasurement(tag.getType());
145             StringBuilder sb = new StringBuilder();
585be5 146             sb.append("from(bucket:\"" + influxDBInstance.getBucket() + "\") ");
a6de49 147             sb.append("|> range(start: ").append(start).append(", stop: ").append(stop).append(") ");
H 148             sb.append("|> filter(fn: (r) => r[\"_measurement\"] == \"" + measurement + "\")");
149             sb.append("|> filter(fn: (r) => r[\"_field\"] == \"value\")");
150             sb.append("|> filter(fn: (r) => r[\"id\"] == \"" + tag.getId() + "\")");
151             sb.append("|> sort(columns: [\"_time\"]) ");
152             sb.append("|> yield(name: \"mean\")");
153             List<FluxTable> tables = queryApi.query(sb.toString(), influxDBInstance.org);
154
155             for (FluxTable table : tables) {
156                 List<FluxRecord> records = table.getRecords();
157                 for (FluxRecord record : records) {
158                     Map<String, Object> dataIem = new HashMap<>(2);
159                     dataIem.put(VALUE, record.getValueByKey("_value"));
160                     dataIem.put(TIME, Date.from(record.getTime()));
161                     dataList.add(dataIem);
162                 }
163             }
164             result.put(tag.getId(), dataList);
165         }
166         return result;
167     }
168
169     private List<ApiExportValueDTO> getExportValue(InfluxTagValuePOJO tag, Date startTime, Date endTime) {
170         List<ApiExportValueDTO> dataList = new ArrayList<>();
171         if (queryApi == null) {
172             queryApi = influxDBInstance.getClient().getQueryApi();
173         }
174         Map<String, List<Object>> result = new HashMap<>();
175         Calendar calendar = Calendar.getInstance();
176         calendar.set(Calendar.MILLISECOND, 0);
177         String start = startTime.getTime() - calendar.getTime().getTime() + "ms";
178         String stop = endTime.getTime() - calendar.getTime().getTime() + "ms";
179
180
181         String measurement = TagValueUtils.getMeasurement(tag.getType());
182         StringBuilder sb = new StringBuilder();
585be5 183         sb.append("from(bucket:\"" + influxDBInstance.getBucket() + "\") ");
a6de49 184         sb.append("|> range(start: ").append(start).append(", stop: ").append(stop).append(") ");
H 185         sb.append("|> filter(fn: (r) => r[\"_measurement\"] == \"" + measurement + "\")");
186         sb.append("|> filter(fn: (r) => r[\"_field\"] == \"value\")");
187         sb.append("|> filter(fn: (r) => r[\"id\"] == \"" + tag.getId() + "\")");
188         sb.append("|> sort(columns: [\"_time\"]) ");
189         sb.append("|> yield(name: \"mean\")");
190         List<FluxTable> tables = queryApi.query(sb.toString(), influxDBInstance.org);
191
192         for (FluxTable table : tables) {
193             List<FluxRecord> records = table.getRecords();
194             for (FluxRecord record : records) {
195                 ApiExportValueDTO dataIem = new ApiExportValueDTO();
196                 dataIem.setDataValue(record.getValueByKey("_value").toString());
197                 dataIem.setDataTime(DateUtils.format(Date.from(record.getTime()), DateUtils.FORMAT_YEAR_MONTH_DAY));
198                 dataList.add(dataIem);
199             }
200         }
201         return dataList;
202     }
203
204     @Override
205     public boolean exportTagValue(HttpServletResponse response, HttpServletRequest request, ApiTagValueQueryDTO params) {
206         InfluxTagValuePOJO tag = new InfluxTagValuePOJO();
207         tag.setId(params.getTagId());
208         tag.setType(params.getDataType());
209         List<ApiExportValueDTO> valueList = getExportValue(tag, params.getStart(), params.getEnd());
210         try{
211             String sheetTitle = "采集数据";
212             String[] title = new String[]{"值", "时间"};
213             ExcelUtil.exportPointValue(sheetTitle, title, new ArrayList<>(valueList), response);
214         } catch (Exception ex) {
215             return false;
216         }
217         return true;
218     }
219
220
221     @Override
222     public Map<String, Object> queryPointsLastValue(List<InfluxPointValuePOJO> influxParams) {
223         Map<String, Object> result = new HashMap<>(influxParams.size());
224         if (influxQLQueryApi == null) {
225             influxQLQueryApi = influxDBInstance.getClient().getInfluxQLQueryApi();
226         }
227         for (int i = 0; i < influxParams.size(); i++) {
228             InfluxPointValuePOJO point = influxParams.get(i);
229             String measurement = PointValueUtils.getMeasurement(point.getType());
230             StringBuilder sql = new StringBuilder();
231             sql.append("SELECT LAST(value) FROM ");
232             sql.append(measurement);
233             sql.append(" WHERE point = '");
234             sql.append(point.getPoint());
235             sql.append("'");
585be5 236             InfluxQLQueryResult data = influxQLQueryApi.query(new InfluxQLQuery(sql.toString(), influxDBInstance.getBucket()));
a6de49 237             Object value = data.getResults().get(0).getSeries().get(0).getValues().get(0).getValueByKey("last");
H 238             result.put(point.getPoint(), value);
239         }
240         return result;
241     }
242
243     @Override
244     public Object queryPointMaxValue(InfluxPointValuePOJO point, Date startTime) {
245         if (influxQLQueryApi == null) {
246             influxQLQueryApi = influxDBInstance.getClient().getInfluxQLQueryApi();
247         }
248         long utcMillis = startTime.getTime() -rawOffset;
249         String utsStart = DateUtils.format(new Date(utcMillis), DateUtils.FORMAT_YEAR_MONTH_DAY);
250         log.info("utsStart=" + utsStart);
251         String measurement = PointValueUtils.getMeasurement(point.getType());
252         StringBuilder sql = new StringBuilder();
253         sql.append("SELECT MAX(value) FROM ");
254         sql.append(measurement);
255         sql.append(" WHERE point = '");
256         sql.append(point.getPoint());
257         sql.append("' AND time >= '" + utsStart +"'");
585be5 258         InfluxQLQueryResult data = influxQLQueryApi.query(new InfluxQLQuery(sql.toString(), influxDBInstance.getBucket()));
a6de49 259         if (data == null) {
H 260             return null;
261         }
262         return data.getResults().get(0).getSeries().get(0).getValues().get(0).getValues()[1];
263     }
264
265     @Override
266     public List<Map<String, Object>> queryPointValues(InfluxPointValuePOJO pojo, Date startTime, Date endTime) {
267         List<InfluxPointValuePOJO> influxParams = new ArrayList<>();
268         influxParams.add(pojo);
269         Map<String, List<Map<String, Object>>> data = this.queryPointsValues(influxParams, startTime, endTime);
270         return data.get(pojo.getPoint());
271     }
272
273     @Override
274     public Map<String, List<Map<String, Object>>> queryPointsValues(List<InfluxPointValuePOJO> influxParams, Date startTime, Date endTime) {
275         if (queryApi == null) {
276             queryApi = influxDBInstance.getClient().getQueryApi();
277         }
278         if (CollectionUtils.isEmpty(influxParams)) {
279             return null;
280         }
281         Map<String, List<Map<String, Object>>> result = new HashMap<>();
282         Calendar calendar = Calendar.getInstance();
283         calendar.set(Calendar.MILLISECOND, 0);
284         String start = startTime.getTime() - calendar.getTime().getTime() + "ms";
285         if (endTime==null){
286             endTime= new Date();
287         }
288         String stop = endTime.getTime() - calendar.getTime().getTime() + "ms";
289
290         for (int i = 0; i < influxParams.size(); i++) {
291             List<Map<String, Object>> dataList = new ArrayList<>();
292             InfluxPointValuePOJO point = influxParams.get(i);
293             String measurement = PointValueUtils.getMeasurement(point.getType());
294             StringBuilder sb = new StringBuilder();
585be5 295             sb.append("from(bucket:\"" + influxDBInstance.getBucket() + "\") ");
a6de49 296             sb.append("|> range(start: ").append(start).append(", stop: ").append(stop).append(") ");
H 297             sb.append("|> filter(fn: (r) => r[\"_measurement\"] == \"" + measurement + "\")");
298             sb.append("|> filter(fn: (r) => r[\"_field\"] == \"value\")");
299             sb.append("|> filter(fn: (r) => r[\"point\"] == \"" + point.getPoint() + "\")");
300             sb.append("|> sort(columns: [\"_time\"]) ");
301             sb.append("|> yield(name: \"mean\")");
302             System.out.println("influxdbSql===============" + sb.toString());
303             List<FluxTable> tables = queryApi.query(sb.toString(), influxDBInstance.org);
304
305             for (FluxTable table : tables) {
306                 List<FluxRecord> records = table.getRecords();
307                 for (FluxRecord record : records) {
308                     Map<String, Object> dataIem = new HashMap<>(2);
309                     dataIem.put(VALUE, record.getValueByKey("_value"));
310                     dataIem.put(TIME, Date.from(record.getTime()));
311                     dataList.add(dataIem);
312                 }
313             }
314             result.put(point.getPoint(), dataList);
315         }
316         return result;
317     }
318
319     @Override
320     public Map<String, Object> queryPointsSpread(List<InfluxPointValuePOJO> influxParams, Date startTime, Date endTime) {
321         Map<String, Object> result = new HashMap<>();
322         Map<String, List<Map<String, Object>>> data = this.queryPointsValues(influxParams, startTime, endTime);
323         if (CollectionUtils.isEmpty(data)) {
324             return result;
325         }
326         data.forEach((k, v) -> {
327             if (!CollectionUtils.isEmpty(v)) {
328                 BigDecimal lastValue = new BigDecimal(v.get(v.size() - 1).get(VALUE).toString());
329                 BigDecimal firstValue = new BigDecimal(v.get(0).get(VALUE).toString());
330                 BigDecimal spread = (lastValue.subtract(firstValue));
331                 result.put(k, spread.compareTo(BigDecimal.ZERO) < 0 ? lastValue : spread);
332             }
333             log.info(k + ",isEmpty");
334         });
335         return result;
336     }
337 }