package com.iailab.module.shasteel.mq.consumer;
|
|
import com.alibaba.fastjson.JSONArray;
|
import com.alibaba.fastjson.JSONObject;
|
import com.iailab.framework.common.util.date.DateUtils;
|
import com.iailab.module.model.api.mcs.McsApi;
|
import com.iailab.module.model.api.mcs.dto.AlarmConfigRespDTO;
|
import com.iailab.module.model.api.mcs.dto.AlarmMessageRespDTO;
|
import com.iailab.module.model.api.mcs.dto.PreDataJsonReqVO;
|
import com.iailab.module.shasteel.mq.common.constant.CommonConstant;
|
import com.iailab.module.shasteel.mq.common.constant.RoutingConstant;
|
import com.iailab.module.shasteel.mq.config.QueuePredictFinishConfig;
|
import lombok.extern.slf4j.Slf4j;
|
import org.springframework.amqp.core.Message;
|
import org.springframework.amqp.rabbit.annotation.RabbitListener;
|
import org.springframework.amqp.rabbit.core.RabbitTemplate;
|
import org.springframework.stereotype.Component;
|
import org.springframework.util.CollectionUtils;
|
|
import javax.annotation.Resource;
|
import java.math.BigDecimal;
|
import java.util.*;
|
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.stream.Collectors;
|
|
/**
|
* 监听预测完成
|
* 根据预警配置产生预警信息
|
*
|
* @author PanZhibao
|
* @Description
|
* @createTime 2024年12月11日
|
*/
|
@Slf4j
|
@Component
|
public class ModelPredictFinishConsumer {
|
|
@Resource
|
private McsApi mcsApi;
|
|
@Resource
|
private RabbitTemplate rabbitTemplate;
|
|
private static Map<String, Long> lastRunTime = new ConcurrentHashMap<>();
|
|
public static Map<Long, List<String>> finishModuleMap = new ConcurrentHashMap<>();
|
|
/**
|
* 监听预测完成,产生预警消息
|
*
|
* @param message
|
*/
|
@RabbitListener(queues = QueuePredictFinishConfig.QUEUE_NAME)
|
public void listen(Message message) {
|
try {
|
String routingKey = message.getMessageProperties().getReceivedRoutingKey();
|
log.info("routingKey:" + routingKey);
|
String messageBody = new String(message.getBody());
|
log.info("messageBody:" + messageBody);
|
JSONObject messageJson = JSONObject.parseObject(messageBody);
|
if (CollectionUtils.isEmpty(messageJson)) {
|
return;
|
}
|
// 预测时间
|
Date predictTime = DateUtils.parse(messageJson.get("predictTime").toString(), DateUtils.FORMAT_YEAR_MONTH_DAY_HOUR_MINUTE_SECOND);
|
// 预测管网
|
String moduleType = messageJson.get("moduleType").toString();
|
if (!finishModuleMap.containsKey(predictTime.getTime())) {
|
List<String> mList = new ArrayList<>();
|
finishModuleMap.put(predictTime.getTime(), mList);
|
}
|
finishModuleMap.get(predictTime.getTime()).add(moduleType);
|
log.info("finishModuleMap:" + JSONObject.toJSONString(finishModuleMap));
|
|
if (moduleType.equals(CommonConstant.NET_BFG) || moduleType.equals(CommonConstant.NET_COG) ||
|
moduleType.equals(CommonConstant.NET_LDG) ||
|
moduleType.equals(CommonConstant.NET_LDG1) ||
|
moduleType.equals(CommonConstant.NET_LDG2) ||
|
moduleType.equals(CommonConstant.NET_LDG3)) {
|
log.info("moduleType:" + moduleType);
|
if (lastRunTime.containsKey(moduleType) && lastRunTime.get(moduleType) == predictTime.getTime()) {
|
log.info("moduleType return");
|
return;
|
}
|
lastRunTime.put(moduleType, predictTime.getTime());
|
log.info("lastRunTime=" + JSONObject.toJSONString(lastRunTime));
|
log.info("开始处理预警");
|
Map<String, Object> params = new HashMap<>();
|
params.put("alarmObj", moduleType);
|
List<AlarmConfigRespDTO> configList = mcsApi.listAlarmConfig(params);
|
if (CollectionUtils.isEmpty(configList)) {
|
log.info("AlarmConfigList is empty");
|
return;
|
}
|
log.info("configList=" + JSONArray.toJSONString(configList));
|
|
// 预警信息列表
|
List<AlarmMessageRespDTO> alarmList = new ArrayList<>();
|
List<String> outputIdList = configList.stream().map(item -> {
|
return item.getOutId();
|
}).collect(Collectors.toList());
|
|
// 查询预测结果
|
PreDataJsonReqVO reqVO = new PreDataJsonReqVO();
|
reqVO.setPredictTime(predictTime);
|
reqVO.setOutputIdList(outputIdList);
|
Map<String, List<Object[]>> preData = mcsApi.getPreDataCur(reqVO);
|
if (CollectionUtils.isEmpty(preData)) {
|
return;
|
}
|
outerLoop:
|
for (AlarmConfigRespDTO configItem : configList) {
|
log.info("AlarmConfigItem: " + configItem);
|
List<Object[]> result = preData.get(configItem.getOutId());
|
if (CollectionUtils.isEmpty(result)) {
|
continue;
|
}
|
log.info("AlarmPreData: " + JSONArray.toJSONString(result));
|
|
// 对比预测值是否超限
|
int toIndex = result.size();
|
int fromIndex = result.size() - configItem.getCompLength();
|
List<Object[]> predictList = result.subList(fromIndex, toIndex);
|
for (Object[] data : predictList) {
|
BigDecimal dataValue = new BigDecimal(Double.parseDouble(data[1].toString())).setScale(2, BigDecimal.ROUND_HALF_UP);
|
if (dataValue.compareTo(configItem.getLowerLimit()) >= 0 && dataValue.compareTo(configItem.getUpperLimit()) <= 0) {
|
continue;
|
}
|
log.info("AlarmOutValue: " + dataValue);
|
// 预警记录
|
AlarmMessageRespDTO alarmMessage = new AlarmMessageRespDTO();
|
alarmMessage.setConfigId(configItem.getId());
|
alarmMessage.setTitle(configItem.getTitle());
|
alarmMessage.setAlarmObj(configItem.getAlarmObj());
|
alarmMessage.setAlarmTime(predictTime);
|
// 设置超出时间
|
alarmMessage.setOutTime(DateUtils.parse(data[0].toString(), DateUtils.FORMAT_YEAR_MONTH_DAY_HOUR_MINUTE_SECOND));
|
// 预警消息
|
StringBuilder content = new StringBuilder();
|
content.append(configItem.getTitle().replace("预警", ""));
|
content.append(DateUtils.format(alarmMessage.getOutTime(), DateUtils.FORMAT_SIMPLE_TIME));
|
content.append(",");
|
content.append("即将");
|
if (dataValue.compareTo(configItem.getLowerLimit()) < 0) {
|
content.append("低于下限");
|
alarmMessage.setAlarmType(CommonConstant.EXCEEDING_LOWER_LIMIT);
|
|
} else if (dataValue.compareTo(configItem.getUpperLimit()) > 0) {
|
content.append("超出上限");
|
alarmMessage.setAlarmType(CommonConstant.EXCEEDING_UPPER_LIMIT);
|
}
|
alarmMessage.setContent(content.toString());
|
mcsApi.createAlarmMessage(alarmMessage);
|
alarmList.add(alarmMessage);
|
continue outerLoop;
|
}
|
|
// 对比累计值是否超限
|
if (configItem.getCulUpper() != null && configItem.getCulLower() != null) {
|
Double culValue = Double.valueOf(0.0);
|
for (Object[] data : result) {
|
culValue += Double.parseDouble(data[1].toString());
|
if (culValue.compareTo(configItem.getCulLower().doubleValue()) >= 0 && culValue.compareTo(configItem.getCulUpper().doubleValue()) <= 0) {
|
continue;
|
}
|
|
// 生成预警信息
|
AlarmMessageRespDTO alarmMessage = new AlarmMessageRespDTO();
|
alarmMessage.setConfigId(configItem.getId());
|
alarmMessage.setTitle(configItem.getTitle());
|
alarmMessage.setAlarmObj(configItem.getAlarmObj());
|
alarmMessage.setAlarmTime(predictTime);
|
// 设置超出时间
|
alarmMessage.setOutTime(DateUtils.parse(data[0].toString(), DateUtils.FORMAT_YEAR_MONTH_DAY_HOUR_MINUTE_SECOND));
|
StringBuilder content = new StringBuilder();
|
content.append(configItem.getTitle().replace("预警", ""));
|
content.append(DateUtils.format(alarmMessage.getOutTime(), DateUtils.FORMAT_SIMPLE_TIME));
|
content.append(",");
|
content.append("即将");
|
|
if (culValue.compareTo(configItem.getCulUpper().doubleValue()) > 0) {
|
content.append("超出累计值上限");
|
alarmMessage.setAlarmType(CommonConstant.EXCEEDING_UPPER_LIMIT);
|
}
|
if (culValue.compareTo(configItem.getCulLower().doubleValue()) < 0) {
|
content.append("低于累计值下限");
|
alarmMessage.setAlarmType(CommonConstant.EXCEEDING_LOWER_LIMIT);
|
}
|
alarmMessage.setContent(content.toString());
|
mcsApi.createAlarmMessage(alarmMessage);
|
alarmList.add(alarmMessage);
|
continue outerLoop;
|
}
|
}
|
}
|
|
if (CollectionUtils.isEmpty(alarmList)) {
|
log.info("alarmList is empty");
|
return;
|
}
|
log.info("发送预警消息");
|
Map<String, Object> msg = new HashMap<>(2);
|
msg.put("predictTime", DateUtils.format(predictTime, DateUtils.FORMAT_YEAR_MONTH_DAY_HOUR_MINUTE_SECOND));
|
msg.put("alarmList", alarmList);
|
msg.put("moduleType", moduleType);
|
rabbitTemplate.convertAndSend(RoutingConstant.EXCHANGE, RoutingConstant.Iailab_Model_Alarm, msg);
|
}
|
} catch (Exception e) {
|
e.printStackTrace();
|
}
|
}
|
}
|