| | |
| | | import com.iailab.module.data.common.utils.TagUtils; |
| | | import lombok.extern.slf4j.Slf4j; |
| | | import org.springframework.beans.factory.annotation.Autowired; |
| | | import org.springframework.data.redis.core.BoundHashOperations; |
| | | import org.springframework.data.redis.core.RedisTemplate; |
| | | import org.springframework.stereotype.Component; |
| | | import org.springframework.util.CollectionUtils; |
| | | |
| | | import java.math.BigDecimal; |
| | | import java.math.RoundingMode; |
| | | import java.util.*; |
| | | import java.util.concurrent.*; |
| | | import java.util.stream.Collectors; |
| | | |
| | | /** |
| | | * AnSteelDB采集 |
| | |
| | | @Autowired |
| | | private RedisTemplate redisTemplate; |
| | | |
| | | ThreadPoolExecutor threadPool = new ThreadPoolExecutor(18, 36, 30, TimeUnit.SECONDS, |
| | | new ArrayBlockingQueue<Runnable>(36), new ThreadPoolExecutor.AbortPolicy()); |
| | | |
| | | private static final String STA_TRUE = "true"; |
| | | |
| | | private static final int GROUP_MAX_COUNT = 300; |
| | | |
| | | private static final int MAX_WAIT = 30; |
| | | |
| | | private static final String pattern = "yyyyMMddHHmm00"; |
| | | |
| | | private static final String IS_SUCCESS = "isSuccess"; |
| | | |
| | | /** |
| | | * tagName |
| | | */ |
| | | private static final String N = "n"; |
| | | |
| | | /** |
| | | * dimension |
| | | */ |
| | | private static final String D = "d"; |
| | | |
| | | /** |
| | | * 类型 |
| | | */ |
| | | private static final String P = "p"; |
| | | |
| | | /** |
| | | * dataValue |
| | | */ |
| | | private static final String V = "v"; |
| | | |
| | | /** |
| | | * dataTime |
| | | */ |
| | | private static final String T = "t"; |
| | | |
| | | /** |
| | | * 数据质量G:good,B:bad |
| | | */ |
| | | private static final String Q = "q"; |
| | | public static final long offset = 10; |
| | | |
| | | private HttpApiEntity getHttpApi(String id) { |
| | | if (apiMap.containsKey(id)) { |
| | |
| | | return httpApi; |
| | | } |
| | | |
| | | public BigDecimal getTagValue(String sourceId, String tagNo, Integer dimension, String valueType) { |
| | | public BigDecimal getTagValue(String sourceId, String tagNo) { |
| | | BigDecimal value = CommonConstant.BAD_VALUE; |
| | | HttpApiEntity httpApi = this.getHttpApi(sourceId); |
| | | String responseStr = HttpUtils.sendGet(httpApi.getUrl(), null, ""); |
| | | if (responseStr == null) { |
| | | return value; |
| | | //先查缓存 |
| | | String catchKey = "IailabData:" + httpApi.getCode() + ":" + tagNo; |
| | | if (redisTemplate.hasKey(catchKey)) { |
| | | log.info("查找IailabData缓存: " + catchKey); |
| | | return new BigDecimal(redisTemplate.opsForValue().get(catchKey).toString()); |
| | | } |
| | | String responseStr = HttpUtils.sendGet(httpApi.getUrl(), null, ""); |
| | | List<HttpAsdbRespDataVO> dataList = JSON.parseArray(responseStr, HttpAsdbRespDataVO.class); |
| | | value = Objects.requireNonNull(dataList.stream().filter(data -> tagNo.equals(data.getPoint())).findFirst().orElse(null)).getValue(); |
| | | log.info("存入IailabData缓存: " + catchKey); |
| | | dataList.forEach(item -> { |
| | | redisTemplate.opsForValue().set(catchKey, item.getValue().toString(), offset, TimeUnit.SECONDS); |
| | | }); |
| | | for (HttpAsdbRespDataVO data : dataList){ |
| | | if (tagNo.equals(data.getPoint())){ |
| | | value = data.getValue(); |
| | | break; |
| | | } |
| | | } |
| | | return value; |
| | | } |
| | | |
| | | public Map<String, Object> getLastValues(String sourceId, List<String> tagNames) { |
| | | public Map<String, Object> getTagValues(String sourceId, List<String> tagNames) { |
| | | Map<String, Object> result = new HashMap<>(); |
| | | HttpApiEntity httpApi = this.getHttpApi(sourceId); |
| | | try { |
| | | if (CollectionUtils.isEmpty(tagNames)) { |
| | | return result; |
| | | } |
| | | List<String> noCacheTagNames = new ArrayList<>();//未缓存的tag |
| | | for (int i = 0; i < tagNames.size(); i++) { |
| | | //先查缓存 |
| | | BoundHashOperations<String, String, Object> ops = redisTemplate.boundHashOps(tagNames.get(i)); |
| | | if (ops.get(V) != null) { |
| | | BigDecimal value = new BigDecimal(ops.get(V).toString()); |
| | | result.put(tagNames.get(i), value.setScale(3, RoundingMode.HALF_UP)); |
| | | } else { |
| | | noCacheTagNames.add(tagNames.get(i)); |
| | | } |
| | | } |
| | | if (CollectionUtils.isEmpty(noCacheTagNames)) { |
| | | log.info("全部读取缓存"); |
| | | return result; |
| | | } |
| | | log.info("查询未缓存的数据"); |
| | | String responseStr = HttpUtils.sendGet(httpApi.getUrl(), null, ""); |
| | | List<HttpAsdbRespDataVO> dataList = JSON.parseArray(responseStr, HttpAsdbRespDataVO.class); |
| | | List<HttpAsdbRespDataVO> noCacheDataList = dataList.stream().filter(data -> noCacheTagNames.contains(data.getPoint())).collect(Collectors.toList()); |
| | | for (HttpAsdbRespDataVO data : noCacheDataList ){ |
| | | if (data.getValue() != null){ |
| | | //存缓存 |
| | | BoundHashOperations<String, String, Object> ops = redisTemplate.boundHashOps(data.getValue().toString()); |
| | | ops.put(V, data.getValue().toString()); |
| | | //设置过期时间 |
| | | redisTemplate.expire(data.getValue().toString(), 10, TimeUnit.SECONDS); |
| | | //把查询到的数据插入结果集 |
| | | BigDecimal value = new BigDecimal(data.getValue().toString()); |
| | | result.put(data.getPoint(), value.setScale(3, RoundingMode.HALF_UP)); |
| | | } else { |
| | | result.put(data.getPoint(), CommonConstant.BAD_VALUE); |
| | | } |
| | | for (String tagName : tagNames) { |
| | | result.put(tagName, getTagValue(sourceId, tagName)); |
| | | } |
| | | } catch (Exception ex) { |
| | | log.info("getCurrentValue异常"); |
| | |
| | | return new HashMap<>(); |
| | | } |
| | | try { |
| | | Map<Integer, List<Object[]>> measurePointsCountGroup = new HashMap<>(); |
| | | int pointListSize = params.size(); |
| | | int groupCount = pointListSize / GROUP_MAX_COUNT + ((pointListSize % GROUP_MAX_COUNT) > 0 ? 1 : 0); |
| | | log.info("groupCount=" + groupCount); |
| | | for (int i = 0; i < groupCount; i++) { |
| | | int end = (i + 1) * GROUP_MAX_COUNT; |
| | | if (end > pointListSize) { |
| | | end = pointListSize; |
| | | } |
| | | measurePointsCountGroup.put(i, params.subList(i * GROUP_MAX_COUNT, end)); |
| | | } |
| | | log.info("measurePointsCountGroup.size()=" + measurePointsCountGroup.size()); |
| | | result = new ConcurrentHashMap<>(params.size()); |
| | | CountDownLatch countDownLatch = new CountDownLatch(measurePointsCountGroup.size()); |
| | | for (Map.Entry<Integer, List<Object[]>> measurePointsItem : measurePointsCountGroup.entrySet()) { |
| | | HttpApiEntity httpApi = this.getHttpApi(measurePointsItem.getValue().get(0)[0].toString()); |
| | | // 并发 |
| | | Thread.sleep(1000); |
| | | threadPool.submit(new Task(httpApi.getUrl(), httpApi.getCode(), result, measurePointsItem.getValue(), |
| | | collectTime, countDownLatch)); |
| | | // 顺序 |
| | | //this.getByHtp(result, measurePointsItem.getValue(), collectTime); |
| | | } |
| | | countDownLatch.await(MAX_WAIT, TimeUnit.SECONDS); |
| | | |
| | | HttpApiEntity httpApi = this.getHttpApi(params.get(0)[0].toString()); |
| | | this.getByHtp(httpApi.getUrl(), httpApi.getCode(), result, params); |
| | | } catch (Exception ex) { |
| | | ex.printStackTrace(); |
| | | } |
| | | return result; |
| | | } |
| | | |
| | | /** |
| | | * 异步采集任务 |
| | | */ |
| | | private class Task implements Runnable { |
| | | String url; |
| | | String sourceName; |
| | | Date collectTime; |
| | | Map<String, Object> result; |
| | | List<Object[]> params; |
| | | CountDownLatch countDownLatch; |
| | | |
| | | public Task(String url, String sourceName, Map<String, Object> result, List<Object[]> params, |
| | | Date collectTime, CountDownLatch countDownLatch) { |
| | | this.url = url; |
| | | this.sourceName = sourceName; |
| | | this.result = result; |
| | | this.collectTime = collectTime; |
| | | this.params = params; |
| | | this.countDownLatch = countDownLatch; |
| | | private void getByHtp(String url, String sourceName, Map<String, Object> result, List<Object[]> params) { |
| | | String responseStr = HttpUtils.sendGet(url, null, ""); |
| | | List<HttpAsdbRespDataVO> dataList = JSON.parseArray(responseStr, HttpAsdbRespDataVO.class); |
| | | Map<String, HttpAsdbRespDataVO> valueGroup = new HashMap<>(); |
| | | for (HttpAsdbRespDataVO data : dataList) { |
| | | valueGroup.put(data.getPoint(), data); |
| | | } |
| | | |
| | | @Override |
| | | public void run() { |
| | | try { |
| | | log.info("请求的Tag数量:" + params.size()); |
| | | this.getByHtp(url, sourceName, result, params, collectTime); |
| | | log.info("请求结束:url=" + url); |
| | | } catch (Exception ex) { |
| | | log.info("获取采集值失败," + ex.getMessage()); |
| | | ex.printStackTrace(); |
| | | } finally { |
| | | countDownLatch.countDown(); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * getTagDataByHttp |
| | | * |
| | | * @param url |
| | | * @param sourceName |
| | | * @param result |
| | | * @param params |
| | | * @param collectTime |
| | | */ |
| | | private void getByHtp(String url, String sourceName, Map<String, Object> result, List<Object[]> params, Date collectTime) { |
| | | String responseStr = HttpUtils.sendGet(url, null, ""); |
| | | List<HttpAsdbRespDataVO> dataList = JSON.parseArray(responseStr, HttpAsdbRespDataVO.class); |
| | | for (HttpAsdbRespDataVO data : dataList){ |
| | | for (Object[] item : params) { |
| | | if (valueGroup.containsKey(item[1].toString())) { |
| | | HttpAsdbRespDataVO data = valueGroup.get(item[1].toString()); |
| | | result.put(TagUtils.genTagId(DataSourceType.HTTP.getCode(), sourceName, data.getPoint()), data.getValue()); |
| | | } |
| | | } |
| | | } |
| | | |
| | | private void getByHtp(String url, String sourceName, Map<String, Object> result, List<Object[]> params, Date collectTime) {String responseStr = HttpUtils.sendGet(url, null, ""); |
| | | List<HttpAsdbRespDataVO> dataList = JSON.parseArray(responseStr, HttpAsdbRespDataVO.class); |
| | | for (HttpAsdbRespDataVO data : dataList){ |
| | | result.put(TagUtils.genTagId(DataSourceType.HTTP.getCode(), sourceName, data.getPoint()), data.getValue()); |
| | | }} |
| | | } |