package com.iailab.module.data.point.collection; import com.iailab.module.data.common.enums.DataSourceType; import com.iailab.module.data.common.utils.R; import com.iailab.module.data.channel.kio.collector.KingIOCollector; import com.iailab.module.data.influxdb.pojo.InfluxPointValueBoolPOJO; import com.iailab.module.data.influxdb.pojo.InfluxPointValueDigPOJO; import com.iailab.module.data.influxdb.pojo.InfluxPointValueSimPOJO; import com.iailab.module.data.point.collection.handler.CalculateHandle; import com.iailab.module.data.point.collection.handler.CumulateHandle; import com.iailab.module.data.point.common.PointTypeEnum; import com.iailab.module.data.point.dto.DaPointDTO; import com.iailab.module.data.point.service.DaPointCollectStatusService; import com.iailab.module.data.point.service.DaPointService; import com.iailab.module.data.influxdb.pojo.InfluxPointValuePOJO; import com.iailab.module.data.channel.modbus.collector.ModBusCollector; import com.iailab.module.data.channel.opcua.collector.OpcUaCollector; import com.iailab.module.data.point.collection.handler.ConstantHandle; import com.iailab.module.data.point.collection.handler.MeasureHandle; import com.iailab.module.data.point.dto.DaPointWriteValueDTO; import com.iailab.module.data.influxdb.service.InfluxDBService; import lombok.extern.slf4j.Slf4j; import javax.annotation.Resource; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Component; import org.springframework.util.CollectionUtils; import java.math.BigDecimal; import java.util.*; import java.util.concurrent.TimeUnit; /** * @author PanZhibao * @Description * @createTime 2023å¹´04月25æ—¥ 16:16:00 */ @Slf4j @Component public class PointCollector { @Resource private DaPointService daPointService; @Resource private ConstantHandle constantHandle; @Resource private MeasureHandle measureHandle; @Resource private CalculateHandle calculateHandle; @Resource private KingIOCollector kingIOCollector; @Resource private InfluxDBService influxDBService; @Resource private ModBusCollector modBusCollector; @Resource private OpcUaCollector opcUaCollector; @Resource private CumulateHandle cumulateHandle; @Autowired private DaPointCollectStatusService daPointCollectStatusService; @Autowired private RedisTemplate<String, Object> redisTemplate; public static final String PV = "point_value:"; public static final long offset = 60 * 2L; /** * 采集 * * @param collectTime * @param minfreq */ public void collect(Date collectTime, String minfreq) { try { log.info("collectTime=" + collectTime.getTime()); Map<String, Object> dataMap = new HashMap<>(); List<InfluxPointValuePOJO> pointValues = new ArrayList<>(); // 记录点ä½çŠ¶æ€ List<String> listGood = new ArrayList<>(); List<String> listBad = new ArrayList<>(); log.info("读å–常é‡ç‚¹"); List<DaPointDTO> pointConstantList = daPointService.getConstantPoint(minfreq); pointValues.addAll(constantHandle.handle(collectTime, pointConstantList, dataMap, listGood, listBad)); log.info("读å–测é‡ç‚¹"); List<DaPointDTO> pointMeasureList = daPointService.getMeasurePoint(minfreq); pointValues.addAll(measureHandle.handle(collectTime, pointMeasureList, dataMap, listGood, listBad)); log.info("读å–计算点"); List<DaPointDTO> pointCalculateList = daPointService.getMathPoint(minfreq); pointValues.addAll(calculateHandle.handle(collectTime, pointCalculateList, dataMap, listGood, listBad)); log.info("读å–累计点"); List<DaPointDTO> pointCumulateList = daPointService.getCumulatePoint(minfreq); pointValues.addAll(cumulateHandle.handle(collectTime, pointCumulateList, listGood, listBad)); log.info("å˜å…¥æ—¶åºåº“"); log.info("pointValueTimestamp=" + (pointValues.get(0) == null ? 0 : pointValues.get(0).getTimestamp().getNano())); influxDBService.asyncWritePointValues(pointValues); log.info("å˜å…¥ç¼“å˜"); for (InfluxPointValuePOJO pointValue : pointValues) { if (pointValue instanceof InfluxPointValueSimPOJO) { InfluxPointValueSimPOJO simPOJO = (InfluxPointValueSimPOJO) pointValue; redisTemplate.opsForValue().set(PV + simPOJO.getPoint(), simPOJO.getValue().doubleValue(), offset, TimeUnit.SECONDS); } else if (pointValue instanceof InfluxPointValueDigPOJO) { InfluxPointValueDigPOJO digPOJO = (InfluxPointValueDigPOJO) pointValue; redisTemplate.opsForValue().set(PV + digPOJO.getPoint(), digPOJO.getValue().intValue(), offset, TimeUnit.SECONDS); } else if (pointValue instanceof InfluxPointValueBoolPOJO) { InfluxPointValueBoolPOJO boolPOJO = (InfluxPointValueBoolPOJO) pointValue; redisTemplate.opsForValue().set(PV + boolPOJO.getPoint(), boolPOJO.getValue().booleanValue(), offset, TimeUnit.SECONDS); } } log.info("更新采集状æ€"); daPointCollectStatusService.recordStatusList(listGood, listBad, collectTime); log.info("采集完æˆ"); } catch (Exception ex) { log.info("采集异常ï¼"); ex.printStackTrace(); } } public Map<String, Object> getCurrentValue(List<String> pointNos) { try { Map<String, Object> data = new HashMap<>(); if (CollectionUtils.isEmpty(pointNos)) { return data; } data.putAll(constantHandle.getCurrent(pointNos)); data.putAll(measureHandle.getCurrent(pointNos)); data.putAll(calculateHandle.getCurrent(pointNos)); data.putAll(cumulateHandle.getCurrent(pointNos)); return data; } catch (Exception ex) { ex.printStackTrace(); return R.error(ex.getMessage()); } } public void setValue(DaPointWriteValueDTO writeValue) throws Exception { DaPointDTO daPointDTO = daPointService.getByNo(writeValue.getPointNo()); if (daPointDTO == null) { throw new Exception("点ä½ä¸å˜åœ¨"); } if (PointTypeEnum.CONSTANT.getCode().equals(daPointDTO.getPointType())) { daPointDTO.setDefaultValue(new BigDecimal(writeValue.getPointValue().toString())); daPointService.updateDefaultValue(daPointDTO); } else if (PointTypeEnum.MEASURE_POINT.getCode().equals(daPointDTO.getPointType())) { DaPointDTO mPoint = daPointService.getMeasurePointByNo(daPointDTO.getPointNo()); if (DataSourceType.OPCUA.getCode().equals(mPoint.getSourceType())) { opcUaCollector.setTagData(mPoint.getSourceId(), mPoint.getTagNo(), writeValue.getPointValue().toString(), mPoint.getDataType()); } else if (DataSourceType.ModBus.getCode().equals(mPoint.getSourceType())) { modBusCollector.setTagValue(mPoint.getSourceId(), mPoint.getTagNo(), writeValue.getPointValue().toString()); } else if (DataSourceType.KIO.getCode().equals(mPoint.getSourceType())) { kingIOCollector.setTagValue(mPoint.getSourceId(), mPoint.getTagNo(), writeValue.getPointValue().toString(), mPoint.getDataType()); } else { log.info("没有匹é…çš„TagNo=" + mPoint.getTagNo()); } } } }