Jay
2024-11-25 ee9f604388a3e77d3f4654e326f3976552e7f532
提交 | 用户 | 时间
52487d 1 package com.iailab.module.data.channel.http.collector.ihdb;
L 2
3 import com.alibaba.fastjson.JSON;
4 import com.alibaba.fastjson.JSONArray;
5 import com.alibaba.fastjson.JSONObject;
e8ad66 6 import com.google.gson.Gson;
52487d 7 import com.iailab.framework.common.constant.CommonConstant;
L 8 import com.iailab.module.data.channel.http.entity.HttpApiEntity;
9 import com.iailab.module.data.channel.http.service.HttpApiService;
10 import com.iailab.module.data.common.enums.DataSourceType;
11 import com.iailab.module.data.common.utils.DateUtils;
d41f14 12 import com.iailab.module.data.common.utils.HttpRequest;
52487d 13 import com.iailab.module.data.common.utils.TagUtils;
L 14 import lombok.extern.slf4j.Slf4j;
15 import org.springframework.beans.factory.annotation.Autowired;
2e7f34 16 import org.springframework.data.redis.core.BoundHashOperations;
D 17 import org.springframework.data.redis.core.RedisTemplate;
52487d 18 import org.springframework.stereotype.Component;
L 19 import org.springframework.util.CollectionUtils;
20
21 import java.math.BigDecimal;
e8ad66 22 import java.math.RoundingMode;
0a2804 23 import java.sql.Timestamp;
24 import java.text.ParseException;
25 import java.text.SimpleDateFormat;
26 import java.time.LocalDateTime;
52487d 27 import java.util.*;
0a2804 28 import java.util.concurrent.*;
29 import java.util.function.Function;
30 import java.util.stream.Collectors;
52487d 31
L 32 /**
33  * iHyperDB采集
d41f14 34  *
52487d 35  * @author lirm
L 36  * @Description
37  * @createTime 2024年10月16日
38  */
39 @Slf4j
40 @Component
d41f14 41 public class HttpCollectorForIhd {
0a2804 42     private static Map<String, HttpApiEntity> apiMap = new HashMap<>();
52487d 43
L 44     @Autowired
45     private HttpApiService httpApiService;
2e7f34 46
D 47     @Autowired
48     private RedisTemplate redisTemplate;
e8ad66 49
0a2804 50     ThreadPoolExecutor threadPool = new ThreadPoolExecutor(18, 36, 30, TimeUnit.SECONDS,
51             new ArrayBlockingQueue<Runnable>(36), new ThreadPoolExecutor.AbortPolicy());
52
d41f14 53     private static final String STA_TRUE = "true";
52487d 54
d338b5 55     private static final int GROUP_MAX_COUNT = 90;
0a2804 56
57     private static final int MAX_WAIT = 30;
2f03e2 58
59     private static final String pattern = "yyyyMMddHHmm00";
60
61     private static final String IS_SUCCESS = "isSuccess";
62
63     /**
64      * tagName
65      */
66     private static final String N = "n";
67
68     /**
69      * dimension
70      */
71     private static final String D = "d";
72
73     /**
74      * 类型
75      */
76     private static final String P = "p";
77
78     /**
79      * dataValue
80      */
81     private static final String V = "v";
82
83     /**
84      * dataTime
85      */
86     private static final String T = "t";
52487d 87
L 88     private HttpApiEntity getHttpApi(String id) {
89         if (apiMap.containsKey(id)) {
90             return apiMap.get(id);
91         }
92         HttpApiEntity httpApi = httpApiService.info(id);
93         apiMap.put(id, httpApi);
94         return httpApi;
95     }
96
97     public BigDecimal getTagValue(String sourceId, String tagNo, Integer dimension, String valueType) {
98         BigDecimal value = CommonConstant.BAD_VALUE;
99         HttpApiEntity httpApi = this.getHttpApi(sourceId);
100         StringBuilder tagSb = new StringBuilder();
101         tagSb.append("[");
102         Map<String, Object> queryParams = new HashMap<>();
2f03e2 103         queryParams.put(P, valueType);
104         queryParams.put(D, dimension);
105         queryParams.put(N, tagNo);
d41f14 106         String jsonString = JSON.toJSONString(queryParams);
52487d 107         tagSb.append(jsonString);
L 108         tagSb.append("]");
2f03e2 109         String currentDate = DateUtils.format(new Date(), pattern);
d41f14 110         String responseStr = HttpRequest.sendPost(httpApi.getUrl() + "/" + currentDate, tagSb.toString());
52487d 111         JSONObject responseObj = JSON.parseObject(responseStr);
2f03e2 112         if (STA_TRUE.equals(responseObj.get(IS_SUCCESS).toString())) {
52487d 113             JSONArray tagValueList = responseObj.getJSONArray("data");
L 114             if (!CollectionUtils.isEmpty(tagValueList)) {
115                 for (int i = 0; i < tagValueList.size(); i++) {
116                     JSONObject item = tagValueList.getJSONObject(i);
2f03e2 117                     value = new BigDecimal(item.get(V).toString());
52487d 118                 }
L 119             }
120         }
121         return value;
122     }
123
2f03e2 124     public Map<String, Object> getLastValues(String sourceId, List<String> tagNames) {
e8ad66 125         Map<String, Object> result = new HashMap<>();
2f03e2 126         HttpApiEntity httpApi = this.getHttpApi(sourceId);
2e7f34 127         try {
D 128             if (CollectionUtils.isEmpty(tagNames)) {
129                 return result;
130             }
131             List<String> noCacheTagNames = new ArrayList<>();//未缓存的tag
132             for (int i = 0; i < tagNames.size(); i++) {
133                 //先查缓存
134                 BoundHashOperations<String, String, Object> ops = redisTemplate.boundHashOps(tagNames.get(i));
2f03e2 135                 if (ops.get(V) != null) {
136                     BigDecimal value = new BigDecimal(ops.get(V).toString());
2e7f34 137                     result.put(tagNames.get(i), value.setScale(3, RoundingMode.HALF_UP));
D 138                 } else {
139                     noCacheTagNames.add(tagNames.get(i));
140                 }
141             }
142             if (CollectionUtils.isEmpty(noCacheTagNames)) {
143                 log.info("全部读取缓存");
144                 return result;
145             }
146             log.info("查询未缓存的数据");
147             Gson gson = new Gson();
148             String tagSb = gson.toJson(noCacheTagNames);
149             log.info("body=====" + tagSb);
2f03e2 150             String currentDate = DateUtils.format(new Date(), pattern);
2e7f34 151             String responseStr = "";
2f03e2 152             responseStr = HttpRequest.sendPost(httpApi.getUrl().replace("getPointdatasAvg", "getPointslast") + "/" + currentDate, tagSb);
2e7f34 153             JSONObject responseObj = JSON.parseObject(responseStr);
2f03e2 154             if (STA_TRUE.equals(responseObj.get(IS_SUCCESS).toString())) {
2e7f34 155                 JSONArray tagValueList = responseObj.getJSONArray("data");
D 156                 if (!CollectionUtils.isEmpty(tagValueList)) {
157                     for (int i = 0; i < tagValueList.size(); i++) {
158                         JSONObject item = tagValueList.getJSONObject(i);
2f03e2 159                         if (item.get(V) != null) {
2e7f34 160                             //存缓存
2f03e2 161                             BoundHashOperations<String, String, Object> ops = redisTemplate.boundHashOps(item.get(V).toString());
162                             ops.put(V, item.get(V).toString());
2e7f34 163                             //设置过期时间
2f03e2 164                             redisTemplate.expire(item.get(N).toString(), 10, TimeUnit.SECONDS);
2e7f34 165                             //把查询到的数据插入结果集
2f03e2 166                             BigDecimal value = new BigDecimal(item.get(V).toString());
167                             result.put(item.get(N).toString(), value.setScale(3, RoundingMode.HALF_UP));
2e7f34 168                         } else {
2f03e2 169                             result.put(item.get(N).toString(), CommonConstant.BAD_VALUE);
2e7f34 170                         }
e8ad66 171                     }
D 172                 }
173             }
2e7f34 174
D 175         } catch (Exception ex) {
176             log.info("getCurrentValue异常");
177             ex.printStackTrace();
178             throw ex;
e8ad66 179         }
D 180         return result;
181     }
182
0a2804 183     public Map<String, Object> getTagValues(List<Object[]> params, Date collectTime) {
184         Map<String, Object> result = new HashMap<>();
52487d 185         if (CollectionUtils.isEmpty(params)) {
L 186             return new HashMap<>();
187         }
0a2804 188         try {
189             Map<Integer, List<Object[]>> measurePointsCountGroup = new HashMap<>();
190             int pointListSize = params.size();
191             int groupCount = pointListSize / GROUP_MAX_COUNT + ((pointListSize % GROUP_MAX_COUNT) > 0 ? 1 : 0);
192             log.info("groupCount=" + groupCount);
193             for (int i = 0; i < groupCount; i++) {
194                 int end = (i + 1) * GROUP_MAX_COUNT;
195                 if (end > pointListSize) {
196                     end = pointListSize;
197                 }
198                 measurePointsCountGroup.put(i, params.subList(i * GROUP_MAX_COUNT, end));
199             }
200             log.info("measurePointsCountGroup.size()=" + measurePointsCountGroup.size());
201             result = new ConcurrentHashMap<>(params.size());
202             CountDownLatch countDownLatch = new CountDownLatch(measurePointsCountGroup.size());
203             for (Map.Entry<Integer, List<Object[]>> measurePointsItem : measurePointsCountGroup.entrySet()) {
204                 HttpApiEntity httpApi = this.getHttpApi(measurePointsItem.getValue().get(0)[0].toString());
205                 // 并发
8e2590 206                 Thread.sleep(200);
0a2804 207                 threadPool.submit(new Task(httpApi.getUrl(), httpApi.getCode(), result, measurePointsItem.getValue(),
208                         collectTime, countDownLatch));
209                 // 顺序
210                 //this.getByHtp(result, measurePointsItem.getValue(), collectTime);
52487d 211             }
0a2804 212             countDownLatch.await(MAX_WAIT, TimeUnit.SECONDS);
213
214         } catch (Exception ex) {
215             ex.printStackTrace();
52487d 216         }
L 217         return result;
218     }
219
0a2804 220     /**
221      * 异步采集任务
222      */
223     private class Task implements Runnable {
224         String url;
225         String sourceName;
226         Date collectTime;
227         Map<String, Object> result;
228         List<Object[]> params;
229         CountDownLatch countDownLatch;
230
231         public Task(String url, String sourceName, Map<String, Object> result, List<Object[]> params,
232                     Date collectTime, CountDownLatch countDownLatch) {
233             this.url = url;
234             this.sourceName = sourceName;
235             this.result = result;
236             this.collectTime = collectTime;
237             this.params = params;
238             this.countDownLatch = countDownLatch;
239         }
240
241         @Override
242         public void run() {
243             try {
244                 log.info("请求的Tag数量:" + params.size());
245                 this.getByHtp(url, sourceName, result, params, collectTime);
246                 log.info("请求结束:url=" + url);
247             } catch (Exception ex) {
248                 log.info("获取采集值失败," + ex.getMessage());
249                 ex.printStackTrace();
250             } finally {
251                 countDownLatch.countDown();
252             }
253         }
254
255         /**
256          * getTagDataByHttp
257          *
258          * @param url
259          * @param sourceName
260          * @param result
261          * @param params
262          * @param collectTime
263          */
264         private void getByHtp(String url, String sourceName, Map<String, Object> result, List<Object[]> params, Date collectTime) {
265             StringBuilder tagSb = new StringBuilder();
266             tagSb.append("[");
267             for (int i = 0; i < params.size(); i++) {
268                 Map<String, Object> queryParams = new HashMap<>();
269                 queryParams.put(N, params.get(i)[1]);
270                 queryParams.put(D, params.get(i)[2]);
271                 queryParams.put(P, params.get(i)[3]);
272                 String jsonString = JSON.toJSONString(queryParams);
273                 tagSb.append(jsonString);
274                 if (i < params.size() - 1) {
275                     tagSb.append(",");
276                 }
277             }
278             tagSb.append("]");
279             log.info("body=====" + tagSb);
280             String currentDate = DateUtils.format(collectTime, pattern);
281             String responseStr = HttpRequest.sendPost(url + "/" + currentDate, tagSb.toString());
282             JSONObject responseObj = JSON.parseObject(responseStr);
283             log.info("responseObj=====" + responseObj.toJSONString());
284             if (STA_TRUE.equals(responseObj.get(IS_SUCCESS).toString())) {
285                 JSONArray tagValueList = responseObj.getJSONArray("data");
286                 if (!CollectionUtils.isEmpty(tagValueList)) {
287                     for (int i = 0; i < tagValueList.size(); i++) {
288                         JSONObject item = tagValueList.getJSONObject(i);
289                         result.put(TagUtils.genTagId(DataSourceType.HTTP.getCode(), sourceName, item.get(N).toString()), item.get(V));
290                     }
291                 }
292             }
293         }
294     }
295
296     private void getByHtp(String url, String sourceName, Map<String, Object> result, List<Object[]> params, Date collectTime) {
52487d 297         StringBuilder tagSb = new StringBuilder();
L 298         tagSb.append("[");
d41f14 299         for (int i = 0; i < params.size(); i++) {
52487d 300             Map<String, Object> queryParams = new HashMap<>();
2f03e2 301             queryParams.put(N, params.get(i)[1]);
302             queryParams.put(D, params.get(i)[2]);
303             queryParams.put(P, params.get(i)[3]);
d41f14 304             String jsonString = JSON.toJSONString(queryParams);
52487d 305             tagSb.append(jsonString);
L 306             if (i < params.size() - 1) {
307                 tagSb.append(",");
308             }
309         }
310         tagSb.append("]");
0a2804 311         log.info("body=====" + tagSb);
312         String currentDate = DateUtils.format(collectTime, pattern);
313         String responseStr = HttpRequest.sendPost(url + "/" + currentDate, tagSb.toString());
52487d 314         JSONObject responseObj = JSON.parseObject(responseStr);
L 315         log.info("responseObj=====" + responseObj.toJSONString());
2f03e2 316         if (STA_TRUE.equals(responseObj.get(IS_SUCCESS).toString())) {
52487d 317             JSONArray tagValueList = responseObj.getJSONArray("data");
L 318             if (!CollectionUtils.isEmpty(tagValueList)) {
319                 for (int i = 0; i < tagValueList.size(); i++) {
320                     JSONObject item = tagValueList.getJSONObject(i);
0a2804 321                     result.put(TagUtils.genTagId(DataSourceType.HTTP.getCode(), sourceName, item.get(N).toString()), item.get(V));
52487d 322                 }
L 323             }
324         }
325     }
326 }