package com.iailab.module.data.influxdb.service.impl; import com.iailab.module.data.api.dto.ApiPointValueQueryDTO; import com.iailab.module.data.common.utils.ExcelUtil; import com.iailab.framework.common.util.date.DateUtils; import com.iailab.module.data.api.dto.ApiExportValueDTO; import com.iailab.module.data.api.dto.ApiTagValueQueryDTO; import com.iailab.module.data.influxdb.common.config.InfluxDBInstance; import com.iailab.module.data.influxdb.common.enums.DataMeasurement; import com.iailab.module.data.influxdb.common.utils.PointValueUtils; import com.iailab.module.data.influxdb.common.utils.TagValueUtils; import com.iailab.module.data.influxdb.pojo.InfluxPointValuePOJO; import com.iailab.module.data.influxdb.pojo.InfluxTagValuePOJO; import com.iailab.module.data.influxdb.service.InfluxDBService; import com.iailab.module.data.point.dto.DaPointDTO; import com.iailab.module.data.point.service.DaPointService; import com.iailab.module.data.point.vo.PointValueExportVO; import com.influxdb.client.InfluxQLQueryApi; import com.influxdb.client.QueryApi; import com.influxdb.client.WriteApi; import com.influxdb.client.WriteApiBlocking; import com.influxdb.client.domain.InfluxQLQuery; import com.influxdb.client.domain.WritePrecision; import com.influxdb.client.write.Point; import com.influxdb.query.FluxRecord; import com.influxdb.query.FluxTable; import com.influxdb.query.InfluxQLQueryResult; import lombok.extern.slf4j.Slf4j; import javax.annotation.Resource; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import org.springframework.util.CollectionUtils; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; import java.math.BigDecimal; import java.text.SimpleDateFormat; import java.util.*; import java.util.stream.Collectors; import static com.iailab.framework.common.pojo.CommonResult.success; /** * InfluxDB操作类 */ @Slf4j @Service public class InfluxDBServiceImpl implements InfluxDBService { @Resource private InfluxDBInstance influxDBInstance; private WriteApi writeApi; @Autowired private DaPointService daPointService; private WriteApiBlocking writeApiBlocking; private QueryApi queryApi; private InfluxQLQueryApi influxQLQueryApi; public static final String VALUE = "value"; public static final String TIME = "time"; private int rawOffset = TimeZone.getDefault().getRawOffset(); private int pas_ms = 1000; private SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); @Resource private InfluxDBService influxDBService; @Override public void syncWriteFloatValue(String pointNo, String dataValue, long time) { if (writeApiBlocking == null) { writeApiBlocking = influxDBInstance.getClient().getWriteApiBlocking(); } Point point = Point.measurement(DataMeasurement.t_da_sim_value.name()); point.addTag("point", pointNo); point.addField("value", Double.parseDouble(dataValue)); point.time(time, WritePrecision.MS); writeApiBlocking.writePoint(influxDBInstance.getBucket(), influxDBInstance.org, point); } @Override public void syncWriteIntValue(String pointNo, String dataValue, long time) { if (writeApiBlocking == null) { writeApiBlocking = influxDBInstance.getClient().getWriteApiBlocking(); } Point point = Point.measurement(DataMeasurement.t_da_dig_value.name()); point.addTag("point", pointNo); point.addField("value", Integer.parseInt(dataValue)); point.time(time, WritePrecision.MS); writeApiBlocking.writePoint(influxDBInstance.getBucket(), influxDBInstance.org, point); } @Override public void syncWriteBooleanValue(String pointNo, String dataValue, long time) { if (writeApiBlocking == null) { writeApiBlocking = influxDBInstance.getClient().getWriteApiBlocking(); } Point point = Point.measurement(DataMeasurement.t_da_bool_value.name()); point.addTag("point", pointNo); point.addField("value", Boolean.parseBoolean(dataValue)); point.time(time, WritePrecision.MS); writeApiBlocking.writePoint(influxDBInstance.getBucket(), influxDBInstance.org, point); } @Override public void asyncWritePointValues(List pointValues) { if (writeApi == null) { writeApi = influxDBInstance.getClient().makeWriteApi(); } if (!CollectionUtils.isEmpty(pointValues)) { pointValues.forEach(item -> { String bucket = influxDBInstance.getBucket(); writeApi.writeMeasurement(bucket, influxDBInstance.org, WritePrecision.MS, item); }); } writeApi.flush(); } @Override public void asyncWriteTagValues(List tagValues) { if (writeApi == null) { writeApi = influxDBInstance.getClient().makeWriteApi(); } if (!CollectionUtils.isEmpty(tagValues)) { tagValues.forEach(item -> { log.info("bucket==="+ influxDBInstance.getBucket() + ",org===" + influxDBInstance.org); writeApi.writeMeasurement(influxDBInstance.getBucket(), influxDBInstance.org, WritePrecision.MS, item); }); } writeApi.flush(); } @Override public Map> queryTagsValues(List influxParams, Date startTime, Date endTime) { if (queryApi == null) { queryApi = influxDBInstance.getClient().getQueryApi(); } if (CollectionUtils.isEmpty(influxParams)) { return null; } Map> result = new HashMap<>(); Calendar calendar = Calendar.getInstance(); calendar.set(Calendar.MILLISECOND, 0); // String start = startTime.getTime() - calendar.getTime().getTime() + "ms"; String start = startTime.getTime() - calendar.getTime().getTime() - pas_ms + "ms"; if (endTime==null){ endTime= new Date(); } String stop = endTime.getTime() - calendar.getTime().getTime() + "ms"; for (int i = 0; i < influxParams.size(); i++) { List dataList = new ArrayList<>(); InfluxTagValuePOJO tag = influxParams.get(i); String measurement = TagValueUtils.getMeasurement(tag.getType()); StringBuilder sb = new StringBuilder(); sb.append("from(bucket:\"" + influxDBInstance.getBucket() + "\") "); sb.append("|> range(start: ").append(start).append(", stop: ").append(stop).append(") "); sb.append("|> filter(fn: (r) => r[\"_measurement\"] == \"" + measurement + "\")"); sb.append("|> filter(fn: (r) => r[\"_field\"] == \"value\")"); sb.append("|> filter(fn: (r) => r[\"id\"] == \"" + tag.getId() + "\")"); sb.append("|> sort(columns: [\"_time\"]) "); sb.append("|> yield(name: \"mean\")"); List tables = queryApi.query(sb.toString(), influxDBInstance.org); for (FluxTable table : tables) { List records = table.getRecords(); for (FluxRecord record : records) { Map dataIem = new HashMap<>(2); dataIem.put(VALUE, record.getValueByKey("_value")); dataIem.put(TIME, Date.from(record.getTime())); dataList.add(dataIem); } } result.put(tag.getId(), dataList); } return result; } private List getExportValue(InfluxTagValuePOJO tag, Date startTime, Date endTime) { List dataList = new ArrayList<>(); if (queryApi == null) { queryApi = influxDBInstance.getClient().getQueryApi(); } Map> result = new HashMap<>(); Calendar calendar = Calendar.getInstance(); calendar.set(Calendar.MILLISECOND, 0); String start = startTime.getTime() - calendar.getTime().getTime() + "ms"; String stop = endTime.getTime() - calendar.getTime().getTime() + "ms"; String measurement = TagValueUtils.getMeasurement(tag.getType()); StringBuilder sb = new StringBuilder(); sb.append("from(bucket:\"" + influxDBInstance.getBucket() + "\") "); sb.append("|> range(start: ").append(start).append(", stop: ").append(stop).append(") "); sb.append("|> filter(fn: (r) => r[\"_measurement\"] == \"" + measurement + "\")"); sb.append("|> filter(fn: (r) => r[\"_field\"] == \"value\")"); sb.append("|> filter(fn: (r) => r[\"id\"] == \"" + tag.getId() + "\")"); sb.append("|> sort(columns: [\"_time\"]) "); sb.append("|> yield(name: \"mean\")"); List tables = queryApi.query(sb.toString(), influxDBInstance.org); for (FluxTable table : tables) { List records = table.getRecords(); for (FluxRecord record : records) { ApiExportValueDTO dataIem = new ApiExportValueDTO(); dataIem.setDataValue(record.getValueByKey("_value").toString()); dataIem.setDataTime(DateUtils.format(Date.from(record.getTime()), DateUtils.FORMAT_YEAR_MONTH_DAY)); dataList.add(dataIem); } } return dataList; } @Override public boolean exportTagValue(HttpServletResponse response, HttpServletRequest request, ApiTagValueQueryDTO queryDto) { //构建参数 Map params = new HashMap<>(1); params.put("pointNos", queryDto.getTagIds()); //查询point列表 List pointList = daPointService.list(params); if (CollectionUtils.isEmpty(pointList)) { return true; } //插入pointType List influxParams = pointList.stream().map(item -> { InfluxPointValuePOJO pojo = new InfluxPointValuePOJO(); pojo.setPoint(item.getTagNo()); pojo.setType(item.getDataType()); return pojo; }).collect(Collectors.toList()); //查询 Map>> data = queryPointsValues(influxParams, queryDto.getStart(), queryDto.getEnd()); //提取list List> list = data.get(queryDto.getTagIds().get(0)); //导出 try{ String sheetTitle = "采集数据"; String[] title = new String[]{"值", "时间"}; ExcelUtil.exportPointValue(sheetTitle, title, new ArrayList<>(list), response); } catch (Exception ex) { return false; } return true; } @Override public List exportPointValue(ApiPointValueQueryDTO queryDto) { List pointValueExportList = new ArrayList<>(); //构建参数 Map params = new HashMap<>(1); params.put("pointNos", queryDto.getPointNos()); //查询point列表 List pointList = daPointService.list(params); if (CollectionUtils.isEmpty(pointList)) { return pointValueExportList; } //插入pointType List influxParams = pointList.stream().map(item -> { InfluxPointValuePOJO pojo = new InfluxPointValuePOJO(); pojo.setPoint(item.getPointNo()); pojo.setType(item.getDataType()); return pojo; }).collect(Collectors.toList()); //查询 Map>> data = queryPointsValues(influxParams, queryDto.getStart(), queryDto.getEnd()); //提取list List> list = data.get(queryDto.getPointNos().get(0)); for(Map map : list){ PointValueExportVO dto = new PointValueExportVO(); dto.setDatatime(map.get("time").toString()); dto.setDatavalue(map.get("value").toString()); pointValueExportList.add(dto); } return pointValueExportList; } @Override public Map queryPointsLastValue(List influxParams) { Map result = new HashMap<>(influxParams.size()); if (influxQLQueryApi == null) { influxQLQueryApi = influxDBInstance.getClient().getInfluxQLQueryApi(); } for (int i = 0; i < influxParams.size(); i++) { InfluxPointValuePOJO point = influxParams.get(i); String measurement = PointValueUtils.getMeasurement(point.getType()); StringBuilder sql = new StringBuilder(); sql.append("SELECT LAST(value) FROM "); sql.append(measurement); sql.append(" WHERE point = '"); sql.append(point.getPoint()); sql.append("'"); InfluxQLQueryResult data = influxQLQueryApi.query(new InfluxQLQuery(sql.toString(), influxDBInstance.getBucket())); Object value = data.getResults().get(0).getSeries().get(0).getValues().get(0).getValueByKey("last"); result.put(point.getPoint(), value); } return result; } @Override public Object queryPointMaxValue(InfluxPointValuePOJO point, Date startTime) { if (influxQLQueryApi == null) { influxQLQueryApi = influxDBInstance.getClient().getInfluxQLQueryApi(); } long utcMillis = startTime.getTime() -rawOffset; String utsStart = DateUtils.format(new Date(utcMillis), DateUtils.FORMAT_YEAR_MONTH_DAY); log.info("utsStart=" + utsStart); String measurement = PointValueUtils.getMeasurement(point.getType()); StringBuilder sql = new StringBuilder(); sql.append("SELECT MAX(value) FROM "); sql.append(measurement); sql.append(" WHERE point = '"); sql.append(point.getPoint()); sql.append("' AND time >= '" + utsStart +"'"); InfluxQLQueryResult data = influxQLQueryApi.query(new InfluxQLQuery(sql.toString(), influxDBInstance.getBucket())); if (data == null) { return null; } return data.getResults().get(0).getSeries().get(0).getValues().get(0).getValues()[1]; } @Override public List> queryPointValues(InfluxPointValuePOJO pojo, Date startTime, Date endTime) { List influxParams = new ArrayList<>(); influxParams.add(pojo); Map>> data = this.queryPointsValues(influxParams, startTime, endTime); return data.get(pojo.getPoint()); } @Override public Map>> queryPointsValues(List influxParams, Date startTime, Date endTime) { if (queryApi == null) { queryApi = influxDBInstance.getClient().getQueryApi(); } if (CollectionUtils.isEmpty(influxParams)) { return null; } Map>> result = new HashMap<>(); Calendar calendar = Calendar.getInstance(); calendar.set(Calendar.MILLISECOND, 0); String start = startTime.getTime() - calendar.getTime().getTime() + "ms"; if (endTime==null){ endTime= new Date(); } String stop = endTime.getTime() - calendar.getTime().getTime() + "ms"; for (int i = 0; i < influxParams.size(); i++) { List> dataList = new ArrayList<>(); InfluxPointValuePOJO point = influxParams.get(i); String measurement = PointValueUtils.getMeasurement(point.getType()); StringBuilder sb = new StringBuilder(); sb.append("from(bucket:\"" + influxDBInstance.getBucket() + "\") "); sb.append("|> range(start: ").append(start).append(", stop: ").append(stop).append(") "); sb.append("|> filter(fn: (r) => r[\"_measurement\"] == \"" + measurement + "\")"); sb.append("|> filter(fn: (r) => r[\"_field\"] == \"value\")"); sb.append("|> filter(fn: (r) => r[\"point\"] == \"" + point.getPoint() + "\")"); sb.append("|> sort(columns: [\"_time\"]) "); sb.append("|> yield(name: \"mean\")"); System.out.println("influxdbSql===============" + sb.toString()); List tables = queryApi.query(sb.toString(), influxDBInstance.org); for (FluxTable table : tables) { List records = table.getRecords(); for (FluxRecord record : records) { Map dataIem = new HashMap<>(2); dataIem.put(VALUE, record.getValueByKey("_value")); dataIem.put(TIME, sdf.format(Date.from(record.getTime()))); dataList.add(dataIem); } } result.put(point.getPoint(), dataList); } return result; } @Override public Map queryPointsSpread(List influxParams, Date startTime, Date endTime) { Map result = new HashMap<>(); Map>> data = this.queryPointsValues(influxParams, startTime, endTime); if (CollectionUtils.isEmpty(data)) { return result; } data.forEach((k, v) -> { if (!CollectionUtils.isEmpty(v)) { BigDecimal lastValue = new BigDecimal(v.get(v.size() - 1).get(VALUE).toString()); BigDecimal firstValue = new BigDecimal(v.get(0).get(VALUE).toString()); BigDecimal spread = (lastValue.subtract(firstValue)); result.put(k, spread.compareTo(BigDecimal.ZERO) < 0 ? lastValue : spread); } log.info(k + ",isEmpty"); }); return result; } }