SpringCloud源码| eureka源码
eureka server启动流程
EurekaBootStrap#contextInitialized(ServletContextEvent) 方法进行初始化
@Override
public void contextInitialized(ServletContextEvent event) {
try {
//1、初始化eureka相关环境
initEurekaEnvironment();
//2、初始化eureka的serverContext
initEurekaServerContext();
ServletContext sc = event.getServletContext();
sc.setAttribute(EurekaServerContext.class.getName(), serverContext);
} catch (Throwable e) {
logger.error("Cannot bootstrap eureka server :", e);
throw new RuntimeException("Cannot bootstrap eureka server :", e);
}
}
/**
* Users can override to initialize the environment themselves.
*/
protected void initEurekaEnvironment() throws Exception {
logger.info("Setting the eureka configuration..");
AbstractConfiguration configInstance = ConfigurationManager.getConfigInstance();
String dataCenter = configInstance.getString(EUREKA_DATACENTER);
if (dataCenter == null) {
logger.info("Eureka data center value eureka.datacenter is not set, defaulting to default");
configInstance.setProperty(ARCHAIUS_DEPLOYMENT_DATACENTER, DEFAULT);
} else {
configInstance.setProperty(ARCHAIUS_DEPLOYMENT_DATACENTER, dataCenter);
}
String environment = configInstance.getString(EUREKA_ENVIRONMENT);
if (environment == null) {
configInstance.setProperty(ARCHAIUS_DEPLOYMENT_ENVIRONMENT, TEST);
logger.info("Eureka environment value eureka.environment is not set, defaulting to test");
}
}
ConfigurationManager是一个double check+volatile实现的单例模式,并且其中所有的相关配置通过接口来实现。具体的实现类中,硬编码配置项名称,默认值等
初始化EurekaServerContext,包含大致7个步骤
/**
* init hook for server context. Override for custom logic.
*/
protected void initEurekaServerContext() throws Exception {
// 1、加载eureka-server.properties的数据
EurekaServerConfig eurekaServerConfig = new DefaultEurekaServerConfig();
// For backward compatibility
JsonXStream.getInstance().registerConverter(new V1AwareInstanceInfoConverter(), XStream.PRIORITY_VERY_HIGH);
XmlXStream.getInstance().registerConverter(new V1AwareInstanceInfoConverter(), XStream.PRIORITY_VERY_HIGH);
logger.info("Initializing the eureka client...");
logger.info(eurekaServerConfig.getJsonCodecName());
ServerCodecs serverCodecs = new DefaultServerCodecs(eurekaServerConfig);
//2、初始化eureka server
ApplicationInfoManager applicationInfoManager = null;
if (eurekaClient == null) {
EurekaInstanceConfig instanceConfig = isCloud(ConfigurationManager.getDeploymentContext())
? new CloudInstanceConfig()
: new MyDataCenterInstanceConfig();
applicationInfoManager = new ApplicationInfoManager(
instanceConfig, new EurekaConfigBasedInstanceInfoProvider(instanceConfig).get());
EurekaClientConfig eurekaClientConfig = new DefaultEurekaClientConfig();
eurekaClient = new DiscoveryClient(applicationInfoManager, eurekaClientConfig);
} else {
applicationInfoManager = eurekaClient.getApplicationInfoManager();
}
//3、构造注册相关的组件
PeerAwareInstanceRegistry registry;
if (isAws(applicationInfoManager.getInfo())) {
registry = new AwsInstanceRegistry(
eurekaServerConfig,
eurekaClient.getEurekaClientConfig(),
serverCodecs,
eurekaClient
);
awsBinder = new AwsBinderDelegate(eurekaServerConfig, eurekaClient.getEurekaClientConfig(), registry, applicationInfoManager);
awsBinder.start();
} else {
registry = new PeerAwareInstanceRegistryImpl(
eurekaServerConfig,
eurekaClient.getEurekaClientConfig(),
serverCodecs,
eurekaClient
);
}
//4、构造peer节点同步组件
PeerEurekaNodes peerEurekaNodes = getPeerEurekaNodes(
registry,
eurekaServerConfig,
eurekaClient.getEurekaClientConfig(),
serverCodecs,
applicationInfoManager
);
//5、完成上下文的创建
serverContext = new DefaultEurekaServerContext(
eurekaServerConfig,
serverCodecs,
registry,
peerEurekaNodes,
applicationInfoManager
);
EurekaServerContextHolder.initialize(serverContext);
serverContext.initialize();
logger.info("Initialized server context");
//6、同步相邻节点的信息,从相邻eureka节点拷贝注册信息
// Copy registry from neighboring eureka node
int registryCount = registry.syncUp();
registry.openForTraffic(applicationInfoManager, registryCount);
//7、注册所有的监控统计项
// Register all monitoring statistics.
EurekaMonitors.registerAllStats();
}
1、加载eureka-server.properties的数据,初始化EurekaServerConfig
2、初始化ApplicationInfoManager及eurekaClient,其中用到的InstanceInfo使用了builder模式构造复杂的实例对象
(1)读取相关配置EurekaInstanceConfig和InstanceInfo
(2)根据配置,处理是否抓取注册表
(3)初始化3个线程池:调度线程池、心跳线程池、缓存刷新线程池
3、处理注册相关的事情
(1)创建PeerAwareInstanceRegistry内部比较重要的是MeasuredRate,其lastBucket统计上一分钟的心跳,currentBucket统计当前一分钟心跳。此组件用做eureka保护模式的重要判断依据
(2)构造peer节点同步组件,用以集群同步
(3)DefaultEurekaServerContext创建,并且启动服务注册,从相邻节点同步,注册监控统计项
![](../eureka server启动的流程图.png)
再从eureka的example中例子
public class ExampleEurekaClient {
private static ApplicationInfoManager applicationInfoManager;
private static EurekaClient eurekaClient;
private static synchronized ApplicationInfoManager initializeApplicationInfoManager(EurekaInstanceConfig instanceConfig) {
if (applicationInfoManager == null) {
InstanceInfo instanceInfo = new EurekaConfigBasedInstanceInfoProvider(instanceConfig).get();
applicationInfoManager = new ApplicationInfoManager(instanceConfig, instanceInfo);
}
return applicationInfoManager;
}
private static synchronized EurekaClient initializeEurekaClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig clientConfig) {
if (eurekaClient == null) {
eurekaClient = new DiscoveryClient(applicationInfoManager, clientConfig);
}
return eurekaClient;
}
public void sendRequestToServiceUsingEureka(EurekaClient eurekaClient) {
// initialize the client
// this is the vip address for the example service to talk to as defined in conf/sample-eureka-service.properties
String vipAddress = "sampleservice.mydomain.net";
InstanceInfo nextServerInfo = null;
try {
nextServerInfo = eurekaClient.getNextServerFromEureka(vipAddress, false);
} catch (Exception e) {
System.err.println("Cannot get an instance of example service to talk to from eureka");
System.exit(-1);
}
System.out.println("Found an instance of example service to talk to from eureka: "
+ nextServerInfo.getVIPAddress() + ":" + nextServerInfo.getPort());
System.out.println("healthCheckUrl: " + nextServerInfo.getHealthCheckUrl());
System.out.println("override: " + nextServerInfo.getOverriddenStatus());
Socket s = new Socket();
int serverPort = nextServerInfo.getPort();
try {
s.connect(new InetSocketAddress(nextServerInfo.getHostName(), serverPort));
} catch (IOException e) {
System.err.println("Could not connect to the server :"
+ nextServerInfo.getHostName() + " at port " + serverPort);
} catch (Exception e) {
System.err.println("Could not connect to the server :"
+ nextServerInfo.getHostName() + " at port " + serverPort + "due to Exception " + e);
}
try {
String request = "FOO " + new Date();
System.out.println("Connected to server. Sending a sample request: " + request);
PrintStream out = new PrintStream(s.getOutputStream());
out.println(request);
System.out.println("Waiting for server response..");
BufferedReader rd = new BufferedReader(new InputStreamReader(s.getInputStream()));
String str = rd.readLine();
if (str != null) {
System.out.println("Received response from server: " + str);
System.out.println("Exiting the client. Demo over..");
}
rd.close();
} catch (IOException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
ExampleEurekaClient sampleClient = new ExampleEurekaClient();
// create the client
ApplicationInfoManager applicationInfoManager = initializeApplicationInfoManager(new MyDataCenterInstanceConfig());
EurekaClient client = initializeEurekaClient(applicationInfoManager, new DefaultEurekaClientConfig());
// use the client
sampleClient.sendRequestToServiceUsingEureka(client);
// shutdown the client
eurekaClient.shutdown();
}
}
client启动流程
(1)读取eureka-client.properties配置文件,形成一个服务实例的配置,基于接口对外提供服务实例的配置项的读取
(2)基于服务实例的配置,构造了一个服务实例(InstanceInfo)
(3)基于服务实例的配置和服务实例,构造了一个服务实例管理器(ApplicationInfoManager)
(4)读取eureka-client.properites配置文件,形成一个eureka client的配置,接口接口对外提供eureka client的配置项的读取
(5)基于eureka client配置,和服务实例管理器,来构造了一个EurekaClient(DiscoveryClient),保存了一些配置,处理服务的注册和注册表的抓取,启动了几个线程池,启动了网络通信组件,启动了一些调度任务,注册了监控项
(6)DiscoveryClient内部依赖了一个InstanceInfoReplicator,进行服务注册 服务注册表实际上就是一个ConcurrentHashMap
public class ExampleEurekaService {
private static ApplicationInfoManager applicationInfoManager;
private static EurekaClient eurekaClient;
private static synchronized ApplicationInfoManager initializeApplicationInfoManager(EurekaInstanceConfig instanceConfig) {
if (applicationInfoManager == null) {
InstanceInfo instanceInfo = new EurekaConfigBasedInstanceInfoProvider(instanceConfig).get();
applicationInfoManager = new ApplicationInfoManager(instanceConfig, instanceInfo);
}
return applicationInfoManager;
}
private static synchronized EurekaClient initializeEurekaClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig clientConfig) {
if (eurekaClient == null) {
eurekaClient = new DiscoveryClient(applicationInfoManager, clientConfig);
}
return eurekaClient;
}
public static void main(String[] args) {
DynamicPropertyFactory configInstance = com.netflix.config.DynamicPropertyFactory.getInstance();
ApplicationInfoManager applicationInfoManager = initializeApplicationInfoManager(new MyDataCenterInstanceConfig());
EurekaClient eurekaClient = initializeEurekaClient(applicationInfoManager, new DefaultEurekaClientConfig());
ExampleServiceBase exampleServiceBase = new ExampleServiceBase(applicationInfoManager, eurekaClient, configInstance);
try {
exampleServiceBase.start();
} finally {
// the stop calls shutdown on eurekaClient
exampleServiceBase.stop();
}
}
}
服务端启动流程,仅仅增加服务端注册同步相关代码,其余与客户端启动一致
服务注册信息的获取接口ApplicationsResource#getContainers
其使用了多级缓存以提高并发性能
ResponseCacheImpl(EurekaServerConfig serverConfig, ServerCodecs serverCodecs, AbstractInstanceRegistry registry) {
this.serverConfig = serverConfig;
this.serverCodecs = serverCodecs;
this.shouldUseReadOnlyResponseCache = serverConfig.shouldUseReadOnlyResponseCache();
this.registry = registry;
long responseCacheUpdateIntervalMs = serverConfig.getResponseCacheUpdateIntervalMs();
this.readWriteCacheMap =
CacheBuilder.newBuilder().initialCapacity(serverConfig.getInitialCapacityOfResponseCache())
//过期时间
.expireAfterWrite(serverConfig.getResponseCacheAutoExpirationInSeconds(), TimeUnit.SECONDS)
//移除region相关的监听器
.removalListener(new RemovalListener<Key, Value>() {
@Override
public void onRemoval(RemovalNotification<Key, Value> notification) {
Key removedKey = notification.getKey();
if (removedKey.hasRegions()) {
Key cloneWithNoRegions = removedKey.cloneWithoutRegions();
regionSpecificKeys.remove(cloneWithNoRegions, removedKey);
}
}
})
//从注册表中获取注册信息
.build(new CacheLoader<Key, Value>() {
@Override
public Value load(Key key) throws Exception {
if (key.hasRegions()) {
Key cloneWithNoRegions = key.cloneWithoutRegions();
regionSpecificKeys.put(cloneWithNoRegions, key);
}
//
Value value = generatePayload(key);
return value;
}
});
if (shouldUseReadOnlyResponseCache) {
timer.schedule(getCacheUpdateTask(),
new Date(((System.currentTimeMillis() / responseCacheUpdateIntervalMs) * responseCacheUpdateIntervalMs)
+ responseCacheUpdateIntervalMs),
responseCacheUpdateIntervalMs);
}
try {
Monitors.registerObject(this);
} catch (Throwable e) {
logger.warn("Cannot register the JMX monitor for the InstanceRegistry", e);
}
}
//定时从注册表更新readOnlyCacheMap
private TimerTask getCacheUpdateTask() {
return new TimerTask() {
@Override
public void run() {
logger.debug("Updating the client cache from response cache");
for (Key key : readOnlyCacheMap.keySet()) {
if (logger.isDebugEnabled()) {
logger.debug("Updating the client cache from response cache for key : {} {} {} {}",
key.getEntityType(), key.getName(), key.getVersion(), key.getType());
}
try {
CurrentRequestVersion.set(key.getVersion());
Value cacheValue = readWriteCacheMap.get(key);
Value currentCacheValue = readOnlyCacheMap.get(key);
//版本不一致时,更新
if (cacheValue != currentCacheValue) {
readOnlyCacheMap.put(key, cacheValue);
}
} catch (Throwable th) {
logger.error("Error while updating the client cache from response cache for key {}", key.toStringCompact(), th);
} finally {
CurrentRequestVersion.remove();
}
}
}
};
}
1、首先去readOnlyCacheMap中读,读到则返回。若没有则从readWriteCacheMap获取
2、readWriteCacheMap没有,则从注册表中去获取。定时比较readWriteCacheMap和readOnlyCacheMap,不一致时将最新的值更新到readOnlyCacheMap。
3、注册表在状态改变的时候,同步将对应的key从readWriteCacheMap清空。保证缓存最终一致性。此逻辑详见AbstractInstanceRegistry中的各方法对其调用
增量注册表
public abstract class AbstractInstanceRegistry implements InstanceRegistry {
//底层由一个队列实现
//在注册表变更的时候,写入队列
private ConcurrentLinkedQueue<RecentlyChangedItem> recentlyChangedQueue = new ConcurrentLinkedQueue<RecentlyChangedItem>();
//定时更新过期的增量注册表
private TimerTask getDeltaRetentionTask() {
return new TimerTask() {
@Override
public void run() {
Iterator<RecentlyChangedItem> it = recentlyChangedQueue.iterator();
while (it.hasNext()) {
if (it.next().getLastUpdateTime() <
System.currentTimeMillis() - serverConfig.getRetentionTimeInMSInDeltaQueue()) {
it.remove();
} else {
break;
}
}
}
};
}
}
故障摘除
AbstractInstanceRegistry#evict(long additionalLeaseMs)
/**
* Evicts everything in the instance registry that has expired, if expiry is enabled.
*
* @see com.netflix.eureka.lease.LeaseManager#evict()
*/
@Override
public void evict() {
evict(0l);
}
public void evict(long additionalLeaseMs) {
logger.debug("Running the evict task");
if (!isLeaseExpirationEnabled()) {
logger.debug("DS: lease expiration is currently disabled.");
return;
}
// We collect first all expired items, to evict them in random order. For large eviction sets,
// if we do not that, we might wipe out whole apps before self preservation kicks in. By randomizing it,
// the impact should be evenly distributed across all applications.
List<Lease<InstanceInfo>> expiredLeases = new ArrayList<>();
for (Entry<String, Map<String, Lease<InstanceInfo>>> groupEntry : registry.entrySet()) {
Map<String, Lease<InstanceInfo>> leaseMap = groupEntry.getValue();
if (leaseMap != null) {
for (Entry<String, Lease<InstanceInfo>> leaseEntry : leaseMap.entrySet()) {
Lease<InstanceInfo> lease = leaseEntry.getValue();
if (lease.isExpired(additionalLeaseMs) && lease.getHolder() != null) {
expiredLeases.add(lease);
}
}
}
}
// To compensate for GC pauses or drifting local time, we need to use current registry size as a base for
// triggering self-preservation. Without that we would wipe out full registry.
int registrySize = (int) getLocalRegistrySize();
int registrySizeThreshold = (int) (registrySize * serverConfig.getRenewalPercentThreshold());
int evictionLimit = registrySize - registrySizeThreshold;
//每次最多摘除15%的实例,因为renewalPercentThreshold默认值是0.85
int toEvict = Math.min(expiredLeases.size(), evictionLimit);
if (toEvict > 0) {
logger.info("Evicting {} items (expired={}, evictionLimit={})", toEvict, expiredLeases.size(), evictionLimit);
//使用随机算法,随机摘除故障实例
Random random = new Random(System.currentTimeMillis());
for (int i = 0; i < toEvict; i++) {
// Pick a random item (Knuth shuffle algorithm)
int next = i + random.nextInt(expiredLeases.size() - i);
Collections.swap(expiredLeases, i, next);
Lease<InstanceInfo> lease = expiredLeases.get(i);
String appName = lease.getHolder().getAppName();
String id = lease.getHolder().getId();
EXPIRED.increment();
logger.warn("DS: Registry: expired lease for {}/{}", appName, id);
internalCancel(appName, id, false);
}
}
}
eureka集群注册表同步机制
PeerEurekaNode中的batchingDispatcher和nonBatchingDispatcher 负责调度
AcceptorExecutor负责从队列中拿出任务来执行
public class PeerEurekaNode {
private final TaskDispatcher<String, ReplicationTask> batchingDispatcher;
private final TaskDispatcher<String, ReplicationTask> nonBatchingDispatcher;
public PeerEurekaNode(PeerAwareInstanceRegistry registry, String targetHost, String serviceUrl, HttpReplicationClient replicationClient, EurekaServerConfig config) {
this(registry, targetHost, serviceUrl, replicationClient, config, BATCH_SIZE, MAX_BATCHING_DELAY_MS, RETRY_SLEEP_TIME_MS, SERVER_UNAVAILABLE_SLEEP_TIME_MS);
}
/* For testing */ PeerEurekaNode(PeerAwareInstanceRegistry registry, String targetHost, String serviceUrl,
HttpReplicationClient replicationClient, EurekaServerConfig config,
int batchSize, long maxBatchingDelayMs,
long retrySleepTimeMs, long serverUnavailableSleepTimeMs) {
this.registry = registry;
this.targetHost = targetHost;
this.replicationClient = replicationClient;
this.serviceUrl = serviceUrl;
this.config = config;
this.maxProcessingDelayMs = config.getMaxTimeForReplication();
String batcherName = getBatcherName();
ReplicationTaskProcessor taskProcessor = new ReplicationTaskProcessor(targetHost, replicationClient);
this.batchingDispatcher = TaskDispatchers.createBatchingTaskDispatcher(
batcherName,
config.getMaxElementsInPeerReplicationPool(),
batchSize,
config.getMaxThreadsForPeerReplication(),
maxBatchingDelayMs,
serverUnavailableSleepTimeMs,
retrySleepTimeMs,
taskProcessor
);
this.nonBatchingDispatcher = TaskDispatchers.createNonBatchingTaskDispatcher(
targetHost,
config.getMaxElementsInStatusReplicationPool(),
config.getMaxThreadsForStatusReplication(),
maxBatchingDelayMs,
serverUnavailableSleepTimeMs,
retrySleepTimeMs,
taskProcessor
);
}
/**
* Sends the registration information of {@link InstanceInfo} receiving by
* this node to the peer node represented by this class.
*
* @param info
* the instance information {@link InstanceInfo} of any instance
* that is send to this instance.
* @throws Exception
*/
public void register(final InstanceInfo info) throws Exception {
long expiryTime = System.currentTimeMillis() + getLeaseRenewalOf(info);
batchingDispatcher.process(
taskId("register", info),
new InstanceReplicationTask(targetHost, Action.Register, info, null, true) {
@Override
public EurekaHttpResponse<Void> execute() {
return replicationClient.register(info);
}
},
expiryTime
);
}
}
class AcceptorExecutor<ID, T> {
private final BlockingQueue<TaskHolder<ID, T>> acceptorQueue = new LinkedBlockingQueue<>();
private final BlockingDeque<TaskHolder<ID, T>> reprocessQueue = new LinkedBlockingDeque<>();
static class BatchWorkerRunnable<ID, T> extends WorkerRunnable<ID, T> {
BatchWorkerRunnable(String workerName,
AtomicBoolean isShutdown,
TaskExecutorMetrics metrics,
TaskProcessor<T> processor,
AcceptorExecutor<ID, T> acceptorExecutor) {
super(workerName, isShutdown, metrics, processor, acceptorExecutor);
}
@Override
public void run() {
try {
while (!isShutdown.get()) {
List<TaskHolder<ID, T>> holders = getWork();
metrics.registerExpiryTimes(holders);
List<T> tasks = getTasksOf(holders);
ProcessingResult result = processor.process(tasks);
switch (result) {
case Success:
break;
case Congestion:
case TransientError:
taskDispatcher.reprocess(holders, result);
break;
case PermanentError:
logger.warn("Discarding {} tasks of {} due to permanent error", holders.size(), workerName);
}
metrics.registerTaskResult(result, tasks.size());
}
} catch (InterruptedException e) {
// Ignore
} catch (Throwable e) {
// Safe-guard, so we never exit this loop in an uncontrolled way.
logger.warn("Discovery WorkerThread error", e);
}
}
![](../eureka server同步任务批处理机制.png)
spring-cloud-eureka-server注解式启动
@EnableEurekaServer注解,springboot启动以后,将eureka server启动起来。
EurekaServerAutoConfiguration使用spring boot的auto configuration机制,触发EurekaServerAutoConfiguration的执行。将eureka server需要的类,统统交由spring bean来进行注入
然后通过EurekaServerInitializerConfiguration来进行启动。
spring-cloud-eureka-client注解式启动
EurekaClientConfigBean配置文件读取配置
EurekaClientAutoConfiguration注入相关的类到spring容器中
EurekaAutoServiceRegistration重写了eureka client默认的注册逻辑,将原来InstanceInfoReplicator中的注册逻辑进行进一步的封装及定制。client启动以后就进行注册,不是eureka默认的40秒以后再注册。并且可以配置初始化注册状态
总结:@EnableEurekaClient,触发了一个EurekaClientAutoConfiguration类的执行,完成从application.yml中读取配置,完成DiscoveryClient的初始化和启动,通过自己额外加的一些代码,一启动,直接触发一次register()服务注册,向eureka server完成一次注册。
- 原文作者:
- 原文链接:https://leyou240.github.io/post/sc_eureka/
- 版权声明:本作品采用知识共享署名-非商业性使用-禁止演绎 4.0 国际许可协议进行许可,非商业转载请注明出处(作者,原文链接),商业转载请联系作者获得授权。