1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
package com.iailab.module.model.mcs.pre.service.impl;
 
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.iailab.framework.common.pojo.PageResult;
import com.iailab.framework.common.service.impl.BaseServiceImpl;
import com.iailab.module.data.api.point.DataPointApi;
import com.iailab.module.data.api.point.dto.ApiPointDTO;
import com.iailab.module.data.api.point.dto.ApiPointValueDTO;
import com.iailab.module.data.api.point.dto.ApiPointValueQueryDTO;
import com.iailab.module.model.common.enums.DataTypeEnum;
import com.iailab.module.model.influxdb.pojo.InfluxModelResultLastSimPOJO;
import com.iailab.module.model.influxdb.pojo.InfluxModelResultPOJO;
import com.iailab.module.model.influxdb.service.InfluxDBService;
import com.iailab.module.model.influxdb.vo.InfluxModelResultVO;
import com.iailab.module.model.mcs.pre.dao.MmPredictAutoAdjustConfigDao;
import com.iailab.module.model.mcs.pre.entity.MmPredictAutoAdjustConfigEntity;
import com.iailab.module.model.mcs.pre.enums.AutoAdjustTriggerRuleEnum;
import com.iailab.module.model.mcs.pre.enums.AutoAdjustValueRuleEnum;
import com.iailab.module.model.mcs.pre.service.MmPredictAutoAdjustConfigService;
import com.iailab.module.model.mcs.pre.service.MmPredictItemService;
import com.iailab.module.model.mcs.pre.vo.MmPredictAutoAdjustConfigPageReqVO;
import com.iailab.module.model.mcs.pre.vo.MmPredictAutoAdjustConfigVO;
import com.iailab.module.model.mdk.vo.ItemVO;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
 
import java.math.BigDecimal;
import java.util.*;
import java.util.function.Predicate;
import java.util.stream.Collectors;
 
/**
 * @author PanZhibao
 * @Description
 * @createTime 2024年11月19日
 */
@Slf4j
@Service
public class MmPredictAutoAdjustConfigServiceImpl extends BaseServiceImpl<MmPredictAutoAdjustConfigDao, MmPredictAutoAdjustConfigEntity>
        implements MmPredictAutoAdjustConfigService {
 
    @Autowired
    private MmPredictItemService mmPredictItemService;
    @Autowired
    private InfluxDBService influxDBService;
    @Autowired
    private DataPointApi dataPointApi;
 
    @Override
    public PageResult<MmPredictAutoAdjustConfigVO> page(MmPredictAutoAdjustConfigPageReqVO params) {
        IPage<MmPredictAutoAdjustConfigVO> page = baseDao.selectPage(params);
        return new PageResult<>(page.getRecords(), page.getTotal());
    }
 
    @Override
    public MmPredictAutoAdjustConfigEntity getInfo(String id) {
        return baseDao.selectById(id);
    }
 
    @Override
    public List<MmPredictAutoAdjustConfigEntity> getByCode(String code) {
        return baseDao.selectList(MmPredictAutoAdjustConfigEntity::getConfigCode,code,MmPredictAutoAdjustConfigEntity::getIsEnable,1);
    }
 
    @Override
    public void create(MmPredictAutoAdjustConfigEntity entity) {
        entity.setId(UUID.randomUUID().toString());
        entity.setCreateTime(new Date());
        baseDao.insert(entity);
    }
 
    @Override
    public void update(MmPredictAutoAdjustConfigEntity entity) {
        baseDao.updateById(entity);
    }
 
    @Override
    public void delete(String id) {
        baseDao.deleteById(id);
    }
 
    @Override
    public boolean autoAdjustByCode(String configCode,long adjustStartTime) {
        log.info("开始自动调整:configCode:" + configCode + ",adjustStartTime:" + new Date(adjustStartTime));
        // 查询调整配置
        List<MmPredictAutoAdjustConfigEntity> configEntityList = getByCode(configCode);
        if (CollectionUtils.isEmpty(configEntityList)) {
            log.info("自动调整失败原因:configEntityList为空");
            return false;
        }
 
        // 根据outputId分组
        Map<String, List<MmPredictAutoAdjustConfigEntity>> outputIdMap = configEntityList.stream().collect(Collectors.groupingBy(MmPredictAutoAdjustConfigEntity::getOutputId));
        for (Map.Entry<String, List<MmPredictAutoAdjustConfigEntity>> entry : outputIdMap.entrySet()) {
            String outputId = entry.getKey();
            // 查询调整用户adjustStartTime 至 adjustStartTime - 预测长度 * 预测粒度 范围的值
            Calendar calendar = Calendar.getInstance();
            calendar.setTimeInMillis(adjustStartTime);
            Date endTime = calendar.getTime();
            ItemVO item = mmPredictItemService.getItemByOutPutId(outputId);
            if (item == null) {
                log.info("自动调整失败原因:getItemByOutPutId为null,outputId:" + outputId);
                continue;
            }
            calendar.add(Calendar.SECOND,item.getPredictLength() * item.getGranularity() * -1);
            Date startTime = calendar.getTime();
 
            // 获取预测历史结果
            InfluxModelResultPOJO pojo = new InfluxModelResultPOJO();
            pojo.setType(DataTypeEnum.FLOAT_LAST_BAK.getCode());
            pojo.setOutPutId(outputId);
            List<InfluxModelResultVO> influxModelResult = influxDBService.queryModelResults(pojo, new Date(adjustStartTime), new Date(adjustStartTime));
            if (CollectionUtils.isEmpty(influxModelResult)) {
                log.info("自动调整失败原因:预测历史结果为空。itemNo:" + item.getItemNo() + ",itemName:" + item.getItemName() + ",outputId:" + outputId + ",time:" + adjustStartTime);
                continue;
            }
 
            // 计算所有影响用户的最终调整值
            Double finalAdjustValue = 0.0;
            for (MmPredictAutoAdjustConfigEntity configEntity : entry.getValue()) {
                Double adjustValue = null;
                // 查询影响用户历史值
                ApiPointValueQueryDTO queryDTO = new ApiPointValueQueryDTO();
                ApiPointDTO pointInfo = dataPointApi.getInfoById(configEntity.getPointId());
                queryDTO.setPointNo(pointInfo.getPointNo());
                queryDTO.setEnd(endTime);
                queryDTO.setStart(startTime);
                List<ApiPointValueDTO> apiPointValueDTOS = dataPointApi.queryPointHistoryValue(queryDTO);
                if (CollectionUtils.isEmpty(apiPointValueDTOS)) {
                    log.info("影响用户[" + pointInfo.getPointName() + "]调整失败原因:测点数据长度为0。queryDTO:" + queryDTO);
                    continue;
                }
                // 过滤掉-2
                apiPointValueDTOS = apiPointValueDTOS.stream().filter(e -> !Double.valueOf(e.getV()).equals(-2.0)).collect(Collectors.toList());
                if (CollectionUtils.isEmpty(apiPointValueDTOS)) {
                    log.info("影响用户调整失败原因:过滤掉-2之后测点数据长度为0。queryDTO:" + queryDTO);
                    continue;
                }
 
                // 触发规则
                AutoAdjustTriggerRuleEnum triggerRuleEnum = AutoAdjustTriggerRuleEnum.fromCode(configEntity.getTriggerRule());
                // 判断是否符合触发条件 并计算调整值
                switch (triggerRuleEnum) {
                    case SLOPE:
                        // 计算每个△t的斜率,任意一个大于触发值则认为该区间有调整
                        Calendar slopeCalendar = Calendar.getInstance();
                        slopeCalendar.setTime(startTime);
                        Date slopeStartTime = slopeCalendar.getTime();
                        slopeCalendar.add(Calendar.MINUTE,configEntity.getT());
                        Date slopeEndTime = slopeCalendar.getTime();
                        if (slopeEndTime.after(endTime)) {
                            log.info("影响用户[" + pointInfo.getPointName() + "]调整失败原因:△t设置过大,大于模型预测长度 * 预测粒度。△t:" + configEntity.getT());
                            continue;
                        }
                        while (!slopeEndTime.after(endTime)) {
                            //计算斜率
                            //△t开始时间测点值
                            Date finalSlopeStartTime = slopeStartTime;
                            Optional<ApiPointValueDTO> startOptional = apiPointValueDTOS.stream().filter(apiPointValueDTO -> apiPointValueDTO.getT().equals(finalSlopeStartTime)).findFirst();
                            //△t结束时间测点值
                            Date finalSlopeEndTime = slopeEndTime;
                            Optional<ApiPointValueDTO> endOptional = apiPointValueDTOS.stream().filter(e -> e.getT().equals(finalSlopeEndTime)).findFirst();
                            if (startOptional.isPresent() && endOptional.isPresent()) {
                                ApiPointValueDTO startPointValue = startOptional.get();
                                ApiPointValueDTO endPointValue = endOptional.get();
                                // 计算斜率
                                double slope = BigDecimal.valueOf(endPointValue.getV() - startPointValue.getV()).divide(BigDecimal.valueOf(configEntity.getT())).doubleValue();
                                // 斜率大于等于触发值则进行调整
                                if (Double.valueOf(Math.abs(slope)).compareTo(configEntity.getTriggerValue()) >= 0) {
                                    // 计算调整值 并跳出循环
                                    adjustValue = AutoAdjustValueRuleEnum.getAdjustValue(configEntity.getAdjustValueRule(), apiPointValueDTOS);
                                    log.info("计算调整值:" + adjustValue + ",斜率:" + slope + ",pointNo:" + pointInfo.getPointNo() + ",pointName:" + pointInfo.getPointName() + ",slopeStartTime:" + slopeStartTime + ",slopeEndTime:" + slopeEndTime);
                                    break;
                                }
                                log.info("斜率不满足条件,斜率:" + slope);
                            }
                            // 下一个△t
                            slopeStartTime = slopeCalendar.getTime();
                            slopeCalendar.add(Calendar.MINUTE,configEntity.getT());
                            slopeEndTime = slopeCalendar.getTime();
                        }
                        break;
                    case AVERAGE_GAP:
                        // 计算每两个△t的平均差,任意一个大于触发值则认为该区间有调整
                        Calendar averageCalendar = Calendar.getInstance();
                        averageCalendar.setTime(startTime);
                        Date averageStartTime = averageCalendar.getTime();
                        averageCalendar.add(Calendar.MINUTE,configEntity.getT());
                        Date averageMiddleTime = averageCalendar.getTime();
                        averageCalendar.add(Calendar.MINUTE,configEntity.getT());
                        Date averageEndTime = averageCalendar.getTime();
 
                        if (averageEndTime.after(endTime)) {
                            log.info("影响用户[" + pointInfo.getPointName() + "]调整失败原因:△t设置过大,△t*2大于模型预测长度 * 预测粒度。△t:" + configEntity.getT());
                            continue;
                        }
                        while (!averageEndTime.after(endTime)) {
                            //计算均值差
                            //前△t测点平均值
                            Date finalAverageStartTime = averageStartTime;
                            Date finalAverageMiddleTime = averageMiddleTime;
                            OptionalDouble startAverage = apiPointValueDTOS.stream().filter(e -> e.getT().after(finalAverageStartTime) && !e.getT().after(finalAverageMiddleTime)).mapToDouble(ApiPointValueDTO::getV).average();
                            //后△t测点平均值
                            Date finalAverageEndTime = averageEndTime;
                            OptionalDouble endAverage = apiPointValueDTOS.stream().filter(e -> e.getT().after(finalAverageMiddleTime) && !e.getT().after(finalAverageEndTime)).mapToDouble(ApiPointValueDTO::getV).average();
                            if (startAverage.isPresent() && endAverage.isPresent()) {
                                double averageGapValue = startAverage.getAsDouble() - endAverage.getAsDouble();
                                // 均值差,大于等于触发值则进行调整
                                if (Double.valueOf(Math.abs(averageGapValue)).compareTo(configEntity.getTriggerValue()) >= 0) {
                                    // 计算调整值 并跳出循环
                                    adjustValue = AutoAdjustValueRuleEnum.getAdjustValue(configEntity.getAdjustValueRule(), apiPointValueDTOS);
                                    log.info("计算调整值:" + adjustValue + ",均值差:" + averageGapValue + ",pointNo:" + pointInfo.getPointNo() + ",pointName:" + pointInfo.getPointName() + ",averageStartTime:" + averageStartTime + ",averageMiddleTime:" + averageMiddleTime + ",averageEndTime:" + averageEndTime);
                                    break;
                                }
                                log.info("均值差不满足条件,均值差:" + averageGapValue);
                            }
 
                            // 下一个△t
                            averageStartTime = averageMiddleTime;
                            averageMiddleTime = averageEndTime;
                            averageCalendar.add(Calendar.MINUTE,configEntity.getT());
                            averageEndTime = averageCalendar.getTime();
                        }
                        break;
                    default:
                        log.info("影响用户[" + pointInfo.getPointName() + "]调整失败原因:未知触发规则,triggerRule" + configEntity.getTriggerRule());
                        continue;
                }
                if (adjustValue == null) {
                    log.info("影响用户[" + pointInfo.getPointName() + "]调整失败原因:未达到触发条件");
                    continue;
                }
 
                // 调整系数
                adjustValue = adjustValue * configEntity.getAdjustCoefficient();
                // 调整方向
                adjustValue = adjustValue * configEntity.getAdjustDirection();
                // 累加到最终调整值
                finalAdjustValue += adjustValue;
            }
            // 执行调整
            if (finalAdjustValue.equals(0.0)) {
                log.info("自动调整失败原因:finalAdjustValue为0,outputId:" + outputId + ",configCode:" + configCode);
                continue;
            }
            List<InfluxModelResultPOJO> lastList = new ArrayList<>();
            for (InfluxModelResultVO resultVO : influxModelResult) {
                InfluxModelResultLastSimPOJO adjustPojo = new InfluxModelResultLastSimPOJO();
                // 设置新的调整值
                adjustPojo.setValue(Double.parseDouble(resultVO.getValue().toString()) + finalAdjustValue);
                adjustPojo.setTimestamp(resultVO.getTimestamp());
                adjustPojo.setOutPutId(outputId);
                lastList.add(adjustPojo);
            }
            // 相同时间直接覆盖旧值
            influxDBService.asyncWriteModelResults(lastList);
            log.info("t+l自动调整。configCode:" + configCode + ",adjustValue:" + finalAdjustValue + ",itemNo:" + item.getItemNo() + ",itemName:" + item.getItemName() + ",outputId:" + outputId + ",adjustTime:" + adjustStartTime);
        }
 
        return true;
    }
}