liriming
2025-01-14 0809aa554fc906e73a383c34f88bb4153bb69b00
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
package com.iailab.module.model.influxdb.service.impl;
 
import com.iailab.module.model.influxdb.common.config.InfluxDBInstance;
import com.iailab.module.model.influxdb.common.utils.MeasurementUtils;
import com.iailab.module.model.influxdb.pojo.InfluxModelResultByOutPutIdsPOJO;
import com.iailab.module.model.influxdb.pojo.InfluxModelResultPOJO;
import com.iailab.module.model.influxdb.service.InfluxDBService;
import com.iailab.module.model.influxdb.vo.InfluxModelResultVO;
import com.influxdb.client.QueryApi;
import com.influxdb.client.WriteApi;
import com.influxdb.client.domain.WritePrecision;
import com.influxdb.query.FluxRecord;
import com.influxdb.query.FluxTable;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
 
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.util.*;
import java.util.stream.Collectors;
 
/**
 * InfluxDB操作类
 */
@Slf4j
@Service
public class InfluxDBServiceImpl implements InfluxDBService {
    @Resource
    private InfluxDBInstance influxDBInstance;
    private WriteApi writeApi;
    private QueryApi queryApi;
 
    @PostConstruct
    private void init() {
        writeApi = influxDBInstance.getClient().makeWriteApi();
        queryApi = influxDBInstance.getClient().getQueryApi();
    }
 
    @Override
    public void asyncWriteModelResults(List<InfluxModelResultPOJO> pointValues) {
        if (!CollectionUtils.isEmpty(pointValues)) {
            pointValues.forEach(item -> {
                String bucket = influxDBInstance.getBucket();
                writeApi.writeMeasurement(bucket, influxDBInstance.org, WritePrecision.MS, item);
            });
        }
        writeApi.flush();
    }
 
    @Override
    public List<InfluxModelResultVO> queryModelResults(InfluxModelResultPOJO pojo, Date startTime, Date endTime) {
        Calendar calendar = Calendar.getInstance();
        calendar.set(Calendar.MILLISECOND, 0);
        String start = startTime.toInstant().toString();
 
        if (startTime.getTime() == endTime.getTime()) {
            // 如果相等,则engTime加1毫秒,否则influxdb报错(因为influxdb的range函数是左闭右开区间,所以将engTime加一毫秒,才可以查到startTime时间点的数据)
            endTime.setTime(endTime.getTime() + 1);
        }
        String stop = endTime.toInstant().toString();
 
        List<InfluxModelResultVO> dataList = new ArrayList<>();
        String measurement = MeasurementUtils.getMeasurement(pojo.getType());
        StringBuilder sb = new StringBuilder();
        sb.append("from(bucket:\"" + influxDBInstance.getBucket() + "\") ");
        sb.append("|> range(start: ").append(start).append(", stop: ").append(stop).append(") ");
        sb.append("|> filter(fn: (r) => r[\"_measurement\"] == \"" + measurement + "\")");
        sb.append("|> filter(fn: (r) => r[\"_field\"] == \"value\")");
        sb.append("|> filter(fn: (r) => r[\"outPutId\"] == \"" + pojo.getOutPutId() + "\")");
        sb.append("|> sort(columns: [\"_time\"]) ");
        sb.append("|> yield(name: \"mean\")");
        log.info("influxdbSql===============" + sb);
        List<FluxTable> tables = queryApi.query(sb.toString(), influxDBInstance.org);
 
        for (FluxTable table : tables) {
            List<FluxRecord> records = table.getRecords();
            for (FluxRecord record : records) {
                InfluxModelResultVO vo = new InfluxModelResultVO(record.getValueByKey("_value"),record.getTime());
                dataList.add(vo);
            }
        }
        return dataList;
    }
 
    @Override
    public Map<String, List<InfluxModelResultVO>> queryModelResultsByOutPutIds(InfluxModelResultByOutPutIdsPOJO pojo, Date startTime, Date endTime) {
        Calendar calendar = Calendar.getInstance();
        calendar.set(Calendar.MILLISECOND, 0);
        String start = startTime.toInstant().toString();
 
        if (startTime.getTime() == endTime.getTime()) {
            // 如果相等,则engTime加1毫秒,否则influxdb报错(因为influxdb的range函数是左闭右开区间,所以将engTime加一毫秒,才可以查到startTime时间点的数据)
            endTime.setTime(endTime.getTime() + 1);
        }
        String stop = endTime.toInstant().toString();
 
        String measurement = MeasurementUtils.getMeasurement(pojo.getType());
        // 拼接OutPutIds
        String outPutIdsFilter = pojo.getOutPutIds().stream().map(id -> "r[\"outPutId\"] == \"" + id + "\"").collect(Collectors.joining(" or "));
        StringBuilder sb = new StringBuilder();
        sb.append("from(bucket:\"" + influxDBInstance.getBucket() + "\") ");
        sb.append("|> range(start: ").append(start).append(", stop: ").append(stop).append(") ");
        sb.append("|> filter(fn: (r) => r[\"_measurement\"] == \"" + measurement + "\")");
        sb.append("|> filter(fn: (r) => r[\"_field\"] == \"value\")");
        sb.append("|> filter(fn: (r) => " + outPutIdsFilter + ")");
        sb.append("|> sort(columns: [\"_time\"]) ");
        sb.append("|> yield(name: \"mean\")");
        log.info("influxdbSql===============" + sb);
        List<FluxTable> tables = queryApi.query(sb.toString(), influxDBInstance.org);
 
        Map<String, List<InfluxModelResultVO>> result = new HashMap<>(pojo.getOutPutIds().size());
        for (FluxTable table : tables) {
            List<FluxRecord> records = table.getRecords();
            for (FluxRecord record : records) {
                String outPutId = record.getValueByKey("outPutId").toString();
                if (result.containsKey(outPutId)) {
                    result.get(outPutId).add( new InfluxModelResultVO(record.getValueByKey("_value"),record.getTime()));
                } else {
                    List<InfluxModelResultVO> dataList = new ArrayList<>();
                    InfluxModelResultVO vo = new InfluxModelResultVO(record.getValueByKey("_value"),record.getTime());
                    dataList.add(vo);
                    result.put(outPutId,dataList);
                }
            }
        }
        return result;
    }
}