潘志宝
2024-12-30 af012402d448313b0888868b9e0230ff3a8f0d49
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
package com.iailab.module.data.point.collection;
 
import com.iailab.module.data.common.enums.DataSourceType;
import com.iailab.module.data.common.utils.R;
import com.iailab.module.data.channel.kio.collector.KingIOCollector;
import com.iailab.module.data.influxdb.pojo.InfluxPointValueBoolPOJO;
import com.iailab.module.data.influxdb.pojo.InfluxPointValueDigPOJO;
import com.iailab.module.data.influxdb.pojo.InfluxPointValueSimPOJO;
import com.iailab.module.data.point.collection.handler.CalculateHandle;
import com.iailab.module.data.point.collection.handler.CumulateHandle;
import com.iailab.module.data.point.common.PointTypeEnum;
import com.iailab.module.data.point.dto.DaPointDTO;
import com.iailab.module.data.point.service.DaPointCollectStatusService;
import com.iailab.module.data.point.service.DaPointService;
import com.iailab.module.data.influxdb.pojo.InfluxPointValuePOJO;
import com.iailab.module.data.channel.modbus.collector.ModBusCollector;
import com.iailab.module.data.channel.opcua.collector.OpcUaCollector;
import com.iailab.module.data.point.collection.handler.ConstantHandle;
import com.iailab.module.data.point.collection.handler.MeasureHandle;
import com.iailab.module.data.point.dto.DaPointWriteValueDTO;
import com.iailab.module.data.influxdb.service.InfluxDBService;
import lombok.extern.slf4j.Slf4j;
 
import javax.annotation.Resource;
 
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
 
import java.math.BigDecimal;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.TimeUnit;
 
/**
 * @author PanZhibao
 * @Description
 * @createTime 2023年04月25日 16:16:00
 */
@Slf4j
@Component
public class PointCollector {
 
    @Resource
    private DaPointService daPointService;
 
    @Resource
    private ConstantHandle constantHandle;
 
    @Resource
    private MeasureHandle measureHandle;
 
    @Resource
    private CalculateHandle calculateHandle;
 
    @Resource
    private KingIOCollector kingIOCollector;
 
    @Resource
    private InfluxDBService influxDBService;
 
    @Resource
    private ModBusCollector modBusCollector;
 
    @Resource
    private OpcUaCollector opcUaCollector;
 
    @Resource
    private CumulateHandle cumulateHandle;
 
    @Autowired
    private DaPointCollectStatusService daPointCollectStatusService;
 
    @Autowired
    private RedisTemplate<String, Object> redisTemplate;
 
    public static final String PV = "point_value:";
 
    public static final long offset = 60 * 3L;
 
    /**
     * 采集
     *
     * @param collectTime
     * @param minfreq
     */
    public void collect(Date collectTime, String minfreq) {
        try {
            Map<String, Object> dataMap = new HashMap<>();
            List<InfluxPointValuePOJO> pointValues = new ArrayList<>();
 
            log.info("读取常量点");
            List<DaPointDTO> pointConstantList = daPointService.getConstantPoint(minfreq);
            pointValues.addAll(constantHandle.handle(collectTime, pointConstantList, dataMap));
 
            log.info("读取测量点");
            List<DaPointDTO> pointMeasureList = daPointService.getMeasurePoint(minfreq);
            pointValues.addAll(measureHandle.handle(collectTime, pointMeasureList, dataMap));
 
            log.info("读取计算点");
            List<DaPointDTO> pointCalculateList = daPointService.getMathPoint(minfreq);
            pointValues.addAll(calculateHandle.handle(collectTime, pointCalculateList, dataMap));
 
            log.info("读取累计点");
            List<DaPointDTO> pointCumulateList = daPointService.getCumulatePoint(minfreq);
            pointValues.addAll(cumulateHandle.handle(collectTime, pointCumulateList));
 
            log.info("存入时序库");
            influxDBService.asyncWritePointValues(pointValues);
 
            log.info("存入缓存");
            for (InfluxPointValuePOJO pointValue : pointValues) {
                if (pointValue instanceof InfluxPointValueSimPOJO) {
                    InfluxPointValueSimPOJO simPOJO = (InfluxPointValueSimPOJO) pointValue;
                    redisTemplate.opsForValue().set(PV + simPOJO.getPoint(), simPOJO.getValue().doubleValue(), offset, TimeUnit.SECONDS);
                } else if (pointValue instanceof InfluxPointValueDigPOJO) {
                    InfluxPointValueDigPOJO digPOJO = (InfluxPointValueDigPOJO) pointValue;
                    redisTemplate.opsForValue().set(PV + digPOJO.getPoint(), digPOJO.getValue().intValue(), offset, TimeUnit.SECONDS);
                } else if (pointValue instanceof InfluxPointValueBoolPOJO) {
                    InfluxPointValueBoolPOJO boolPOJO = (InfluxPointValueBoolPOJO) pointValue;
                    redisTemplate.opsForValue().set(PV + boolPOJO.getPoint(), boolPOJO.getValue().booleanValue(), offset, TimeUnit.SECONDS);
                }
            }
            log.info("更新采集状态");
            daPointCollectStatusService.recordStatusList(pointValues, collectTime);
            log.info("采集完成");
        } catch (Exception ex) {
            log.info("采集异常!");
            ex.printStackTrace();
        }
    }
 
    public Map<String, Object> getCurrentValue(List<String> pointNos) {
        try {
            Map<String, Object> data = new HashMap<>();
            if (CollectionUtils.isEmpty(pointNos)) {
                return data;
            }
            data.putAll(constantHandle.getCurrent(pointNos));
            data.putAll(measureHandle.getCurrent(pointNos));
            data.putAll(calculateHandle.getCurrent(pointNos));
            data.putAll(cumulateHandle.getCurrent(pointNos));
            return data;
        } catch (Exception ex) {
            return R.error(ex.getMessage());
        }
 
    }
 
    public void setValue(DaPointWriteValueDTO writeValue) throws Exception {
        DaPointDTO daPointDTO = daPointService.getByNo(writeValue.getPointNo());
        if (daPointDTO == null) {
            throw new Exception("点位不存在");
        }
        if (PointTypeEnum.CONSTANT.getCode().equals(daPointDTO.getPointType())) {
            daPointDTO.setDefaultValue(new BigDecimal(writeValue.getPointValue().toString()));
            daPointService.updateDefaultValue(daPointDTO);
        } else if (PointTypeEnum.MEASURE_POINT.getCode().equals(daPointDTO.getPointType())) {
            DaPointDTO mPoint = daPointService.getMeasurePointByNo(daPointDTO.getPointNo());
            if (DataSourceType.OPCUA.getCode().equals(mPoint.getSourceType())) {
                opcUaCollector.setTagData(mPoint.getSourceId(), mPoint.getTagNo(), writeValue.getPointValue().toString(), mPoint.getDataType());
            } else if (DataSourceType.ModBus.getCode().equals(mPoint.getSourceType())) {
                modBusCollector.setTagValue(mPoint.getSourceId(), mPoint.getTagNo(), writeValue.getPointValue().toString());
            } else if (DataSourceType.KIO.getCode().equals(mPoint.getSourceType())) {
                kingIOCollector.setTagValue(mPoint.getSourceId(), mPoint.getTagNo(), writeValue.getPointValue().toString(), mPoint.getDataType());
            } else {
                log.info("没有匹配的TagNo=" + mPoint.getTagNo());
            }
        }
 
    }
}