提交 | 用户 | 时间
|
e30eca
|
1 |
package com.iailab.module.data.channel.http.collector.asdb; |
J |
2 |
|
|
3 |
import com.alibaba.fastjson.JSON; |
|
4 |
import com.iailab.framework.common.constant.CommonConstant; |
|
5 |
import com.iailab.framework.common.util.http.HttpUtils; |
|
6 |
import com.iailab.module.data.channel.http.collector.asdb.vo.HttpAsdbRespDataVO; |
|
7 |
import com.iailab.module.data.channel.http.entity.HttpApiEntity; |
|
8 |
import com.iailab.module.data.channel.http.service.HttpApiService; |
|
9 |
import com.iailab.module.data.common.enums.DataSourceType; |
|
10 |
import com.iailab.module.data.common.utils.TagUtils; |
|
11 |
import lombok.extern.slf4j.Slf4j; |
|
12 |
import org.springframework.beans.factory.annotation.Autowired; |
|
13 |
import org.springframework.data.redis.core.BoundHashOperations; |
|
14 |
import org.springframework.data.redis.core.RedisTemplate; |
|
15 |
import org.springframework.stereotype.Component; |
|
16 |
import org.springframework.util.CollectionUtils; |
|
17 |
|
|
18 |
import java.math.BigDecimal; |
|
19 |
import java.math.RoundingMode; |
|
20 |
import java.util.*; |
|
21 |
import java.util.concurrent.*; |
|
22 |
import java.util.stream.Collectors; |
|
23 |
|
|
24 |
/** |
|
25 |
* AnSteelDB采集 |
|
26 |
* |
|
27 |
* @author Jay |
|
28 |
*/ |
|
29 |
@Slf4j |
|
30 |
@Component |
|
31 |
public class HttpCollectorForAsdb { |
|
32 |
private static Map<String, HttpApiEntity> apiMap = new HashMap<>(); |
|
33 |
|
|
34 |
@Autowired |
|
35 |
private HttpApiService httpApiService; |
|
36 |
|
|
37 |
@Autowired |
|
38 |
private RedisTemplate redisTemplate; |
|
39 |
|
|
40 |
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(18, 36, 30, TimeUnit.SECONDS, |
|
41 |
new ArrayBlockingQueue<Runnable>(36), new ThreadPoolExecutor.AbortPolicy()); |
|
42 |
|
|
43 |
private static final String STA_TRUE = "true"; |
|
44 |
|
|
45 |
private static final int GROUP_MAX_COUNT = 300; |
|
46 |
|
|
47 |
private static final int MAX_WAIT = 30; |
|
48 |
|
|
49 |
private static final String pattern = "yyyyMMddHHmm00"; |
|
50 |
|
|
51 |
private static final String IS_SUCCESS = "isSuccess"; |
|
52 |
|
|
53 |
/** |
|
54 |
* tagName |
|
55 |
*/ |
|
56 |
private static final String N = "n"; |
|
57 |
|
|
58 |
/** |
|
59 |
* dimension |
|
60 |
*/ |
|
61 |
private static final String D = "d"; |
|
62 |
|
|
63 |
/** |
|
64 |
* 类型 |
|
65 |
*/ |
|
66 |
private static final String P = "p"; |
|
67 |
|
|
68 |
/** |
|
69 |
* dataValue |
|
70 |
*/ |
|
71 |
private static final String V = "v"; |
|
72 |
|
|
73 |
/** |
|
74 |
* dataTime |
|
75 |
*/ |
|
76 |
private static final String T = "t"; |
|
77 |
|
|
78 |
/** |
|
79 |
* 数据质量G:good,B:bad |
|
80 |
*/ |
|
81 |
private static final String Q = "q"; |
|
82 |
|
|
83 |
private HttpApiEntity getHttpApi(String id) { |
|
84 |
if (apiMap.containsKey(id)) { |
|
85 |
return apiMap.get(id); |
|
86 |
} |
|
87 |
HttpApiEntity httpApi = httpApiService.info(id); |
|
88 |
apiMap.put(id, httpApi); |
|
89 |
return httpApi; |
|
90 |
} |
|
91 |
|
|
92 |
public BigDecimal getTagValue(String sourceId, String tagNo, Integer dimension, String valueType) { |
|
93 |
BigDecimal value = CommonConstant.BAD_VALUE; |
|
94 |
HttpApiEntity httpApi = this.getHttpApi(sourceId); |
|
95 |
String responseStr = HttpUtils.sendGet(httpApi.getUrl(), null, ""); |
|
96 |
if (responseStr == null) { |
|
97 |
return value; |
|
98 |
} |
|
99 |
List<HttpAsdbRespDataVO> dataList = JSON.parseArray(responseStr, HttpAsdbRespDataVO.class); |
|
100 |
value = Objects.requireNonNull(dataList.stream().filter(data -> tagNo.equals(data.getPoint())).findFirst().orElse(null)).getValue(); |
|
101 |
return value; |
|
102 |
} |
|
103 |
|
|
104 |
public Map<String, Object> getLastValues(String sourceId, List<String> tagNames) { |
|
105 |
Map<String, Object> result = new HashMap<>(); |
|
106 |
HttpApiEntity httpApi = this.getHttpApi(sourceId); |
|
107 |
try { |
|
108 |
if (CollectionUtils.isEmpty(tagNames)) { |
|
109 |
return result; |
|
110 |
} |
|
111 |
List<String> noCacheTagNames = new ArrayList<>();//未缓存的tag |
|
112 |
for (int i = 0; i < tagNames.size(); i++) { |
|
113 |
//先查缓存 |
|
114 |
BoundHashOperations<String, String, Object> ops = redisTemplate.boundHashOps(tagNames.get(i)); |
|
115 |
if (ops.get(V) != null) { |
|
116 |
BigDecimal value = new BigDecimal(ops.get(V).toString()); |
|
117 |
result.put(tagNames.get(i), value.setScale(3, RoundingMode.HALF_UP)); |
|
118 |
} else { |
|
119 |
noCacheTagNames.add(tagNames.get(i)); |
|
120 |
} |
|
121 |
} |
|
122 |
if (CollectionUtils.isEmpty(noCacheTagNames)) { |
|
123 |
log.info("全部读取缓存"); |
|
124 |
return result; |
|
125 |
} |
|
126 |
log.info("查询未缓存的数据"); |
|
127 |
String responseStr = HttpUtils.sendGet(httpApi.getUrl(), null, ""); |
|
128 |
List<HttpAsdbRespDataVO> dataList = JSON.parseArray(responseStr, HttpAsdbRespDataVO.class); |
|
129 |
List<HttpAsdbRespDataVO> noCacheDataList = dataList.stream().filter(data -> noCacheTagNames.contains(data.getPoint())).collect(Collectors.toList()); |
|
130 |
for (HttpAsdbRespDataVO data : noCacheDataList ){ |
|
131 |
if (data.getValue() != null){ |
|
132 |
//存缓存 |
|
133 |
BoundHashOperations<String, String, Object> ops = redisTemplate.boundHashOps(data.getValue().toString()); |
|
134 |
ops.put(V, data.getValue().toString()); |
|
135 |
//设置过期时间 |
|
136 |
redisTemplate.expire(data.getValue().toString(), 10, TimeUnit.SECONDS); |
|
137 |
//把查询到的数据插入结果集 |
|
138 |
BigDecimal value = new BigDecimal(data.getValue().toString()); |
|
139 |
result.put(data.getPoint(), value.setScale(3, RoundingMode.HALF_UP)); |
|
140 |
} else { |
|
141 |
result.put(data.getPoint(), CommonConstant.BAD_VALUE); |
|
142 |
} |
|
143 |
} |
|
144 |
} catch (Exception ex) { |
|
145 |
log.info("getCurrentValue异常"); |
|
146 |
ex.printStackTrace(); |
|
147 |
throw ex; |
|
148 |
} |
|
149 |
return result; |
|
150 |
} |
|
151 |
|
|
152 |
public Map<String, Object> getTagValues(List<Object[]> params, Date collectTime) { |
|
153 |
Map<String, Object> result = new HashMap<>(); |
|
154 |
if (CollectionUtils.isEmpty(params)) { |
|
155 |
return new HashMap<>(); |
|
156 |
} |
|
157 |
try { |
|
158 |
Map<Integer, List<Object[]>> measurePointsCountGroup = new HashMap<>(); |
|
159 |
int pointListSize = params.size(); |
|
160 |
int groupCount = pointListSize / GROUP_MAX_COUNT + ((pointListSize % GROUP_MAX_COUNT) > 0 ? 1 : 0); |
|
161 |
log.info("groupCount=" + groupCount); |
|
162 |
for (int i = 0; i < groupCount; i++) { |
|
163 |
int end = (i + 1) * GROUP_MAX_COUNT; |
|
164 |
if (end > pointListSize) { |
|
165 |
end = pointListSize; |
|
166 |
} |
|
167 |
measurePointsCountGroup.put(i, params.subList(i * GROUP_MAX_COUNT, end)); |
|
168 |
} |
|
169 |
log.info("measurePointsCountGroup.size()=" + measurePointsCountGroup.size()); |
|
170 |
result = new ConcurrentHashMap<>(params.size()); |
|
171 |
CountDownLatch countDownLatch = new CountDownLatch(measurePointsCountGroup.size()); |
|
172 |
for (Map.Entry<Integer, List<Object[]>> measurePointsItem : measurePointsCountGroup.entrySet()) { |
|
173 |
HttpApiEntity httpApi = this.getHttpApi(measurePointsItem.getValue().get(0)[0].toString()); |
|
174 |
// 并发 |
|
175 |
Thread.sleep(1000); |
|
176 |
threadPool.submit(new Task(httpApi.getUrl(), httpApi.getCode(), result, measurePointsItem.getValue(), |
|
177 |
collectTime, countDownLatch)); |
|
178 |
// 顺序 |
|
179 |
//this.getByHtp(result, measurePointsItem.getValue(), collectTime); |
|
180 |
} |
|
181 |
countDownLatch.await(MAX_WAIT, TimeUnit.SECONDS); |
|
182 |
|
|
183 |
} catch (Exception ex) { |
|
184 |
ex.printStackTrace(); |
|
185 |
} |
|
186 |
return result; |
|
187 |
} |
|
188 |
|
|
189 |
/** |
|
190 |
* 异步采集任务 |
|
191 |
*/ |
|
192 |
private class Task implements Runnable { |
|
193 |
String url; |
|
194 |
String sourceName; |
|
195 |
Date collectTime; |
|
196 |
Map<String, Object> result; |
|
197 |
List<Object[]> params; |
|
198 |
CountDownLatch countDownLatch; |
|
199 |
|
|
200 |
public Task(String url, String sourceName, Map<String, Object> result, List<Object[]> params, |
|
201 |
Date collectTime, CountDownLatch countDownLatch) { |
|
202 |
this.url = url; |
|
203 |
this.sourceName = sourceName; |
|
204 |
this.result = result; |
|
205 |
this.collectTime = collectTime; |
|
206 |
this.params = params; |
|
207 |
this.countDownLatch = countDownLatch; |
|
208 |
} |
|
209 |
|
|
210 |
@Override |
|
211 |
public void run() { |
|
212 |
try { |
|
213 |
log.info("请求的Tag数量:" + params.size()); |
|
214 |
this.getByHtp(url, sourceName, result, params, collectTime); |
|
215 |
log.info("请求结束:url=" + url); |
|
216 |
} catch (Exception ex) { |
|
217 |
log.info("获取采集值失败," + ex.getMessage()); |
|
218 |
ex.printStackTrace(); |
|
219 |
} finally { |
|
220 |
countDownLatch.countDown(); |
|
221 |
} |
|
222 |
} |
|
223 |
|
|
224 |
/** |
|
225 |
* getTagDataByHttp |
|
226 |
* |
|
227 |
* @param url |
|
228 |
* @param sourceName |
|
229 |
* @param result |
|
230 |
* @param params |
|
231 |
* @param collectTime |
|
232 |
*/ |
|
233 |
private void getByHtp(String url, String sourceName, Map<String, Object> result, List<Object[]> params, Date collectTime) { |
|
234 |
String responseStr = HttpUtils.sendGet(url, null, ""); |
|
235 |
List<HttpAsdbRespDataVO> dataList = JSON.parseArray(responseStr, HttpAsdbRespDataVO.class); |
|
236 |
for (HttpAsdbRespDataVO data : dataList){ |
|
237 |
result.put(TagUtils.genTagId(DataSourceType.HTTP.getCode(), sourceName, data.getPoint()), data.getValue()); |
|
238 |
} |
|
239 |
} |
|
240 |
} |
|
241 |
|
|
242 |
private void getByHtp(String url, String sourceName, Map<String, Object> result, List<Object[]> params, Date collectTime) {String responseStr = HttpUtils.sendGet(url, null, ""); |
|
243 |
List<HttpAsdbRespDataVO> dataList = JSON.parseArray(responseStr, HttpAsdbRespDataVO.class); |
|
244 |
for (HttpAsdbRespDataVO data : dataList){ |
|
245 |
result.put(TagUtils.genTagId(DataSourceType.HTTP.getCode(), sourceName, data.getPoint()), data.getValue()); |
|
246 |
}} |
|
247 |
} |