潘志宝
3 天以前 3903c6f4ef7f5fa7dd931bbffc51d0ce0d6f0af1
提交 | 用户 | 时间
a955f1 1 package com.iailab.module.model.influxdb.service.impl;
D 2
3 import com.iailab.module.model.influxdb.common.config.InfluxDBInstance;
4 import com.iailab.module.model.influxdb.common.utils.MeasurementUtils;
5 import com.iailab.module.model.influxdb.pojo.InfluxModelResultPOJO;
6 import com.iailab.module.model.influxdb.service.InfluxDBService;
7 import com.iailab.module.model.influxdb.vo.InfluxModelResultVO;
8 import com.influxdb.client.QueryApi;
9 import com.influxdb.client.WriteApi;
10 import com.influxdb.client.domain.WritePrecision;
11 import com.influxdb.query.FluxRecord;
12 import com.influxdb.query.FluxTable;
13 import lombok.extern.slf4j.Slf4j;
14 import org.springframework.stereotype.Service;
15 import org.springframework.util.CollectionUtils;
16
17 import javax.annotation.PostConstruct;
18 import javax.annotation.Resource;
19 import java.util.*;
20
21 /**
22  * InfluxDB操作类
23  */
24 @Slf4j
25 @Service
26 public class InfluxDBServiceImpl implements InfluxDBService {
27     @Resource
28     private InfluxDBInstance influxDBInstance;
29     private WriteApi writeApi;
30     private QueryApi queryApi;
31
32     @PostConstruct
33     private void init() {
34         writeApi = influxDBInstance.getClient().makeWriteApi();
35         queryApi = influxDBInstance.getClient().getQueryApi();
36     }
37
38     @Override
39     public void asyncWriteModelResults(List<InfluxModelResultPOJO> pointValues) {
40         if (!CollectionUtils.isEmpty(pointValues)) {
41             pointValues.forEach(item -> {
42                 String bucket = influxDBInstance.getBucket();
43                 writeApi.writeMeasurement(bucket, influxDBInstance.org, WritePrecision.MS, item);
44             });
45         }
46         writeApi.flush();
47     }
48
49     @Override
50     public List<InfluxModelResultVO> queryModelResults(InfluxModelResultPOJO pojo, Date startTime, Date endTime) {
51         Calendar calendar = Calendar.getInstance();
52         calendar.set(Calendar.MILLISECOND, 0);
1d4ac2 53         String start = startTime.toInstant().toString();
D 54
55         if (startTime.getTime() == endTime.getTime()) {
033e69 56             // 如果相等,则engTime加1毫秒,否则influxdb报错(因为influxdb的range函数是左闭右开区间,所以将engTime加一毫秒,才可以查到startTime时间点的数据)
1d4ac2 57             endTime.setTime(endTime.getTime() + 1);
a955f1 58         }
1d4ac2 59         String stop = endTime.toInstant().toString();
a955f1 60
D 61         List<InfluxModelResultVO> dataList = new ArrayList<>();
62         String measurement = MeasurementUtils.getMeasurement(pojo.getType());
63         StringBuilder sb = new StringBuilder();
64         sb.append("from(bucket:\"" + influxDBInstance.getBucket() + "\") ");
65         sb.append("|> range(start: ").append(start).append(", stop: ").append(stop).append(") ");
66         sb.append("|> filter(fn: (r) => r[\"_measurement\"] == \"" + measurement + "\")");
67         sb.append("|> filter(fn: (r) => r[\"_field\"] == \"value\")");
68         sb.append("|> filter(fn: (r) => r[\"outPutId\"] == \"" + pojo.getOutPutId() + "\")");
69         sb.append("|> sort(columns: [\"_time\"]) ");
70         sb.append("|> yield(name: \"mean\")");
71         log.info("influxdbSql===============" + sb);
72         List<FluxTable> tables = queryApi.query(sb.toString(), influxDBInstance.org);
73
74         for (FluxTable table : tables) {
75             List<FluxRecord> records = table.getRecords();
76             for (FluxRecord record : records) {
77                 InfluxModelResultVO vo = new InfluxModelResultVO(record.getValueByKey("_value"),record.getTime());
78                 dataList.add(vo);
79             }
80         }
81         return dataList;
82     }
83 }