潘志宝
2024-12-25 e2b151e9bed2b6798cfc1ed499e98bcb0665d6ec
提交 | 用户 | 时间
a6de49 1 package com.iailab.module.data.influxdb.service.impl;
H 2
139c6a 3 import com.iailab.module.data.api.dto.ApiPointValueQueryDTO;
a6de49 4 import com.iailab.module.data.common.utils.ExcelUtil;
H 5 import com.iailab.framework.common.util.date.DateUtils;
6 import com.iailab.module.data.api.dto.ApiExportValueDTO;
7 import com.iailab.module.data.api.dto.ApiTagValueQueryDTO;
8 import com.iailab.module.data.influxdb.common.config.InfluxDBInstance;
9 import com.iailab.module.data.influxdb.common.enums.DataMeasurement;
10 import com.iailab.module.data.influxdb.common.utils.PointValueUtils;
11 import com.iailab.module.data.influxdb.common.utils.TagValueUtils;
12 import com.iailab.module.data.influxdb.pojo.InfluxPointValuePOJO;
13 import com.iailab.module.data.influxdb.pojo.InfluxTagValuePOJO;
14 import com.iailab.module.data.influxdb.service.InfluxDBService;
139c6a 15 import com.iailab.module.data.point.dto.DaPointDTO;
D 16 import com.iailab.module.data.point.service.DaPointService;
17 import com.iailab.module.data.point.vo.PointValueExportVO;
a6de49 18 import com.influxdb.client.InfluxQLQueryApi;
H 19 import com.influxdb.client.QueryApi;
20 import com.influxdb.client.WriteApi;
21 import com.influxdb.client.WriteApiBlocking;
22 import com.influxdb.client.domain.InfluxQLQuery;
23 import com.influxdb.client.domain.WritePrecision;
24 import com.influxdb.client.write.Point;
25 import com.influxdb.query.FluxRecord;
26 import com.influxdb.query.FluxTable;
27 import com.influxdb.query.InfluxQLQueryResult;
28 import lombok.extern.slf4j.Slf4j;
29 import javax.annotation.Resource;
139c6a 30
D 31 import org.springframework.beans.factory.annotation.Autowired;
a6de49 32 import org.springframework.stereotype.Service;
H 33 import org.springframework.util.CollectionUtils;
34
35 import javax.servlet.http.HttpServletRequest;
36 import javax.servlet.http.HttpServletResponse;
37 import java.math.BigDecimal;
139c6a 38 import java.text.SimpleDateFormat;
a6de49 39 import java.util.*;
139c6a 40 import java.util.stream.Collectors;
D 41
42 import static com.iailab.framework.common.pojo.CommonResult.success;
a6de49 43
H 44 /**
45  * InfluxDB操作类
46  */
47 @Slf4j
48 @Service
49 public class InfluxDBServiceImpl implements InfluxDBService {
50
51     @Resource
52     private InfluxDBInstance influxDBInstance;
53
54     private WriteApi writeApi;
139c6a 55
D 56     @Autowired
57     private DaPointService daPointService;
a6de49 58
H 59     private WriteApiBlocking writeApiBlocking;
60
61     private QueryApi queryApi;
62
63     private InfluxQLQueryApi influxQLQueryApi;
64
a2aa90 65     public static final String VALUE = "value";
a6de49 66
a2aa90 67     public static final String TIME = "time";
a6de49 68
H 69     private int rawOffset = TimeZone.getDefault().getRawOffset();
70
71     private int pas_ms = 1000;
139c6a 72
D 73     private SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
74
75     @Resource
76     private InfluxDBService influxDBService;
a6de49 77
H 78     @Override
79     public void syncWriteFloatValue(String pointNo, String dataValue, long time) {
80         if (writeApiBlocking == null) {
81             writeApiBlocking = influxDBInstance.getClient().getWriteApiBlocking();
82         }
83         Point point = Point.measurement(DataMeasurement.t_da_sim_value.name());
84         point.addTag("point", pointNo);
85         point.addField("value", Double.parseDouble(dataValue));
86         point.time(time, WritePrecision.MS);
585be5 87         writeApiBlocking.writePoint(influxDBInstance.getBucket(), influxDBInstance.org, point);
a6de49 88     }
H 89
90     @Override
91     public void syncWriteIntValue(String pointNo, String dataValue, long time) {
92         if (writeApiBlocking == null) {
93             writeApiBlocking = influxDBInstance.getClient().getWriteApiBlocking();
94         }
95         Point point = Point.measurement(DataMeasurement.t_da_dig_value.name());
96         point.addTag("point", pointNo);
97         point.addField("value", Integer.parseInt(dataValue));
98         point.time(time, WritePrecision.MS);
585be5 99         writeApiBlocking.writePoint(influxDBInstance.getBucket(), influxDBInstance.org, point);
a6de49 100     }
H 101
102     @Override
103     public void syncWriteBooleanValue(String pointNo, String dataValue, long time) {
104         if (writeApiBlocking == null) {
105             writeApiBlocking = influxDBInstance.getClient().getWriteApiBlocking();
106         }
107         Point point = Point.measurement(DataMeasurement.t_da_bool_value.name());
108         point.addTag("point", pointNo);
109         point.addField("value", Boolean.parseBoolean(dataValue));
110         point.time(time, WritePrecision.MS);
585be5 111         writeApiBlocking.writePoint(influxDBInstance.getBucket(), influxDBInstance.org, point);
a6de49 112     }
H 113
114     @Override
115     public void asyncWritePointValues(List<InfluxPointValuePOJO> pointValues) {
116         if (writeApi == null) {
117             writeApi = influxDBInstance.getClient().makeWriteApi();
118         }
119         if (!CollectionUtils.isEmpty(pointValues)) {
120             pointValues.forEach(item -> {
d41f14 121                 String bucket = influxDBInstance.getBucket();
122                 writeApi.writeMeasurement(bucket, influxDBInstance.org, WritePrecision.MS, item);
a6de49 123             });
H 124         }
125         writeApi.flush();
126     }
127
128     @Override
129     public void asyncWriteTagValues(List<InfluxTagValuePOJO> tagValues) {
130         if (writeApi == null) {
131             writeApi = influxDBInstance.getClient().makeWriteApi();
132         }
133         if (!CollectionUtils.isEmpty(tagValues)) {
134             tagValues.forEach(item -> {
54626e 135                 log.info("bucket==="+ influxDBInstance.getBucket() + ",org===" + influxDBInstance.org);
585be5 136                 writeApi.writeMeasurement(influxDBInstance.getBucket(), influxDBInstance.org, WritePrecision.MS, item);
a6de49 137             });
H 138         }
139         writeApi.flush();
140     }
141
142     @Override
143     public Map<String, List<Object>> queryTagsValues(List<InfluxTagValuePOJO> influxParams, Date startTime, Date endTime) {
144         if (queryApi == null) {
145             queryApi = influxDBInstance.getClient().getQueryApi();
146         }
147         if (CollectionUtils.isEmpty(influxParams)) {
148             return null;
149         }
150         Map<String, List<Object>> result = new HashMap<>();
151         Calendar calendar = Calendar.getInstance();
152         calendar.set(Calendar.MILLISECOND, 0);
153         // String start = startTime.getTime() - calendar.getTime().getTime() + "ms";
154         String start = startTime.getTime() - calendar.getTime().getTime() - pas_ms + "ms";
155         if (endTime==null){
156             endTime= new Date();
157         }
158         String stop = endTime.getTime() - calendar.getTime().getTime() + "ms";
159
160         for (int i = 0; i < influxParams.size(); i++) {
161             List<Object> dataList = new ArrayList<>();
162             InfluxTagValuePOJO tag = influxParams.get(i);
163             String measurement = TagValueUtils.getMeasurement(tag.getType());
164             StringBuilder sb = new StringBuilder();
585be5 165             sb.append("from(bucket:\"" + influxDBInstance.getBucket() + "\") ");
a6de49 166             sb.append("|> range(start: ").append(start).append(", stop: ").append(stop).append(") ");
H 167             sb.append("|> filter(fn: (r) => r[\"_measurement\"] == \"" + measurement + "\")");
168             sb.append("|> filter(fn: (r) => r[\"_field\"] == \"value\")");
169             sb.append("|> filter(fn: (r) => r[\"id\"] == \"" + tag.getId() + "\")");
170             sb.append("|> sort(columns: [\"_time\"]) ");
171             sb.append("|> yield(name: \"mean\")");
172             List<FluxTable> tables = queryApi.query(sb.toString(), influxDBInstance.org);
173
174             for (FluxTable table : tables) {
175                 List<FluxRecord> records = table.getRecords();
176                 for (FluxRecord record : records) {
177                     Map<String, Object> dataIem = new HashMap<>(2);
178                     dataIem.put(VALUE, record.getValueByKey("_value"));
179                     dataIem.put(TIME, Date.from(record.getTime()));
180                     dataList.add(dataIem);
181                 }
182             }
183             result.put(tag.getId(), dataList);
184         }
185         return result;
186     }
187
188     private List<ApiExportValueDTO> getExportValue(InfluxTagValuePOJO tag, Date startTime, Date endTime) {
189         List<ApiExportValueDTO> dataList = new ArrayList<>();
190         if (queryApi == null) {
191             queryApi = influxDBInstance.getClient().getQueryApi();
192         }
193         Map<String, List<Object>> result = new HashMap<>();
194         Calendar calendar = Calendar.getInstance();
195         calendar.set(Calendar.MILLISECOND, 0);
196         String start = startTime.getTime() - calendar.getTime().getTime() + "ms";
197         String stop = endTime.getTime() - calendar.getTime().getTime() + "ms";
198
199
200         String measurement = TagValueUtils.getMeasurement(tag.getType());
201         StringBuilder sb = new StringBuilder();
585be5 202         sb.append("from(bucket:\"" + influxDBInstance.getBucket() + "\") ");
a6de49 203         sb.append("|> range(start: ").append(start).append(", stop: ").append(stop).append(") ");
H 204         sb.append("|> filter(fn: (r) => r[\"_measurement\"] == \"" + measurement + "\")");
205         sb.append("|> filter(fn: (r) => r[\"_field\"] == \"value\")");
206         sb.append("|> filter(fn: (r) => r[\"id\"] == \"" + tag.getId() + "\")");
207         sb.append("|> sort(columns: [\"_time\"]) ");
208         sb.append("|> yield(name: \"mean\")");
209         List<FluxTable> tables = queryApi.query(sb.toString(), influxDBInstance.org);
210
211         for (FluxTable table : tables) {
212             List<FluxRecord> records = table.getRecords();
213             for (FluxRecord record : records) {
214                 ApiExportValueDTO dataIem = new ApiExportValueDTO();
215                 dataIem.setDataValue(record.getValueByKey("_value").toString());
216                 dataIem.setDataTime(DateUtils.format(Date.from(record.getTime()), DateUtils.FORMAT_YEAR_MONTH_DAY));
217                 dataList.add(dataIem);
218             }
219         }
220         return dataList;
221     }
222
223     @Override
139c6a 224     public boolean exportTagValue(HttpServletResponse response, HttpServletRequest request, ApiTagValueQueryDTO queryDto) {
D 225         //构建参数
226         Map<String, Object> params = new HashMap<>(1);
227         params.put("pointNos", queryDto.getTagIds());
228
229         //查询point列表
230         List<DaPointDTO> pointList = daPointService.list(params);
231         if (CollectionUtils.isEmpty(pointList)) {
232             return true;
233         }
234
235         //插入pointType
236         List<InfluxPointValuePOJO> influxParams = pointList.stream().map(item -> {
237             InfluxPointValuePOJO pojo = new InfluxPointValuePOJO();
238             pojo.setPoint(item.getTagNo());
239             pojo.setType(item.getDataType());
240             return pojo;
241         }).collect(Collectors.toList());
242
243         //查询
244         Map<String, List<Map<String, Object>>> data = queryPointsValues(influxParams, queryDto.getStart(), queryDto.getEnd());
245
246         //提取list
247         List<Map<String, Object>> list = data.get(queryDto.getTagIds().get(0));
248
249         //导出
a6de49 250         try{
H 251             String sheetTitle = "采集数据";
252             String[] title = new String[]{"值", "时间"};
139c6a 253             ExcelUtil.exportPointValue(sheetTitle, title, new ArrayList<>(list), response);
a6de49 254         } catch (Exception ex) {
H 255             return false;
256         }
257         return true;
258     }
259
139c6a 260     @Override
D 261     public List<PointValueExportVO> exportPointValue(ApiPointValueQueryDTO queryDto) {
262
263         List<PointValueExportVO> pointValueExportList = new ArrayList<>();
264
265         //构建参数
266         Map<String, Object> params = new HashMap<>(1);
267         params.put("pointNos", queryDto.getPointNos());
268
269         //查询point列表
270         List<DaPointDTO> pointList = daPointService.list(params);
271         if (CollectionUtils.isEmpty(pointList)) {
272             return pointValueExportList;
273         }
274
275         //插入pointType
276         List<InfluxPointValuePOJO> influxParams = pointList.stream().map(item -> {
277             InfluxPointValuePOJO pojo = new InfluxPointValuePOJO();
278             pojo.setPoint(item.getPointNo());
279             pojo.setType(item.getDataType());
280             return pojo;
281         }).collect(Collectors.toList());
282
283         //查询
284         Map<String, List<Map<String, Object>>> data = queryPointsValues(influxParams, queryDto.getStart(), queryDto.getEnd());
285
286         //提取list
287         List<Map<String, Object>> list = data.get(queryDto.getPointNos().get(0));
288         for(Map<String, Object> map : list){
289             PointValueExportVO dto = new PointValueExportVO();
290             dto.setDatatime(map.get("time").toString());
291             dto.setDatavalue(map.get("value").toString());
292             pointValueExportList.add(dto);
293         }
294
295         return pointValueExportList;
296     }
a6de49 297
H 298     @Override
299     public Map<String, Object> queryPointsLastValue(List<InfluxPointValuePOJO> influxParams) {
300         Map<String, Object> result = new HashMap<>(influxParams.size());
301         if (influxQLQueryApi == null) {
302             influxQLQueryApi = influxDBInstance.getClient().getInfluxQLQueryApi();
303         }
304         for (int i = 0; i < influxParams.size(); i++) {
305             InfluxPointValuePOJO point = influxParams.get(i);
306             String measurement = PointValueUtils.getMeasurement(point.getType());
307             StringBuilder sql = new StringBuilder();
308             sql.append("SELECT LAST(value) FROM ");
309             sql.append(measurement);
310             sql.append(" WHERE point = '");
311             sql.append(point.getPoint());
312             sql.append("'");
585be5 313             InfluxQLQueryResult data = influxQLQueryApi.query(new InfluxQLQuery(sql.toString(), influxDBInstance.getBucket()));
a6de49 314             Object value = data.getResults().get(0).getSeries().get(0).getValues().get(0).getValueByKey("last");
H 315             result.put(point.getPoint(), value);
316         }
317         return result;
318     }
319
320     @Override
321     public Object queryPointMaxValue(InfluxPointValuePOJO point, Date startTime) {
322         if (influxQLQueryApi == null) {
323             influxQLQueryApi = influxDBInstance.getClient().getInfluxQLQueryApi();
324         }
325         long utcMillis = startTime.getTime() -rawOffset;
326         String utsStart = DateUtils.format(new Date(utcMillis), DateUtils.FORMAT_YEAR_MONTH_DAY);
327         log.info("utsStart=" + utsStart);
328         String measurement = PointValueUtils.getMeasurement(point.getType());
329         StringBuilder sql = new StringBuilder();
330         sql.append("SELECT MAX(value) FROM ");
331         sql.append(measurement);
332         sql.append(" WHERE point = '");
333         sql.append(point.getPoint());
334         sql.append("' AND time >= '" + utsStart +"'");
585be5 335         InfluxQLQueryResult data = influxQLQueryApi.query(new InfluxQLQuery(sql.toString(), influxDBInstance.getBucket()));
a6de49 336         if (data == null) {
H 337             return null;
338         }
339         return data.getResults().get(0).getSeries().get(0).getValues().get(0).getValues()[1];
340     }
341
342     @Override
343     public List<Map<String, Object>> queryPointValues(InfluxPointValuePOJO pojo, Date startTime, Date endTime) {
344         List<InfluxPointValuePOJO> influxParams = new ArrayList<>();
345         influxParams.add(pojo);
346         Map<String, List<Map<String, Object>>> data = this.queryPointsValues(influxParams, startTime, endTime);
347         return data.get(pojo.getPoint());
348     }
349
350     @Override
351     public Map<String, List<Map<String, Object>>> queryPointsValues(List<InfluxPointValuePOJO> influxParams, Date startTime, Date endTime) {
352         if (queryApi == null) {
353             queryApi = influxDBInstance.getClient().getQueryApi();
354         }
355         if (CollectionUtils.isEmpty(influxParams)) {
356             return null;
357         }
358         Map<String, List<Map<String, Object>>> result = new HashMap<>();
359         Calendar calendar = Calendar.getInstance();
360         calendar.set(Calendar.MILLISECOND, 0);
361         String start = startTime.getTime() - calendar.getTime().getTime() + "ms";
362         if (endTime==null){
363             endTime= new Date();
364         }
365         String stop = endTime.getTime() - calendar.getTime().getTime() + "ms";
366
367         for (int i = 0; i < influxParams.size(); i++) {
368             List<Map<String, Object>> dataList = new ArrayList<>();
369             InfluxPointValuePOJO point = influxParams.get(i);
370             String measurement = PointValueUtils.getMeasurement(point.getType());
371             StringBuilder sb = new StringBuilder();
a2aa90 372             sb.append("from(bucket:\"" + influxDBInstance.getBucket() + "\") ");
a6de49 373             sb.append("|> range(start: ").append(start).append(", stop: ").append(stop).append(") ");
H 374             sb.append("|> filter(fn: (r) => r[\"_measurement\"] == \"" + measurement + "\")");
375             sb.append("|> filter(fn: (r) => r[\"_field\"] == \"value\")");
376             sb.append("|> filter(fn: (r) => r[\"point\"] == \"" + point.getPoint() + "\")");
377             sb.append("|> sort(columns: [\"_time\"]) ");
378             sb.append("|> yield(name: \"mean\")");
379             System.out.println("influxdbSql===============" + sb.toString());
380             List<FluxTable> tables = queryApi.query(sb.toString(), influxDBInstance.org);
381
382             for (FluxTable table : tables) {
383                 List<FluxRecord> records = table.getRecords();
384                 for (FluxRecord record : records) {
385                     Map<String, Object> dataIem = new HashMap<>(2);
386                     dataIem.put(VALUE, record.getValueByKey("_value"));
139c6a 387                     dataIem.put(TIME, sdf.format(Date.from(record.getTime())));
a6de49 388                     dataList.add(dataIem);
H 389                 }
390             }
391             result.put(point.getPoint(), dataList);
392         }
393         return result;
394     }
395
396     @Override
397     public Map<String, Object> queryPointsSpread(List<InfluxPointValuePOJO> influxParams, Date startTime, Date endTime) {
398         Map<String, Object> result = new HashMap<>();
399         Map<String, List<Map<String, Object>>> data = this.queryPointsValues(influxParams, startTime, endTime);
400         if (CollectionUtils.isEmpty(data)) {
401             return result;
402         }
403         data.forEach((k, v) -> {
404             if (!CollectionUtils.isEmpty(v)) {
405                 BigDecimal lastValue = new BigDecimal(v.get(v.size() - 1).get(VALUE).toString());
406                 BigDecimal firstValue = new BigDecimal(v.get(0).get(VALUE).toString());
407                 BigDecimal spread = (lastValue.subtract(firstValue));
408                 result.put(k, spread.compareTo(BigDecimal.ZERO) < 0 ? lastValue : spread);
409             }
410             log.info(k + ",isEmpty");
411         });
412         return result;
413     }
414 }