溫馨提示:

本文內容基於個人學習Nacos 2.0.1版本程式碼總結而來,因個人理解差異,不保證完全正確。如有理解錯誤之處歡迎各位拍磚指正,相互學習;轉載請註明出處。

Nacos服務端在處理健康檢查和心跳檢查任務的時候它是使用攔截器鏈來執行的。攔截器鏈內部有多個攔截器,通過獲取不同的攔截器鏈例項,在例項內部指定具體的攔截器型別來組成一組攔截器。這裡使用了攔截器模式和模板模式來組織程式碼。攔截器模式體現在整體攔截機制的實現;模板模式主要體現在對攔截器鏈的抽象實現上。

攔截器模式有三個要素

  • 攔截器
  • 排程者
  • 業務邏輯

攔截器

定義一個攔截器的基本功能,同時限定了傳入的攔截物件型別必須為Interceptable。這裡只定義了基本的功能和基本的限定攔截物件。這裡將其描述為基本的功能,那就意味著它的實現將會有更高階的功能。

/**
* Nacos naming interceptor.
* 攔截器物件
* @author xiweng.yy
*/
public interface NacosNamingInterceptor<T extends Interceptable> { /**
* Judge whether the input type is intercepted by this Interceptor.
* 此攔截器的例項將會判斷傳入的物件是否是他需要處理的型別,此方法可以實現不同攔截器處理不同物件的隔離操作
* <p>This method only should judge the object type whether need be do intercept. Not the intercept logic.
* @param type type
* @return true if the input type is intercepted by this Interceptor, otherwise false
*/
boolean isInterceptType(Class<?> type); /**
* Do intercept operation.
* 執行攔截操作
* <p>This method is the actual intercept operation.
* @param object need intercepted object
* @return true if object is intercepted, otherwise false
*/
boolean intercept(T object); /**
* The order of interceptor. The lower the number, the earlier the execution.
* 攔截器排序,數字越低,優先順序越高
* @return the order number of interceptor
*/
int order();
}

被攔截的物件

Interceptable 定義了對攔截操作相關的執行方法,passIntercept()在未被攔截的時候需要執行,afterIntercept()在被攔截之後需要執行。被攔截物件的業務邏輯需要由攔截器負責排程。

/**
* Interceptable Interface.
*
* @author xiweng.yy
*/
public interface Interceptable { /**
* If no {@link NacosNamingInterceptor} intercept this object, this method will be called to execute.
*/
void passIntercept(); /**
* If one {@link NacosNamingInterceptor} intercept this object, this method will be called.
*/
void afterIntercept();
}

排程者

排程者主要是用來管理攔截器的組織方式,觸發攔截器的攔截操作。下圖展示了Naming模組的攔截器鏈的繼承關係。

整體的構成由NacosNamingInterceptorChain定義基本框架,AbstractNamingInterceptorChain實現通用邏輯,HealthCheckInterceptorChainInstanceBeatCheckTaskInterceptorChain則分別服務於健康檢查和心跳檢查。

NacosNamingInterceptorChain

定義了攔截器鏈物件應該具有的基本行為:新增攔截器、執行攔截器。

/**
* Nacos naming interceptor chain.
* Nacos Naming模組的攔截器連結口,攔截器鏈用於儲存並管理多個攔截器
* @author xiweng.yy
*/
public interface NacosNamingInterceptorChain<T extends Interceptable> { /**
* Add interceptor.
* 新增指定型別的攔截器物件
* @param interceptor interceptor
*/
void addInterceptor(NacosNamingInterceptor<T> interceptor); /**
* Do intercept by added interceptors.
* 執行攔截的業務操作
* @param object be interceptor object
*/
void doInterceptor(T object);
}

AbstractNamingInterceptorChain

AbstractNamingInterceptorChain實現了NacosNamingInterceptorChain所定義的對NacosNamingInterceptor的操作。在構造方法中提供了具體的攔截器實現類的載入,它這裡使用了SPI方式載入。預設可以載入的攔截器必須是NacosNamingInterceptor的例項。在攔截器的執行方法doInterceptor()中會按優先順序呼叫每一個攔截器,首先判斷被攔截的物件是否是此攔截器處理,接著呼叫攔截器的intercept()方法,成功後呼叫被攔截物件的afterIntercept()方法。若未攔截成功則呼叫被攔截物件的passIntercept()方法。因此在攔截器中的intercept()方法中可以定義攔截器對被攔截物件的處理邏輯,而被攔截物件則可以在afterIntercept()和passIntercept()方法中定義自身的處理邏輯。從而實現在攔截器中被處理和自身處理任務依賴於攔截器來觸發。

/**
* Abstract Naming Interceptor Chain.
* 抽象的命名服務攔截器鏈,用於定義攔截器鏈的工作流程
* @author xiweng.yy
*/
public abstract class AbstractNamingInterceptorChain<T extends Interceptable> implements NacosNamingInterceptorChain<T> { // 儲存多個攔截器
private final List<NacosNamingInterceptor<T>> interceptors; // 限制使用範圍為當前包或者其子類
protected AbstractNamingInterceptorChain(Class<? extends NacosNamingInterceptor<T>> clazz) {
this.interceptors = new LinkedList<>();
// 使用SPI模式載入指定的攔截器型別
// 而且NacosNamingInterceptor內部有判斷它需要攔截物件的型別,因此非常靈活
interceptors.addAll(NacosServiceLoader.load(clazz));
// 對攔截器的順序進行排序
interceptors.sort(Comparator.comparingInt(NacosNamingInterceptor::order));
} /**
* Get all interceptors.
*
* @return interceptors list
*/
protected List<NacosNamingInterceptor<T>> getInterceptors() {
return interceptors;
} @Override
public void addInterceptor(NacosNamingInterceptor<T> interceptor) {
// 若手動新增,則需要再次進行排序
interceptors.add(interceptor);
interceptors.sort(Comparator.comparingInt(NacosNamingInterceptor::order));
} @Override
public void doInterceptor(T object) {
// 因為內部的攔截器已經排序過了,所以直接遍歷
for (NacosNamingInterceptor<T> each : interceptors) {
// 若當前攔截的物件不是當前攔截器所要處理的型別則調過
if (!each.isInterceptType(object.getClass())) {
continue;
}
// 執行攔截操作成功之後,繼續執行攔截後操作
if (each.intercept(object)) {
object.afterIntercept();
return;
}
}
// 未攔截的操作
object.passIntercept();
}
}

doInterceptor() 方法中使用當前攔截器鏈內部的所有攔截器對被攔截物件進行處理,並且組織了被攔截物件被攔截之後的方法呼叫流程。即:攔截之後執行被攔截物件的afterIntercept()方法,未攔截時執行passIntercept()方法。

HealthCheckInterceptorChain

健康檢查攔截器鏈負責載入AbstractHealthCheckInterceptor型別的攔截器。

/**
* Health check interceptor chain.
* @author xiweng.yy
*/
public class HealthCheckInterceptorChain extends AbstractNamingInterceptorChain<NacosHealthCheckTask> { private static final HealthCheckInterceptorChain INSTANCE = new HealthCheckInterceptorChain(); private HealthCheckInterceptorChain() {
super(AbstractHealthCheckInterceptor.class);
} public static HealthCheckInterceptorChain getInstance() {
return INSTANCE;
}
}

InstanceBeatCheckTaskInterceptorChain

例項心跳檢查器鏈負責載入AbstractBeatCheckInterceptor型別的攔截器。

/**
* Instance beat check interceptor chain.
*
* @author xiweng.yy
*/
public class InstanceBeatCheckTaskInterceptorChain extends AbstractNamingInterceptorChain<InstanceBeatCheckTask> { private static final InstanceBeatCheckTaskInterceptorChain INSTANCE = new InstanceBeatCheckTaskInterceptorChain(); private InstanceBeatCheckTaskInterceptorChain() {
super(AbstractBeatCheckInterceptor.class);
} public static InstanceBeatCheckTaskInterceptorChain getInstance() {
return INSTANCE;
}
}

小結

通過模板模式來實現攔截器機制。

  • AbstractNamingInterceptorChain 抽象出聯結器鏈對攔截器載入的通用方法,定義了攔截器對被攔截物件的通用處理流程。
  • AbstractHealthCheckInterceptor 定義了健康檢查攔截器被攔截的物件型別
  • AbstractBeatCheckInterceptor 定義了心跳檢查攔截器被攔截的物件型別

通過對攔截器鏈的組織方式梳理可以看到有明顯的兩條路線,一個是健康檢查,一個是心跳檢查。分析後續具體的攔截器,以及他們所要處理的任務就很清晰了。

業務邏輯

業務邏輯是被攔截器攔截之後需要進行的操作。

健康檢查類的被攔截物件

健康檢查的抽象攔截器AbstractHealthCheckInterceptor定義了它的子類將要處理的任務型別為NacosHealthCheckTask

HealthCheckTaskV2

/**
* Health check task for v2.x.
* v2版本的健康檢查
* <p>Current health check logic is same as v1.x. TODO refactor health check for v2.x.
*
* @author nacos
*/
public class HealthCheckTaskV2 extends AbstractExecuteTask implements NacosHealthCheckTask { /**
* 一個客戶端物件(此客戶端代表提供服務用於被應用訪問的客戶端)
* 從這裡可以看出,啟動一個健康檢查任務是以客戶端為維度的
*/
private final IpPortBasedClient client; private final String taskId; private final SwitchDomain switchDomain; private final NamingMetadataManager metadataManager; private long checkRtNormalized = -1;
/**
* 檢查最佳響應時間
*/
private long checkRtBest = -1; /**
* 檢查最差響應時間
*/
private long checkRtWorst = -1; /**
* 檢查上次響應時間
*/
private long checkRtLast = -1; /**
* 檢查上上次響應時間
*/
private long checkRtLastLast = -1; /**
* 開始時間
*/
private long startTime; /**
* 任務是否取消
*/
private volatile boolean cancelled = false; public HealthCheckTaskV2(IpPortBasedClient client) {
this.client = client;
this.taskId = client.getResponsibleId();
this.switchDomain = ApplicationUtils.getBean(SwitchDomain.class);
this.metadataManager = ApplicationUtils.getBean(NamingMetadataManager.class);
// 初始化響應時間檢查
initCheckRT();
} /**
* 初始化響應時間值
*/
private void initCheckRT() {
// first check time delay
// 2000 + (在5000以內的隨機數)
checkRtNormalized =
2000 + RandomUtils.nextInt(0, RandomUtils.nextInt(0, switchDomain.getTcpHealthParams().getMax()));
// 最佳響應時間
checkRtBest = Long.MAX_VALUE;
// 最差響應時間為0
checkRtWorst = 0L;
} public IpPortBasedClient getClient() {
return client;
} @Override
public String getTaskId() {
return taskId;
} /**
* 開始執行健康檢查任務
*/
@Override
public void doHealthCheck() {
try {
// 獲取當前傳入的Client所釋出的所有Service
for (Service each : client.getAllPublishedService()) {
// 只有當Service開啟了健康檢查才執行
if (switchDomain.isHealthCheckEnabled(each.getGroupedServiceName())) {
// 獲取Service對應的InstancePublishInfo
InstancePublishInfo instancePublishInfo = client.getInstancePublishInfo(each);
// 獲取叢集元資料
ClusterMetadata metadata = getClusterMetadata(each, instancePublishInfo);
// 使用Processor代理物件對任務進行處理
ApplicationUtils.getBean(HealthCheckProcessorV2Delegate.class).process(this, each, metadata);
if (Loggers.EVT_LOG.isDebugEnabled()) {
Loggers.EVT_LOG.debug("[HEALTH-CHECK] schedule health check task: {}", client.getClientId());
}
}
}
} catch (Throwable e) {
Loggers.SRV_LOG.error("[HEALTH-CHECK] error while process health check for {}", client.getClientId(), e);
} finally {
// 若任務執行狀態為已取消,則再次啟動
if (!cancelled) {
HealthCheckReactor.scheduleCheck(this);
// worst == 0 means never checked
if (this.getCheckRtWorst() > 0) {
// TLog doesn't support float so we must convert it into long
long checkRtLastLast = getCheckRtLastLast();
this.setCheckRtLastLast(this.getCheckRtLast());
if (checkRtLastLast > 0) {
long diff = ((this.getCheckRtLast() - this.getCheckRtLastLast()) * 10000) / checkRtLastLast;
if (Loggers.CHECK_RT.isDebugEnabled()) {
Loggers.CHECK_RT.debug("{}->normalized: {}, worst: {}, best: {}, last: {}, diff: {}",
client.getClientId(), this.getCheckRtNormalized(), this.getCheckRtWorst(),
this.getCheckRtBest(), this.getCheckRtLast(), diff);
}
}
}
}
}
} @Override
public void passIntercept() {
doHealthCheck();
} @Override
public void afterIntercept() {
// 若任務執行狀態為已取消,則再次啟動
if (!cancelled) {
HealthCheckReactor.scheduleCheck(this);
}
} @Override
public void run() {
// 呼叫健康檢查
doHealthCheck();
} /**
* 獲取叢集元資料
* @param service 服務資訊
* @param instancePublishInfo 服務對應的ip等資訊
* @return
*/
private ClusterMetadata getClusterMetadata(Service service, InstancePublishInfo instancePublishInfo) {
Optional<ServiceMetadata> serviceMetadata = metadataManager.getServiceMetadata(service);
if (!serviceMetadata.isPresent()) {
return new ClusterMetadata();
}
String cluster = instancePublishInfo.getCluster();
ClusterMetadata result = serviceMetadata.get().getClusters().get(cluster);
return null == result ? new ClusterMetadata() : result;
} public long getCheckRtNormalized() {
return checkRtNormalized;
} public long getCheckRtBest() {
return checkRtBest;
} public long getCheckRtWorst() {
return checkRtWorst;
} public void setCheckRtWorst(long checkRtWorst) {
this.checkRtWorst = checkRtWorst;
} public void setCheckRtBest(long checkRtBest) {
this.checkRtBest = checkRtBest;
} public void setCheckRtNormalized(long checkRtNormalized) {
this.checkRtNormalized = checkRtNormalized;
} public boolean isCancelled() {
return cancelled;
} public void setCancelled(boolean cancelled) {
this.cancelled = cancelled;
} public long getStartTime() {
return startTime;
} public void setStartTime(long startTime) {
this.startTime = startTime;
} public long getCheckRtLast() {
return checkRtLast;
} public void setCheckRtLast(long checkRtLast) {
this.checkRtLast = checkRtLast;
} public long getCheckRtLastLast() {
return checkRtLastLast;
} public void setCheckRtLastLast(long checkRtLastLast) {
this.checkRtLastLast = checkRtLastLast;
}
}

心跳檢查類的被攔截物件

ClientBeatCheckTaskV2

雖然它繼承了NacosHealthCheckTask,但內部只使用了InstanceBeatCheckTaskInterceptorChain,沒有使用HealthCheckInterceptorChain, 按理說應該劃分到"心跳檢查類的被攔截物件" 這個類別的。不知道為何這樣設計,已提issues。

/**
* Client beat check task of service for version 2.x.
* @author nkorange
*/
public class ClientBeatCheckTaskV2 extends AbstractExecuteTask implements BeatCheckTask, NacosHealthCheckTask { private final IpPortBasedClient client; private final String taskId; /**
* 使用攔截器鏈
*/
private final InstanceBeatCheckTaskInterceptorChain interceptorChain; public ClientBeatCheckTaskV2(IpPortBasedClient client) {
this.client = client;
this.taskId = client.getResponsibleId();
this.interceptorChain = InstanceBeatCheckTaskInterceptorChain.getInstance();
} public GlobalConfig getGlobalConfig() {
return ApplicationUtils.getBean(GlobalConfig.class);
} @Override
public String taskKey() {
return KeyBuilder.buildServiceMetaKey(client.getClientId(), String.valueOf(client.isEphemeral()));
} @Override
public String getTaskId() {
return taskId;
} @Override
public void doHealthCheck() { try {
// 獲取所有的Service
Collection<Service> services = client.getAllPublishedService();
for (Service each : services) {
logger.info("開始對Service進行攔截操作,{}", each.getName());
// 獲取Service對應的InstancePublishInfo
HealthCheckInstancePublishInfo instance = (HealthCheckInstancePublishInfo) client.getInstancePublishInfo(each);
// 建立一個InstanceBeatCheckTask,並交由攔截器鏈處理
interceptorChain.doInterceptor(new InstanceBeatCheckTask(client, each, instance));
} } catch (Exception e) {
Loggers.SRV_LOG.warn("Exception while processing client beat time out.", e);
}
} @Override
public void run() {
doHealthCheck();
} @Override
public void passIntercept() {
doHealthCheck();
} @Override
public void afterIntercept() {
}
}

InstanceBeatCheckTask

/**
* Instance beat check task.
* Instance心跳檢查任務,此處它作為一個可被攔截器攔截的物件使用。
* @author xiweng.yy
*/
public class InstanceBeatCheckTask implements Interceptable { // 心跳檢查者列表
private static final List<InstanceBeatChecker> CHECKERS = new LinkedList<>(); // 客戶端物件(因為例項就代表的是客戶端)
private final IpPortBasedClient client; // 服務物件
private final Service service; // 健康檢查資訊
private final HealthCheckInstancePublishInfo instancePublishInfo; static {
// 新增不健康例項檢查器
CHECKERS.add(new UnhealthyInstanceChecker());
// 新增過期例項檢查器
CHECKERS.add(new ExpiredInstanceChecker());
// 新增使用者自定義的心跳檢查器
CHECKERS.addAll(NacosServiceLoader.load(InstanceBeatChecker.class));
} public InstanceBeatCheckTask(IpPortBasedClient client, Service service, HealthCheckInstancePublishInfo instancePublishInfo) {
this.client = client;
this.service = service;
this.instancePublishInfo = instancePublishInfo;
} @Override
public void passIntercept() {
// 未被攔截的時候執行自身邏輯
for (InstanceBeatChecker each : CHECKERS) {
each.doCheck(client, service, instancePublishInfo);
}
} @Override
public void afterIntercept() {
} public IpPortBasedClient getClient() {
return client;
} public Service getService() {
return service;
} public HealthCheckInstancePublishInfo getInstancePublishInfo() {
return instancePublishInfo;
}
}

總結

  • 攔截器鏈確定了要載入的攔截器型別
  • 攔截器確定了要攔截的物件型別
  • 被攔截的物件又建立了自己的檢查策略