潘志宝
2025-02-28 1673913c59fbe4458629ffa4093acc007ae9e749
提交 | 用户 | 时间
52487d 1 package com.iailab.module.data.channel.http.collector.ihdb;
L 2
3 import com.alibaba.fastjson.JSON;
4 import com.alibaba.fastjson.JSONArray;
5 import com.alibaba.fastjson.JSONObject;
e8ad66 6 import com.google.gson.Gson;
52487d 7 import com.iailab.framework.common.constant.CommonConstant;
d7fd46 8 import com.iailab.framework.common.util.http.HttpUtils;
52487d 9 import com.iailab.module.data.channel.http.entity.HttpApiEntity;
L 10 import com.iailab.module.data.channel.http.service.HttpApiService;
11 import com.iailab.module.data.common.enums.DataSourceType;
12 import com.iailab.module.data.common.utils.DateUtils;
d41f14 13 import com.iailab.module.data.common.utils.HttpRequest;
52487d 14 import com.iailab.module.data.common.utils.TagUtils;
L 15 import lombok.extern.slf4j.Slf4j;
2228b6 16 import org.apache.commons.lang3.StringUtils;
52487d 17 import org.springframework.beans.factory.annotation.Autowired;
2e7f34 18 import org.springframework.data.redis.core.BoundHashOperations;
D 19 import org.springframework.data.redis.core.RedisTemplate;
52487d 20 import org.springframework.stereotype.Component;
L 21 import org.springframework.util.CollectionUtils;
22
23 import java.math.BigDecimal;
e8ad66 24 import java.math.RoundingMode;
0a2804 25 import java.sql.Timestamp;
26 import java.text.ParseException;
27 import java.text.SimpleDateFormat;
28 import java.time.LocalDateTime;
52487d 29 import java.util.*;
0a2804 30 import java.util.concurrent.*;
31 import java.util.function.Function;
32 import java.util.stream.Collectors;
52487d 33
L 34 /**
35  * iHyperDB采集
d41f14 36  *
52487d 37  * @author lirm
L 38  * @Description
39  * @createTime 2024年10月16日
40  */
41 @Slf4j
42 @Component
d41f14 43 public class HttpCollectorForIhd {
0a2804 44     private static Map<String, HttpApiEntity> apiMap = new HashMap<>();
52487d 45
L 46     @Autowired
47     private HttpApiService httpApiService;
2e7f34 48
D 49     @Autowired
50     private RedisTemplate redisTemplate;
e8ad66 51
0a2804 52     ThreadPoolExecutor threadPool = new ThreadPoolExecutor(18, 36, 30, TimeUnit.SECONDS,
53             new ArrayBlockingQueue<Runnable>(36), new ThreadPoolExecutor.AbortPolicy());
54
d41f14 55     private static final String STA_TRUE = "true";
52487d 56
1e563e 57     private static final int GROUP_MAX_COUNT = 300;
0a2804 58
167391 59     private static final int MAX_WAIT = 40;
2f03e2 60
61     private static final String pattern = "yyyyMMddHHmm00";
62
63     private static final String IS_SUCCESS = "isSuccess";
64
65     /**
66      * tagName
67      */
68     private static final String N = "n";
69
70     /**
71      * dimension
72      */
73     private static final String D = "d";
74
75     /**
76      * 类型
77      */
78     private static final String P = "p";
79
80     /**
81      * dataValue
82      */
83     private static final String V = "v";
84
85     /**
86      * dataTime
87      */
88     private static final String T = "t";
52487d 89
c47a86 90     /**
D 91      * 数据质量G:good,B:bad
92      */
93     private static final String Q = "q";
94
52487d 95     private HttpApiEntity getHttpApi(String id) {
L 96         if (apiMap.containsKey(id)) {
97             return apiMap.get(id);
98         }
99         HttpApiEntity httpApi = httpApiService.info(id);
100         apiMap.put(id, httpApi);
101         return httpApi;
102     }
103
104     public BigDecimal getTagValue(String sourceId, String tagNo, Integer dimension, String valueType) {
105         BigDecimal value = CommonConstant.BAD_VALUE;
106         HttpApiEntity httpApi = this.getHttpApi(sourceId);
107         StringBuilder tagSb = new StringBuilder();
108         tagSb.append("[");
109         Map<String, Object> queryParams = new HashMap<>();
2f03e2 110         queryParams.put(P, valueType);
111         queryParams.put(D, dimension);
112         queryParams.put(N, tagNo);
d41f14 113         String jsonString = JSON.toJSONString(queryParams);
52487d 114         tagSb.append(jsonString);
L 115         tagSb.append("]");
2f03e2 116         String currentDate = DateUtils.format(new Date(), pattern);
d7fd46 117         String responseStr = HttpUtils.sendPost(httpApi.getUrl() + "/" + currentDate, tagSb.toString());
52487d 118         JSONObject responseObj = JSON.parseObject(responseStr);
2f03e2 119         if (STA_TRUE.equals(responseObj.get(IS_SUCCESS).toString())) {
52487d 120             JSONArray tagValueList = responseObj.getJSONArray("data");
L 121             if (!CollectionUtils.isEmpty(tagValueList)) {
122                 for (int i = 0; i < tagValueList.size(); i++) {
123                     JSONObject item = tagValueList.getJSONObject(i);
2f03e2 124                     value = new BigDecimal(item.get(V).toString());
52487d 125                 }
L 126             }
127         }
128         return value;
129     }
130
2f03e2 131     public Map<String, Object> getLastValues(String sourceId, List<String> tagNames) {
e8ad66 132         Map<String, Object> result = new HashMap<>();
2f03e2 133         HttpApiEntity httpApi = this.getHttpApi(sourceId);
2e7f34 134         try {
D 135             if (CollectionUtils.isEmpty(tagNames)) {
136                 return result;
137             }
138             List<String> noCacheTagNames = new ArrayList<>();//未缓存的tag
139             for (int i = 0; i < tagNames.size(); i++) {
140                 //先查缓存
141                 BoundHashOperations<String, String, Object> ops = redisTemplate.boundHashOps(tagNames.get(i));
2f03e2 142                 if (ops.get(V) != null) {
143                     BigDecimal value = new BigDecimal(ops.get(V).toString());
2e7f34 144                     result.put(tagNames.get(i), value.setScale(3, RoundingMode.HALF_UP));
D 145                 } else {
146                     noCacheTagNames.add(tagNames.get(i));
147                 }
148             }
149             if (CollectionUtils.isEmpty(noCacheTagNames)) {
150                 log.info("全部读取缓存");
151                 return result;
152             }
153             log.info("查询未缓存的数据");
154             Gson gson = new Gson();
155             String tagSb = gson.toJson(noCacheTagNames);
156             log.info("body=====" + tagSb);
2f03e2 157             String currentDate = DateUtils.format(new Date(), pattern);
2e7f34 158             String responseStr = "";
d7fd46 159             responseStr = HttpUtils.sendPost(httpApi.getUrl().replace("getPointdatasAvg", "getPointslast") + "/" + currentDate, tagSb);
2e7f34 160             JSONObject responseObj = JSON.parseObject(responseStr);
2f03e2 161             if (STA_TRUE.equals(responseObj.get(IS_SUCCESS).toString())) {
2e7f34 162                 JSONArray tagValueList = responseObj.getJSONArray("data");
D 163                 if (!CollectionUtils.isEmpty(tagValueList)) {
164                     for (int i = 0; i < tagValueList.size(); i++) {
165                         JSONObject item = tagValueList.getJSONObject(i);
2f03e2 166                         if (item.get(V) != null) {
2e7f34 167                             //存缓存
2f03e2 168                             BoundHashOperations<String, String, Object> ops = redisTemplate.boundHashOps(item.get(V).toString());
169                             ops.put(V, item.get(V).toString());
2e7f34 170                             //设置过期时间
2f03e2 171                             redisTemplate.expire(item.get(N).toString(), 10, TimeUnit.SECONDS);
2e7f34 172                             //把查询到的数据插入结果集
2f03e2 173                             BigDecimal value = new BigDecimal(item.get(V).toString());
174                             result.put(item.get(N).toString(), value.setScale(3, RoundingMode.HALF_UP));
2e7f34 175                         } else {
2f03e2 176                             result.put(item.get(N).toString(), CommonConstant.BAD_VALUE);
2e7f34 177                         }
e8ad66 178                     }
D 179                 }
180             }
2e7f34 181
D 182         } catch (Exception ex) {
183             log.info("getCurrentValue异常");
184             ex.printStackTrace();
185             throw ex;
e8ad66 186         }
D 187         return result;
188     }
189
0a2804 190     public Map<String, Object> getTagValues(List<Object[]> params, Date collectTime) {
191         Map<String, Object> result = new HashMap<>();
52487d 192         if (CollectionUtils.isEmpty(params)) {
L 193             return new HashMap<>();
194         }
0a2804 195         try {
196             Map<Integer, List<Object[]>> measurePointsCountGroup = new HashMap<>();
197             int pointListSize = params.size();
198             int groupCount = pointListSize / GROUP_MAX_COUNT + ((pointListSize % GROUP_MAX_COUNT) > 0 ? 1 : 0);
199             log.info("groupCount=" + groupCount);
200             for (int i = 0; i < groupCount; i++) {
201                 int end = (i + 1) * GROUP_MAX_COUNT;
202                 if (end > pointListSize) {
203                     end = pointListSize;
204                 }
205                 measurePointsCountGroup.put(i, params.subList(i * GROUP_MAX_COUNT, end));
206             }
207             log.info("measurePointsCountGroup.size()=" + measurePointsCountGroup.size());
208             result = new ConcurrentHashMap<>(params.size());
209             CountDownLatch countDownLatch = new CountDownLatch(measurePointsCountGroup.size());
210             for (Map.Entry<Integer, List<Object[]>> measurePointsItem : measurePointsCountGroup.entrySet()) {
211                 HttpApiEntity httpApi = this.getHttpApi(measurePointsItem.getValue().get(0)[0].toString());
212                 // 并发
167391 213                 Thread.sleep(500);
0a2804 214                 threadPool.submit(new Task(httpApi.getUrl(), httpApi.getCode(), result, measurePointsItem.getValue(),
215                         collectTime, countDownLatch));
216                 // 顺序
217                 //this.getByHtp(result, measurePointsItem.getValue(), collectTime);
52487d 218             }
0a2804 219             countDownLatch.await(MAX_WAIT, TimeUnit.SECONDS);
220
221         } catch (Exception ex) {
222             ex.printStackTrace();
52487d 223         }
L 224         return result;
225     }
226
0a2804 227     /**
228      * 异步采集任务
229      */
230     private class Task implements Runnable {
231         String url;
232         String sourceName;
233         Date collectTime;
234         Map<String, Object> result;
235         List<Object[]> params;
236         CountDownLatch countDownLatch;
237
238         public Task(String url, String sourceName, Map<String, Object> result, List<Object[]> params,
239                     Date collectTime, CountDownLatch countDownLatch) {
240             this.url = url;
241             this.sourceName = sourceName;
242             this.result = result;
243             this.collectTime = collectTime;
244             this.params = params;
245             this.countDownLatch = countDownLatch;
246         }
247
248         @Override
249         public void run() {
250             try {
251                 log.info("请求的Tag数量:" + params.size());
252                 this.getByHtp(url, sourceName, result, params, collectTime);
253                 log.info("请求结束:url=" + url);
254             } catch (Exception ex) {
255                 log.info("获取采集值失败," + ex.getMessage());
256                 ex.printStackTrace();
257             } finally {
258                 countDownLatch.countDown();
259             }
260         }
261
262         /**
263          * getTagDataByHttp
264          *
265          * @param url
266          * @param sourceName
267          * @param result
268          * @param params
269          * @param collectTime
270          */
271         private void getByHtp(String url, String sourceName, Map<String, Object> result, List<Object[]> params, Date collectTime) {
272             StringBuilder tagSb = new StringBuilder();
273             tagSb.append("[");
274             for (int i = 0; i < params.size(); i++) {
2228b6 275                 Map<String, Object> queryParams = new HashMap<>(3);
0a2804 276                 queryParams.put(N, params.get(i)[1]);
277                 queryParams.put(D, params.get(i)[2]);
278                 queryParams.put(P, params.get(i)[3]);
279                 String jsonString = JSON.toJSONString(queryParams);
280                 tagSb.append(jsonString);
281                 if (i < params.size() - 1) {
282                     tagSb.append(",");
283                 }
284             }
285             tagSb.append("]");
286             log.info("body=====" + tagSb);
287             String currentDate = DateUtils.format(collectTime, pattern);
d7fd46 288             String responseStr = HttpUtils.sendPost(url + "/" + currentDate, tagSb.toString());
0a2804 289             JSONObject responseObj = JSON.parseObject(responseStr);
290             log.info("responseObj=====" + responseObj.toJSONString());
291             if (STA_TRUE.equals(responseObj.get(IS_SUCCESS).toString())) {
292                 JSONArray tagValueList = responseObj.getJSONArray("data");
293                 if (!CollectionUtils.isEmpty(tagValueList)) {
294                     for (int i = 0; i < tagValueList.size(); i++) {
295                         JSONObject item = tagValueList.getJSONObject(i);
c47a86 296                         if (item.get(Q).toString().equals("G")) {
D 297                             result.put(TagUtils.genTagId(DataSourceType.HTTP.getCode(), sourceName, item.get(N).toString()), item.get(V));
298                         }
0a2804 299                     }
300                 }
301             }
302         }
303     }
304
305     private void getByHtp(String url, String sourceName, Map<String, Object> result, List<Object[]> params, Date collectTime) {
52487d 306         StringBuilder tagSb = new StringBuilder();
L 307         tagSb.append("[");
d41f14 308         for (int i = 0; i < params.size(); i++) {
52487d 309             Map<String, Object> queryParams = new HashMap<>();
2f03e2 310             queryParams.put(N, params.get(i)[1]);
311             queryParams.put(D, params.get(i)[2]);
312             queryParams.put(P, params.get(i)[3]);
d41f14 313             String jsonString = JSON.toJSONString(queryParams);
52487d 314             tagSb.append(jsonString);
L 315             if (i < params.size() - 1) {
316                 tagSb.append(",");
317             }
318         }
319         tagSb.append("]");
0a2804 320         log.info("body=====" + tagSb);
321         String currentDate = DateUtils.format(collectTime, pattern);
d7fd46 322         String responseStr = HttpUtils.sendPost(url + "/" + currentDate, tagSb.toString());
52487d 323         JSONObject responseObj = JSON.parseObject(responseStr);
L 324         log.info("responseObj=====" + responseObj.toJSONString());
2f03e2 325         if (STA_TRUE.equals(responseObj.get(IS_SUCCESS).toString())) {
52487d 326             JSONArray tagValueList = responseObj.getJSONArray("data");
L 327             if (!CollectionUtils.isEmpty(tagValueList)) {
328                 for (int i = 0; i < tagValueList.size(); i++) {
329                     JSONObject item = tagValueList.getJSONObject(i);
0a2804 330                     result.put(TagUtils.genTagId(DataSourceType.HTTP.getCode(), sourceName, item.get(N).toString()), item.get(V));
52487d 331                 }
L 332             }
333         }
334     }
335 }