ribbon的作用:一个服务部署多个实例时,负责负载均衡请求的组件
ribbon重要组件
ILoadBalancer:负载均衡器,其中包含了IRule和IPing
IRule:负责负载均衡选择服务的规则
IPing:定时ping服务器,判断其是否存活
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
public class MyConfiguration {
@Bean
public IRule getRule() {
return new MyRule();
}
@Bean
public IPing getPing() {
return new MyPing();
}
}
@RibbonClient(name = "ServiceB", configuration = MyConfiguration.class)
public class ServiceBConfiguration {
}
|
ribbon的入口
@LoadBalanced将将一个RestTemplate标志为底层采用LoadBalancerClient来执行实际的http请求,支持负载均衡。
org.springframework.cloud.client.loadbalancer.LoadBalancerAutoConfiguration中有相关的bean注册。LoadBalancerRequestFactory、LoadBalancerInterceptor、RestTemplateCustomizer等等
使用RestTemplateCustomizer对每个restTemplate,使用LoadBalancerInterceptor进行定制。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
|
@Bean
@ConditionalOnMissingBean
public LoadBalancerRequestFactory loadBalancerRequestFactory(
LoadBalancerClient loadBalancerClient) {
return new LoadBalancerRequestFactory(loadBalancerClient, this.transformers);
}
@Configuration(proxyBeanMethods = false)
@ConditionalOnMissingClass("org.springframework.retry.support.RetryTemplate")
static class LoadBalancerInterceptorConfig {
//配置拦截器
@Bean
public LoadBalancerInterceptor loadBalancerInterceptor(
LoadBalancerClient loadBalancerClient,
LoadBalancerRequestFactory requestFactory) {
return new LoadBalancerInterceptor(loadBalancerClient, requestFactory);
}
//RestTemplate定制化,使用LoadBalancerInterceptor
@Bean
@ConditionalOnMissingBean
public RestTemplateCustomizer restTemplateCustomizer(
final LoadBalancerInterceptor loadBalancerInterceptor) {
return restTemplate -> {
List<ClientHttpRequestInterceptor> list = new ArrayList<>(
restTemplate.getInterceptors());
list.add(loadBalancerInterceptor);
restTemplate.setInterceptors(list);
};
}
}
|
LoadBalancerClient是谁?
LoadBalancerClient在org.springframework.cloud.netflix.ribbon.RibbonAutoConfiguration中进行注册了,RibbonLoadBalancerClient
需要注意的是,对于client来说,每个服务也可以说服务名对应一个独立的ApplicationContext。
1
2
3
4
5
|
@Bean
@ConditionalOnMissingBean(LoadBalancerClient.class)
public LoadBalancerClient loadBalancerClient(final SpringClientFactory springClientFactory) {
return new RibbonLoadBalancerClient(springClientFactory);
}
|
那么LoadBalancerClient底层使用的ILoadBalancer是谁呢?
org.springframework.cloud.netflix.ribbon.RibbonClientConfiguration中有如下配置
1
2
3
4
5
6
7
8
9
10
11
|
@Bean
@ConditionalOnMissingBean
public ILoadBalancer ribbonLoadBalancer(IClientConfig config,
ServerList<Server> serverList, ServerListFilter<Server> serverListFilter,
IRule rule, IPing ping, ServerListUpdater serverListUpdater) {
if (this.propertiesFactory.isSet(ILoadBalancer.class, name)) {
return this.propertiesFactory.get(ILoadBalancer.class, config, name);
}
return new ZoneAwareLoadBalancer<>(config, rule, ping, serverList,
serverListFilter, serverListUpdater);
}
|
ZoneAwareLoadBalancer与eureka整合,如何获取到注册表呢?
其父类DynamicServerListLoadBalancer#initWithNiwsConfig方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
|
@Override
public void initWithNiwsConfig(IClientConfig clientConfig) {
try {
super.initWithNiwsConfig(clientConfig);
String niwsServerListClassName = clientConfig.getPropertyAsString(
CommonClientConfigKey.NIWSServerListClassName,
DefaultClientConfigImpl.DEFAULT_SEVER_LIST_CLASS);
ServerList<T> niwsServerListImpl = (ServerList<T>) ClientFactory
.instantiateInstanceWithClientConfig(niwsServerListClassName, clientConfig);
this.serverListImpl = niwsServerListImpl;
if (niwsServerListImpl instanceof AbstractServerList) {
AbstractServerListFilter<T> niwsFilter = ((AbstractServerList) niwsServerListImpl)
.getFilterImpl(clientConfig);
niwsFilter.setLoadBalancerStats(getLoadBalancerStats());
this.filter = niwsFilter;
}
String serverListUpdaterClassName = clientConfig.getPropertyAsString(
CommonClientConfigKey.ServerListUpdaterClassName,
DefaultClientConfigImpl.DEFAULT_SERVER_LIST_UPDATER_CLASS
);
this.serverListUpdater = (ServerListUpdater) ClientFactory
.instantiateInstanceWithClientConfig(serverListUpdaterClassName, clientConfig);
restOfInit(clientConfig);
} catch (Exception e) {
throw new RuntimeException(
"Exception while initializing NIWSDiscoveryLoadBalancer:"
+ clientConfig.getClientName()
+ ", niwsClientConfig:" + clientConfig, e);
}
}
void restOfInit(IClientConfig clientConfig) {
boolean primeConnection = this.isEnablePrimingConnections();
// turn this off to avoid duplicated asynchronous priming done in BaseLoadBalancer.setServerList()
this.setEnablePrimingConnections(false);
enableAndInitLearnNewServersFeature();
updateListOfServers();
if (primeConnection && this.getPrimeConnections() != null) {
this.getPrimeConnections()
.primeConnections(getReachableServers());
}
this.setEnablePrimingConnections(primeConnection);
LOGGER.info("DynamicServerListLoadBalancer for client {} initialized: {}", clientConfig.getClientName(), this.toString());
}
|
比较坑的是RibbonClientConfiguration和EurekaRibbonClientConfiguration都配置了serverlist
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
|
//RibbonClientConfiguration配置的
@Bean
@ConditionalOnMissingBean
@SuppressWarnings("unchecked")
public ServerList<Server> ribbonServerList(IClientConfig config) {
if (this.propertiesFactory.isSet(ServerList.class, name)) {
return this.propertiesFactory.get(ServerList.class, config, name);
}
ConfigurationBasedServerList serverList = new ConfigurationBasedServerList();
serverList.initWithNiwsConfig(config);
return serverList;
}
//EurekaRibbonClientConfiguration
@Bean
@ConditionalOnMissingBean
public ServerList<?> ribbonServerList(IClientConfig config,
Provider<EurekaClient> eurekaClientProvider) {
if (this.propertiesFactory.isSet(ServerList.class, serviceId)) {
return this.propertiesFactory.get(ServerList.class, config, serviceId);
}
DiscoveryEnabledNIWSServerList discoveryServerList = new DiscoveryEnabledNIWSServerList(
config, eurekaClientProvider);
DomainExtractingServerList serverList = new DomainExtractingServerList(
discoveryServerList, config, this.approximateZoneFromHostname);
return serverList;
}
|
通过调试发现,还是DiscoveryEnabledNIWSServerList的getUpdatedServerList()方法返回的server list。此处估计是根据配置,使用各自的server
那么server list怎么更新服务列表呢?
1
2
3
4
5
6
|
//RibbonClientConfiguration配置的
@Bean
@ConditionalOnMissingBean
public ServerListUpdater ribbonServerListUpdater(IClientConfig config) {
return new PollingServerListUpdater(config);
}
|
默认一分钟过后第一次执行,然后每隔30秒执行一次,刷新注册表
1
2
3
4
5
6
7
8
9
10
11
|
//RibbonClientConfiguration配置的
@Bean
@ConditionalOnMissingBean
public IRule ribbonRule(IClientConfig config) {
if (this.propertiesFactory.isSet(IRule.class, name)) {
return this.propertiesFactory.get(IRule.class, config, name);
}
ZoneAvoidanceRule rule = new ZoneAvoidanceRule();
rule.initWithNiwsConfig(config);
return rule;
}
|
默认的负载均衡为ZoneAvoidanceRule,其基类PredicateBasedRule的choose方法。
默认是轮询算法
RibbonLoadBalancerClient调用excute执行方法调用。其中重要的调用
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
|
//方法调用
T returnVal = request.apply(serviceInstance);
public class AsyncLoadBalancerInterceptor implements AsyncClientHttpRequestInterceptor {
private LoadBalancerClient loadBalancer;
public AsyncLoadBalancerInterceptor(LoadBalancerClient loadBalancer) {
this.loadBalancer = loadBalancer;
}
public ListenableFuture<ClientHttpResponse> intercept(final HttpRequest request, final byte[] body, final AsyncClientHttpRequestExecution execution) throws IOException {
URI originalUri = request.getURI();
String serviceName = originalUri.getHost();
//底层实际调用
return (ListenableFuture)this.loadBalancer.execute(serviceName, new LoadBalancerRequest<ListenableFuture<ClientHttpResponse>>() {
public ListenableFuture<ClientHttpResponse> apply(final ServiceInstance instance) throws Exception {
HttpRequest serviceRequest = new ServiceRequestWrapper(request, instance, AsyncLoadBalancerInterceptor.this.loadBalancer);
return execution.executeAsync(serviceRequest, body);
}
});
}
}
//ServiceRequestWrapper#getURI方法,将服务名替换为选择到的服务具体ip端口
public class ServiceRequestWrapper extends HttpRequestWrapper {
private final ServiceInstance instance;
private final LoadBalancerClient loadBalancer;
public ServiceRequestWrapper(HttpRequest request, ServiceInstance instance,
LoadBalancerClient loadBalancer) {
super(request);
this.instance = instance;
this.loadBalancer = loadBalancer;
}
@Override
public URI getURI() {
URI uri = this.loadBalancer.reconstructURI(this.instance, getRequest().getURI());
return uri;
}
}
|
最后ribbon的健康检查。调用对应服务的健康检查接口,进行判断
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
@Bean
@ConditionalOnMissingBean
public IPing ribbonPing(IClientConfig config) {
if (this.propertiesFactory.isSet(IPing.class, serviceId)) {
return this.propertiesFactory.get(IPing.class, config, serviceId);
}
NIWSDiscoveryPing ping = new NIWSDiscoveryPing();
ping.initWithNiwsConfig(config);
return ping;
}
//NIWSDiscoveryPing#isAlive
public boolean isAlive(Server server) {
boolean isAlive = true;
if (server!=null && server instanceof DiscoveryEnabledServer){
DiscoveryEnabledServer dServer = (DiscoveryEnabledServer)server;
InstanceInfo instanceInfo = dServer.getInstanceInfo();
if (instanceInfo!=null){
InstanceStatus status = instanceInfo.getStatus();
if (status!=null){
isAlive = status.equals(InstanceStatus.UP);
}
}
}
return isAlive;
}
|