package com.iailab.module.model.influxdb.service.impl; import com.iailab.module.model.influxdb.common.config.InfluxDBInstance; import com.iailab.module.model.influxdb.common.utils.MeasurementUtils; import com.iailab.module.model.influxdb.pojo.InfluxModelResultPOJO; import com.iailab.module.model.influxdb.service.InfluxDBService; import com.iailab.module.model.influxdb.vo.InfluxModelResultVO; import com.influxdb.client.QueryApi; import com.influxdb.client.WriteApi; import com.influxdb.client.domain.WritePrecision; import com.influxdb.query.FluxRecord; import com.influxdb.query.FluxTable; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Service; import org.springframework.util.CollectionUtils; import javax.annotation.PostConstruct; import javax.annotation.Resource; import java.util.*; /** * InfluxDB操作类 */ @Slf4j @Service public class InfluxDBServiceImpl implements InfluxDBService { @Resource private InfluxDBInstance influxDBInstance; private WriteApi writeApi; private QueryApi queryApi; @PostConstruct private void init() { writeApi = influxDBInstance.getClient().makeWriteApi(); queryApi = influxDBInstance.getClient().getQueryApi(); } @Override public void asyncWriteModelResults(List pointValues) { if (!CollectionUtils.isEmpty(pointValues)) { pointValues.forEach(item -> { String bucket = influxDBInstance.getBucket(); writeApi.writeMeasurement(bucket, influxDBInstance.org, WritePrecision.MS, item); }); } writeApi.flush(); } @Override public List queryModelResults(InfluxModelResultPOJO pojo, Date startTime, Date endTime) { Calendar calendar = Calendar.getInstance(); calendar.set(Calendar.MILLISECOND, 0); String start = startTime.toInstant().toString(); if (startTime.getTime() == endTime.getTime()) { // 如果相等,则engTime加1毫秒,负责influxdb报错(因为influxdb的range函数是左闭右开区间,所以将engTime加一毫秒,才可以查到startTime时间点的数据) endTime.setTime(endTime.getTime() + 1); } String stop = endTime.toInstant().toString(); List dataList = new ArrayList<>(); String measurement = MeasurementUtils.getMeasurement(pojo.getType()); StringBuilder sb = new StringBuilder(); sb.append("from(bucket:\"" + influxDBInstance.getBucket() + "\") "); sb.append("|> range(start: ").append(start).append(", stop: ").append(stop).append(") "); sb.append("|> filter(fn: (r) => r[\"_measurement\"] == \"" + measurement + "\")"); sb.append("|> filter(fn: (r) => r[\"_field\"] == \"value\")"); sb.append("|> filter(fn: (r) => r[\"outPutId\"] == \"" + pojo.getOutPutId() + "\")"); sb.append("|> sort(columns: [\"_time\"]) "); sb.append("|> yield(name: \"mean\")"); log.info("influxdbSql===============" + sb); List tables = queryApi.query(sb.toString(), influxDBInstance.org); for (FluxTable table : tables) { List records = table.getRecords(); for (FluxRecord record : records) { InfluxModelResultVO vo = new InfluxModelResultVO(record.getValueByKey("_value"),record.getTime()); dataList.add(vo); } } return dataList; } }