| | |
| | | import com.alibaba.fastjson.JSONObject; |
| | | import com.google.gson.Gson; |
| | | import com.iailab.framework.common.constant.CommonConstant; |
| | | import com.iailab.framework.common.util.http.HttpUtils; |
| | | import com.iailab.module.data.channel.http.entity.HttpApiEntity; |
| | | import com.iailab.module.data.channel.http.service.HttpApiService; |
| | | import com.iailab.module.data.common.enums.DataSourceType; |
| | | import com.iailab.module.data.common.utils.DateUtils; |
| | | import com.iailab.module.data.common.utils.HttpRequest; |
| | | import com.iailab.module.data.common.utils.TagUtils; |
| | | import lombok.extern.slf4j.Slf4j; |
| | | import org.springframework.beans.factory.annotation.Autowired; |
| | |
| | | |
| | | import java.math.BigDecimal; |
| | | import java.math.RoundingMode; |
| | | import java.sql.Timestamp; |
| | | import java.text.ParseException; |
| | | import java.text.SimpleDateFormat; |
| | | import java.time.LocalDateTime; |
| | | import java.util.*; |
| | | import java.util.concurrent.*; |
| | | import java.util.function.Function; |
| | | import java.util.stream.Collectors; |
| | | |
| | | /** |
| | | * iHyperDB采集 |
| | |
| | | |
| | | private static final String STA_TRUE = "true"; |
| | | |
| | | private static final int GROUP_MAX_COUNT = 90; |
| | | private static final int GROUP_MAX_COUNT = 300; |
| | | |
| | | private static final int MAX_WAIT = 30; |
| | | private static final int MAX_WAIT = 45; |
| | | |
| | | private static final String pattern = "yyyyMMddHHmm00"; |
| | | |
| | | private static final String IS_SUCCESS = "isSuccess"; |
| | | |
| | | private static final String RESP_DATA = "data"; |
| | | |
| | | public static final String TIV = "TagIdValue:"; |
| | | |
| | | public static final long offset = 60 * 10L; |
| | | |
| | | /** |
| | | * tagName |
| | |
| | | */ |
| | | private static final String T = "t"; |
| | | |
| | | /** |
| | | * 数据质量G:good,B:bad |
| | | */ |
| | | private static final String Q = "q"; |
| | | |
| | | private HttpApiEntity getHttpApi(String id) { |
| | | if (apiMap.containsKey(id)) { |
| | | return apiMap.get(id); |
| | |
| | | tagSb.append(jsonString); |
| | | tagSb.append("]"); |
| | | String currentDate = DateUtils.format(new Date(), pattern); |
| | | String responseStr = HttpRequest.sendPost(httpApi.getUrl() + "/" + currentDate, tagSb.toString()); |
| | | String responseStr = HttpUtils.sendPost(httpApi.getUrl() + "/" + currentDate, tagSb.toString()); |
| | | JSONObject responseObj = JSON.parseObject(responseStr); |
| | | if (STA_TRUE.equals(responseObj.get(IS_SUCCESS).toString())) { |
| | | JSONArray tagValueList = responseObj.getJSONArray("data"); |
| | | JSONArray tagValueList = responseObj.getJSONArray(RESP_DATA); |
| | | if (!CollectionUtils.isEmpty(tagValueList)) { |
| | | for (int i = 0; i < tagValueList.size(); i++) { |
| | | JSONObject item = tagValueList.getJSONObject(i); |
| | |
| | | return result; |
| | | } |
| | | List<String> noCacheTagNames = new ArrayList<>();//未缓存的tag |
| | | for (int i = 0; i < tagNames.size(); i++) { |
| | | for (String tagName : tagNames) { |
| | | //先查缓存 |
| | | BoundHashOperations<String, String, Object> ops = redisTemplate.boundHashOps(tagNames.get(i)); |
| | | BoundHashOperations<String, String, Object> ops = redisTemplate.boundHashOps("IhdTag:" + tagName); |
| | | if (ops.get(V) != null) { |
| | | BigDecimal value = new BigDecimal(ops.get(V).toString()); |
| | | result.put(tagNames.get(i), value.setScale(3, RoundingMode.HALF_UP)); |
| | | result.put(tagName, value.setScale(3, RoundingMode.HALF_UP)); |
| | | } else { |
| | | noCacheTagNames.add(tagNames.get(i)); |
| | | noCacheTagNames.add(tagName); |
| | | } |
| | | } |
| | | if (CollectionUtils.isEmpty(noCacheTagNames)) { |
| | |
| | | log.info("查询未缓存的数据"); |
| | | Gson gson = new Gson(); |
| | | String tagSb = gson.toJson(noCacheTagNames); |
| | | log.info("body=====" + tagSb); |
| | | String currentDate = DateUtils.format(new Date(), pattern); |
| | | String responseStr = ""; |
| | | responseStr = HttpRequest.sendPost(httpApi.getUrl().replace("getPointdatasAvg", "getPointslast") + "/" + currentDate, tagSb); |
| | | responseStr = HttpUtils.sendPost(httpApi.getUrl().replace("getPointdatasAvg", "getPointslast") + "/" + currentDate, tagSb); |
| | | JSONObject responseObj = JSON.parseObject(responseStr); |
| | | if (STA_TRUE.equals(responseObj.get(IS_SUCCESS).toString())) { |
| | | JSONArray tagValueList = responseObj.getJSONArray("data"); |
| | | if (!CollectionUtils.isEmpty(tagValueList)) { |
| | | for (int i = 0; i < tagValueList.size(); i++) { |
| | | JSONObject item = tagValueList.getJSONObject(i); |
| | | if (item.get(V) != null) { |
| | | //存缓存 |
| | | BoundHashOperations<String, String, Object> ops = redisTemplate.boundHashOps(item.get(V).toString()); |
| | | ops.put(V, item.get(V).toString()); |
| | | //设置过期时间 |
| | | redisTemplate.expire(item.get(N).toString(), 10, TimeUnit.SECONDS); |
| | | //把查询到的数据插入结果集 |
| | | BigDecimal value = new BigDecimal(item.get(V).toString()); |
| | | result.put(item.get(N).toString(), value.setScale(3, RoundingMode.HALF_UP)); |
| | | } else { |
| | | result.put(item.get(N).toString(), CommonConstant.BAD_VALUE); |
| | | } |
| | | } |
| | | } |
| | | if (!STA_TRUE.equals(responseObj.get(IS_SUCCESS).toString())) { |
| | | return result; |
| | | } |
| | | |
| | | JSONArray tagValueList = responseObj.getJSONArray(RESP_DATA); |
| | | if (CollectionUtils.isEmpty(tagValueList)) { |
| | | return result; |
| | | } |
| | | for (int i = 0; i < tagValueList.size(); i++) { |
| | | JSONObject item = tagValueList.getJSONObject(i); |
| | | if (item.get(V) == null) { |
| | | continue; |
| | | } |
| | | //存缓存 |
| | | BoundHashOperations<String, String, Object> ops = redisTemplate.boundHashOps("IhdTag:" + item.get(N).toString()); |
| | | ops.put(V, item.get(V).toString()); |
| | | //设置过期时间 |
| | | redisTemplate.expire(item.get(N).toString(), 10, TimeUnit.SECONDS); |
| | | //把查询到的数据插入结果集 |
| | | BigDecimal value = new BigDecimal(item.get(V).toString()); |
| | | result.put(item.get(N).toString(), value.setScale(3, RoundingMode.HALF_UP)); |
| | | } |
| | | } catch (Exception ex) { |
| | | log.info("getCurrentValue异常"); |
| | | ex.printStackTrace(); |
| | |
| | | log.info("measurePointsCountGroup.size()=" + measurePointsCountGroup.size()); |
| | | result = new ConcurrentHashMap<>(params.size()); |
| | | CountDownLatch countDownLatch = new CountDownLatch(measurePointsCountGroup.size()); |
| | | List<Integer> finishedGroup = new CopyOnWriteArrayList<>(); |
| | | List<Integer> notFinish = new CopyOnWriteArrayList<>(); |
| | | for (Map.Entry<Integer, List<Object[]>> measurePointsItem : measurePointsCountGroup.entrySet()) { |
| | | HttpApiEntity httpApi = this.getHttpApi(measurePointsItem.getValue().get(0)[0].toString()); |
| | | // 并发 |
| | | Thread.sleep(200); |
| | | Thread.sleep(500); |
| | | threadPool.submit(new Task(httpApi.getUrl(), httpApi.getCode(), result, measurePointsItem.getValue(), |
| | | collectTime, countDownLatch)); |
| | | collectTime, countDownLatch, finishedGroup, measurePointsItem.getKey())); |
| | | // 顺序 |
| | | //this.getByHtp(result, measurePointsItem.getValue(), collectTime); |
| | | } |
| | | countDownLatch.await(MAX_WAIT, TimeUnit.SECONDS); |
| | | for (Map.Entry<String, Object> itemValue : result.entrySet()) { |
| | | redisTemplate.opsForValue().set(TIV + itemValue.getKey(), itemValue.getValue(), offset, TimeUnit.SECONDS); |
| | | } |
| | | |
| | | //超时使用LastValue |
| | | for (Map.Entry<Integer, List<Object[]>> measurePointsItem : measurePointsCountGroup.entrySet()) { |
| | | if (finishedGroup.contains(measurePointsItem.getKey())) { |
| | | continue; |
| | | } |
| | | notFinish.add(measurePointsItem.getKey()); |
| | | HttpApiEntity httpApi = this.getHttpApi(measurePointsItem.getValue().get(0)[0].toString()); |
| | | this.setOut(httpApi.getUrl(), httpApi.getCode(), result, measurePointsItem.getValue(), |
| | | collectTime); |
| | | } |
| | | log.info("notFinish=" + JSONArray.toJSONString(notFinish)); |
| | | } catch (Exception ex) { |
| | | ex.printStackTrace(); |
| | | } |
| | |
| | | Map<String, Object> result; |
| | | List<Object[]> params; |
| | | CountDownLatch countDownLatch; |
| | | List<Integer> finishedGroup; |
| | | Integer groupKey; |
| | | |
| | | public Task(String url, String sourceName, Map<String, Object> result, List<Object[]> params, |
| | | Date collectTime, CountDownLatch countDownLatch) { |
| | | Date collectTime, CountDownLatch countDownLatch, List<Integer> finishedGroup, Integer groupKey) { |
| | | this.url = url; |
| | | this.sourceName = sourceName; |
| | | this.result = result; |
| | | this.collectTime = collectTime; |
| | | this.params = params; |
| | | this.countDownLatch = countDownLatch; |
| | | this.finishedGroup = finishedGroup; |
| | | this.groupKey = groupKey; |
| | | } |
| | | |
| | | @Override |
| | |
| | | ex.printStackTrace(); |
| | | } finally { |
| | | countDownLatch.countDown(); |
| | | finishedGroup.add(groupKey); |
| | | } |
| | | } |
| | | |
| | |
| | | StringBuilder tagSb = new StringBuilder(); |
| | | tagSb.append("["); |
| | | for (int i = 0; i < params.size(); i++) { |
| | | Map<String, Object> queryParams = new HashMap<>(); |
| | | Map<String, Object> queryParams = new HashMap<>(3); |
| | | queryParams.put(N, params.get(i)[1]); |
| | | queryParams.put(D, params.get(i)[2]); |
| | | queryParams.put(P, params.get(i)[3]); |
| | |
| | | tagSb.append("]"); |
| | | log.info("body=====" + tagSb); |
| | | String currentDate = DateUtils.format(collectTime, pattern); |
| | | String responseStr = HttpRequest.sendPost(url + "/" + currentDate, tagSb.toString()); |
| | | String responseStr = HttpUtils.sendPost(url + "/" + currentDate, tagSb.toString()); |
| | | JSONObject responseObj = JSON.parseObject(responseStr); |
| | | log.info("responseObj=====" + responseObj.toJSONString()); |
| | | if (STA_TRUE.equals(responseObj.get(IS_SUCCESS).toString())) { |
| | | JSONArray tagValueList = responseObj.getJSONArray("data"); |
| | | JSONArray tagValueList = responseObj.getJSONArray(RESP_DATA); |
| | | if (!CollectionUtils.isEmpty(tagValueList)) { |
| | | for (int i = 0; i < tagValueList.size(); i++) { |
| | | JSONObject item = tagValueList.getJSONObject(i); |
| | | result.put(TagUtils.genTagId(DataSourceType.HTTP.getCode(), sourceName, item.get(N).toString()), item.get(V)); |
| | | if (item.get(Q).toString().equals("G")) { |
| | | result.put(TagUtils.genTagId(DataSourceType.HTTP.getCode(), sourceName, item.get(N).toString()), item.get(V)); |
| | | } |
| | | } |
| | | } |
| | | } |
| | |
| | | tagSb.append("]"); |
| | | log.info("body=====" + tagSb); |
| | | String currentDate = DateUtils.format(collectTime, pattern); |
| | | String responseStr = HttpRequest.sendPost(url + "/" + currentDate, tagSb.toString()); |
| | | String responseStr = HttpUtils.sendPost(url + "/" + currentDate, tagSb.toString()); |
| | | JSONObject responseObj = JSON.parseObject(responseStr); |
| | | log.info("responseObj=====" + responseObj.toJSONString()); |
| | | if (STA_TRUE.equals(responseObj.get(IS_SUCCESS).toString())) { |
| | | JSONArray tagValueList = responseObj.getJSONArray("data"); |
| | | JSONArray tagValueList = responseObj.getJSONArray(RESP_DATA); |
| | | if (!CollectionUtils.isEmpty(tagValueList)) { |
| | | for (int i = 0; i < tagValueList.size(); i++) { |
| | | JSONObject item = tagValueList.getJSONObject(i); |
| | |
| | | } |
| | | } |
| | | } |
| | | |
| | | private void setOut(String url, String sourceName, Map<String, Object> result, List<Object[]> params, Date collectTime) { |
| | | for (int i = 0; i < params.size(); i++) { |
| | | String tagId = TIV + TagUtils.genTagId(DataSourceType.HTTP.getCode(), sourceName, params.get(i)[1].toString()); |
| | | if (redisTemplate.hasKey(tagId)) { |
| | | result.put(TagUtils.genTagId(DataSourceType.HTTP.getCode(), sourceName, params.get(i)[1].toString()), redisTemplate.opsForValue().get(tagId)); |
| | | } |
| | | } |
| | | } |
| | | } |