From 057d17fad3bf7143d5aa07a5dbd9cd3451b8a66c Mon Sep 17 00:00:00 2001 From: 潘志宝 <979469083@qq.com> Date: 星期一, 17 三月 2025 10:19:21 +0800 Subject: [PATCH] 采集超时使用最近的值 --- iailab-module-data/iailab-module-data-biz/src/main/java/com/iailab/module/data/channel/http/collector/ihdb/HttpCollectorForIhd.java | 72 ++++++++++++++++++++++++++---------- iailab-module-data/iailab-module-data-biz/src/main/java/com/iailab/module/data/point/collection/PointCollector.java | 31 +-------------- 2 files changed, 54 insertions(+), 49 deletions(-) diff --git a/iailab-module-data/iailab-module-data-biz/src/main/java/com/iailab/module/data/channel/http/collector/ihdb/HttpCollectorForIhd.java b/iailab-module-data/iailab-module-data-biz/src/main/java/com/iailab/module/data/channel/http/collector/ihdb/HttpCollectorForIhd.java index d8e42b4..da14d16 100644 --- a/iailab-module-data/iailab-module-data-biz/src/main/java/com/iailab/module/data/channel/http/collector/ihdb/HttpCollectorForIhd.java +++ b/iailab-module-data/iailab-module-data-biz/src/main/java/com/iailab/module/data/channel/http/collector/ihdb/HttpCollectorForIhd.java @@ -56,11 +56,15 @@ private static final int GROUP_MAX_COUNT = 300; - private static final int MAX_WAIT = 40; + private static final int MAX_WAIT = 45; private static final String pattern = "yyyyMMddHHmm00"; private static final String IS_SUCCESS = "isSuccess"; + + public static final String TIV = "TagIdValue:"; + + public static final long offset = 60 * 10L; /** * tagName @@ -138,7 +142,7 @@ List<String> noCacheTagNames = new ArrayList<>();//未缓存的tag for (int i = 0; i < tagNames.size(); i++) { //先查缓存 - BoundHashOperations<String, String, Object> ops = redisTemplate.boundHashOps(tagNames.get(i)); + BoundHashOperations<String, String, Object> ops = redisTemplate.boundHashOps("IhdTag:" + tagNames.get(i)); if (ops.get(V) != null) { BigDecimal value = new BigDecimal(ops.get(V).toString()); result.put(tagNames.get(i), value.setScale(3, RoundingMode.HALF_UP)); @@ -153,7 +157,6 @@ log.info("查询未缓存的数据"); Gson gson = new Gson(); String tagSb = gson.toJson(noCacheTagNames); - log.info("body=====" + tagSb); String currentDate = DateUtils.format(new Date(), pattern); String responseStr = ""; responseStr = HttpUtils.sendPost(httpApi.getUrl().replace("getPointdatasAvg", "getPointslast") + "/" + currentDate, tagSb); @@ -161,24 +164,23 @@ if (STA_TRUE.equals(responseObj.get(IS_SUCCESS).toString())) { JSONArray tagValueList = responseObj.getJSONArray("data"); if (!CollectionUtils.isEmpty(tagValueList)) { - for (int i = 0; i < tagValueList.size(); i++) { - JSONObject item = tagValueList.getJSONObject(i); - if (item.get(V) != null) { - //存缓存 - BoundHashOperations<String, String, Object> ops = redisTemplate.boundHashOps(item.get(V).toString()); - ops.put(V, item.get(V).toString()); - //设置过期时间 - redisTemplate.expire(item.get(N).toString(), 10, TimeUnit.SECONDS); - //把查询到的数据插入结果集 - BigDecimal value = new BigDecimal(item.get(V).toString()); - result.put(item.get(N).toString(), value.setScale(3, RoundingMode.HALF_UP)); - } else { - result.put(item.get(N).toString(), CommonConstant.BAD_VALUE); - } + return result; + } + for (int i = 0; i < tagValueList.size(); i++) { + JSONObject item = tagValueList.getJSONObject(i); + if (item.get(V) == null) { + continue; } + //存缓存 + BoundHashOperations<String, String, Object> ops = redisTemplate.boundHashOps("IhdTag:" + item.get(N).toString()); + ops.put(V, item.get(V).toString()); + //设置过期时间 + redisTemplate.expire(item.get(N).toString(), 10, TimeUnit.SECONDS); + //把查询到的数据插入结果集 + BigDecimal value = new BigDecimal(item.get(V).toString()); + result.put(item.get(N).toString(), value.setScale(3, RoundingMode.HALF_UP)); } } - } catch (Exception ex) { log.info("getCurrentValue异常"); ex.printStackTrace(); @@ -207,17 +209,33 @@ log.info("measurePointsCountGroup.size()=" + measurePointsCountGroup.size()); result = new ConcurrentHashMap<>(params.size()); CountDownLatch countDownLatch = new CountDownLatch(measurePointsCountGroup.size()); + List<Integer> finishedGroup = new CopyOnWriteArrayList<>(); + List<Integer> notFinish = new CopyOnWriteArrayList<>(); for (Map.Entry<Integer, List<Object[]>> measurePointsItem : measurePointsCountGroup.entrySet()) { HttpApiEntity httpApi = this.getHttpApi(measurePointsItem.getValue().get(0)[0].toString()); // 并发 Thread.sleep(500); threadPool.submit(new Task(httpApi.getUrl(), httpApi.getCode(), result, measurePointsItem.getValue(), - collectTime, countDownLatch)); + collectTime, countDownLatch, finishedGroup, measurePointsItem.getKey())); // 顺序 //this.getByHtp(result, measurePointsItem.getValue(), collectTime); } countDownLatch.await(MAX_WAIT, TimeUnit.SECONDS); + for (Map.Entry<String, Object> itemValue : result.entrySet()) { + redisTemplate.opsForValue().set(TIV + itemValue.getKey(), itemValue.getValue(), offset, TimeUnit.SECONDS); + } + //超时使用LastValue + for (Map.Entry<Integer, List<Object[]>> measurePointsItem : measurePointsCountGroup.entrySet()) { + if (finishedGroup.contains(measurePointsItem.getKey())) { + continue; + } + notFinish.add(measurePointsItem.getKey()); + HttpApiEntity httpApi = this.getHttpApi(measurePointsItem.getValue().get(0)[0].toString()); + this.setOut(httpApi.getUrl(), httpApi.getCode(), result, measurePointsItem.getValue(), + collectTime); + } + log.info("notFinish=" + JSONArray.toJSONString(notFinish)); } catch (Exception ex) { ex.printStackTrace(); } @@ -234,15 +252,19 @@ Map<String, Object> result; List<Object[]> params; CountDownLatch countDownLatch; + List<Integer> finishedGroup; + Integer groupKey; public Task(String url, String sourceName, Map<String, Object> result, List<Object[]> params, - Date collectTime, CountDownLatch countDownLatch) { + Date collectTime, CountDownLatch countDownLatch, List<Integer> finishedGroup, Integer groupKey) { this.url = url; this.sourceName = sourceName; this.result = result; this.collectTime = collectTime; this.params = params; this.countDownLatch = countDownLatch; + this.finishedGroup = finishedGroup; + this.groupKey = groupKey; } @Override @@ -256,6 +278,7 @@ ex.printStackTrace(); } finally { countDownLatch.countDown(); + finishedGroup.add(groupKey); } } @@ -332,4 +355,13 @@ } } } + + private void setOut(String url, String sourceName, Map<String, Object> result, List<Object[]> params, Date collectTime) { + for (int i = 0; i < params.size(); i++) { + String tagId = TIV + TagUtils.genTagId(DataSourceType.HTTP.getCode(), sourceName, params.get(i)[1].toString()); + if(redisTemplate.hasKey(tagId)) { + result.put(TagUtils.genTagId(DataSourceType.HTTP.getCode(), sourceName, params.get(i)[1].toString()), redisTemplate.opsForValue().get(tagId)); + } + } + } } \ No newline at end of file diff --git a/iailab-module-data/iailab-module-data-biz/src/main/java/com/iailab/module/data/point/collection/PointCollector.java b/iailab-module-data/iailab-module-data-biz/src/main/java/com/iailab/module/data/point/collection/PointCollector.java index 4c5eee6..69c1412 100644 --- a/iailab-module-data/iailab-module-data-biz/src/main/java/com/iailab/module/data/point/collection/PointCollector.java +++ b/iailab-module-data/iailab-module-data-biz/src/main/java/com/iailab/module/data/point/collection/PointCollector.java @@ -101,40 +101,13 @@ List<DaPointDTO> pointMeasureList = daPointService.getMeasurePoint(minfreq); pointValues.addAll(measureHandle.handle(collectTime, pointMeasureList, dataMap, listGood, listBad)); - List<String> listBadNew = new ArrayList<>(); - if (!CollectionUtils.isEmpty(listBad)) { - log.info("BAD点值修复"); - List<InfluxPointValuePOJO> influxParams = new ArrayList<>(); - for (String bad : listBad) { - DaPointDTO daPointDTO = daPointService.getByNo(bad); - InfluxPointValuePOJO pojo = new InfluxPointValuePOJO(); - pojo.setPoint(bad); - pojo.setType(daPointDTO.getDataType()); - influxParams.add(pojo); - } - Map<String, Object> lastValue = influxDBService.queryPointsLastValue(influxParams); - log.info("lastValue=" + JSONObject.toJSONString(lastValue)); - if (!CollectionUtils.isEmpty(lastValue)) { - for (String bad : listBad) { - if (lastValue.containsKey(bad)) { - listGood.add(bad); - dataMap.put(bad, lastValue.get(bad)); - } else { - listBadNew.add(bad); - } - } - } else { - listBadNew = listBad; - } - } - log.info("读取计算点"); List<DaPointDTO> pointCalculateList = daPointService.getMathPoint(minfreq); - pointValues.addAll(calculateHandle.handle(collectTime, pointCalculateList, dataMap, listGood, listBadNew)); + pointValues.addAll(calculateHandle.handle(collectTime, pointCalculateList, dataMap, listGood, listBad)); log.info("读取累计点"); List<DaPointDTO> pointCumulateList = daPointService.getCumulatePoint(minfreq); - pointValues.addAll(cumulateHandle.handle(collectTime, pointCumulateList, listGood, listBadNew)); + pointValues.addAll(cumulateHandle.handle(collectTime, pointCumulateList, listGood, listBad)); log.info("存入时序库"); influxDBService.asyncWritePointValues(pointValues); -- Gitblit v1.9.3