潘志宝
3 天以前 3903c6f4ef7f5fa7dd931bbffc51d0ce0d6f0af1
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
package com.iailab.module.model.influxdb.service.impl;
 
import com.iailab.module.model.influxdb.common.config.InfluxDBInstance;
import com.iailab.module.model.influxdb.common.utils.MeasurementUtils;
import com.iailab.module.model.influxdb.pojo.InfluxModelResultPOJO;
import com.iailab.module.model.influxdb.service.InfluxDBService;
import com.iailab.module.model.influxdb.vo.InfluxModelResultVO;
import com.influxdb.client.QueryApi;
import com.influxdb.client.WriteApi;
import com.influxdb.client.domain.WritePrecision;
import com.influxdb.query.FluxRecord;
import com.influxdb.query.FluxTable;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
 
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.util.*;
 
/**
 * InfluxDB操作类
 */
@Slf4j
@Service
public class InfluxDBServiceImpl implements InfluxDBService {
    @Resource
    private InfluxDBInstance influxDBInstance;
    private WriteApi writeApi;
    private QueryApi queryApi;
 
    @PostConstruct
    private void init() {
        writeApi = influxDBInstance.getClient().makeWriteApi();
        queryApi = influxDBInstance.getClient().getQueryApi();
    }
 
    @Override
    public void asyncWriteModelResults(List<InfluxModelResultPOJO> pointValues) {
        if (!CollectionUtils.isEmpty(pointValues)) {
            pointValues.forEach(item -> {
                String bucket = influxDBInstance.getBucket();
                writeApi.writeMeasurement(bucket, influxDBInstance.org, WritePrecision.MS, item);
            });
        }
        writeApi.flush();
    }
 
    @Override
    public List<InfluxModelResultVO> queryModelResults(InfluxModelResultPOJO pojo, Date startTime, Date endTime) {
        Calendar calendar = Calendar.getInstance();
        calendar.set(Calendar.MILLISECOND, 0);
        String start = startTime.toInstant().toString();
 
        if (startTime.getTime() == endTime.getTime()) {
            // 如果相等,则engTime加1毫秒,否则influxdb报错(因为influxdb的range函数是左闭右开区间,所以将engTime加一毫秒,才可以查到startTime时间点的数据)
            endTime.setTime(endTime.getTime() + 1);
        }
        String stop = endTime.toInstant().toString();
 
        List<InfluxModelResultVO> dataList = new ArrayList<>();
        String measurement = MeasurementUtils.getMeasurement(pojo.getType());
        StringBuilder sb = new StringBuilder();
        sb.append("from(bucket:\"" + influxDBInstance.getBucket() + "\") ");
        sb.append("|> range(start: ").append(start).append(", stop: ").append(stop).append(") ");
        sb.append("|> filter(fn: (r) => r[\"_measurement\"] == \"" + measurement + "\")");
        sb.append("|> filter(fn: (r) => r[\"_field\"] == \"value\")");
        sb.append("|> filter(fn: (r) => r[\"outPutId\"] == \"" + pojo.getOutPutId() + "\")");
        sb.append("|> sort(columns: [\"_time\"]) ");
        sb.append("|> yield(name: \"mean\")");
        log.info("influxdbSql===============" + sb);
        List<FluxTable> tables = queryApi.query(sb.toString(), influxDBInstance.org);
 
        for (FluxTable table : tables) {
            List<FluxRecord> records = table.getRecords();
            for (FluxRecord record : records) {
                InfluxModelResultVO vo = new InfluxModelResultVO(record.getValueByKey("_value"),record.getTime());
                dataList.add(vo);
            }
        }
        return dataList;
    }
}