dengzedong
3 天以前 a955f188c0380df9705c84f3a0eacccd2fc2375a
提交 | 用户 | 时间
a955f1 1 package com.iailab.module.model.influxdb.service.impl;
D 2
3 import com.iailab.module.model.influxdb.common.config.InfluxDBInstance;
4 import com.iailab.module.model.influxdb.common.utils.MeasurementUtils;
5 import com.iailab.module.model.influxdb.pojo.InfluxModelResultPOJO;
6 import com.iailab.module.model.influxdb.service.InfluxDBService;
7 import com.iailab.module.model.influxdb.vo.InfluxModelResultVO;
8 import com.influxdb.client.QueryApi;
9 import com.influxdb.client.WriteApi;
10 import com.influxdb.client.domain.WritePrecision;
11 import com.influxdb.query.FluxRecord;
12 import com.influxdb.query.FluxTable;
13 import lombok.extern.slf4j.Slf4j;
14 import org.springframework.stereotype.Service;
15 import org.springframework.util.CollectionUtils;
16
17 import javax.annotation.PostConstruct;
18 import javax.annotation.Resource;
19 import java.util.*;
20
21 /**
22  * InfluxDB操作类
23  */
24 @Slf4j
25 @Service
26 public class InfluxDBServiceImpl implements InfluxDBService {
27     @Resource
28     private InfluxDBInstance influxDBInstance;
29     private WriteApi writeApi;
30     private QueryApi queryApi;
31
32     @PostConstruct
33     private void init() {
34         writeApi = influxDBInstance.getClient().makeWriteApi();
35         queryApi = influxDBInstance.getClient().getQueryApi();
36     }
37
38     @Override
39     public void asyncWriteModelResults(List<InfluxModelResultPOJO> pointValues) {
40         if (!CollectionUtils.isEmpty(pointValues)) {
41             pointValues.forEach(item -> {
42                 String bucket = influxDBInstance.getBucket();
43                 writeApi.writeMeasurement(bucket, influxDBInstance.org, WritePrecision.MS, item);
44             });
45         }
46         writeApi.flush();
47     }
48
49     @Override
50     public List<InfluxModelResultVO> queryModelResults(InfluxModelResultPOJO pojo, Date startTime, Date endTime) {
51         Calendar calendar = Calendar.getInstance();
52         calendar.set(Calendar.MILLISECOND, 0);
53         String start = startTime.getTime() - calendar.getTime().getTime() + "ms";
54         if (endTime==null){
55             endTime= new Date();
56         }
57         String stop = endTime.getTime() - calendar.getTime().getTime() + "ms";
58
59         List<InfluxModelResultVO> dataList = new ArrayList<>();
60         String measurement = MeasurementUtils.getMeasurement(pojo.getType());
61         StringBuilder sb = new StringBuilder();
62         sb.append("from(bucket:\"" + influxDBInstance.getBucket() + "\") ");
63         sb.append("|> range(start: ").append(start).append(", stop: ").append(stop).append(") ");
64         sb.append("|> filter(fn: (r) => r[\"_measurement\"] == \"" + measurement + "\")");
65         sb.append("|> filter(fn: (r) => r[\"_field\"] == \"value\")");
66         sb.append("|> filter(fn: (r) => r[\"outPutId\"] == \"" + pojo.getOutPutId() + "\")");
67         sb.append("|> sort(columns: [\"_time\"]) ");
68         sb.append("|> yield(name: \"mean\")");
69         log.info("influxdbSql===============" + sb);
70         List<FluxTable> tables = queryApi.query(sb.toString(), influxDBInstance.org);
71
72         for (FluxTable table : tables) {
73             List<FluxRecord> records = table.getRecords();
74             for (FluxRecord record : records) {
75                 InfluxModelResultVO vo = new InfluxModelResultVO(record.getValueByKey("_value"),record.getTime());
76                 dataList.add(vo);
77             }
78         }
79         return dataList;
80     }
81 }