瞌睡龙

技术杂货铺

0%

全链路灰度 - Spring Cloud Gateway

全链路灰度的做法

1.链路上各个组件和服务能够根据请求流量特征进⾏动态路由。
2.需要对服务下的所有节点进⾏分组,能够区分版本。
3.需要对流量进⾏灰度标识、版本标识。
4.需要识别出不同版本的灰度流量。

思路:
使用 Spring Cloud LoadBalancer 进行流量路由。
服务注册时在 Metadata 中 lane 为泳道标记,空位基线版本。
规定 Http Header 中 laneTag 为泳道标识,由上游带入。
使用 Filter 识别出不同版本的灰度流量并调用 LoadBalancer。

例子:github AppVeyor

ps:Spring Cloud 没有很好的整合 Ribbon,Ribbon 是脱离 Spring Cloud 实现的

动态路由

Sping cloud loadBanancer 是Spring cloud 官方自己提供的客户端负载均衡器, 用来代替Ribbon。

核心接口 ReactiveLoadBalancerchoose

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public interface ReactiveLoadBalancer<T> {

/**
* Default implementation of a request.
*/
Request<DefaultRequestContext> REQUEST = new DefaultRequest();

/**
* Choose the next server based on the load balancing algorithm.
* @param request - incoming request
* @return publisher for the response
*/
@SuppressWarnings("rawtypes")
Publisher<Response<T>> choose(Request request);

default Publisher<Response<T>> choose() { // conflicting name
return choose(REQUEST);
}

@FunctionalInterface
interface Factory<T> {

ReactiveLoadBalancer<T> getInstance(String serviceId);

}

}

实现方式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
package com.example.demogw.cloud.lb;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.cloud.client.ServiceInstance;
import org.springframework.cloud.client.loadbalancer.reactive.DefaultResponse;
import org.springframework.cloud.client.loadbalancer.reactive.EmptyResponse;
import org.springframework.cloud.client.loadbalancer.reactive.Request;
import org.springframework.cloud.client.loadbalancer.reactive.Response;
import org.springframework.cloud.loadbalancer.core.NoopServiceInstanceListSupplier;
import org.springframework.cloud.loadbalancer.core.ReactorServiceInstanceLoadBalancer;
import org.springframework.cloud.loadbalancer.core.ServiceInstanceListSupplier;
import org.springframework.http.HttpHeaders;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.stream.Collectors;

/**
* 泳道的负载均衡实现
*
* @author cui
*/
public class LaneLoadBalancer implements ReactorServiceInstanceLoadBalancer {
private static final Log log = LogFactory.getLog(LaneLoadBalancer.class);
private final ObjectProvider<ServiceInstanceListSupplier> serviceInstanceListSupplierProvider;
private final String serviceId;


public LaneLoadBalancer(ObjectProvider<ServiceInstanceListSupplier> serviceInstanceListSupplierProvider, String serviceId) {
this.serviceId = serviceId;
this.serviceInstanceListSupplierProvider = serviceInstanceListSupplierProvider;
log.info("GrayLoadBalancer init " + serviceId);
}


@Override
public Mono<Response<ServiceInstance>> choose(Request request) {
final Object context = request.getContext();
// 从 request 取出 context 我们规定 类型为 HttpHeaders
if (context instanceof HttpHeaders && this.serviceInstanceListSupplierProvider != null) {
HttpHeaders headers = (HttpHeaders) context;
ServiceInstanceListSupplier supplier = this.serviceInstanceListSupplierProvider.getIfAvailable(NoopServiceInstanceListSupplier::new);
return ((Flux) supplier.get()).next().map(list -> getInstanceResponse((List<ServiceInstance>) list, headers));
}
return null;
}


/**
* 获取需要的实例
*
* @param instances 全部的实例列表
* @param headers 当前请求的header
* @return 选中的实例
*/
private Response<ServiceInstance> getInstanceResponse(List<ServiceInstance> instances, HttpHeaders headers) {
if (instances.isEmpty()) {
return getServiceInstanceEmptyResponse();
} else {
return getServiceInstanceResponseWithLane(instances, headers);
}
}

/**
* 根据在nacos中配置的权重值,进行分发
*
* @param instances 全部的实例列表
* @return 选中的实例
*/
private Response<ServiceInstance> getServiceInstanceResponseWithLane(List<ServiceInstance> instances, HttpHeaders headers) {

// 泳道的实例列表
final ArrayList<ServiceInstance> laneServiceInstances = new ArrayList<>();
// 基线的实例列表
final ArrayList<ServiceInstance> normalServiceInstances = new ArrayList<>();
// header 带有 laneTag 的流量为泳道流量
final String currentEnvironment = headers.getFirst("laneTag");

instances.forEach(serviceInstance -> {
final Map<String, String> metadata = serviceInstance.getMetadata();
final String laneName = metadata.getOrDefault("lane", "");
if (laneName.isEmpty()) {
normalServiceInstances.add(serviceInstance);
} else if (currentEnvironment != null && !currentEnvironment.isEmpty() && currentEnvironment.equals(laneName)) {
laneServiceInstances.add(serviceInstance);
}
});

if (log.isTraceEnabled()) {
log.trace("laneServiceInstances = " + laneServiceInstances.stream().map(ServiceInstance::getUri).collect(Collectors.toList()));
log.trace("normalServiceInstances = " + normalServiceInstances.stream().map(ServiceInstance::getUri).collect(Collectors.toList()));
}

// 取不到泳道就走基线
final ArrayList<ServiceInstance> serviceInstances = !laneServiceInstances.isEmpty() ? laneServiceInstances : normalServiceInstances;

int size = serviceInstances.size();
Random random = new Random();
ServiceInstance instance = serviceInstances.get(random.nextInt(size));
if (log.isTraceEnabled()) {
log.trace("use instance = " + instance.getUri());
}
return new DefaultResponse(instance);
}

private Response<ServiceInstance> getServiceInstanceEmptyResponse() {
log.warn("No servers available for service: " + this.serviceId);
return new EmptyResponse();
}
}

使用 Filter 调用 Balancer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
package com.example.gateway.cloud.lb;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.cloud.client.ServiceInstance;
import org.springframework.cloud.client.loadbalancer.LoadBalancerUriTools;
import org.springframework.cloud.client.loadbalancer.reactive.DefaultRequest;
import org.springframework.cloud.client.loadbalancer.reactive.Request;
import org.springframework.cloud.client.loadbalancer.reactive.Response;
import org.springframework.cloud.gateway.config.LoadBalancerProperties;
import org.springframework.cloud.gateway.filter.GatewayFilterChain;
import org.springframework.cloud.gateway.filter.GlobalFilter;
import org.springframework.cloud.gateway.filter.ReactiveLoadBalancerClientFilter;
import org.springframework.cloud.gateway.support.DelegatingServiceInstance;
import org.springframework.cloud.gateway.support.NotFoundException;
import org.springframework.cloud.gateway.support.ServerWebExchangeUtils;
import org.springframework.cloud.loadbalancer.core.ServiceInstanceListSupplier;
import org.springframework.cloud.loadbalancer.support.LoadBalancerClientFactory;
import org.springframework.core.Ordered;
import org.springframework.http.HttpHeaders;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Mono;

import java.net.URI;

/**
* @author Cui
*/
public class LaneReactiveLoadBalancerClientFilter implements GlobalFilter, Ordered {

private static final Log log = LogFactory.getLog(ReactiveLoadBalancerClientFilter.class);
private static final int LOAD_BALANCER_CLIENT_FILTER_ORDER = 10150;
private final LoadBalancerClientFactory clientFactory;
private final LoadBalancerProperties properties;

public LaneReactiveLoadBalancerClientFilter(LoadBalancerClientFactory clientFactory, LoadBalancerProperties properties) {
this.clientFactory = clientFactory;
this.properties = properties;
}

@Override
public int getOrder() {
return LOAD_BALANCER_CLIENT_FILTER_ORDER;
}

@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
URI url = exchange.getAttribute(ServerWebExchangeUtils.GATEWAY_REQUEST_URL_ATTR);
String schemePrefix = exchange.getAttribute(ServerWebExchangeUtils.GATEWAY_SCHEME_PREFIX_ATTR);
if (url == null || (!"lb".equals(url.getScheme()) && !"lb".equals(schemePrefix))) {
return chain.filter(exchange);
}

if (log.isTraceEnabled()) {
log.trace(ReactiveLoadBalancerClientFilter.class.getSimpleName() + " url before: " + url);
}

return this.choose(exchange).doOnNext((response) -> {
if (!response.hasServer()) {
throw NotFoundException.create(properties.isUse404(), "Unable to find instance for " + url.getHost());
} else {
URI uri = exchange.getRequest().getURI();

DelegatingServiceInstance serviceInstance = new DelegatingServiceInstance(response.getServer(), null);
URI requestUrl = this.reconstructURI(serviceInstance, uri);
if (log.isTraceEnabled()) {
log.trace("LoadBalancerClientFilter url chosen: " + requestUrl);
}

exchange.getAttributes().put(ServerWebExchangeUtils.GATEWAY_REQUEST_URL_ATTR, requestUrl);
}
}).then(chain.filter(exchange));

}

protected URI reconstructURI(ServiceInstance serviceInstance, URI original) {
return LoadBalancerUriTools.reconstructURI(serviceInstance, original);
}

private Mono<Response<ServiceInstance>> choose(ServerWebExchange exchange) {
URI uri = exchange.getAttribute(ServerWebExchangeUtils.GATEWAY_REQUEST_URL_ATTR);
LaneLoadBalancer loadBalancer = new LaneLoadBalancer(clientFactory.getLazyProvider(uri.getHost(), ServiceInstanceListSupplier.class), uri.getHost());
return loadBalancer.choose(this.createRequest(exchange));
}

private Request createRequest(ServerWebExchange exchange) {
HttpHeaders headers = exchange.getRequest().getHeaders();
return new DefaultRequest<>(headers);
}
}package com.example.gateway.cloud.lb;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.cloud.client.ServiceInstance;
import org.springframework.cloud.client.loadbalancer.LoadBalancerUriTools;
import org.springframework.cloud.client.loadbalancer.reactive.DefaultRequest;
import org.springframework.cloud.client.loadbalancer.reactive.Request;
import org.springframework.cloud.client.loadbalancer.reactive.Response;
import org.springframework.cloud.gateway.config.LoadBalancerProperties;
import org.springframework.cloud.gateway.filter.GatewayFilterChain;
import org.springframework.cloud.gateway.filter.GlobalFilter;
import org.springframework.cloud.gateway.filter.ReactiveLoadBalancerClientFilter;
import org.springframework.cloud.gateway.support.DelegatingServiceInstance;
import org.springframework.cloud.gateway.support.NotFoundException;
import org.springframework.cloud.gateway.support.ServerWebExchangeUtils;
import org.springframework.cloud.loadbalancer.core.ServiceInstanceListSupplier;
import org.springframework.cloud.loadbalancer.support.LoadBalancerClientFactory;
import org.springframework.core.Ordered;
import org.springframework.http.HttpHeaders;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Mono;

import java.net.URI;

/**
* @author Cui
*/
public class LaneReactiveLoadBalancerClientFilter implements GlobalFilter, Ordered {

private static final Log log = LogFactory.getLog(ReactiveLoadBalancerClientFilter.class);
private static final int LOAD_BALANCER_CLIENT_FILTER_ORDER = 10150;
private final LoadBalancerClientFactory clientFactory;
private final LoadBalancerProperties properties;

public LaneReactiveLoadBalancerClientFilter(LoadBalancerClientFactory clientFactory, LoadBalancerProperties properties) {
this.clientFactory = clientFactory;
this.properties = properties;
}

@Override
public int getOrder() {
return LOAD_BALANCER_CLIENT_FILTER_ORDER;
}

@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
URI url = exchange.getAttribute(ServerWebExchangeUtils.GATEWAY_REQUEST_URL_ATTR);
String schemePrefix = exchange.getAttribute(ServerWebExchangeUtils.GATEWAY_SCHEME_PREFIX_ATTR);
if (url == null || (!"lb".equals(url.getScheme()) && !"lb".equals(schemePrefix))) {
return chain.filter(exchange);
}

if (log.isTraceEnabled()) {
log.trace(ReactiveLoadBalancerClientFilter.class.getSimpleName() + " url before: " + url);
}

return this.choose(exchange).doOnNext((response) -> {
if (!response.hasServer()) {
throw NotFoundException.create(properties.isUse404(), "Unable to find instance for " + url.getHost());
} else {
URI uri = exchange.getRequest().getURI();

DelegatingServiceInstance serviceInstance = new DelegatingServiceInstance(response.getServer(), null);
URI requestUrl = this.reconstructURI(serviceInstance, uri);
if (log.isTraceEnabled()) {
log.trace("LoadBalancerClientFilter url chosen: " + requestUrl);
}

exchange.getAttributes().put(ServerWebExchangeUtils.GATEWAY_REQUEST_URL_ATTR, requestUrl);
}
}).then(chain.filter(exchange));

}

protected URI reconstructURI(ServiceInstance serviceInstance, URI original) {
return LoadBalancerUriTools.reconstructURI(serviceInstance, original);
}

private Mono<Response<ServiceInstance>> choose(ServerWebExchange exchange) {
URI uri = exchange.getAttribute(ServerWebExchangeUtils.GATEWAY_REQUEST_URL_ATTR);
LaneLoadBalancer loadBalancer = new LaneLoadBalancer(clientFactory.getLazyProvider(uri.getHost(), ServiceInstanceListSupplier.class), uri.getHost());
return loadBalancer.choose(this.createRequest(exchange));
}

private Request createRequest(ServerWebExchange exchange) {
HttpHeaders headers = exchange.getRequest().getHeaders();
return new DefaultRequest<>(headers);
}
}

节点进⾏分组

使用 ServiceInstance 中的 Metadata

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
public interface ServiceInstance {

/**
* @return The unique instance ID as registered.
*/
default String getInstanceId() {
return null;
}

/**
* @return The service ID as registered.
*/
String getServiceId();

/**
* @return The hostname of the registered service instance.
*/
String getHost();

/**
* @return The port of the registered service instance.
*/
int getPort();

/**
* @return Whether the port of the registered service instance uses HTTPS.
*/
boolean isSecure();

/**
* @return The service URI address.
*/
URI getUri();

/**
* @return The key / value pair metadata associated with the service instance.
*/
Map<String, String> getMetadata();

/**
* @return The scheme of the service instance.
*/
default String getScheme() {
return null;
}

}

欢迎关注我的其它发布渠道