houzhongjian
2025-01-10 d7fd4674d6fd40f6f7561e7e1eaac4b84bd9e974
提交 | 用户 | 时间
52487d 1 package com.iailab.module.data.channel.http.collector.ihdb;
L 2
3 import com.alibaba.fastjson.JSON;
4 import com.alibaba.fastjson.JSONArray;
5 import com.alibaba.fastjson.JSONObject;
e8ad66 6 import com.google.gson.Gson;
52487d 7 import com.iailab.framework.common.constant.CommonConstant;
d7fd46 8 import com.iailab.framework.common.util.http.HttpUtils;
52487d 9 import com.iailab.module.data.channel.http.entity.HttpApiEntity;
L 10 import com.iailab.module.data.channel.http.service.HttpApiService;
11 import com.iailab.module.data.common.enums.DataSourceType;
12 import com.iailab.module.data.common.utils.DateUtils;
d41f14 13 import com.iailab.module.data.common.utils.HttpRequest;
52487d 14 import com.iailab.module.data.common.utils.TagUtils;
L 15 import lombok.extern.slf4j.Slf4j;
2228b6 16 import org.apache.commons.lang3.StringUtils;
52487d 17 import org.springframework.beans.factory.annotation.Autowired;
2e7f34 18 import org.springframework.data.redis.core.BoundHashOperations;
D 19 import org.springframework.data.redis.core.RedisTemplate;
52487d 20 import org.springframework.stereotype.Component;
L 21 import org.springframework.util.CollectionUtils;
22
23 import java.math.BigDecimal;
e8ad66 24 import java.math.RoundingMode;
0a2804 25 import java.sql.Timestamp;
26 import java.text.ParseException;
27 import java.text.SimpleDateFormat;
28 import java.time.LocalDateTime;
52487d 29 import java.util.*;
0a2804 30 import java.util.concurrent.*;
31 import java.util.function.Function;
32 import java.util.stream.Collectors;
52487d 33
L 34 /**
35  * iHyperDB采集
d41f14 36  *
52487d 37  * @author lirm
L 38  * @Description
39  * @createTime 2024年10月16日
40  */
41 @Slf4j
42 @Component
d41f14 43 public class HttpCollectorForIhd {
0a2804 44     private static Map<String, HttpApiEntity> apiMap = new HashMap<>();
52487d 45
L 46     @Autowired
47     private HttpApiService httpApiService;
2e7f34 48
D 49     @Autowired
50     private RedisTemplate redisTemplate;
e8ad66 51
0a2804 52     ThreadPoolExecutor threadPool = new ThreadPoolExecutor(18, 36, 30, TimeUnit.SECONDS,
53             new ArrayBlockingQueue<Runnable>(36), new ThreadPoolExecutor.AbortPolicy());
54
d41f14 55     private static final String STA_TRUE = "true";
52487d 56
1e563e 57     private static final int GROUP_MAX_COUNT = 300;
0a2804 58
59     private static final int MAX_WAIT = 30;
2f03e2 60
61     private static final String pattern = "yyyyMMddHHmm00";
62
63     private static final String IS_SUCCESS = "isSuccess";
64
65     /**
66      * tagName
67      */
68     private static final String N = "n";
69
70     /**
71      * dimension
72      */
73     private static final String D = "d";
74
75     /**
76      * 类型
77      */
78     private static final String P = "p";
79
80     /**
81      * dataValue
82      */
83     private static final String V = "v";
84
85     /**
86      * dataTime
87      */
88     private static final String T = "t";
52487d 89
L 90     private HttpApiEntity getHttpApi(String id) {
91         if (apiMap.containsKey(id)) {
92             return apiMap.get(id);
93         }
94         HttpApiEntity httpApi = httpApiService.info(id);
95         apiMap.put(id, httpApi);
96         return httpApi;
97     }
98
99     public BigDecimal getTagValue(String sourceId, String tagNo, Integer dimension, String valueType) {
100         BigDecimal value = CommonConstant.BAD_VALUE;
101         HttpApiEntity httpApi = this.getHttpApi(sourceId);
102         StringBuilder tagSb = new StringBuilder();
103         tagSb.append("[");
104         Map<String, Object> queryParams = new HashMap<>();
2f03e2 105         queryParams.put(P, valueType);
106         queryParams.put(D, dimension);
107         queryParams.put(N, tagNo);
d41f14 108         String jsonString = JSON.toJSONString(queryParams);
52487d 109         tagSb.append(jsonString);
L 110         tagSb.append("]");
2f03e2 111         String currentDate = DateUtils.format(new Date(), pattern);
d7fd46 112         String responseStr = HttpUtils.sendPost(httpApi.getUrl() + "/" + currentDate, tagSb.toString());
52487d 113         JSONObject responseObj = JSON.parseObject(responseStr);
2f03e2 114         if (STA_TRUE.equals(responseObj.get(IS_SUCCESS).toString())) {
52487d 115             JSONArray tagValueList = responseObj.getJSONArray("data");
L 116             if (!CollectionUtils.isEmpty(tagValueList)) {
117                 for (int i = 0; i < tagValueList.size(); i++) {
118                     JSONObject item = tagValueList.getJSONObject(i);
2f03e2 119                     value = new BigDecimal(item.get(V).toString());
52487d 120                 }
L 121             }
122         }
123         return value;
124     }
125
2f03e2 126     public Map<String, Object> getLastValues(String sourceId, List<String> tagNames) {
e8ad66 127         Map<String, Object> result = new HashMap<>();
2f03e2 128         HttpApiEntity httpApi = this.getHttpApi(sourceId);
2e7f34 129         try {
D 130             if (CollectionUtils.isEmpty(tagNames)) {
131                 return result;
132             }
133             List<String> noCacheTagNames = new ArrayList<>();//未缓存的tag
134             for (int i = 0; i < tagNames.size(); i++) {
135                 //先查缓存
136                 BoundHashOperations<String, String, Object> ops = redisTemplate.boundHashOps(tagNames.get(i));
2f03e2 137                 if (ops.get(V) != null) {
138                     BigDecimal value = new BigDecimal(ops.get(V).toString());
2e7f34 139                     result.put(tagNames.get(i), value.setScale(3, RoundingMode.HALF_UP));
D 140                 } else {
141                     noCacheTagNames.add(tagNames.get(i));
142                 }
143             }
144             if (CollectionUtils.isEmpty(noCacheTagNames)) {
145                 log.info("全部读取缓存");
146                 return result;
147             }
148             log.info("查询未缓存的数据");
149             Gson gson = new Gson();
150             String tagSb = gson.toJson(noCacheTagNames);
151             log.info("body=====" + tagSb);
2f03e2 152             String currentDate = DateUtils.format(new Date(), pattern);
2e7f34 153             String responseStr = "";
d7fd46 154             responseStr = HttpUtils.sendPost(httpApi.getUrl().replace("getPointdatasAvg", "getPointslast") + "/" + currentDate, tagSb);
2e7f34 155             JSONObject responseObj = JSON.parseObject(responseStr);
2f03e2 156             if (STA_TRUE.equals(responseObj.get(IS_SUCCESS).toString())) {
2e7f34 157                 JSONArray tagValueList = responseObj.getJSONArray("data");
D 158                 if (!CollectionUtils.isEmpty(tagValueList)) {
159                     for (int i = 0; i < tagValueList.size(); i++) {
160                         JSONObject item = tagValueList.getJSONObject(i);
2f03e2 161                         if (item.get(V) != null) {
2e7f34 162                             //存缓存
2f03e2 163                             BoundHashOperations<String, String, Object> ops = redisTemplate.boundHashOps(item.get(V).toString());
164                             ops.put(V, item.get(V).toString());
2e7f34 165                             //设置过期时间
2f03e2 166                             redisTemplate.expire(item.get(N).toString(), 10, TimeUnit.SECONDS);
2e7f34 167                             //把查询到的数据插入结果集
2f03e2 168                             BigDecimal value = new BigDecimal(item.get(V).toString());
169                             result.put(item.get(N).toString(), value.setScale(3, RoundingMode.HALF_UP));
2e7f34 170                         } else {
2f03e2 171                             result.put(item.get(N).toString(), CommonConstant.BAD_VALUE);
2e7f34 172                         }
e8ad66 173                     }
D 174                 }
175             }
2e7f34 176
D 177         } catch (Exception ex) {
178             log.info("getCurrentValue异常");
179             ex.printStackTrace();
180             throw ex;
e8ad66 181         }
D 182         return result;
183     }
184
0a2804 185     public Map<String, Object> getTagValues(List<Object[]> params, Date collectTime) {
186         Map<String, Object> result = new HashMap<>();
52487d 187         if (CollectionUtils.isEmpty(params)) {
L 188             return new HashMap<>();
189         }
0a2804 190         try {
191             Map<Integer, List<Object[]>> measurePointsCountGroup = new HashMap<>();
192             int pointListSize = params.size();
193             int groupCount = pointListSize / GROUP_MAX_COUNT + ((pointListSize % GROUP_MAX_COUNT) > 0 ? 1 : 0);
194             log.info("groupCount=" + groupCount);
195             for (int i = 0; i < groupCount; i++) {
196                 int end = (i + 1) * GROUP_MAX_COUNT;
197                 if (end > pointListSize) {
198                     end = pointListSize;
199                 }
200                 measurePointsCountGroup.put(i, params.subList(i * GROUP_MAX_COUNT, end));
201             }
202             log.info("measurePointsCountGroup.size()=" + measurePointsCountGroup.size());
203             result = new ConcurrentHashMap<>(params.size());
204             CountDownLatch countDownLatch = new CountDownLatch(measurePointsCountGroup.size());
205             for (Map.Entry<Integer, List<Object[]>> measurePointsItem : measurePointsCountGroup.entrySet()) {
206                 HttpApiEntity httpApi = this.getHttpApi(measurePointsItem.getValue().get(0)[0].toString());
207                 // 并发
1e563e 208                 Thread.sleep(1000);
0a2804 209                 threadPool.submit(new Task(httpApi.getUrl(), httpApi.getCode(), result, measurePointsItem.getValue(),
210                         collectTime, countDownLatch));
211                 // 顺序
212                 //this.getByHtp(result, measurePointsItem.getValue(), collectTime);
52487d 213             }
0a2804 214             countDownLatch.await(MAX_WAIT, TimeUnit.SECONDS);
215
216         } catch (Exception ex) {
217             ex.printStackTrace();
52487d 218         }
L 219         return result;
220     }
221
0a2804 222     /**
223      * 异步采集任务
224      */
225     private class Task implements Runnable {
226         String url;
227         String sourceName;
228         Date collectTime;
229         Map<String, Object> result;
230         List<Object[]> params;
231         CountDownLatch countDownLatch;
232
233         public Task(String url, String sourceName, Map<String, Object> result, List<Object[]> params,
234                     Date collectTime, CountDownLatch countDownLatch) {
235             this.url = url;
236             this.sourceName = sourceName;
237             this.result = result;
238             this.collectTime = collectTime;
239             this.params = params;
240             this.countDownLatch = countDownLatch;
241         }
242
243         @Override
244         public void run() {
245             try {
246                 log.info("请求的Tag数量:" + params.size());
247                 this.getByHtp(url, sourceName, result, params, collectTime);
248                 log.info("请求结束:url=" + url);
249             } catch (Exception ex) {
250                 log.info("获取采集值失败," + ex.getMessage());
251                 ex.printStackTrace();
252             } finally {
253                 countDownLatch.countDown();
254             }
255         }
256
257         /**
258          * getTagDataByHttp
259          *
260          * @param url
261          * @param sourceName
262          * @param result
263          * @param params
264          * @param collectTime
265          */
266         private void getByHtp(String url, String sourceName, Map<String, Object> result, List<Object[]> params, Date collectTime) {
267             StringBuilder tagSb = new StringBuilder();
268             tagSb.append("[");
269             for (int i = 0; i < params.size(); i++) {
2228b6 270                 Map<String, Object> queryParams = new HashMap<>(3);
0a2804 271                 queryParams.put(N, params.get(i)[1]);
272                 queryParams.put(D, params.get(i)[2]);
273                 queryParams.put(P, params.get(i)[3]);
274                 String jsonString = JSON.toJSONString(queryParams);
275                 tagSb.append(jsonString);
276                 if (i < params.size() - 1) {
277                     tagSb.append(",");
278                 }
279             }
280             tagSb.append("]");
281             log.info("body=====" + tagSb);
282             String currentDate = DateUtils.format(collectTime, pattern);
d7fd46 283             String responseStr = HttpUtils.sendPost(url + "/" + currentDate, tagSb.toString());
0a2804 284             JSONObject responseObj = JSON.parseObject(responseStr);
285             log.info("responseObj=====" + responseObj.toJSONString());
286             if (STA_TRUE.equals(responseObj.get(IS_SUCCESS).toString())) {
287                 JSONArray tagValueList = responseObj.getJSONArray("data");
288                 if (!CollectionUtils.isEmpty(tagValueList)) {
289                     for (int i = 0; i < tagValueList.size(); i++) {
290                         JSONObject item = tagValueList.getJSONObject(i);
291                         result.put(TagUtils.genTagId(DataSourceType.HTTP.getCode(), sourceName, item.get(N).toString()), item.get(V));
292                     }
293                 }
294             }
295         }
296     }
297
298     private void getByHtp(String url, String sourceName, Map<String, Object> result, List<Object[]> params, Date collectTime) {
52487d 299         StringBuilder tagSb = new StringBuilder();
L 300         tagSb.append("[");
d41f14 301         for (int i = 0; i < params.size(); i++) {
52487d 302             Map<String, Object> queryParams = new HashMap<>();
2f03e2 303             queryParams.put(N, params.get(i)[1]);
304             queryParams.put(D, params.get(i)[2]);
305             queryParams.put(P, params.get(i)[3]);
d41f14 306             String jsonString = JSON.toJSONString(queryParams);
52487d 307             tagSb.append(jsonString);
L 308             if (i < params.size() - 1) {
309                 tagSb.append(",");
310             }
311         }
312         tagSb.append("]");
0a2804 313         log.info("body=====" + tagSb);
314         String currentDate = DateUtils.format(collectTime, pattern);
d7fd46 315         String responseStr = HttpUtils.sendPost(url + "/" + currentDate, tagSb.toString());
52487d 316         JSONObject responseObj = JSON.parseObject(responseStr);
L 317         log.info("responseObj=====" + responseObj.toJSONString());
2f03e2 318         if (STA_TRUE.equals(responseObj.get(IS_SUCCESS).toString())) {
52487d 319             JSONArray tagValueList = responseObj.getJSONArray("data");
L 320             if (!CollectionUtils.isEmpty(tagValueList)) {
321                 for (int i = 0; i < tagValueList.size(); i++) {
322                     JSONObject item = tagValueList.getJSONObject(i);
0a2804 323                     result.put(TagUtils.genTagId(DataSourceType.HTTP.getCode(), sourceName, item.get(N).toString()), item.get(V));
52487d 324                 }
L 325             }
326         }
327     }
328 }