潘志宝
2025-01-23 9bff7e6944f0660f0a39ededfc405b7e1a15802c
提交 | 用户 | 时间
a6de49 1 package com.iailab.module.data.influxdb.service.impl;
H 2
139c6a 3 import com.iailab.module.data.api.dto.ApiPointValueQueryDTO;
a6de49 4 import com.iailab.module.data.common.utils.ExcelUtil;
H 5 import com.iailab.framework.common.util.date.DateUtils;
6 import com.iailab.module.data.api.dto.ApiExportValueDTO;
7 import com.iailab.module.data.api.dto.ApiTagValueQueryDTO;
8 import com.iailab.module.data.influxdb.common.config.InfluxDBInstance;
9 import com.iailab.module.data.influxdb.common.enums.DataMeasurement;
10 import com.iailab.module.data.influxdb.common.utils.PointValueUtils;
11 import com.iailab.module.data.influxdb.common.utils.TagValueUtils;
12 import com.iailab.module.data.influxdb.pojo.InfluxPointValuePOJO;
13 import com.iailab.module.data.influxdb.pojo.InfluxTagValuePOJO;
14 import com.iailab.module.data.influxdb.service.InfluxDBService;
139c6a 15 import com.iailab.module.data.point.dto.DaPointDTO;
D 16 import com.iailab.module.data.point.service.DaPointService;
17 import com.iailab.module.data.point.vo.PointValueExportVO;
a6de49 18 import com.influxdb.client.InfluxQLQueryApi;
H 19 import com.influxdb.client.QueryApi;
20 import com.influxdb.client.WriteApi;
21 import com.influxdb.client.WriteApiBlocking;
22 import com.influxdb.client.domain.InfluxQLQuery;
23 import com.influxdb.client.domain.WritePrecision;
24 import com.influxdb.client.write.Point;
25 import com.influxdb.query.FluxRecord;
26 import com.influxdb.query.FluxTable;
27 import com.influxdb.query.InfluxQLQueryResult;
28 import lombok.extern.slf4j.Slf4j;
29 import javax.annotation.Resource;
139c6a 30
D 31 import org.springframework.beans.factory.annotation.Autowired;
a6de49 32 import org.springframework.stereotype.Service;
H 33 import org.springframework.util.CollectionUtils;
34
35 import javax.servlet.http.HttpServletRequest;
36 import javax.servlet.http.HttpServletResponse;
37 import java.math.BigDecimal;
38 import java.util.*;
139c6a 39 import java.util.stream.Collectors;
a6de49 40
H 41 /**
42  * InfluxDB操作类
43  */
44 @Slf4j
45 @Service
46 public class InfluxDBServiceImpl implements InfluxDBService {
47
48     @Resource
49     private InfluxDBInstance influxDBInstance;
50
51     private WriteApi writeApi;
139c6a 52
D 53     @Autowired
54     private DaPointService daPointService;
a6de49 55
H 56     private WriteApiBlocking writeApiBlocking;
57
58     private QueryApi queryApi;
59
60     private InfluxQLQueryApi influxQLQueryApi;
61
a2aa90 62     public static final String VALUE = "value";
a6de49 63
a2aa90 64     public static final String TIME = "time";
a6de49 65
H 66     private int rawOffset = TimeZone.getDefault().getRawOffset();
67
68     private int pas_ms = 1000;
69
70     @Override
71     public void syncWriteFloatValue(String pointNo, String dataValue, long time) {
72         if (writeApiBlocking == null) {
73             writeApiBlocking = influxDBInstance.getClient().getWriteApiBlocking();
74         }
75         Point point = Point.measurement(DataMeasurement.t_da_sim_value.name());
76         point.addTag("point", pointNo);
77         point.addField("value", Double.parseDouble(dataValue));
78         point.time(time, WritePrecision.MS);
585be5 79         writeApiBlocking.writePoint(influxDBInstance.getBucket(), influxDBInstance.org, point);
a6de49 80     }
H 81
82     @Override
83     public void syncWriteIntValue(String pointNo, String dataValue, long time) {
84         if (writeApiBlocking == null) {
85             writeApiBlocking = influxDBInstance.getClient().getWriteApiBlocking();
86         }
87         Point point = Point.measurement(DataMeasurement.t_da_dig_value.name());
88         point.addTag("point", pointNo);
89         point.addField("value", Integer.parseInt(dataValue));
90         point.time(time, WritePrecision.MS);
585be5 91         writeApiBlocking.writePoint(influxDBInstance.getBucket(), influxDBInstance.org, point);
a6de49 92     }
H 93
94     @Override
95     public void syncWriteBooleanValue(String pointNo, String dataValue, long time) {
96         if (writeApiBlocking == null) {
97             writeApiBlocking = influxDBInstance.getClient().getWriteApiBlocking();
98         }
99         Point point = Point.measurement(DataMeasurement.t_da_bool_value.name());
100         point.addTag("point", pointNo);
101         point.addField("value", Boolean.parseBoolean(dataValue));
102         point.time(time, WritePrecision.MS);
585be5 103         writeApiBlocking.writePoint(influxDBInstance.getBucket(), influxDBInstance.org, point);
a6de49 104     }
H 105
106     @Override
107     public void asyncWritePointValues(List<InfluxPointValuePOJO> pointValues) {
108         if (writeApi == null) {
109             writeApi = influxDBInstance.getClient().makeWriteApi();
110         }
111         if (!CollectionUtils.isEmpty(pointValues)) {
112             pointValues.forEach(item -> {
d41f14 113                 String bucket = influxDBInstance.getBucket();
114                 writeApi.writeMeasurement(bucket, influxDBInstance.org, WritePrecision.MS, item);
a6de49 115             });
H 116         }
117         writeApi.flush();
118     }
119
120     @Override
121     public void asyncWriteTagValues(List<InfluxTagValuePOJO> tagValues) {
122         if (writeApi == null) {
123             writeApi = influxDBInstance.getClient().makeWriteApi();
124         }
125         if (!CollectionUtils.isEmpty(tagValues)) {
126             tagValues.forEach(item -> {
54626e 127                 log.info("bucket==="+ influxDBInstance.getBucket() + ",org===" + influxDBInstance.org);
585be5 128                 writeApi.writeMeasurement(influxDBInstance.getBucket(), influxDBInstance.org, WritePrecision.MS, item);
a6de49 129             });
H 130         }
131         writeApi.flush();
132     }
133
134     @Override
135     public Map<String, List<Object>> queryTagsValues(List<InfluxTagValuePOJO> influxParams, Date startTime, Date endTime) {
136         if (queryApi == null) {
137             queryApi = influxDBInstance.getClient().getQueryApi();
138         }
139         if (CollectionUtils.isEmpty(influxParams)) {
140             return null;
141         }
142         Map<String, List<Object>> result = new HashMap<>();
143         Calendar calendar = Calendar.getInstance();
144         calendar.set(Calendar.MILLISECOND, 0);
145         // String start = startTime.getTime() - calendar.getTime().getTime() + "ms";
146         String start = startTime.getTime() - calendar.getTime().getTime() - pas_ms + "ms";
147         if (endTime==null){
148             endTime= new Date();
149         }
150         String stop = endTime.getTime() - calendar.getTime().getTime() + "ms";
151
152         for (int i = 0; i < influxParams.size(); i++) {
153             List<Object> dataList = new ArrayList<>();
154             InfluxTagValuePOJO tag = influxParams.get(i);
155             String measurement = TagValueUtils.getMeasurement(tag.getType());
156             StringBuilder sb = new StringBuilder();
585be5 157             sb.append("from(bucket:\"" + influxDBInstance.getBucket() + "\") ");
a6de49 158             sb.append("|> range(start: ").append(start).append(", stop: ").append(stop).append(") ");
H 159             sb.append("|> filter(fn: (r) => r[\"_measurement\"] == \"" + measurement + "\")");
160             sb.append("|> filter(fn: (r) => r[\"_field\"] == \"value\")");
161             sb.append("|> filter(fn: (r) => r[\"id\"] == \"" + tag.getId() + "\")");
162             sb.append("|> sort(columns: [\"_time\"]) ");
163             sb.append("|> yield(name: \"mean\")");
164             List<FluxTable> tables = queryApi.query(sb.toString(), influxDBInstance.org);
165
166             for (FluxTable table : tables) {
167                 List<FluxRecord> records = table.getRecords();
168                 for (FluxRecord record : records) {
169                     Map<String, Object> dataIem = new HashMap<>(2);
170                     dataIem.put(VALUE, record.getValueByKey("_value"));
171                     dataIem.put(TIME, Date.from(record.getTime()));
172                     dataList.add(dataIem);
173                 }
174             }
175             result.put(tag.getId(), dataList);
176         }
177         return result;
178     }
179
180     private List<ApiExportValueDTO> getExportValue(InfluxTagValuePOJO tag, Date startTime, Date endTime) {
181         List<ApiExportValueDTO> dataList = new ArrayList<>();
182         if (queryApi == null) {
183             queryApi = influxDBInstance.getClient().getQueryApi();
184         }
185         Map<String, List<Object>> result = new HashMap<>();
186         Calendar calendar = Calendar.getInstance();
187         calendar.set(Calendar.MILLISECOND, 0);
188         String start = startTime.getTime() - calendar.getTime().getTime() + "ms";
189         String stop = endTime.getTime() - calendar.getTime().getTime() + "ms";
190
191
192         String measurement = TagValueUtils.getMeasurement(tag.getType());
193         StringBuilder sb = new StringBuilder();
585be5 194         sb.append("from(bucket:\"" + influxDBInstance.getBucket() + "\") ");
a6de49 195         sb.append("|> range(start: ").append(start).append(", stop: ").append(stop).append(") ");
H 196         sb.append("|> filter(fn: (r) => r[\"_measurement\"] == \"" + measurement + "\")");
197         sb.append("|> filter(fn: (r) => r[\"_field\"] == \"value\")");
198         sb.append("|> filter(fn: (r) => r[\"id\"] == \"" + tag.getId() + "\")");
199         sb.append("|> sort(columns: [\"_time\"]) ");
200         sb.append("|> yield(name: \"mean\")");
201         List<FluxTable> tables = queryApi.query(sb.toString(), influxDBInstance.org);
202
203         for (FluxTable table : tables) {
204             List<FluxRecord> records = table.getRecords();
205             for (FluxRecord record : records) {
206                 ApiExportValueDTO dataIem = new ApiExportValueDTO();
207                 dataIem.setDataValue(record.getValueByKey("_value").toString());
208                 dataIem.setDataTime(DateUtils.format(Date.from(record.getTime()), DateUtils.FORMAT_YEAR_MONTH_DAY));
209                 dataList.add(dataIem);
210             }
211         }
212         return dataList;
213     }
214
215     @Override
139c6a 216     public boolean exportTagValue(HttpServletResponse response, HttpServletRequest request, ApiTagValueQueryDTO queryDto) {
D 217         //构建参数
218         Map<String, Object> params = new HashMap<>(1);
219         params.put("pointNos", queryDto.getTagIds());
220
221         //查询point列表
222         List<DaPointDTO> pointList = daPointService.list(params);
223         if (CollectionUtils.isEmpty(pointList)) {
224             return true;
225         }
226
227         //插入pointType
228         List<InfluxPointValuePOJO> influxParams = pointList.stream().map(item -> {
229             InfluxPointValuePOJO pojo = new InfluxPointValuePOJO();
230             pojo.setPoint(item.getTagNo());
231             pojo.setType(item.getDataType());
232             return pojo;
233         }).collect(Collectors.toList());
234
235         //查询
236         Map<String, List<Map<String, Object>>> data = queryPointsValues(influxParams, queryDto.getStart(), queryDto.getEnd());
237
238         //提取list
239         List<Map<String, Object>> list = data.get(queryDto.getTagIds().get(0));
240
241         //导出
a6de49 242         try{
H 243             String sheetTitle = "采集数据";
244             String[] title = new String[]{"值", "时间"};
139c6a 245             ExcelUtil.exportPointValue(sheetTitle, title, new ArrayList<>(list), response);
a6de49 246         } catch (Exception ex) {
H 247             return false;
248         }
249         return true;
250     }
251
139c6a 252     @Override
D 253     public List<PointValueExportVO> exportPointValue(ApiPointValueQueryDTO queryDto) {
254
255         List<PointValueExportVO> pointValueExportList = new ArrayList<>();
256
257         //构建参数
258         Map<String, Object> params = new HashMap<>(1);
259         params.put("pointNos", queryDto.getPointNos());
260
261         //查询point列表
262         List<DaPointDTO> pointList = daPointService.list(params);
263         if (CollectionUtils.isEmpty(pointList)) {
264             return pointValueExportList;
265         }
266
267         //插入pointType
268         List<InfluxPointValuePOJO> influxParams = pointList.stream().map(item -> {
269             InfluxPointValuePOJO pojo = new InfluxPointValuePOJO();
270             pojo.setPoint(item.getPointNo());
271             pojo.setType(item.getDataType());
272             return pojo;
273         }).collect(Collectors.toList());
274
275         //查询
276         Map<String, List<Map<String, Object>>> data = queryPointsValues(influxParams, queryDto.getStart(), queryDto.getEnd());
277
278         //提取list
279         List<Map<String, Object>> list = data.get(queryDto.getPointNos().get(0));
280         for(Map<String, Object> map : list){
281             PointValueExportVO dto = new PointValueExportVO();
282             dto.setDatatime(map.get("time").toString());
283             dto.setDatavalue(map.get("value").toString());
284             pointValueExportList.add(dto);
285         }
286
287         return pointValueExportList;
288     }
a6de49 289
H 290     @Override
291     public Map<String, Object> queryPointsLastValue(List<InfluxPointValuePOJO> influxParams) {
292         Map<String, Object> result = new HashMap<>(influxParams.size());
293         if (influxQLQueryApi == null) {
294             influxQLQueryApi = influxDBInstance.getClient().getInfluxQLQueryApi();
295         }
296         for (int i = 0; i < influxParams.size(); i++) {
297             InfluxPointValuePOJO point = influxParams.get(i);
298             String measurement = PointValueUtils.getMeasurement(point.getType());
299             StringBuilder sql = new StringBuilder();
300             sql.append("SELECT LAST(value) FROM ");
301             sql.append(measurement);
302             sql.append(" WHERE point = '");
303             sql.append(point.getPoint());
304             sql.append("'");
585be5 305             InfluxQLQueryResult data = influxQLQueryApi.query(new InfluxQLQuery(sql.toString(), influxDBInstance.getBucket()));
a6de49 306             Object value = data.getResults().get(0).getSeries().get(0).getValues().get(0).getValueByKey("last");
H 307             result.put(point.getPoint(), value);
308         }
309         return result;
310     }
311
312     @Override
313     public Object queryPointMaxValue(InfluxPointValuePOJO point, Date startTime) {
314         if (influxQLQueryApi == null) {
315             influxQLQueryApi = influxDBInstance.getClient().getInfluxQLQueryApi();
316         }
317         long utcMillis = startTime.getTime() -rawOffset;
318         String utsStart = DateUtils.format(new Date(utcMillis), DateUtils.FORMAT_YEAR_MONTH_DAY);
319         log.info("utsStart=" + utsStart);
320         String measurement = PointValueUtils.getMeasurement(point.getType());
321         StringBuilder sql = new StringBuilder();
322         sql.append("SELECT MAX(value) FROM ");
323         sql.append(measurement);
324         sql.append(" WHERE point = '");
325         sql.append(point.getPoint());
326         sql.append("' AND time >= '" + utsStart +"'");
585be5 327         InfluxQLQueryResult data = influxQLQueryApi.query(new InfluxQLQuery(sql.toString(), influxDBInstance.getBucket()));
a6de49 328         if (data == null) {
H 329             return null;
330         }
331         return data.getResults().get(0).getSeries().get(0).getValues().get(0).getValues()[1];
332     }
333
334     @Override
335     public List<Map<String, Object>> queryPointValues(InfluxPointValuePOJO pojo, Date startTime, Date endTime) {
336         List<InfluxPointValuePOJO> influxParams = new ArrayList<>();
337         influxParams.add(pojo);
338         Map<String, List<Map<String, Object>>> data = this.queryPointsValues(influxParams, startTime, endTime);
339         return data.get(pojo.getPoint());
340     }
341
342     @Override
343     public Map<String, List<Map<String, Object>>> queryPointsValues(List<InfluxPointValuePOJO> influxParams, Date startTime, Date endTime) {
344         if (queryApi == null) {
345             queryApi = influxDBInstance.getClient().getQueryApi();
346         }
347         if (CollectionUtils.isEmpty(influxParams)) {
348             return null;
349         }
350         Map<String, List<Map<String, Object>>> result = new HashMap<>();
351         Calendar calendar = Calendar.getInstance();
352         calendar.set(Calendar.MILLISECOND, 0);
353         String start = startTime.getTime() - calendar.getTime().getTime() + "ms";
354         if (endTime==null){
355             endTime= new Date();
356         }
357         String stop = endTime.getTime() - calendar.getTime().getTime() + "ms";
358
359         for (int i = 0; i < influxParams.size(); i++) {
360             List<Map<String, Object>> dataList = new ArrayList<>();
361             InfluxPointValuePOJO point = influxParams.get(i);
362             String measurement = PointValueUtils.getMeasurement(point.getType());
363             StringBuilder sb = new StringBuilder();
a2aa90 364             sb.append("from(bucket:\"" + influxDBInstance.getBucket() + "\") ");
a6de49 365             sb.append("|> range(start: ").append(start).append(", stop: ").append(stop).append(") ");
H 366             sb.append("|> filter(fn: (r) => r[\"_measurement\"] == \"" + measurement + "\")");
367             sb.append("|> filter(fn: (r) => r[\"_field\"] == \"value\")");
368             sb.append("|> filter(fn: (r) => r[\"point\"] == \"" + point.getPoint() + "\")");
369             sb.append("|> sort(columns: [\"_time\"]) ");
370             sb.append("|> yield(name: \"mean\")");
371             System.out.println("influxdbSql===============" + sb.toString());
372             List<FluxTable> tables = queryApi.query(sb.toString(), influxDBInstance.org);
373
374             for (FluxTable table : tables) {
375                 List<FluxRecord> records = table.getRecords();
376                 for (FluxRecord record : records) {
377                     Map<String, Object> dataIem = new HashMap<>(2);
378                     dataIem.put(VALUE, record.getValueByKey("_value"));
9bff7e 379                     dataIem.put(TIME, DateUtils.format(Date.from(record.getTime()), DateUtils.FORMAT_YEAR_MONTH_DAY_HOUR_MINUTE_SECOND));
a6de49 380                     dataList.add(dataIem);
H 381                 }
382             }
383             result.put(point.getPoint(), dataList);
384         }
385         return result;
386     }
387
388     @Override
389     public Map<String, Object> queryPointsSpread(List<InfluxPointValuePOJO> influxParams, Date startTime, Date endTime) {
390         Map<String, Object> result = new HashMap<>();
391         Map<String, List<Map<String, Object>>> data = this.queryPointsValues(influxParams, startTime, endTime);
392         if (CollectionUtils.isEmpty(data)) {
393             return result;
394         }
395         data.forEach((k, v) -> {
396             if (!CollectionUtils.isEmpty(v)) {
397                 BigDecimal lastValue = new BigDecimal(v.get(v.size() - 1).get(VALUE).toString());
398                 BigDecimal firstValue = new BigDecimal(v.get(0).get(VALUE).toString());
399                 BigDecimal spread = (lastValue.subtract(firstValue));
400                 result.put(k, spread.compareTo(BigDecimal.ZERO) < 0 ? lastValue : spread);
401             }
402             log.info(k + ",isEmpty");
403         });
404         return result;
405     }
406 }