| | |
| | | package com.iailab.module.data.influxdb.service.impl; |
| | | |
| | | import com.iailab.module.data.api.dto.ApiPointValueQueryDTO; |
| | | import com.iailab.module.data.common.utils.ExcelUtil; |
| | | import com.iailab.framework.common.util.date.DateUtils; |
| | | import com.iailab.module.data.api.dto.ApiExportValueDTO; |
| | |
| | | import com.iailab.module.data.influxdb.pojo.InfluxPointValuePOJO; |
| | | import com.iailab.module.data.influxdb.pojo.InfluxTagValuePOJO; |
| | | import com.iailab.module.data.influxdb.service.InfluxDBService; |
| | | import com.iailab.module.data.point.dto.DaPointDTO; |
| | | import com.iailab.module.data.point.service.DaPointService; |
| | | import com.iailab.module.data.point.vo.PointValueExportVO; |
| | | import com.influxdb.client.InfluxQLQueryApi; |
| | | import com.influxdb.client.QueryApi; |
| | | import com.influxdb.client.WriteApi; |
| | |
| | | import com.influxdb.query.InfluxQLQueryResult; |
| | | import lombok.extern.slf4j.Slf4j; |
| | | import javax.annotation.Resource; |
| | | |
| | | import org.springframework.beans.factory.annotation.Autowired; |
| | | import org.springframework.stereotype.Service; |
| | | import org.springframework.util.CollectionUtils; |
| | | |
| | | import javax.servlet.http.HttpServletRequest; |
| | | import javax.servlet.http.HttpServletResponse; |
| | | import java.math.BigDecimal; |
| | | import java.text.SimpleDateFormat; |
| | | import java.util.*; |
| | | import java.util.stream.Collectors; |
| | | |
| | | import static com.iailab.framework.common.pojo.CommonResult.success; |
| | | |
| | | /** |
| | | * InfluxDB操作类 |
| | |
| | | |
| | | private WriteApi writeApi; |
| | | |
| | | @Autowired |
| | | private DaPointService daPointService; |
| | | |
| | | private WriteApiBlocking writeApiBlocking; |
| | | |
| | | private QueryApi queryApi; |
| | | |
| | | private InfluxQLQueryApi influxQLQueryApi; |
| | | |
| | | private String VALUE = "value"; |
| | | public static final String VALUE = "value"; |
| | | |
| | | private String TIME = "time"; |
| | | public static final String TIME = "time"; |
| | | |
| | | private int rawOffset = TimeZone.getDefault().getRawOffset(); |
| | | |
| | | private int pas_ms = 1000; |
| | | |
| | | private SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); |
| | | |
| | | @Resource |
| | | private InfluxDBService influxDBService; |
| | | |
| | | @Override |
| | | public void syncWriteFloatValue(String pointNo, String dataValue, long time) { |
| | |
| | | } |
| | | if (!CollectionUtils.isEmpty(pointValues)) { |
| | | pointValues.forEach(item -> { |
| | | writeApi.writeMeasurement(influxDBInstance.getBucket(), influxDBInstance.org, WritePrecision.MS, item); |
| | | String bucket = influxDBInstance.getBucket(); |
| | | writeApi.writeMeasurement(bucket, influxDBInstance.org, WritePrecision.MS, item); |
| | | }); |
| | | } |
| | | writeApi.flush(); |
| | |
| | | } |
| | | if (!CollectionUtils.isEmpty(tagValues)) { |
| | | tagValues.forEach(item -> { |
| | | log.info("bucket==="+ influxDBInstance.getBucket() + ",org===" + influxDBInstance.org); |
| | | writeApi.writeMeasurement(influxDBInstance.getBucket(), influxDBInstance.org, WritePrecision.MS, item); |
| | | }); |
| | | } |
| | |
| | | } |
| | | |
| | | @Override |
| | | public boolean exportTagValue(HttpServletResponse response, HttpServletRequest request, ApiTagValueQueryDTO params) { |
| | | InfluxTagValuePOJO tag = new InfluxTagValuePOJO(); |
| | | tag.setId(params.getTagId()); |
| | | tag.setType(params.getDataType()); |
| | | List<ApiExportValueDTO> valueList = getExportValue(tag, params.getStart(), params.getEnd()); |
| | | public boolean exportTagValue(HttpServletResponse response, HttpServletRequest request, ApiTagValueQueryDTO queryDto) { |
| | | //构建参数 |
| | | Map<String, Object> params = new HashMap<>(1); |
| | | params.put("pointNos", queryDto.getTagIds()); |
| | | |
| | | //查询point列表 |
| | | List<DaPointDTO> pointList = daPointService.list(params); |
| | | if (CollectionUtils.isEmpty(pointList)) { |
| | | return true; |
| | | } |
| | | |
| | | //插入pointType |
| | | List<InfluxPointValuePOJO> influxParams = pointList.stream().map(item -> { |
| | | InfluxPointValuePOJO pojo = new InfluxPointValuePOJO(); |
| | | pojo.setPoint(item.getTagNo()); |
| | | pojo.setType(item.getDataType()); |
| | | return pojo; |
| | | }).collect(Collectors.toList()); |
| | | |
| | | //查询 |
| | | Map<String, List<Map<String, Object>>> data = queryPointsValues(influxParams, queryDto.getStart(), queryDto.getEnd()); |
| | | |
| | | //提取list |
| | | List<Map<String, Object>> list = data.get(queryDto.getTagIds().get(0)); |
| | | |
| | | //导出 |
| | | try{ |
| | | String sheetTitle = "采集数据"; |
| | | String[] title = new String[]{"值", "时间"}; |
| | | ExcelUtil.exportPointValue(sheetTitle, title, new ArrayList<>(valueList), response); |
| | | ExcelUtil.exportPointValue(sheetTitle, title, new ArrayList<>(list), response); |
| | | } catch (Exception ex) { |
| | | return false; |
| | | } |
| | | return true; |
| | | } |
| | | |
| | | @Override |
| | | public List<PointValueExportVO> exportPointValue(ApiPointValueQueryDTO queryDto) { |
| | | |
| | | List<PointValueExportVO> pointValueExportList = new ArrayList<>(); |
| | | |
| | | //构建参数 |
| | | Map<String, Object> params = new HashMap<>(1); |
| | | params.put("pointNos", queryDto.getPointNos()); |
| | | |
| | | //查询point列表 |
| | | List<DaPointDTO> pointList = daPointService.list(params); |
| | | if (CollectionUtils.isEmpty(pointList)) { |
| | | return pointValueExportList; |
| | | } |
| | | |
| | | //插入pointType |
| | | List<InfluxPointValuePOJO> influxParams = pointList.stream().map(item -> { |
| | | InfluxPointValuePOJO pojo = new InfluxPointValuePOJO(); |
| | | pojo.setPoint(item.getPointNo()); |
| | | pojo.setType(item.getDataType()); |
| | | return pojo; |
| | | }).collect(Collectors.toList()); |
| | | |
| | | //查询 |
| | | Map<String, List<Map<String, Object>>> data = queryPointsValues(influxParams, queryDto.getStart(), queryDto.getEnd()); |
| | | |
| | | //提取list |
| | | List<Map<String, Object>> list = data.get(queryDto.getPointNos().get(0)); |
| | | for(Map<String, Object> map : list){ |
| | | PointValueExportVO dto = new PointValueExportVO(); |
| | | dto.setDatatime(map.get("time").toString()); |
| | | dto.setDatavalue(map.get("value").toString()); |
| | | pointValueExportList.add(dto); |
| | | } |
| | | |
| | | return pointValueExportList; |
| | | } |
| | | |
| | | @Override |
| | | public Map<String, Object> queryPointsLastValue(List<InfluxPointValuePOJO> influxParams) { |
| | |
| | | for (FluxRecord record : records) { |
| | | Map<String, Object> dataIem = new HashMap<>(2); |
| | | dataIem.put(VALUE, record.getValueByKey("_value")); |
| | | dataIem.put(TIME, Date.from(record.getTime())); |
| | | dataIem.put(TIME, sdf.format(Date.from(record.getTime()))); |
| | | dataList.add(dataIem); |
| | | } |
| | | } |