潘志宝
8 天以前 f3de04db06bae67537d093017e28863ee685f8a3
提交 | 用户 | 时间
e7c126 1 package com.iailab.gateway.filter.logging;
H 2
3 import cn.hutool.core.date.LocalDateTimeUtil;
4 import cn.hutool.core.map.MapUtil;
5 import cn.hutool.json.JSONUtil;
6 import com.iailab.framework.common.util.json.JsonUtils;
7 import com.iailab.gateway.util.SecurityFrameworkUtils;
8 import com.iailab.gateway.util.WebFrameworkUtils;
9 import com.alibaba.nacos.common.utils.StringUtils;
10 import lombok.extern.slf4j.Slf4j;
11 import org.reactivestreams.Publisher;
12 import org.springframework.beans.factory.annotation.Autowired;
13 import org.springframework.cloud.gateway.filter.GatewayFilterChain;
14 import org.springframework.cloud.gateway.filter.GlobalFilter;
15 import org.springframework.cloud.gateway.filter.factory.rewrite.CachedBodyOutputMessage;
16 import org.springframework.cloud.gateway.filter.factory.rewrite.ModifyRequestBodyGatewayFilterFactory;
17 import org.springframework.cloud.gateway.support.BodyInserterContext;
18 import org.springframework.cloud.gateway.support.ServerWebExchangeUtils;
19 import org.springframework.core.Ordered;
20 import org.springframework.core.io.buffer.DataBuffer;
21 import org.springframework.core.io.buffer.DataBufferFactory;
22 import org.springframework.core.io.buffer.DataBufferUtils;
23 import org.springframework.core.io.buffer.DefaultDataBufferFactory;
24 import org.springframework.http.HttpHeaders;
25 import org.springframework.http.MediaType;
26 import org.springframework.http.ReactiveHttpOutputMessage;
27 import org.springframework.http.codec.CodecConfigurer;
28 import org.springframework.http.server.reactive.ServerHttpRequest;
29 import org.springframework.http.server.reactive.ServerHttpRequestDecorator;
30 import org.springframework.http.server.reactive.ServerHttpResponse;
31 import org.springframework.http.server.reactive.ServerHttpResponseDecorator;
32 import org.springframework.stereotype.Component;
33 import org.springframework.web.reactive.function.BodyInserter;
34 import org.springframework.web.reactive.function.BodyInserters;
35 import org.springframework.web.reactive.function.server.ServerRequest;
36 import org.springframework.web.server.ServerWebExchange;
37 import reactor.core.publisher.Flux;
38 import reactor.core.publisher.Mono;
39
40 import javax.annotation.Resource;
41 import java.nio.charset.StandardCharsets;
42 import java.time.LocalDateTime;
43 import java.util.List;
44 import java.util.Map;
45
46 import static cn.hutool.core.date.DatePattern.NORM_DATETIME_MS_FORMATTER;
47
48 /**
49  * 网关的访问日志过滤器
50  *
51  * 从功能上,它类似 iailab-common-web 的 ApiAccessLogFilter 过滤器
52  *
53  * TODO iailab:如果网关执行异常,不会记录访问日志,后续研究下 https://github.com/Silvmike/webflux-demo/blob/master/tests/src/test/java/ru/hardcoders/demo/webflux/web_handler/filters/logging
54  *
55  * @author iailab
56  */
57 @Slf4j
58 @Component
59 public class AccessLogFilter implements GlobalFilter, Ordered {
60
61     @Resource
62     private CodecConfigurer codecConfigurer;
63
64     /**
65      * 打印日志
66      *
67      * @param gatewayLog 网关日志
68      */
69     private void writeAccessLog(AccessLog gatewayLog) {
70         // 方式一:打印 Logger 后,通过 ELK 进行收集
71         // log.info("[writeAccessLog][日志内容:{}]", JsonUtils.toJsonString(gatewayLog));
72
73         // 方式二:调用远程服务,记录到数据库中
74         // TODO iailab:暂未实现
75
76         // 方式三:打印到控制台,方便排查错误
77         Map<String, Object> values = MapUtil.newHashMap(15, true); // 手工拼接,保证排序;15 保证不用扩容
78         values.put("userId", gatewayLog.getUserId());
79         values.put("userType", gatewayLog.getUserType());
80         values.put("routeId", gatewayLog.getRoute() != null ? gatewayLog.getRoute().getId() : null);
81         values.put("schema", gatewayLog.getSchema());
82         values.put("requestUrl", gatewayLog.getRequestUrl());
83         values.put("queryParams", gatewayLog.getQueryParams().toSingleValueMap());
84         values.put("requestBody", JsonUtils.isJson(gatewayLog.getRequestBody()) ? // 保证 body 的展示好看
85                 JSONUtil.parse(gatewayLog.getRequestBody()) : gatewayLog.getRequestBody());
86         values.put("requestHeaders", JsonUtils.toJsonString(gatewayLog.getRequestHeaders().toSingleValueMap()));
87         values.put("userIp", gatewayLog.getUserIp());
88         values.put("responseBody", JsonUtils.isJson(gatewayLog.getResponseBody()) ? // 保证 body 的展示好看
89                 JSONUtil.parse(gatewayLog.getResponseBody()) : gatewayLog.getResponseBody());
90         values.put("responseHeaders", gatewayLog.getResponseHeaders() != null ?
91                 JsonUtils.toJsonString(gatewayLog.getResponseHeaders().toSingleValueMap()) : null);
92         values.put("httpStatus", gatewayLog.getHttpStatus());
93         values.put("startTime", LocalDateTimeUtil.format(gatewayLog.getStartTime(), NORM_DATETIME_MS_FORMATTER));
94         values.put("endTime", LocalDateTimeUtil.format(gatewayLog.getEndTime(), NORM_DATETIME_MS_FORMATTER));
95         values.put("duration", gatewayLog.getDuration() != null ? gatewayLog.getDuration() + " ms" : null);
96         log.info("[writeAccessLog][网关日志:{}]", JsonUtils.toJsonPrettyString(values));
97     }
98
99     @Override
100     public int getOrder() {
101         return Ordered.HIGHEST_PRECEDENCE;
102     }
103
104     @Override
105     public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
106         // 将 Request 中可以直接获取到的参数,设置到网关日志
107         ServerHttpRequest request = exchange.getRequest();
108         // TODO traceId
109         AccessLog gatewayLog = new AccessLog();
110         gatewayLog.setRoute(WebFrameworkUtils.getGatewayRoute(exchange));
111         gatewayLog.setSchema(request.getURI().getScheme());
112         gatewayLog.setRequestMethod(request.getMethodValue());
113         gatewayLog.setRequestUrl(request.getURI().getRawPath());
114         gatewayLog.setQueryParams(request.getQueryParams());
115         gatewayLog.setRequestHeaders(request.getHeaders());
116         gatewayLog.setStartTime(LocalDateTime.now());
117         gatewayLog.setUserIp(WebFrameworkUtils.getClientIP(exchange));
118
119         // 继续 filter 过滤
120         MediaType mediaType = request.getHeaders().getContentType();
121         if (MediaType.APPLICATION_FORM_URLENCODED.isCompatibleWith(mediaType)
122                 || MediaType.APPLICATION_JSON.isCompatibleWith(mediaType)) { // 适合 JSON 和 Form 提交的请求
123             return filterWithRequestBody(exchange, chain, gatewayLog);
124         }
125         return filterWithoutRequestBody(exchange, chain, gatewayLog);
126     }
127
128     private Mono<Void> filterWithoutRequestBody(ServerWebExchange exchange, GatewayFilterChain chain, AccessLog accessLog) {
129         // 包装 Response,用于记录 Response Body
130         ServerHttpResponseDecorator decoratedResponse = recordResponseLog(exchange, accessLog);
131         return chain.filter(exchange.mutate().response(decoratedResponse).build())
132                 .then(Mono.fromRunnable(() -> writeAccessLog(accessLog))); // 打印日志
133     }
134
135     /**
136      * 参考 {@link ModifyRequestBodyGatewayFilterFactory} 实现
137      *
138      * 差别主要在于使用 modifiedBody 来读取 Request Body 数据
139      */
140     private Mono<Void> filterWithRequestBody(ServerWebExchange exchange, GatewayFilterChain chain, AccessLog gatewayLog) {
141         // 设置 Request Body 读取时,设置到网关日志
142         // 此处 codecConfigurer.getReaders() 的目的,是解决 spring.codec.max-in-memory-size 不生效
143         ServerRequest serverRequest = ServerRequest.create(exchange, codecConfigurer.getReaders());
144         Mono<String> modifiedBody = serverRequest.bodyToMono(String.class).flatMap(body -> {
145             gatewayLog.setRequestBody(body);
146             return Mono.just(body);
147         });
148
149         // 创建 BodyInserter 对象
150         BodyInserter<Mono<String>, ReactiveHttpOutputMessage> bodyInserter = BodyInserters.fromPublisher(modifiedBody, String.class);
151         // 创建 CachedBodyOutputMessage 对象
152         HttpHeaders headers = new HttpHeaders();
153         headers.putAll(exchange.getRequest().getHeaders());
154         // the new content type will be computed by bodyInserter
155         // and then set in the request decorator
156         headers.remove(HttpHeaders.CONTENT_LENGTH); // 移除
157         CachedBodyOutputMessage outputMessage = new CachedBodyOutputMessage(exchange, headers);
158         // 通过 BodyInserter 将 Request Body 写入到 CachedBodyOutputMessage 中
159         return bodyInserter.insert(outputMessage, new BodyInserterContext()).then(Mono.defer(() -> {
160             // 包装 Request,用于缓存 Request Body
161             ServerHttpRequest decoratedRequest = requestDecorate(exchange, headers, outputMessage);
162             // 包装 Response,用于记录 Response Body
163             ServerHttpResponseDecorator decoratedResponse = recordResponseLog(exchange, gatewayLog);
164             // 记录普通的
165             return chain.filter(exchange.mutate().request(decoratedRequest).response(decoratedResponse).build())
166                     .then(Mono.fromRunnable(() -> writeAccessLog(gatewayLog))); // 打印日志
167
168         }));
169     }
170
171     /**
172      * 记录响应日志
173      * 通过 DataBufferFactory 解决响应体分段传输问题。
174      */
175     private ServerHttpResponseDecorator recordResponseLog(ServerWebExchange exchange, AccessLog gatewayLog) {
176         ServerHttpResponse response = exchange.getResponse();
177         return new ServerHttpResponseDecorator(response) {
178
179             @Override
180             public Mono<Void> writeWith(Publisher<? extends DataBuffer> body) {
181                 if (body instanceof Flux) {
182                     DataBufferFactory bufferFactory = response.bufferFactory();
183                     // 计算执行时间
184                     gatewayLog.setEndTime(LocalDateTime.now());
185                     gatewayLog.setDuration((int) (LocalDateTimeUtil.between(gatewayLog.getStartTime(),
186                             gatewayLog.getEndTime()).toMillis()));
187                     // 设置其它字段
188                     gatewayLog.setUserId(SecurityFrameworkUtils.getLoginUserId(exchange));
189                     gatewayLog.setUserType(SecurityFrameworkUtils.getLoginUserType(exchange));
190                     gatewayLog.setResponseHeaders(response.getHeaders());
191                     gatewayLog.setHttpStatus(response.getStatusCode());
192
193                     // 获取响应类型,如果是 json 就打印
194                     String originalResponseContentType = exchange.getAttribute(ServerWebExchangeUtils.ORIGINAL_RESPONSE_CONTENT_TYPE_ATTR);
195                     if (StringUtils.isNotBlank(originalResponseContentType)
196                             && originalResponseContentType.contains("application/json")) {
197                         Flux<? extends DataBuffer> fluxBody = Flux.from(body);
198                         return super.writeWith(fluxBody.buffer().map(dataBuffers -> {
199                             // 设置 response body 到网关日志
200                             byte[] content = readContent(dataBuffers);
201                             String responseResult = new String(content, StandardCharsets.UTF_8);
202                             gatewayLog.setResponseBody(responseResult);
203
204                             // 响应
205                             return bufferFactory.wrap(content);
206                         }));
207                     }
208                 }
209                 // if body is not a flux. never got there.
210                 return super.writeWith(body);
211             }
212         };
213     }
214
215     // ========== 参考 ModifyRequestBodyGatewayFilterFactory 中的方法 ==========
216
217     /**
218      * 请求装饰器,支持重新计算 headers、body 缓存
219      *
220      * @param exchange 请求
221      * @param headers 请求头
222      * @param outputMessage body 缓存
223      * @return 请求装饰器
224      */
225     private ServerHttpRequestDecorator requestDecorate(ServerWebExchange exchange, HttpHeaders headers, CachedBodyOutputMessage outputMessage) {
226         return new ServerHttpRequestDecorator(exchange.getRequest()) {
227
228             @Override
229             public HttpHeaders getHeaders() {
230                 long contentLength = headers.getContentLength();
231                 HttpHeaders httpHeaders = new HttpHeaders();
232                 httpHeaders.putAll(super.getHeaders());
233                 if (contentLength > 0) {
234                     httpHeaders.setContentLength(contentLength);
235                 } else {
236                     // TODO: this causes a 'HTTP/1.1 411 Length Required' // on
237                     // httpbin.org
238                     httpHeaders.set(HttpHeaders.TRANSFER_ENCODING, "chunked");
239                 }
240                 return httpHeaders;
241             }
242
243             @Override
244             public Flux<DataBuffer> getBody() {
245                 return outputMessage.getBody();
246             }
247         };
248     }
249
250     // ========== 参考 ModifyResponseBodyGatewayFilterFactory 中的方法 ==========
251
252     private byte[] readContent(List<? extends DataBuffer> dataBuffers) {
253         // 合并多个流集合,解决返回体分段传输
254         DataBufferFactory dataBufferFactory = new DefaultDataBufferFactory();
255         DataBuffer join = dataBufferFactory.join(dataBuffers);
256         byte[] content = new byte[join.readableByteCount()];
257         join.read(content);
258         // 释放掉内存
259         DataBufferUtils.release(join);
260         return content;
261     }
262
263 }