nacos注册中心初始化
注册中心启动类就一个springboot应用,看不出什么东西
@EnableScheduling
@SpringBootApplication(scanBasePackages = {"com.alibaba.nacos.naming", "com.alibaba.nacos.core"})
public class NamingApp {
public static void main(String[] args) {
SpringApplication.run(NamingApp.class, args);
}
}
通过官方文档已经一些分析,得知底层是用了raft协议来做集群同步
//PersistentConsistencyServiceDelegateImpl
private BasePersistentServiceProcessor createNewPersistentServiceProcessor(ProtocolManager protocolManager,
ClusterVersionJudgement versionJudgement) throws Exception {
//根据相关配置,判断到底是standlone还是raft模式
final BasePersistentServiceProcessor processor =
EnvUtil.getStandaloneMode() ? new StandalonePersistentServiceProcessor(versionJudgement)
: new PersistentServiceProcessor(protocolManager, versionJudgement);
//调用基类的方法
processor.afterConstruct();
return processor;
}
具体的代码逻辑
//PersistentServiceProcessor
@Override
public void afterConstruct() {
super.afterConstruct();
this.protocol.addRequestProcessors(Collections.singletonList(this));
//监听元数据的状态
this.protocol.protocolMetaData()
.subscribe(Constants.NAMING_PERSISTENT_SERVICE_GROUP, MetadataKey.LEADER_META_DATA,
(o, arg) -> hasLeader = StringUtils.isNotBlank(String.valueOf(arg)));
// If you choose to use the new RAFT protocol directly, there will be no compatible logical execution
if (EnvUtil.getProperty(Constants.NACOS_NAMING_USE_NEW_RAFT_FIRST, Boolean.class, false)) {
NotifyCenter.registerSubscriber(notifier);
waitLeader();
startNotify = true;
}
}
//等待leader选举完成
private void waitLeader() {
while (!hasLeader && !hasError) {
Loggers.RAFT.info("Waiting Jraft leader vote ...");
try {
TimeUnit.MILLISECONDS.sleep(500);
} catch (InterruptedException ignored) {
}
}
}
再看raft相关的代码
@PostConstruct
public void init() throws Exception {
Loggers.RAFT.info("initializing Raft sub-system");
final long start = System.currentTimeMillis();
raftStore.loadDatums(notifier, datums);
setTerm(NumberUtils.toLong(raftStore.loadMeta().getProperty("term"), 0L));
Loggers.RAFT.info("cache loaded, datum count: {}, current term: {}", datums.size(), peers.getTerm());
initialized = true;
Loggers.RAFT.info("finish to load data from disk, cost: {} ms.", (System.currentTimeMillis() - start));
//选举
masterTask = GlobalExecutor.registerMasterElection(new MasterElection());
//心跳
heartbeatTask = GlobalExecutor.registerHeartbeat(new HeartBeat());
versionJudgement.registerObserver(isAllNewVersion -> {
stopWork = isAllNewVersion;
if (stopWork) {
try {
shutdown();
raftListener.removeOldRaftMetadata();
} catch (NacosException e) {
throw new NacosRuntimeException(NacosException.SERVER_ERROR, e);
}
}
}, 100);
NotifyCenter.registerSubscriber(notifier);
Loggers.RAFT.info("timer started: leader timeout ms: {}, heart-beat timeout ms: {}",
GlobalExecutor.LEADER_TIMEOUT_MS, GlobalExecutor.HEARTBEAT_INTERVAL_MS);
}
从后置处理器触发选举,心跳等逻辑。达到大于集群半数节点即可选出leader。
所有的选举都通过http协议提交,详见RaftController中的/vote接口
public synchronized RaftPeer receivedVote(RaftPeer remote) {
if (stopWork) {
throw new IllegalStateException("old raft protocol already stop work");
}
if (!peers.contains(remote)) {
throw new IllegalStateException("can not find peer: " + remote.ip);
}
RaftPeer local = peers.get(NetUtils.localServer());
//不符合条件的投票,此时投票给自己
if (remote.term.get() <= local.term.get()) {
String msg = "received illegitimate vote" + ", voter-term:" + remote.term + ", votee-term:" + local.term;
Loggers.RAFT.info(msg);
if (StringUtils.isEmpty(local.voteFor)) {
local.voteFor = local.ip;
}
return local;
}
//修改自己的下次投票时间及其他信息,并投票给remote
local.resetLeaderDue();
local.state = RaftPeer.State.FOLLOWER;
local.voteFor = remote.ip;
local.term.set(remote.term.get());
Loggers.RAFT.info("vote {} as leader, term: {}", remote.ip, remote.term);
return local;
}
- 原文作者:
- 原文链接:https://leyou240.github.io/post/sca_01_nacos%E6%B3%A8%E5%86%8C%E4%B8%AD%E5%BF%83%E5%88%9D%E5%A7%8B%E5%8C%96/
- 版权声明:本作品采用知识共享署名-非商业性使用-禁止演绎 4.0 国际许可协议进行许可,非商业转载请注明出处(作者,原文链接),商业转载请联系作者获得授权。