package com.iailab.module.shasteel.mq.consumer;
|
|
import com.alibaba.fastjson.JSONObject;
|
import com.iailab.framework.common.util.date.DateUtils;
|
import com.iailab.module.model.api.mcs.McsApi;
|
import com.iailab.module.model.api.mcs.dto.AlarmConfigRespDTO;
|
import com.iailab.module.model.api.mcs.dto.AlarmMessageRespDTO;
|
import com.iailab.module.model.api.mcs.dto.PreDataJsonReqVO;
|
import com.iailab.module.shasteel.mq.common.constant.CommonConstant;
|
import com.iailab.module.shasteel.mq.common.constant.RoutingConstant;
|
import com.iailab.module.shasteel.mq.config.QueuePredictFinishConfig;
|
import lombok.extern.slf4j.Slf4j;
|
import org.springframework.amqp.core.Message;
|
import org.springframework.amqp.rabbit.annotation.RabbitListener;
|
import org.springframework.amqp.rabbit.core.RabbitTemplate;
|
import org.springframework.stereotype.Component;
|
import org.springframework.util.CollectionUtils;
|
|
import javax.annotation.Resource;
|
import java.math.BigDecimal;
|
import java.util.*;
|
import java.util.stream.Collectors;
|
|
/**
|
* 监听预测完成
|
* 根据预警配置产生预警信息
|
*
|
* @author PanZhibao
|
* @Description
|
* @createTime 2024年12月11日
|
*/
|
@Slf4j
|
@Component
|
public class ModelPredictFinishConsumer {
|
|
@Resource
|
private McsApi mcsApi;
|
|
@Resource
|
private RabbitTemplate rabbitTemplate;
|
|
/**
|
* 监听预测完成,产生预警消息
|
*
|
* @param message
|
*/
|
@RabbitListener(queues = QueuePredictFinishConfig.QUEUE_NAME)
|
public void listen(Message message) {
|
try {
|
String routingKey = message.getMessageProperties().getReceivedRoutingKey();
|
log.info("routingKey:" + routingKey);
|
String messageBody = new String(message.getBody());
|
log.info("messageBody:" + messageBody);
|
JSONObject messageJson = JSONObject.parseObject(messageBody);
|
if (CollectionUtils.isEmpty(messageJson)) {
|
return;
|
}
|
Date predictTime = DateUtils.parse(messageJson.get("predictTime").toString(), DateUtils.FORMAT_YEAR_MONTH_DAY_HOUR_MINUTE_SECOND);
|
|
// 查询相关预警配置
|
List<AlarmConfigRespDTO> configList = mcsApi.listAlarmConfig(new HashMap<String, Object>());
|
if (CollectionUtils.isEmpty(configList)) {
|
log.info("AlarmConfigList is empty");
|
return;
|
}
|
List<String> outputIdList = configList.stream().map(item -> {
|
return item.getOutId();
|
}).collect(Collectors.toList());
|
|
List<AlarmMessageRespDTO> alarmList = new ArrayList<>();
|
for (AlarmConfigRespDTO configItem : configList) {
|
PreDataJsonReqVO reqVO = new PreDataJsonReqVO();
|
reqVO.setPredictTime(predictTime);
|
reqVO.setOutputIdList(outputIdList);
|
Map<String, List<Object[]>> preData = mcsApi.getPreDataCur(reqVO);
|
if (CollectionUtils.isEmpty(preData)) {
|
return;
|
}
|
Map<String, BigDecimal> culData = new HashMap<>();
|
preData.forEach((key, value) -> {
|
double nv = value.stream().map(v1 -> {
|
return Double.parseDouble(v1[1].toString());
|
}).collect(Collectors.toList()).stream().mapToDouble(Double::doubleValue).sum();
|
culData.put(key, new BigDecimal(nv));
|
});
|
|
|
List<Object[]> result = preData.get(configItem.getOutId());
|
if (CollectionUtils.isEmpty(result)) {
|
continue;
|
}
|
// 累计值
|
BigDecimal culValue = new BigDecimal(result.stream().map(v1 -> {
|
return Double.parseDouble(v1[1].toString());
|
}).collect(Collectors.toList()).stream().mapToDouble(Double::doubleValue).sum());
|
log.info("culValue:" + culValue);
|
|
|
// 生成预警信息
|
AlarmMessageRespDTO alarmMessage = new AlarmMessageRespDTO();
|
alarmMessage.setConfigId(configItem.getId());
|
alarmMessage.setTitle(configItem.getTitle());
|
alarmMessage.setAlarmObj(configItem.getAlarmObj());
|
alarmMessage.setAlarmTime(predictTime);
|
log.info("对比累计值是否超限");
|
StringBuilder content = new StringBuilder();
|
if (configItem.getCulUpper() != null && culValue.compareTo(configItem.getCulUpper()) > 0) {
|
content.append("即将超出累计值上限");
|
alarmMessage.setAlarmType(CommonConstant.EXCEEDING_UPPER_LIMIT);
|
alarmMessage.setContent(content.toString());
|
mcsApi.createAlarmMessage(alarmMessage);
|
alarmList.add(alarmMessage);
|
continue;
|
}
|
if (configItem.getCulLower() != null && culValue.compareTo(configItem.getCulLower()) < 0) {
|
content.append("即将低于累计值下限");
|
alarmMessage.setAlarmType(CommonConstant.EXCEEDING_LOWER_LIMIT);
|
alarmMessage.setContent(content.toString());
|
mcsApi.createAlarmMessage(alarmMessage);
|
alarmList.add(alarmMessage);
|
continue;
|
}
|
|
|
log.info("对比预测值是否超限");
|
int toIndex = result.size();
|
int fromIndex = result.size() - configItem.getCompLength();
|
List<Object[]> predictList = result.subList(fromIndex, toIndex);
|
for (Object[] data : predictList) {
|
BigDecimal dataValue = new BigDecimal(Double.parseDouble(data[1].toString())).setScale(2, BigDecimal.ROUND_HALF_UP);
|
if (dataValue.compareTo(configItem.getLowerLimit()) >= 0 && dataValue.compareTo(configItem.getUpperLimit()) <= 0) {
|
log.info("预测值不超限");
|
continue;
|
}
|
alarmMessage.setOutTime(DateUtils.parse(data[0].toString(), DateUtils.FORMAT_YEAR_MONTH_DAY_HOUR_MINUTE_SECOND));
|
content.append(DateUtils.format(alarmMessage.getOutTime(), DateUtils.FORMAT_SIMPLE_TIME));
|
content.append(",");
|
content.append("即将");
|
if (dataValue.compareTo(configItem.getLowerLimit()) < 0) {
|
content.append("低与下限");
|
alarmMessage.setAlarmType(CommonConstant.EXCEEDING_LOWER_LIMIT);
|
|
} else if (dataValue.compareTo(configItem.getUpperLimit()) > 0) {
|
content.append("超出上限");
|
alarmMessage.setAlarmType(CommonConstant.EXCEEDING_UPPER_LIMIT);
|
}
|
alarmMessage.setContent(content.toString());
|
mcsApi.createAlarmMessage(alarmMessage);
|
alarmList.add(alarmMessage);
|
}
|
}
|
if (!CollectionUtils.isEmpty(alarmList)) {
|
log.info("发送预警消息");
|
Map<String, Object> msg = new HashMap<>(2);
|
msg.put("predictTime", DateUtils.format(predictTime, DateUtils.FORMAT_YEAR_MONTH_DAY_HOUR_MINUTE_SECOND));
|
msg.put("alarmList", alarmList);
|
rabbitTemplate.convertAndSend(RoutingConstant.EXCHANGE, RoutingConstant.Iailab_Model_Alarm, msg);
|
}
|
} catch (Exception e) {
|
e.printStackTrace();
|
}
|
|
}
|
}
|