package com.iailab.module.model.mcs.pre.service.impl; import com.baomidou.mybatisplus.core.metadata.IPage; import com.iailab.framework.common.pojo.PageResult; import com.iailab.framework.common.service.impl.BaseServiceImpl; import com.iailab.module.data.api.point.DataPointApi; import com.iailab.module.data.api.point.dto.ApiPointDTO; import com.iailab.module.data.api.point.dto.ApiPointValueDTO; import com.iailab.module.data.api.point.dto.ApiPointValueQueryDTO; import com.iailab.module.model.common.enums.DataTypeEnum; import com.iailab.module.model.influxdb.pojo.InfluxModelResultLastSimPOJO; import com.iailab.module.model.influxdb.pojo.InfluxModelResultPOJO; import com.iailab.module.model.influxdb.service.InfluxDBService; import com.iailab.module.model.influxdb.vo.InfluxModelResultVO; import com.iailab.module.model.mcs.pre.dao.MmPredictAutoAdjustConfigDao; import com.iailab.module.model.mcs.pre.entity.MmPredictAutoAdjustConfigEntity; import com.iailab.module.model.mcs.pre.enums.AutoAdjustTriggerRuleEnum; import com.iailab.module.model.mcs.pre.enums.AutoAdjustValueRuleEnum; import com.iailab.module.model.mcs.pre.service.MmPredictAutoAdjustConfigService; import com.iailab.module.model.mcs.pre.service.MmPredictItemService; import com.iailab.module.model.mcs.pre.vo.MmPredictAutoAdjustConfigPageReqVO; import com.iailab.module.model.mcs.pre.vo.MmPredictAutoAdjustConfigVO; import com.iailab.module.model.mdk.vo.ItemVO; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import org.springframework.util.CollectionUtils; import java.math.BigDecimal; import java.util.*; import java.util.function.Predicate; import java.util.stream.Collectors; /** * @author PanZhibao * @Description * @createTime 2024年11月19日 */ @Slf4j @Service public class MmPredictAutoAdjustConfigServiceImpl extends BaseServiceImpl implements MmPredictAutoAdjustConfigService { @Autowired private MmPredictItemService mmPredictItemService; @Autowired private InfluxDBService influxDBService; @Autowired private DataPointApi dataPointApi; @Override public PageResult page(MmPredictAutoAdjustConfigPageReqVO params) { IPage page = baseDao.selectPage(params); return new PageResult<>(page.getRecords(), page.getTotal()); } @Override public MmPredictAutoAdjustConfigEntity getInfo(String id) { return baseDao.selectById(id); } @Override public List getByCode(String code) { return baseDao.selectList(MmPredictAutoAdjustConfigEntity::getConfigCode,code,MmPredictAutoAdjustConfigEntity::getIsEnable,1); } @Override public void create(MmPredictAutoAdjustConfigEntity entity) { entity.setId(UUID.randomUUID().toString()); entity.setCreateTime(new Date()); baseDao.insert(entity); } @Override public void update(MmPredictAutoAdjustConfigEntity entity) { baseDao.updateById(entity); } @Override public void delete(String id) { baseDao.deleteById(id); } @Override public boolean autoAdjustByCode(String configCode,long adjustStartTime) { log.info("开始自动调整:configCode:" + configCode + ",adjustStartTime:" + new Date(adjustStartTime)); // 查询调整配置 List configEntityList = getByCode(configCode); if (CollectionUtils.isEmpty(configEntityList)) { log.info("自动调整失败原因:configEntityList为空"); return false; } // 根据outputId分组 Map> outputIdMap = configEntityList.stream().collect(Collectors.groupingBy(MmPredictAutoAdjustConfigEntity::getOutputId)); for (Map.Entry> entry : outputIdMap.entrySet()) { String outputId = entry.getKey(); // 查询调整用户adjustStartTime 至 adjustStartTime - 预测长度 * 预测粒度 范围的值 Calendar calendar = Calendar.getInstance(); calendar.setTimeInMillis(adjustStartTime); Date endTime = calendar.getTime(); ItemVO item = mmPredictItemService.getItemByOutPutId(outputId); if (item == null) { log.info("自动调整失败原因:getItemByOutPutId为null,outputId:" + outputId); continue; } calendar.add(Calendar.SECOND,item.getPredictLength() * item.getGranularity() * -1); Date startTime = calendar.getTime(); // 获取预测历史结果 InfluxModelResultPOJO pojo = new InfluxModelResultPOJO(); pojo.setType(DataTypeEnum.FLOAT_LAST_BAK.getCode()); pojo.setOutPutId(outputId); List influxModelResult = influxDBService.queryModelResults(pojo, new Date(adjustStartTime), new Date(adjustStartTime)); if (CollectionUtils.isEmpty(influxModelResult)) { log.info("自动调整失败原因:预测历史结果为空。itemNo:" + item.getItemNo() + ",itemName:" + item.getItemName() + ",outputId:" + outputId + ",time:" + adjustStartTime); continue; } // 计算所有影响用户的最终调整值 Double finalAdjustValue = 0.0; for (MmPredictAutoAdjustConfigEntity configEntity : entry.getValue()) { Double adjustValue = null; // 查询影响用户历史值 ApiPointValueQueryDTO queryDTO = new ApiPointValueQueryDTO(); ApiPointDTO pointInfo = dataPointApi.getInfoById(configEntity.getPointId()); queryDTO.setPointNo(pointInfo.getPointNo()); queryDTO.setEnd(endTime); queryDTO.setStart(startTime); List apiPointValueDTOS = dataPointApi.queryPointHistoryValue(queryDTO); if (CollectionUtils.isEmpty(apiPointValueDTOS)) { log.info("影响用户[" + pointInfo.getPointName() + "]调整失败原因:测点数据长度为0。queryDTO:" + queryDTO); continue; } // 过滤掉-2 apiPointValueDTOS = apiPointValueDTOS.stream().filter(e -> !Double.valueOf(e.getV()).equals(-2.0)).collect(Collectors.toList()); if (CollectionUtils.isEmpty(apiPointValueDTOS)) { log.info("影响用户调整失败原因:过滤掉-2之后测点数据长度为0。queryDTO:" + queryDTO); continue; } // 触发规则 AutoAdjustTriggerRuleEnum triggerRuleEnum = AutoAdjustTriggerRuleEnum.fromCode(configEntity.getTriggerRule()); // 判断是否符合触发条件 并计算调整值 switch (triggerRuleEnum) { case SLOPE: // 计算每个△t的斜率,任意一个大于触发值则认为该区间有调整 Calendar slopeCalendar = Calendar.getInstance(); slopeCalendar.setTime(startTime); Date slopeStartTime = slopeCalendar.getTime(); slopeCalendar.add(Calendar.MINUTE,configEntity.getT()); Date slopeEndTime = slopeCalendar.getTime(); if (slopeEndTime.after(endTime)) { log.info("影响用户[" + pointInfo.getPointName() + "]调整失败原因:△t设置过大,大于模型预测长度 * 预测粒度。△t:" + configEntity.getT()); continue; } while (!slopeEndTime.after(endTime)) { //计算斜率 //△t开始时间测点值 Date finalSlopeStartTime = slopeStartTime; Optional startOptional = apiPointValueDTOS.stream().filter(apiPointValueDTO -> apiPointValueDTO.getT().equals(finalSlopeStartTime)).findFirst(); //△t结束时间测点值 Date finalSlopeEndTime = slopeEndTime; Optional endOptional = apiPointValueDTOS.stream().filter(e -> e.getT().equals(finalSlopeEndTime)).findFirst(); if (startOptional.isPresent() && endOptional.isPresent()) { ApiPointValueDTO startPointValue = startOptional.get(); ApiPointValueDTO endPointValue = endOptional.get(); // 计算斜率 double slope = BigDecimal.valueOf(endPointValue.getV() - startPointValue.getV()).divide(BigDecimal.valueOf(configEntity.getT())).doubleValue(); // 斜率大于等于触发值则进行调整 if (Double.valueOf(Math.abs(slope)).compareTo(configEntity.getTriggerValue()) >= 0) { // 计算调整值 并跳出循环 adjustValue = AutoAdjustValueRuleEnum.getAdjustValue(configEntity.getAdjustValueRule(), apiPointValueDTOS); log.info("计算调整值:" + adjustValue + ",斜率:" + slope + ",pointNo:" + pointInfo.getPointNo() + ",pointName:" + pointInfo.getPointName() + ",slopeStartTime:" + slopeStartTime + ",slopeEndTime:" + slopeEndTime); break; } log.info("斜率不满足条件,斜率:" + slope); } // 下一个△t slopeStartTime = slopeCalendar.getTime(); slopeCalendar.add(Calendar.MINUTE,configEntity.getT()); slopeEndTime = slopeCalendar.getTime(); } break; case AVERAGE_GAP: // 计算每两个△t的平均差,任意一个大于触发值则认为该区间有调整 Calendar averageCalendar = Calendar.getInstance(); averageCalendar.setTime(startTime); Date averageStartTime = averageCalendar.getTime(); averageCalendar.add(Calendar.MINUTE,configEntity.getT()); Date averageMiddleTime = averageCalendar.getTime(); averageCalendar.add(Calendar.MINUTE,configEntity.getT()); Date averageEndTime = averageCalendar.getTime(); if (averageEndTime.after(endTime)) { log.info("影响用户[" + pointInfo.getPointName() + "]调整失败原因:△t设置过大,△t*2大于模型预测长度 * 预测粒度。△t:" + configEntity.getT()); continue; } while (!averageEndTime.after(endTime)) { //计算均值差 //前△t测点平均值 Date finalAverageStartTime = averageStartTime; Date finalAverageMiddleTime = averageMiddleTime; OptionalDouble startAverage = apiPointValueDTOS.stream().filter(e -> e.getT().after(finalAverageStartTime) && !e.getT().after(finalAverageMiddleTime)).mapToDouble(ApiPointValueDTO::getV).average(); //后△t测点平均值 Date finalAverageEndTime = averageEndTime; OptionalDouble endAverage = apiPointValueDTOS.stream().filter(e -> e.getT().after(finalAverageMiddleTime) && !e.getT().after(finalAverageEndTime)).mapToDouble(ApiPointValueDTO::getV).average(); if (startAverage.isPresent() && endAverage.isPresent()) { double averageGapValue = startAverage.getAsDouble() - endAverage.getAsDouble(); // 均值差,大于等于触发值则进行调整 if (Double.valueOf(Math.abs(averageGapValue)).compareTo(configEntity.getTriggerValue()) >= 0) { // 计算调整值 并跳出循环 adjustValue = AutoAdjustValueRuleEnum.getAdjustValue(configEntity.getAdjustValueRule(), apiPointValueDTOS); log.info("计算调整值:" + adjustValue + ",均值差:" + averageGapValue + ",pointNo:" + pointInfo.getPointNo() + ",pointName:" + pointInfo.getPointName() + ",averageStartTime:" + averageStartTime + ",averageMiddleTime:" + averageMiddleTime + ",averageEndTime:" + averageEndTime); break; } log.info("均值差不满足条件,均值差:" + averageGapValue); } // 下一个△t averageStartTime = averageMiddleTime; averageMiddleTime = averageEndTime; averageCalendar.add(Calendar.MINUTE,configEntity.getT()); averageEndTime = averageCalendar.getTime(); } break; default: log.info("影响用户[" + pointInfo.getPointName() + "]调整失败原因:未知触发规则,triggerRule" + configEntity.getTriggerRule()); continue; } if (adjustValue == null) { log.info("影响用户[" + pointInfo.getPointName() + "]调整失败原因:未达到触发条件"); continue; } // 调整系数 adjustValue = adjustValue * configEntity.getAdjustCoefficient(); // 调整方向 adjustValue = adjustValue * configEntity.getAdjustDirection(); // 累加到最终调整值 finalAdjustValue += adjustValue; } // 执行调整 if (finalAdjustValue.equals(0.0)) { log.info("自动调整失败原因:finalAdjustValue为0,outputId:" + outputId + ",configCode:" + configCode); continue; } List lastList = new ArrayList<>(); for (InfluxModelResultVO resultVO : influxModelResult) { InfluxModelResultLastSimPOJO adjustPojo = new InfluxModelResultLastSimPOJO(); // 设置新的调整值 adjustPojo.setValue(Double.parseDouble(resultVO.getValue().toString()) + finalAdjustValue); adjustPojo.setTimestamp(resultVO.getTimestamp()); adjustPojo.setOutPutId(outputId); lastList.add(adjustPojo); } // 相同时间直接覆盖旧值 influxDBService.asyncWriteModelResults(lastList); log.info("t+l自动调整。configCode:" + configCode + ",adjustValue:" + finalAdjustValue + ",itemNo:" + item.getItemNo() + ",itemName:" + item.getItemName() + ",outputId:" + outputId + ",adjustTime:" + adjustStartTime); } return true; } }