iailab-module-data/iailab-module-data-biz/src/main/java/com/iailab/module/data/point/collection/PointCollector.java
@@ -99,26 +99,10 @@ influxDBService.asyncWritePointValues(pointValues); log.info("更新采集状态"); updateCollectStatus(pointValues, collectTime); daPointCollectStatusService.recordStatusList(pointValues, collectTime); log.info("采集完成"); } catch (Exception ex) { log.info("采集异常!"); ex.printStackTrace(); } } private void updateCollectStatus(List<InfluxPointValuePOJO> pointValues, Date collectTime) { try { for (InfluxPointValuePOJO pointValue : pointValues) { if (pointValue instanceof InfluxPointValueSimPOJO) { InfluxPointValueSimPOJO pvo = (InfluxPointValueSimPOJO) pointValue; daPointCollectStatusService.recordStatus(pvo.getPoint(), pvo.getValue().toString(), collectTime); } else if (pointValue instanceof InfluxPointValueDigPOJO) { InfluxPointValueDigPOJO pvo = (InfluxPointValueDigPOJO) pointValue; daPointCollectStatusService.recordStatus(pvo.getPoint(), pvo.getValue().toString(), collectTime); } } } catch (Exception ex) { ex.printStackTrace(); } } iailab-module-data/iailab-module-data-biz/src/main/java/com/iailab/module/data/point/service/DaPointCollectStatusService.java
@@ -1,10 +1,12 @@ package com.iailab.module.data.point.service; import com.iailab.framework.common.service.BaseService; import com.iailab.module.data.influxdb.pojo.InfluxPointValuePOJO; import com.iailab.module.data.point.entity.DaPointCollectStatusEntity; import org.springframework.scheduling.annotation.Async; import java.util.Date; import java.util.List; /** * @author PanZhibao @@ -15,4 +17,8 @@ @Async void recordStatus(String pointNo, String collectValue, Date collectTime); @Async void recordStatusList(List<InfluxPointValuePOJO> pointValues, Date collectTime); } iailab-module-data/iailab-module-data-biz/src/main/java/com/iailab/module/data/point/service/impl/DaPointCollectStatusServiceImpl.java
@@ -3,12 +3,18 @@ import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; import com.iailab.framework.common.service.impl.BaseServiceImpl; import com.iailab.module.data.common.enums.DataQualityEnum; import com.iailab.module.data.influxdb.pojo.InfluxPointValueDigPOJO; import com.iailab.module.data.influxdb.pojo.InfluxPointValuePOJO; import com.iailab.module.data.influxdb.pojo.InfluxPointValueSimPOJO; import com.iailab.module.data.point.dao.DaPointCollectStatusDao; import com.iailab.module.data.point.entity.DaPointCollectStatusEntity; import com.iailab.module.data.point.service.DaPointCollectStatusService; import org.springframework.stereotype.Service; import org.springframework.util.CollectionUtils; import java.util.ArrayList; import java.util.Date; import java.util.List; import java.util.UUID; /** @@ -40,4 +46,47 @@ } } public void recordStatusList(List<InfluxPointValuePOJO> pointValues, Date collectTime) { List<String> listGood = new ArrayList<>(); List<String> listBad = new ArrayList<>(); Object collectValue = null; for (InfluxPointValuePOJO pointValue : pointValues) { if (pointValue instanceof InfluxPointValueSimPOJO) { InfluxPointValueSimPOJO pvo = (InfluxPointValueSimPOJO) pointValue; collectValue = pvo.getValue(); } else if (pointValue instanceof InfluxPointValueDigPOJO) { InfluxPointValueDigPOJO pvo = (InfluxPointValueDigPOJO) pointValue; collectValue = pvo.getValue(); } else { continue; } switch (DataQualityEnum.getEumByValue(collectValue)) { case GOOD: listGood.add(pointValue.getPoint()); break; case BAD: listBad.add(pointValue.getPoint()); break; default: break; } } if (!CollectionUtils.isEmpty(listGood)) { QueryWrapper<DaPointCollectStatusEntity> queryWrapper = new QueryWrapper<>(); queryWrapper.in("point_no", listGood); DaPointCollectStatusEntity entity = new DaPointCollectStatusEntity(); entity.setCollectTime(collectTime); entity.setCollectQuality(DataQualityEnum.GOOD.getCode()); baseDao.update(entity, queryWrapper); } if (!CollectionUtils.isEmpty(listBad)) { QueryWrapper<DaPointCollectStatusEntity> queryWrapper = new QueryWrapper<>(); queryWrapper.in("point_no", listBad); DaPointCollectStatusEntity entity = new DaPointCollectStatusEntity(); entity.setCollectTime(collectTime); entity.setCollectQuality(DataQualityEnum.BAD.getCode()); baseDao.update(entity, queryWrapper); } } } iailab-module-data/iailab-module-data-biz/src/main/resources/application-dev.yaml
@@ -54,10 +54,10 @@ spring: # RabbitMQ 配置项,对应 RabbitProperties 配置类 rabbitmq: host: 172.16.8.200 # RabbitMQ 服务的地址 host: 127.0.0.1 # RabbitMQ 服务的地址 port: 5672 # RabbitMQ 服务的端口 username: admin # RabbitMQ 服务的账号 password: admin123 # RabbitMQ 服务的密码 username: guest # RabbitMQ 服务的账号 password: guest # RabbitMQ 服务的密码 # Kafka 配置项,对应 KafkaProperties 配置类 kafka: bootstrap-servers: 127.0.0.1:9092 # 指定 Kafka Broker 地址,可以设置多个,以逗号分隔