SpringCloud源码| ribbon源码

ribbon的作用:一个服务部署多个实例时,负责负载均衡请求的组件

ribbon重要组件

ILoadBalancer:负载均衡器,其中包含了IRule和IPing

IRule:负责负载均衡选择服务的规则

IPing:定时ping服务器,判断其是否存活

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
public class MyConfiguration {
@Bean
public IRule getRule() {
return new MyRule();
}

@Bean
public IPing getPing() {
return new MyPing();
}
}

@RibbonClient(name = "ServiceB", configuration = MyConfiguration.class)
public class ServiceBConfiguration {

}

ribbon的入口

@LoadBalanced将将一个RestTemplate标志为底层采用LoadBalancerClient来执行实际的http请求,支持负载均衡。

org.springframework.cloud.client.loadbalancer.LoadBalancerAutoConfiguration中有相关的bean注册。LoadBalancerRequestFactory、LoadBalancerInterceptor、RestTemplateCustomizer等等

使用RestTemplateCustomizer对每个restTemplate,使用LoadBalancerInterceptor进行定制。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
@Bean
	@ConditionalOnMissingBean
	public LoadBalancerRequestFactory loadBalancerRequestFactory(
			LoadBalancerClient loadBalancerClient) {
		return new LoadBalancerRequestFactory(loadBalancerClient, this.transformers);
	}

	@Configuration(proxyBeanMethods = false)
	@ConditionalOnMissingClass("org.springframework.retry.support.RetryTemplate")
	static class LoadBalancerInterceptorConfig {
		//配置拦截器
		@Bean
		public LoadBalancerInterceptor loadBalancerInterceptor(
				LoadBalancerClient loadBalancerClient,
				LoadBalancerRequestFactory requestFactory) {
			return new LoadBalancerInterceptor(loadBalancerClient, requestFactory);
		}
		//RestTemplate定制化,使用LoadBalancerInterceptor
		@Bean
		@ConditionalOnMissingBean
		public RestTemplateCustomizer restTemplateCustomizer(
				final LoadBalancerInterceptor loadBalancerInterceptor) {
			return restTemplate -> {
				List<ClientHttpRequestInterceptor> list = new ArrayList<>(
						restTemplate.getInterceptors());
				list.add(loadBalancerInterceptor);
				restTemplate.setInterceptors(list);
			};
		}

	}

LoadBalancerClient是谁?

LoadBalancerClient在org.springframework.cloud.netflix.ribbon.RibbonAutoConfiguration中进行注册了,RibbonLoadBalancerClient

需要注意的是,对于client来说,每个服务也可以说服务名对应一个独立的ApplicationContext。

1
2
3
4
5
@Bean
@ConditionalOnMissingBean(LoadBalancerClient.class)
public LoadBalancerClient loadBalancerClient(final SpringClientFactory springClientFactory) {
   return new RibbonLoadBalancerClient(springClientFactory);
}

那么LoadBalancerClient底层使用的ILoadBalancer是谁呢?

org.springframework.cloud.netflix.ribbon.RibbonClientConfiguration中有如下配置

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
	@Bean
	@ConditionalOnMissingBean
	public ILoadBalancer ribbonLoadBalancer(IClientConfig config,
			ServerList<Server> serverList, ServerListFilter<Server> serverListFilter,
			IRule rule, IPing ping, ServerListUpdater serverListUpdater) {
		if (this.propertiesFactory.isSet(ILoadBalancer.class, name)) {
			return this.propertiesFactory.get(ILoadBalancer.class, config, name);
		}
		return new ZoneAwareLoadBalancer<>(config, rule, ping, serverList,
				serverListFilter, serverListUpdater);
	}

ZoneAwareLoadBalancer与eureka整合,如何获取到注册表呢?

其父类DynamicServerListLoadBalancer#initWithNiwsConfig方法

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
@Override
public void initWithNiwsConfig(IClientConfig clientConfig) {
    try {
        super.initWithNiwsConfig(clientConfig);
        String niwsServerListClassName = clientConfig.getPropertyAsString(
                CommonClientConfigKey.NIWSServerListClassName,
                DefaultClientConfigImpl.DEFAULT_SEVER_LIST_CLASS);

        ServerList<T> niwsServerListImpl = (ServerList<T>) ClientFactory
                .instantiateInstanceWithClientConfig(niwsServerListClassName, clientConfig);
        this.serverListImpl = niwsServerListImpl;

        if (niwsServerListImpl instanceof AbstractServerList) {
            AbstractServerListFilter<T> niwsFilter = ((AbstractServerList) niwsServerListImpl)
                    .getFilterImpl(clientConfig);
            niwsFilter.setLoadBalancerStats(getLoadBalancerStats());
            this.filter = niwsFilter;
        }

        String serverListUpdaterClassName = clientConfig.getPropertyAsString(
                CommonClientConfigKey.ServerListUpdaterClassName,
                DefaultClientConfigImpl.DEFAULT_SERVER_LIST_UPDATER_CLASS
        );

        this.serverListUpdater = (ServerListUpdater) ClientFactory
                .instantiateInstanceWithClientConfig(serverListUpdaterClassName, clientConfig);

        restOfInit(clientConfig);
    } catch (Exception e) {
        throw new RuntimeException(
                "Exception while initializing NIWSDiscoveryLoadBalancer:"
                        + clientConfig.getClientName()
                        + ", niwsClientConfig:" + clientConfig, e);
    }
}

void restOfInit(IClientConfig clientConfig) {
    boolean primeConnection = this.isEnablePrimingConnections();
    // turn this off to avoid duplicated asynchronous priming done in BaseLoadBalancer.setServerList()
    this.setEnablePrimingConnections(false);
    enableAndInitLearnNewServersFeature();

    updateListOfServers();
    if (primeConnection && this.getPrimeConnections() != null) {
        this.getPrimeConnections()
                .primeConnections(getReachableServers());
    }
    this.setEnablePrimingConnections(primeConnection);
    LOGGER.info("DynamicServerListLoadBalancer for client {} initialized: {}", clientConfig.getClientName(), this.toString());
}

比较坑的是RibbonClientConfiguration和EurekaRibbonClientConfiguration都配置了serverlist

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
//RibbonClientConfiguration配置的
@Bean
@ConditionalOnMissingBean
@SuppressWarnings("unchecked")
public ServerList<Server> ribbonServerList(IClientConfig config) {
   if (this.propertiesFactory.isSet(ServerList.class, name)) {
      return this.propertiesFactory.get(ServerList.class, config, name);
   }
   ConfigurationBasedServerList serverList = new ConfigurationBasedServerList();
   serverList.initWithNiwsConfig(config);
   return serverList;
}
//EurekaRibbonClientConfiguration
@Bean
	@ConditionalOnMissingBean
	public ServerList<?> ribbonServerList(IClientConfig config,
			Provider<EurekaClient> eurekaClientProvider) {
		if (this.propertiesFactory.isSet(ServerList.class, serviceId)) {
			return this.propertiesFactory.get(ServerList.class, config, serviceId);
		}
		DiscoveryEnabledNIWSServerList discoveryServerList = new DiscoveryEnabledNIWSServerList(
				config, eurekaClientProvider);
		DomainExtractingServerList serverList = new DomainExtractingServerList(
				discoveryServerList, config, this.approximateZoneFromHostname);
		return serverList;
	}

通过调试发现,还是DiscoveryEnabledNIWSServerList的getUpdatedServerList()方法返回的server list。此处估计是根据配置,使用各自的server

那么server list怎么更新服务列表呢?

1
2
3
4
5
6
//RibbonClientConfiguration配置的
@Bean
@ConditionalOnMissingBean
public ServerListUpdater ribbonServerListUpdater(IClientConfig config) {
   return new PollingServerListUpdater(config);
}

默认一分钟过后第一次执行,然后每隔30秒执行一次,刷新注册表

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
//RibbonClientConfiguration配置的
@Bean
@ConditionalOnMissingBean
public IRule ribbonRule(IClientConfig config) {
   if (this.propertiesFactory.isSet(IRule.class, name)) {
      return this.propertiesFactory.get(IRule.class, config, name);
   }
   ZoneAvoidanceRule rule = new ZoneAvoidanceRule();
   rule.initWithNiwsConfig(config);
   return rule;
}

默认的负载均衡为ZoneAvoidanceRule,其基类PredicateBasedRule的choose方法。

默认是轮询算法

RibbonLoadBalancerClient调用excute执行方法调用。其中重要的调用

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
//方法调用
T returnVal = request.apply(serviceInstance);

public class AsyncLoadBalancerInterceptor implements AsyncClientHttpRequestInterceptor {
    private LoadBalancerClient loadBalancer;

    public AsyncLoadBalancerInterceptor(LoadBalancerClient loadBalancer) {
        this.loadBalancer = loadBalancer;
    }

    public ListenableFuture<ClientHttpResponse> intercept(final HttpRequest request, final byte[] body, final AsyncClientHttpRequestExecution execution) throws IOException {
        URI originalUri = request.getURI();
        String serviceName = originalUri.getHost();
        //底层实际调用
        return (ListenableFuture)this.loadBalancer.execute(serviceName, new LoadBalancerRequest<ListenableFuture<ClientHttpResponse>>() {
            public ListenableFuture<ClientHttpResponse> apply(final ServiceInstance instance) throws Exception {
                HttpRequest serviceRequest = new ServiceRequestWrapper(request, instance, AsyncLoadBalancerInterceptor.this.loadBalancer);
                return execution.executeAsync(serviceRequest, body);
            }
        });
    }
}
//ServiceRequestWrapper#getURI方法,将服务名替换为选择到的服务具体ip端口
public class ServiceRequestWrapper extends HttpRequestWrapper {

	private final ServiceInstance instance;

	private final LoadBalancerClient loadBalancer;

	public ServiceRequestWrapper(HttpRequest request, ServiceInstance instance,
			LoadBalancerClient loadBalancer) {
		super(request);
		this.instance = instance;
		this.loadBalancer = loadBalancer;
	}

	@Override
	public URI getURI() {
		URI uri = this.loadBalancer.reconstructURI(this.instance, getRequest().getURI());
		return uri;
	}

}

最后ribbon的健康检查。调用对应服务的健康检查接口,进行判断

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
@Bean
@ConditionalOnMissingBean
public IPing ribbonPing(IClientConfig config) {
   if (this.propertiesFactory.isSet(IPing.class, serviceId)) {
      return this.propertiesFactory.get(IPing.class, config, serviceId);
   }
   NIWSDiscoveryPing ping = new NIWSDiscoveryPing();
   ping.initWithNiwsConfig(config);
   return ping;
}
//NIWSDiscoveryPing#isAlive
public boolean isAlive(Server server) {
		    boolean isAlive = true;
		    if (server!=null && server instanceof DiscoveryEnabledServer){
	            DiscoveryEnabledServer dServer = (DiscoveryEnabledServer)server;	            
	            InstanceInfo instanceInfo = dServer.getInstanceInfo();
	            if (instanceInfo!=null){	                
	                InstanceStatus status = instanceInfo.getStatus();
	                if (status!=null){
	                    isAlive = status.equals(InstanceStatus.UP);
	                }
	            }
	        }
		    return isAlive;
		}
Talk is cheap, show me the bug/code.
使用 Hugo 构建
主题 StackJimmy 设计