Netflix Eureka原始碼分析(19)——eureka server叢集機制原始碼剖析:登錄檔同步以及高可用
(1)eureka core的BootStrap裡面,有一塊程式碼,是PeerEurekaNodes的程式碼,其實是在處理eureka server叢集資訊的初始化,會執行PeerEurekaNodes.start()方法
public class EurekaBootStrap implements ServletContextListener { protected void initEurekaServerContext() throws Exception { PeerEurekaNodes peerEurekaNodes = getPeerEurekaNodes( registry, eurekaServerConfig, eurekaClient.getEurekaClientConfig(), serverCodecs, applicationInfoManager ); serverContext = new DefaultEurekaServerContext( eurekaServerConfig, serverCodecs, registry, peerEurekaNodes, applicationInfoManager ); //將serverContext放在了一個holder中 EurekaServerContextHolder.initialize(serverContext); //serverContext初始化 serverContext.initialize(); } }
@Singleton public class DefaultEurekaServerContext implements EurekaServerContext { @PostConstruct @Override public void initialize() throws Exception { logger.info("Initializing ..."); //定時更新eureka server叢集的資訊 peerEurekaNodes.start(); //eureka server的登錄檔資訊初始化 registry.init(peerEurekaNodes); logger.info("Initialized"); } }
解析配置檔案中的其他eureka server的url地址,基於url地址構造一個一個的PeerEurekaNode,一個PeerEurekaNode就代表了一個eureka server。啟動一個後臺的執行緒,預設是每隔10分鐘,會執行一個任務,就是基於配置檔案中的url來重新整理eureka server列表。
@Singleton public class PeerEurekaNodes { public void start() { taskExecutor = Executors.newSingleThreadScheduledExecutor( new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread thread = new Thread(r, "Eureka-PeerNodesUpdater"); thread.setDaemon(true); return thread; } } ); try { //基於配置檔案中的url來重新整理eureka server列表 updatePeerEurekaNodes(resolvePeerUrls()); Runnable peersUpdateTask = new Runnable() { @Override public void run() { try { //基於配置檔案中的url來重新整理eureka server列表 updatePeerEurekaNodes(resolvePeerUrls()); } catch (Throwable e) { logger.error("Cannot update the replica Nodes", e); } } }; taskExecutor.scheduleWithFixedDelay( peersUpdateTask, serverConfig.getPeerEurekaNodesUpdateIntervalMs(), serverConfig.getPeerEurekaNodesUpdateIntervalMs(), TimeUnit.MILLISECONDS ); } catch (Exception e) { throw new IllegalStateException(e); } for (PeerEurekaNode node : peerEurekaNodes) { logger.info("Replica node URL: " + node.getServiceUrl()); } } protected void updatePeerEurekaNodes(List<String> newPeerUrls) { if (newPeerUrls.isEmpty()) { logger.warn("The replica size seems to be empty. Check the route 53 DNS Registry"); return; } Set<String> toShutdown = new HashSet<>(peerEurekaNodeUrls); toShutdown.removeAll(newPeerUrls); Set<String> toAdd = new HashSet<>(newPeerUrls); toAdd.removeAll(peerEurekaNodeUrls); if (toShutdown.isEmpty() && toAdd.isEmpty()) { // No change return; } // Remove peers no long available List<PeerEurekaNode> newNodeList = new ArrayList<>(peerEurekaNodes); if (!toShutdown.isEmpty()) { logger.info("Removing no longer available peer nodes {}", toShutdown); int i = 0; while (i < newNodeList.size()) { PeerEurekaNode eurekaNode = newNodeList.get(i); if (toShutdown.contains(eurekaNode.getServiceUrl())) { newNodeList.remove(i); eurekaNode.shutDown(); } else { i++; } } } // Add new peers if (!toAdd.isEmpty()) { logger.info("Adding new peer nodes {}", toAdd); for (String peerUrl : toAdd) { newNodeList.add(createPeerEurekaNode(peerUrl)); } } this.peerEurekaNodes = newNodeList; this.peerEurekaNodeUrls = new HashSet<>(newPeerUrls); } }
(2)registry.syncUp()
就是說,當前這個eureka server會從任何一個其他的eureka server拉取登錄檔過來放在自己本地,作為初始的登錄檔。將自己作為一個eureka client,找任意一個eureka server來拉取登錄檔,將拉取到的登錄檔放到自己本地去。
// Copy registry from neighboring eureka node
//從相鄰的eureka server節點,拷貝過來登錄檔
int registryCount = registry.syncUp();
eurekaClient.getApplications();//獲取所有當前註冊的所有服務例項
eureka server自己本身本來就是個eureka client,在初始化的時候,就會去找任意的一個eureka server拉取登錄檔到自己本地來,把這個登錄檔放到自己身上來,作為自己這個eureka server的登錄檔
@Singleton
public class PeerAwareInstanceRegistryImpl extends AbstractInstanceRegistry implements PeerAwareInstanceRegistry {
@Override
public int syncUp() {
// Copy entire entry from neighboring DS node
int count = 0;
for (int i = 0; ((i < serverConfig.getRegistrySyncRetries()) && (count == 0)); i++) {
if (i > 0) {
try {
Thread.sleep(serverConfig.getRegistrySyncRetryWaitMs());
} catch (InterruptedException e) {
logger.warn("Interrupted during registry transfer..");
break;
}
}
//獲取所有當前註冊的所有服務例項
Applications apps = eurekaClient.getApplications();
for (Application app : apps.getRegisteredApplications()) {
for (InstanceInfo instance : app.getInstances()) {
try {
if (isRegisterable(instance)) {
//把拉取的登錄檔服務例項全部註冊到自己身上,作為自己這個eureka server的登錄檔
register(instance, instance.getLeaseInfo().getDurationInSecs(), true);
count++;
}
} catch (Throwable t) {
logger.error("During DS init copy", t);
}
}
}
}
return count;
}
}
(3)註冊、下線、故障、心跳
如何從一臺eureka server同步到另外一臺eureka server上去的
ApplicationResource的addInstance()方法,負責註冊,現在自己本地完成一個註冊,接著會replicateToPeers()方法,這個方法就會將這次註冊請求,同步到其他所有的eureka server上去。。。
如果是某臺eureka client來找eureka server進行註冊,isReplication是false,此時會給其他所有的你配置的eureka server都同步這個註冊請求,此時一定會基於jersey,呼叫其他所有的eureka server的restful介面,去執行這個服務例項的註冊的請求
eureka-core-jersey2的工程,ReplicationHttpClient,此時同步註冊請求給其他eureka server的時候,一定會將isReplication設定為true,這個東西可以確保說什麼呢,其他eureka server接到這個同步的請求,僅僅在自己本地執行,不會再次向其他的eureka server去進行註冊
@Singleton
public class PeerAwareInstanceRegistryImpl extends AbstractInstanceRegistry implements PeerAwareInstanceRegistry {
@Override
public void register(final InstanceInfo info, final boolean isReplication) {
int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS;
if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) {
leaseDuration = info.getLeaseInfo().getDurationInSecs();
}
super.register(info, leaseDuration, isReplication);
//同步到其他所有的eureka server上去
replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication);
}
/**
* Replicates all eureka actions to peer eureka nodes except for replication
* traffic to this node.
*
*/
private void replicateToPeers(Action action, String appName, String id,
InstanceInfo info /* optional */,
InstanceStatus newStatus /* optional */, boolean isReplication) {
Stopwatch tracer = action.getTimer().start();
try {
if (isReplication) {
//每傳送一次心跳,currentBucket就累加一次
numberOfReplicationsLastMin.increment();
}
// If it is a replication already, do not replicate again as this will create a poison replication
//如果已經複製了,不要再次複製,因為這會建立一個糟糕的複製
if (peerEurekaNodes == Collections.EMPTY_LIST || isReplication) {
return;
}
for (final PeerEurekaNode node : peerEurekaNodes.getPeerEurekaNodes()) {
// If the url represents this host, do not replicate to yourself.
if (peerEurekaNodes.isThisMyUrl(node.getServiceUrl())) {
continue;
}
replicateInstanceActionsToPeers(action, appName, id, info, newStatus, node);
}
} finally {
tracer.stop();
}
}
}
總結:eureka server叢集機制 流程圖