iailab-module-data/iailab-module-data-biz/src/main/java/com/iailab/module/data/channel/http/collector/ihdb/HttpCollectorForIhd.java
@@ -266,11 +266,6 @@ StringBuilder tagSb = new StringBuilder(); tagSb.append("["); for (int i = 0; i < params.size(); i++) { if (StringUtils.isBlank(params.get(i)[1].toString()) || StringUtils.isBlank(params.get(i)[2].toString()) || StringUtils.isBlank(params.get(i)[3].toString())) { continue; } Map<String, Object> queryParams = new HashMap<>(3); queryParams.put(N, params.get(i)[1]); queryParams.put(D, params.get(i)[2]); iailab-module-data/iailab-module-data-biz/src/main/java/com/iailab/module/data/common/enums/DataQualityEnum.java
@@ -27,8 +27,12 @@ if (new BigDecimal(((Number) value).doubleValue()).compareTo(CommonConstant.BAD_VALUE) == 0) { return BAD; } } else if (value instanceof BigDecimal) { if (new BigDecimal(value.toString()).compareTo(CommonConstant.BAD_VALUE) == 0) { return BAD; } } else if (value instanceof String) { if (value.toString().equals(CommonConstant.BAD_VALUE.toString())) { if (new BigDecimal(value.toString()).compareTo(CommonConstant.BAD_VALUE) == 0) { return BAD; } } iailab-module-data/iailab-module-data-biz/src/main/java/com/iailab/module/data/point/collection/PointCollector.java
@@ -99,26 +99,10 @@ influxDBService.asyncWritePointValues(pointValues); log.info("更新采集状态"); updateCollectStatus(pointValues, collectTime); daPointCollectStatusService.recordStatusList(pointValues, collectTime); log.info("采集完成"); } catch (Exception ex) { log.info("采集异常!"); ex.printStackTrace(); } } private void updateCollectStatus(List<InfluxPointValuePOJO> pointValues, Date collectTime) { try { for (InfluxPointValuePOJO pointValue : pointValues) { if (pointValue instanceof InfluxPointValueSimPOJO) { InfluxPointValueSimPOJO pvo = (InfluxPointValueSimPOJO) pointValue; daPointCollectStatusService.recordStatus(pvo.getPoint(), pvo.getValue().toString(), collectTime); } else if (pointValue instanceof InfluxPointValueDigPOJO) { InfluxPointValueDigPOJO pvo = (InfluxPointValueDigPOJO) pointValue; daPointCollectStatusService.recordStatus(pvo.getPoint(), pvo.getValue().toString(), collectTime); } } } catch (Exception ex) { ex.printStackTrace(); } } iailab-module-data/iailab-module-data-biz/src/main/java/com/iailab/module/data/point/collection/handler/MeasureHandle.java
@@ -83,7 +83,9 @@ kioTagIds.add(new String[]{item.getSourceId(), item.getTagNo()}); } else if (DataSourceType.HTTP.getCode().equals(item.getSourceType())) { if (SourceApiEnum.iHyperDB.getCode().equals(item.getSourceName())) { httpTagIhd.add(new Object[]{item.getSourceId(), item.getTagNo(), item.getDimension(), item.getValueType()}); if (item.getTagNo() != null && item.getDimension() != null && item.getValueType() != null) { httpTagIhd.add(new Object[]{item.getSourceId(), item.getTagNo(), item.getDimension(), item.getValueType()}); } } } }); @@ -138,9 +140,12 @@ } if (DataTypeEnum.FLOAT.getCode().equals(dto.getDataType()) || DataTypeEnum.INT.getCode().equals(dto.getDataType())) { BigDecimal rawValue = new BigDecimal(value.toString()); if(CommonConstant.BAD_VALUE.compareTo(rawValue) == 0) { return CommonConstant.BAD_VALUE; } // 异常值处理 if (rawValue.compareTo(maxValue) > 0 || rawValue.compareTo(minValue) < 0) { rawValue = CommonConstant.BAD_VALUE; return CommonConstant.BAD_VALUE; } BigDecimal coefficient = dto.getUnittransfactor() == null ? BigDecimal.ONE : dto.getUnittransfactor(); BigDecimal calValue = rawValue.multiply(coefficient); iailab-module-data/iailab-module-data-biz/src/main/java/com/iailab/module/data/point/service/DaPointCollectStatusService.java
@@ -1,10 +1,12 @@ package com.iailab.module.data.point.service; import com.iailab.framework.common.service.BaseService; import com.iailab.module.data.influxdb.pojo.InfluxPointValuePOJO; import com.iailab.module.data.point.entity.DaPointCollectStatusEntity; import org.springframework.scheduling.annotation.Async; import java.util.Date; import java.util.List; /** * @author PanZhibao @@ -15,4 +17,8 @@ @Async void recordStatus(String pointNo, String collectValue, Date collectTime); @Async void recordStatusList(List<InfluxPointValuePOJO> pointValues, Date collectTime); } iailab-module-data/iailab-module-data-biz/src/main/java/com/iailab/module/data/point/service/impl/DaPointCollectStatusServiceImpl.java
@@ -3,12 +3,18 @@ import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; import com.iailab.framework.common.service.impl.BaseServiceImpl; import com.iailab.module.data.common.enums.DataQualityEnum; import com.iailab.module.data.influxdb.pojo.InfluxPointValueDigPOJO; import com.iailab.module.data.influxdb.pojo.InfluxPointValuePOJO; import com.iailab.module.data.influxdb.pojo.InfluxPointValueSimPOJO; import com.iailab.module.data.point.dao.DaPointCollectStatusDao; import com.iailab.module.data.point.entity.DaPointCollectStatusEntity; import com.iailab.module.data.point.service.DaPointCollectStatusService; import org.springframework.stereotype.Service; import org.springframework.util.CollectionUtils; import java.util.ArrayList; import java.util.Date; import java.util.List; import java.util.UUID; /** @@ -40,4 +46,47 @@ } } public void recordStatusList(List<InfluxPointValuePOJO> pointValues, Date collectTime) { List<String> listGood = new ArrayList<>(); List<String> listBad = new ArrayList<>(); Object collectValue = null; for (InfluxPointValuePOJO pointValue : pointValues) { if (pointValue instanceof InfluxPointValueSimPOJO) { InfluxPointValueSimPOJO pvo = (InfluxPointValueSimPOJO) pointValue; collectValue = pvo.getValue(); } else if (pointValue instanceof InfluxPointValueDigPOJO) { InfluxPointValueDigPOJO pvo = (InfluxPointValueDigPOJO) pointValue; collectValue = pvo.getValue(); } else { continue; } switch (DataQualityEnum.getEumByValue(collectValue)) { case GOOD: listGood.add(pointValue.getPoint()); break; case BAD: listBad.add(pointValue.getPoint()); break; default: break; } } if (!CollectionUtils.isEmpty(listGood)) { QueryWrapper<DaPointCollectStatusEntity> queryWrapper = new QueryWrapper<>(); queryWrapper.in("point_no", listGood); DaPointCollectStatusEntity entity = new DaPointCollectStatusEntity(); entity.setCollectTime(collectTime); entity.setCollectQuality(DataQualityEnum.GOOD.getCode()); baseDao.update(entity, queryWrapper); } if (!CollectionUtils.isEmpty(listBad)) { QueryWrapper<DaPointCollectStatusEntity> queryWrapper = new QueryWrapper<>(); queryWrapper.in("point_no", listBad); DaPointCollectStatusEntity entity = new DaPointCollectStatusEntity(); entity.setCollectTime(collectTime); entity.setCollectQuality(DataQualityEnum.BAD.getCode()); baseDao.update(entity, queryWrapper); } } } iailab-module-data/iailab-module-data-biz/src/main/java/com/iailab/module/data/point/vo/DaPointPageReqVO.java
@@ -30,4 +30,6 @@ private String tagNo; private Integer isEnable; private String collectQuality; } iailab-module-data/iailab-module-data-biz/src/main/resources/application-dev.yaml
@@ -54,10 +54,10 @@ spring: # RabbitMQ 配置项,对应 RabbitProperties 配置类 rabbitmq: host: 172.16.8.200 # RabbitMQ 服务的地址 host: 127.0.0.1 # RabbitMQ 服务的地址 port: 5672 # RabbitMQ 服务的端口 username: admin # RabbitMQ 服务的账号 password: admin123 # RabbitMQ 服务的密码 username: guest # RabbitMQ 服务的账号 password: guest # RabbitMQ 服务的密码 # Kafka 配置项,对应 KafkaProperties 配置类 kafka: bootstrap-servers: 127.0.0.1:9092 # 指定 Kafka Broker 地址,可以设置多个,以逗号分隔 iailab-module-data/iailab-module-data-biz/src/main/resources/mapper/point/DaPointDao.xml
@@ -68,6 +68,9 @@ <if test="params.isEnable != null "> and t1.is_enable = #{params.isEnable} </if> <if test="params.collectQuality != null and params.collectQuality != ''"> and t6.collect_quality = #{params.collectQuality} </if> </where> order by t1.create_time desc, t1.point_no desc </select>