潘志宝
2024-11-15 4be7d863a161b64f8592a789d699e807545e7dc6
iailab-module-data/iailab-module-data-biz/src/main/java/com/iailab/module/data/influxdb/service/impl/InfluxDBServiceImpl.java
@@ -1,5 +1,6 @@
package com.iailab.module.data.influxdb.service.impl;
import com.iailab.module.data.api.dto.ApiPointValueQueryDTO;
import com.iailab.module.data.common.utils.ExcelUtil;
import com.iailab.framework.common.util.date.DateUtils;
import com.iailab.module.data.api.dto.ApiExportValueDTO;
@@ -11,6 +12,9 @@
import com.iailab.module.data.influxdb.pojo.InfluxPointValuePOJO;
import com.iailab.module.data.influxdb.pojo.InfluxTagValuePOJO;
import com.iailab.module.data.influxdb.service.InfluxDBService;
import com.iailab.module.data.point.dto.DaPointDTO;
import com.iailab.module.data.point.service.DaPointService;
import com.iailab.module.data.point.vo.PointValueExportVO;
import com.influxdb.client.InfluxQLQueryApi;
import com.influxdb.client.QueryApi;
import com.influxdb.client.WriteApi;
@@ -23,13 +27,19 @@
import com.influxdb.query.InfluxQLQueryResult;
import lombok.extern.slf4j.Slf4j;
import javax.annotation.Resource;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.math.BigDecimal;
import java.text.SimpleDateFormat;
import java.util.*;
import java.util.stream.Collectors;
import static com.iailab.framework.common.pojo.CommonResult.success;
/**
 * InfluxDB操作类
@@ -43,19 +53,27 @@
    private WriteApi writeApi;
    @Autowired
    private DaPointService daPointService;
    private WriteApiBlocking writeApiBlocking;
    private QueryApi queryApi;
    private InfluxQLQueryApi influxQLQueryApi;
    private String VALUE = "value";
    public static final String VALUE = "value";
    private String TIME = "time";
    public static final String TIME = "time";
    private int rawOffset = TimeZone.getDefault().getRawOffset();
    private int pas_ms = 1000;
    private SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    @Resource
    private InfluxDBService influxDBService;
    @Override
    public void syncWriteFloatValue(String pointNo, String dataValue, long time) {
@@ -66,7 +84,7 @@
        point.addTag("point", pointNo);
        point.addField("value", Double.parseDouble(dataValue));
        point.time(time, WritePrecision.MS);
        writeApiBlocking.writePoint(influxDBInstance.bucket, influxDBInstance.org, point);
        writeApiBlocking.writePoint(influxDBInstance.getBucket(), influxDBInstance.org, point);
    }
    @Override
@@ -78,7 +96,7 @@
        point.addTag("point", pointNo);
        point.addField("value", Integer.parseInt(dataValue));
        point.time(time, WritePrecision.MS);
        writeApiBlocking.writePoint(influxDBInstance.bucket, influxDBInstance.org, point);
        writeApiBlocking.writePoint(influxDBInstance.getBucket(), influxDBInstance.org, point);
    }
    @Override
@@ -90,7 +108,7 @@
        point.addTag("point", pointNo);
        point.addField("value", Boolean.parseBoolean(dataValue));
        point.time(time, WritePrecision.MS);
        writeApiBlocking.writePoint(influxDBInstance.bucket, influxDBInstance.org, point);
        writeApiBlocking.writePoint(influxDBInstance.getBucket(), influxDBInstance.org, point);
    }
    @Override
@@ -100,7 +118,8 @@
        }
        if (!CollectionUtils.isEmpty(pointValues)) {
            pointValues.forEach(item -> {
                writeApi.writeMeasurement(influxDBInstance.bucket, influxDBInstance.org, WritePrecision.MS, item);
                String bucket = influxDBInstance.getBucket();
                writeApi.writeMeasurement(bucket, influxDBInstance.org, WritePrecision.MS, item);
            });
        }
        writeApi.flush();
@@ -113,7 +132,8 @@
        }
        if (!CollectionUtils.isEmpty(tagValues)) {
            tagValues.forEach(item -> {
                writeApi.writeMeasurement(influxDBInstance.bucket, influxDBInstance.org, WritePrecision.MS, item);
                log.info("bucket==="+ influxDBInstance.getBucket() + ",org===" + influxDBInstance.org);
                writeApi.writeMeasurement(influxDBInstance.getBucket(), influxDBInstance.org, WritePrecision.MS, item);
            });
        }
        writeApi.flush();
@@ -142,7 +162,7 @@
            InfluxTagValuePOJO tag = influxParams.get(i);
            String measurement = TagValueUtils.getMeasurement(tag.getType());
            StringBuilder sb = new StringBuilder();
            sb.append("from(bucket:\"" + influxDBInstance.bucket + "\") ");
            sb.append("from(bucket:\"" + influxDBInstance.getBucket() + "\") ");
            sb.append("|> range(start: ").append(start).append(", stop: ").append(stop).append(") ");
            sb.append("|> filter(fn: (r) => r[\"_measurement\"] == \"" + measurement + "\")");
            sb.append("|> filter(fn: (r) => r[\"_field\"] == \"value\")");
@@ -179,7 +199,7 @@
        String measurement = TagValueUtils.getMeasurement(tag.getType());
        StringBuilder sb = new StringBuilder();
        sb.append("from(bucket:\"" + influxDBInstance.bucket + "\") ");
        sb.append("from(bucket:\"" + influxDBInstance.getBucket() + "\") ");
        sb.append("|> range(start: ").append(start).append(", stop: ").append(stop).append(") ");
        sb.append("|> filter(fn: (r) => r[\"_measurement\"] == \"" + measurement + "\")");
        sb.append("|> filter(fn: (r) => r[\"_field\"] == \"value\")");
@@ -201,21 +221,79 @@
    }
    @Override
    public boolean exportTagValue(HttpServletResponse response, HttpServletRequest request, ApiTagValueQueryDTO params) {
        InfluxTagValuePOJO tag = new InfluxTagValuePOJO();
        tag.setId(params.getTagId());
        tag.setType(params.getDataType());
        List<ApiExportValueDTO> valueList = getExportValue(tag, params.getStart(), params.getEnd());
    public boolean exportTagValue(HttpServletResponse response, HttpServletRequest request, ApiTagValueQueryDTO queryDto) {
        //构建参数
        Map<String, Object> params = new HashMap<>(1);
        params.put("pointNos", queryDto.getTagIds());
        //查询point列表
        List<DaPointDTO> pointList = daPointService.list(params);
        if (CollectionUtils.isEmpty(pointList)) {
            return true;
        }
        //插入pointType
        List<InfluxPointValuePOJO> influxParams = pointList.stream().map(item -> {
            InfluxPointValuePOJO pojo = new InfluxPointValuePOJO();
            pojo.setPoint(item.getTagNo());
            pojo.setType(item.getDataType());
            return pojo;
        }).collect(Collectors.toList());
        //查询
        Map<String, List<Map<String, Object>>> data = queryPointsValues(influxParams, queryDto.getStart(), queryDto.getEnd());
        //提取list
        List<Map<String, Object>> list = data.get(queryDto.getTagIds().get(0));
        //导出
        try{
            String sheetTitle = "采集数据";
            String[] title = new String[]{"值", "时间"};
            ExcelUtil.exportPointValue(sheetTitle, title, new ArrayList<>(valueList), response);
            ExcelUtil.exportPointValue(sheetTitle, title, new ArrayList<>(list), response);
        } catch (Exception ex) {
            return false;
        }
        return true;
    }
    @Override
    public List<PointValueExportVO> exportPointValue(ApiPointValueQueryDTO queryDto) {
        List<PointValueExportVO> pointValueExportList = new ArrayList<>();
        //构建参数
        Map<String, Object> params = new HashMap<>(1);
        params.put("pointNos", queryDto.getPointNos());
        //查询point列表
        List<DaPointDTO> pointList = daPointService.list(params);
        if (CollectionUtils.isEmpty(pointList)) {
            return pointValueExportList;
        }
        //插入pointType
        List<InfluxPointValuePOJO> influxParams = pointList.stream().map(item -> {
            InfluxPointValuePOJO pojo = new InfluxPointValuePOJO();
            pojo.setPoint(item.getPointNo());
            pojo.setType(item.getDataType());
            return pojo;
        }).collect(Collectors.toList());
        //查询
        Map<String, List<Map<String, Object>>> data = queryPointsValues(influxParams, queryDto.getStart(), queryDto.getEnd());
        //提取list
        List<Map<String, Object>> list = data.get(queryDto.getPointNos().get(0));
        for(Map<String, Object> map : list){
            PointValueExportVO dto = new PointValueExportVO();
            dto.setDatatime(map.get("time").toString());
            dto.setDatavalue(map.get("value").toString());
            pointValueExportList.add(dto);
        }
        return pointValueExportList;
    }
    @Override
    public Map<String, Object> queryPointsLastValue(List<InfluxPointValuePOJO> influxParams) {
@@ -232,7 +310,7 @@
            sql.append(" WHERE point = '");
            sql.append(point.getPoint());
            sql.append("'");
            InfluxQLQueryResult data = influxQLQueryApi.query(new InfluxQLQuery(sql.toString(), influxDBInstance.bucket));
            InfluxQLQueryResult data = influxQLQueryApi.query(new InfluxQLQuery(sql.toString(), influxDBInstance.getBucket()));
            Object value = data.getResults().get(0).getSeries().get(0).getValues().get(0).getValueByKey("last");
            result.put(point.getPoint(), value);
        }
@@ -254,7 +332,7 @@
        sql.append(" WHERE point = '");
        sql.append(point.getPoint());
        sql.append("' AND time >= '" + utsStart +"'");
        InfluxQLQueryResult data = influxQLQueryApi.query(new InfluxQLQuery(sql.toString(), influxDBInstance.bucket));
        InfluxQLQueryResult data = influxQLQueryApi.query(new InfluxQLQuery(sql.toString(), influxDBInstance.getBucket()));
        if (data == null) {
            return null;
        }
@@ -291,7 +369,7 @@
            InfluxPointValuePOJO point = influxParams.get(i);
            String measurement = PointValueUtils.getMeasurement(point.getType());
            StringBuilder sb = new StringBuilder();
            sb.append("from(bucket:\"" + influxDBInstance.bucket + "\") ");
            sb.append("from(bucket:\"" + influxDBInstance.getBucket() + "\") ");
            sb.append("|> range(start: ").append(start).append(", stop: ").append(stop).append(") ");
            sb.append("|> filter(fn: (r) => r[\"_measurement\"] == \"" + measurement + "\")");
            sb.append("|> filter(fn: (r) => r[\"_field\"] == \"value\")");
@@ -306,7 +384,7 @@
                for (FluxRecord record : records) {
                    Map<String, Object> dataIem = new HashMap<>(2);
                    dataIem.put(VALUE, record.getValueByKey("_value"));
                    dataIem.put(TIME, Date.from(record.getTime()));
                    dataIem.put(TIME, sdf.format(Date.from(record.getTime())));
                    dataList.add(dataIem);
                }
            }