package com.iailab.gateway.filter.grey; import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.cloud.client.ServiceInstance; import org.springframework.cloud.client.loadbalancer.*; import org.springframework.cloud.gateway.config.GatewayLoadBalancerProperties; import org.springframework.cloud.gateway.filter.GatewayFilterChain; import org.springframework.cloud.gateway.filter.GlobalFilter; import org.springframework.cloud.gateway.filter.ReactiveLoadBalancerClientFilter; import org.springframework.cloud.gateway.support.DelegatingServiceInstance; import org.springframework.cloud.gateway.support.NotFoundException; import org.springframework.cloud.loadbalancer.core.ServiceInstanceListSupplier; import org.springframework.cloud.loadbalancer.support.LoadBalancerClientFactory; import org.springframework.core.Ordered; import org.springframework.stereotype.Component; import org.springframework.web.server.ServerWebExchange; import reactor.core.publisher.Mono; import java.net.URI; import java.util.Map; import java.util.Set; import static org.springframework.cloud.gateway.support.ServerWebExchangeUtils.*; /** * 支持灰度功能的 {@link ReactiveLoadBalancerClientFilter} 实现类 * * 由于 {@link ReactiveLoadBalancerClientFilter#choose(Request, String, Set)} 是 private 方法,无法进行重写。 * 因此,这里只好 copy 它所有的代码,手动重写 choose 方法 * * 具体的使用与实现原理,可阅读如下两个文章: * 1. https://www.jianshu.com/p/6db15bc0be8f * 2. https://cloud.tencent.com/developer/article/1620795 * * @author iailab */ @Component @AllArgsConstructor @Slf4j @SuppressWarnings({"JavadocReference", "rawtypes", "unchecked", "ConstantConditions"}) public class GrayReactiveLoadBalancerClientFilter implements GlobalFilter, Ordered { private final LoadBalancerClientFactory clientFactory; private final GatewayLoadBalancerProperties properties; @Override public int getOrder() { return ReactiveLoadBalancerClientFilter.LOAD_BALANCER_CLIENT_FILTER_ORDER; } @Override public Mono filter(ServerWebExchange exchange, GatewayFilterChain chain) { URI url = exchange.getAttribute(GATEWAY_REQUEST_URL_ATTR); String schemePrefix = exchange.getAttribute(GATEWAY_SCHEME_PREFIX_ATTR); // 修改 by iailab:将 lb 替换成 grayLb,表示灰度负载均衡 if (url == null || (!"grayLb".equals(url.getScheme()) && !"grayLb".equals(schemePrefix))) { return chain.filter(exchange); } // preserve the original url addOriginalRequestUrl(exchange, url); if (log.isTraceEnabled()) { log.trace(ReactiveLoadBalancerClientFilter.class.getSimpleName() + " url before: " + url); } URI requestUri = exchange.getAttribute(GATEWAY_REQUEST_URL_ATTR); String serviceId = requestUri.getHost(); Set supportedLifecycleProcessors = LoadBalancerLifecycleValidator .getSupportedLifecycleProcessors(clientFactory.getInstances(serviceId, LoadBalancerLifecycle.class), RequestDataContext.class, ResponseData.class, ServiceInstance.class); DefaultRequest lbRequest = new DefaultRequest<>( new RequestDataContext(new RequestData(exchange.getRequest()), getHint(serviceId))); return choose(lbRequest, serviceId, supportedLifecycleProcessors).doOnNext(response -> { if (!response.hasServer()) { supportedLifecycleProcessors.forEach(lifecycle -> lifecycle .onComplete(new CompletionContext<>(CompletionContext.Status.DISCARD, lbRequest, response))); throw NotFoundException.create(properties.isUse404(), "Unable to find instance for " + url.getHost()); } ServiceInstance retrievedInstance = response.getServer(); URI uri = exchange.getRequest().getURI(); // if the `lb:` mechanism was used, use `` as the default, // if the loadbalancer doesn't provide one. String overrideScheme = retrievedInstance.isSecure() ? "https" : "http"; if (schemePrefix != null) { overrideScheme = url.getScheme(); } DelegatingServiceInstance serviceInstance = new DelegatingServiceInstance(retrievedInstance, overrideScheme); URI requestUrl = reconstructURI(serviceInstance, uri); if (log.isTraceEnabled()) { log.trace("LoadBalancerClientFilter url chosen: " + requestUrl); } exchange.getAttributes().put(GATEWAY_REQUEST_URL_ATTR, requestUrl); exchange.getAttributes().put(GATEWAY_LOADBALANCER_RESPONSE_ATTR, response); supportedLifecycleProcessors.forEach(lifecycle -> lifecycle.onStartRequest(lbRequest, response)); }).then(chain.filter(exchange)) .doOnError(throwable -> supportedLifecycleProcessors.forEach(lifecycle -> lifecycle .onComplete(new CompletionContext( CompletionContext.Status.FAILED, throwable, lbRequest, exchange.getAttribute(GATEWAY_LOADBALANCER_RESPONSE_ATTR))))) .doOnSuccess(aVoid -> supportedLifecycleProcessors.forEach(lifecycle -> lifecycle .onComplete(new CompletionContext( CompletionContext.Status.SUCCESS, lbRequest, exchange.getAttribute(GATEWAY_LOADBALANCER_RESPONSE_ATTR), new ResponseData(exchange.getResponse(), new RequestData(exchange.getRequest())))))); } protected URI reconstructURI(ServiceInstance serviceInstance, URI original) { return LoadBalancerUriTools.reconstructURI(serviceInstance, original); } private Mono> choose(Request lbRequest, String serviceId, Set supportedLifecycleProcessors) { // 修改 by iailab:直接创建 GrayLoadBalancer 对象 GrayLoadBalancer loadBalancer = new GrayLoadBalancer( clientFactory.getLazyProvider(serviceId, ServiceInstanceListSupplier.class), serviceId); supportedLifecycleProcessors.forEach(lifecycle -> lifecycle.onStart(lbRequest)); return loadBalancer.choose(lbRequest); } private String getHint(String serviceId) { LoadBalancerProperties loadBalancerProperties = clientFactory.getProperties(serviceId); Map hints = loadBalancerProperties.getHint(); String defaultHint = hints.getOrDefault("default", "default"); String hintPropertyValue = hints.get(serviceId); return hintPropertyValue != null ? hintPropertyValue : defaultHint; } }