dongyukun
2024-12-31 6eeac9efdb16f92d19536bf23a2d1471705fe752
提交 | 用户 | 时间
e7c126 1 package com.iailab.gateway.filter.grey;
H 2
3 import lombok.AllArgsConstructor;
4 import lombok.extern.slf4j.Slf4j;
5 import org.springframework.cloud.client.ServiceInstance;
6 import org.springframework.cloud.client.loadbalancer.*;
7 import org.springframework.cloud.gateway.config.GatewayLoadBalancerProperties;
8 import org.springframework.cloud.gateway.filter.GatewayFilterChain;
9 import org.springframework.cloud.gateway.filter.GlobalFilter;
10 import org.springframework.cloud.gateway.filter.ReactiveLoadBalancerClientFilter;
11 import org.springframework.cloud.gateway.support.DelegatingServiceInstance;
12 import org.springframework.cloud.gateway.support.NotFoundException;
13 import org.springframework.cloud.loadbalancer.core.ServiceInstanceListSupplier;
14 import org.springframework.cloud.loadbalancer.support.LoadBalancerClientFactory;
15 import org.springframework.core.Ordered;
16 import org.springframework.stereotype.Component;
17 import org.springframework.web.server.ServerWebExchange;
18 import reactor.core.publisher.Mono;
19
20 import java.net.URI;
21 import java.util.Map;
22 import java.util.Set;
23
24 import static org.springframework.cloud.gateway.support.ServerWebExchangeUtils.*;
25
26 /**
27  * 支持灰度功能的 {@link ReactiveLoadBalancerClientFilter} 实现类
28  *
29  * 由于 {@link ReactiveLoadBalancerClientFilter#choose(Request, String, Set)} 是 private 方法,无法进行重写。
30  * 因此,这里只好 copy 它所有的代码,手动重写 choose 方法
31  *
32  * 具体的使用与实现原理,可阅读如下两个文章:
33  * 1. https://www.jianshu.com/p/6db15bc0be8f
34  * 2. https://cloud.tencent.com/developer/article/1620795
35  *
36  * @author iailab
37  */
38 @Component
39 @AllArgsConstructor
40 @Slf4j
41 @SuppressWarnings({"JavadocReference", "rawtypes", "unchecked", "ConstantConditions"})
42 public class GrayReactiveLoadBalancerClientFilter implements GlobalFilter, Ordered {
43
44     private final LoadBalancerClientFactory clientFactory;
45
46     private final GatewayLoadBalancerProperties properties;
47
48     @Override
49     public int getOrder() {
50         return ReactiveLoadBalancerClientFilter.LOAD_BALANCER_CLIENT_FILTER_ORDER;
51     }
52
53     @Override
54     public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
55         URI url = exchange.getAttribute(GATEWAY_REQUEST_URL_ATTR);
56         String schemePrefix = exchange.getAttribute(GATEWAY_SCHEME_PREFIX_ATTR);
57         // 修改 by iailab:将 lb 替换成 grayLb,表示灰度负载均衡
58         if (url == null || (!"grayLb".equals(url.getScheme()) && !"grayLb".equals(schemePrefix))) {
59             return chain.filter(exchange);
60         }
61         // preserve the original url
62         addOriginalRequestUrl(exchange, url);
63
64         if (log.isTraceEnabled()) {
65             log.trace(ReactiveLoadBalancerClientFilter.class.getSimpleName() + " url before: " + url);
66         }
67
68         URI requestUri = exchange.getAttribute(GATEWAY_REQUEST_URL_ATTR);
69         String serviceId = requestUri.getHost();
70         Set<LoadBalancerLifecycle> supportedLifecycleProcessors = LoadBalancerLifecycleValidator
71                 .getSupportedLifecycleProcessors(clientFactory.getInstances(serviceId, LoadBalancerLifecycle.class),
72                         RequestDataContext.class, ResponseData.class, ServiceInstance.class);
73         DefaultRequest<RequestDataContext> lbRequest = new DefaultRequest<>(
74                 new RequestDataContext(new RequestData(exchange.getRequest()), getHint(serviceId)));
75         return choose(lbRequest, serviceId, supportedLifecycleProcessors).doOnNext(response -> {
76
77                     if (!response.hasServer()) {
78                         supportedLifecycleProcessors.forEach(lifecycle -> lifecycle
79                                 .onComplete(new CompletionContext<>(CompletionContext.Status.DISCARD, lbRequest, response)));
80                         throw NotFoundException.create(properties.isUse404(), "Unable to find instance for " + url.getHost());
81                     }
82
83                     ServiceInstance retrievedInstance = response.getServer();
84
85                     URI uri = exchange.getRequest().getURI();
86
87                     // if the `lb:<scheme>` mechanism was used, use `<scheme>` as the default,
88                     // if the loadbalancer doesn't provide one.
89                     String overrideScheme = retrievedInstance.isSecure() ? "https" : "http";
90                     if (schemePrefix != null) {
91                         overrideScheme = url.getScheme();
92                     }
93
94                     DelegatingServiceInstance serviceInstance = new DelegatingServiceInstance(retrievedInstance,
95                             overrideScheme);
96
97                     URI requestUrl = reconstructURI(serviceInstance, uri);
98
99                     if (log.isTraceEnabled()) {
100                         log.trace("LoadBalancerClientFilter url chosen: " + requestUrl);
101                     }
102                     exchange.getAttributes().put(GATEWAY_REQUEST_URL_ATTR, requestUrl);
103                     exchange.getAttributes().put(GATEWAY_LOADBALANCER_RESPONSE_ATTR, response);
104                     supportedLifecycleProcessors.forEach(lifecycle -> lifecycle.onStartRequest(lbRequest, response));
105                 }).then(chain.filter(exchange))
106                 .doOnError(throwable -> supportedLifecycleProcessors.forEach(lifecycle -> lifecycle
107                         .onComplete(new CompletionContext<ResponseData, ServiceInstance, RequestDataContext>(
108                                 CompletionContext.Status.FAILED, throwable, lbRequest,
109                                 exchange.getAttribute(GATEWAY_LOADBALANCER_RESPONSE_ATTR)))))
110                 .doOnSuccess(aVoid -> supportedLifecycleProcessors.forEach(lifecycle -> lifecycle
111                         .onComplete(new CompletionContext<ResponseData, ServiceInstance, RequestDataContext>(
112                                 CompletionContext.Status.SUCCESS, lbRequest,
113                                 exchange.getAttribute(GATEWAY_LOADBALANCER_RESPONSE_ATTR),
114                                 new ResponseData(exchange.getResponse(), new RequestData(exchange.getRequest()))))));
115     }
116
117     protected URI reconstructURI(ServiceInstance serviceInstance, URI original) {
118         return LoadBalancerUriTools.reconstructURI(serviceInstance, original);
119     }
120
121     private Mono<Response<ServiceInstance>> choose(Request<RequestDataContext> lbRequest, String serviceId,
122                                                    Set<LoadBalancerLifecycle> supportedLifecycleProcessors) {
123         // 修改 by iailab:直接创建 GrayLoadBalancer 对象
124         GrayLoadBalancer loadBalancer = new GrayLoadBalancer(
125                 clientFactory.getLazyProvider(serviceId, ServiceInstanceListSupplier.class), serviceId);
126         supportedLifecycleProcessors.forEach(lifecycle -> lifecycle.onStart(lbRequest));
127         return loadBalancer.choose(lbRequest);
128     }
129
130     private String getHint(String serviceId) {
131         LoadBalancerProperties loadBalancerProperties = clientFactory.getProperties(serviceId);
132         Map<String, String> hints = loadBalancerProperties.getHint();
133         String defaultHint = hints.getOrDefault("default", "default");
134         String hintPropertyValue = hints.get(serviceId);
135         return hintPropertyValue != null ? hintPropertyValue : defaultHint;
136     }
137
138 }