溫馨提示:
本文內容基於個人學習Nacos 2.0.1版本程式碼總結而來,因個人理解差異,不保證完全正確。如有理解錯誤之處歡迎各位拍磚指正,相互學習;轉載請註明出處。
什麼是Distro協議
今天來分析Nacos中使用的一種叫作Distro的協議,Distro是阿里巴巴內部使用的一種協議,用於實現分散式環境下的資料一致性。協議約定了節點之間通訊的資料格式,資料解析規則,資料傳輸載體。它是一種臨時資料一致性協議,所管理的資料僅保留在記憶體中。
Distro協議用來做什麼
Nacos作為一個分散式服務管理平臺(其最主要的功能之一),在分散式環境下每個節點上面的服務資訊都會有不同的狀態,當服務的可用狀態變更等一系列的問題都需要通知其他節點,每個節點上的服務列表也需要進行同步。Distro協議就是用於在不同節點之間同步節點之間的服務。除了字面上的同步之外,它還負責向其他節點報告自身的服務狀態。事實上也可以看做是一種同步。
本篇內容不設計該協議的具體操作,僅從Nacos中所有關於Distro的類中來看看它能做什麼。通過在Idea內搜尋Distro開頭的類可以發現它有30個類,分別分佈在nacos-core
、nacos-naming
、nacos-config
模組中。本篇只分析nacos-core
模組下的內容,因為它已經覆蓋了Distro協議的完整流程。
提示:
這裡可以先記住一個關鍵詞同步
。所謂的同步無非就是從遠端獲取資料到本地,或者是從本地傳送資料到遠端,同步的資料在這裡肯定就是服務
相關的了。畢竟在官方文件中都是這樣寫的:”服務(Service)是 Nacos 世界的一等公民”。
本篇介紹的所有內容均是為了服務於同步
這個概念的。
Distro協議的核心元件
在nacos-core
模組下,定義了Distro協議的所有元件。
distro
component Distro的一些元件,例如資料儲存物件、資料處理器、資料傳輸代理等
entity 實體物件
exception 異常處理
task 任務相關
delay 延遲任務相關元件
execute 任務執行器相關元件
load 載入任務相關元件
verify 驗證任務相關元件
DistroConfig.java Distro配置資訊
DistroConstants.java Distro常量
DistroProtocol.java Distro協議入口
com.alibaba.nacos.core.distributed.distro.component
DistroCallback
Distro回撥介面,用於非同步處理之後需要回調的場景。
public interface DistroCallback {
/**
* Callback when distro task execute successfully.
*/
void onSuccess();
/**
* Callback when distro task execute failed.
*
* @param throwable throwable if execute failed caused by exception
*/
void onFailed(Throwable throwable);
}
DistroComponentHolder
Distro元件持有者,它內部定義了一些容器(HashMap)來儲存Distro協議需要用到的資料,相當於一個大管家。
@Component
public class DistroComponentHolder {
// 儲存不同型別的DistroData傳輸物件
private final Map<String, DistroTransportAgent> transportAgentMap = new HashMap<>();
// 儲存不同型別的DistroData裝載容器
private final Map<String, DistroDataStorage> dataStorageMap = new HashMap<>();
// 儲存不同型別的Distro失敗任務處理器
private final Map<String, DistroFailedTaskHandler> failedTaskHandlerMap = new HashMap<>();
// 儲存不同型別的DistroData資料處理器
private final Map<String, DistroDataProcessor> dataProcessorMap = new HashMap<>();
public DistroTransportAgent findTransportAgent(String type) {
return transportAgentMap.get(type);
}
public void registerTransportAgent(String type, DistroTransportAgent transportAgent) {
transportAgentMap.put(type, transportAgent);
}
public DistroDataStorage findDataStorage(String type) {
return dataStorageMap.get(type);
}
public void registerDataStorage(String type, DistroDataStorage dataStorage) {
dataStorageMap.put(type, dataStorage);
}
public Set<String> getDataStorageTypes() {
return dataStorageMap.keySet();
}
public DistroFailedTaskHandler findFailedTaskHandler(String type) {
return failedTaskHandlerMap.get(type);
}
public void registerFailedTaskHandler(String type, DistroFailedTaskHandler failedTaskHandler) {
failedTaskHandlerMap.put(type, failedTaskHandler);
}
public void registerDataProcessor(DistroDataProcessor dataProcessor) {
dataProcessorMap.putIfAbsent(dataProcessor.processType(), dataProcessor);
}
public DistroDataProcessor findDataProcessor(String processType) {
return dataProcessorMap.get(processType);
}
}
DistroDataProcessor
用於處理Distro協議的資料物件。
/**
* Distro data processor.
*
* @author xiweng.yy
*/
public interface DistroDataProcessor {
/**
* Process type of this processor.
* 當前處理器可處理的型別
* @return type of this processor
*/
String processType();
/**
* Process received data.
* 處理接收到的資料
* @param distroData received data 接收到的資料物件
* @return true if process data successfully, otherwise false
*/
boolean processData(DistroData distroData);
/**
* Process received verify data.
* 處理接收到的驗證型別的資料
* @param distroData verify data 被處理的資料
* @param sourceAddress source server address, might be get data from source server 被處理資料的來源伺服器
* @return true if the data is available, otherwise false
*/
boolean processVerifyData(DistroData distroData, String sourceAddress);
/**
* Process snapshot data.
* 處理快照資料
* @param distroData snapshot data
* @return true if process data successfully, otherwise false
*/
boolean processSnapshot(DistroData distroData);
}
DiustroDataStorage
DistroData的儲存器
public interface DistroDataStorage {
/**
* Set this distro data storage has finished initial step.
* 設定當前儲存器已經初始化完畢它內部的DistroData
*/
void finishInitial();
/**
* Whether this distro data is finished initial.
* 當前儲存器是否已經初始化完畢內部的DistroData
* <p>If not finished, this data storage should not send verify data to other node.
*
* @return {@code true} if finished, otherwise false
*/
boolean isFinishInitial();
/**
* Get distro datum.
* 獲取內部的DistroData
* @param distroKey key of distro datum 資料對應的key
* @return need to sync datum
*/
DistroData getDistroData(DistroKey distroKey);
/**
* Get all distro datum snapshot.
* 獲取內部儲存的所有DistroData
* @return all datum
*/
DistroData getDatumSnapshot();
/**
* Get verify datum.
* 獲取所有的DistroData用於驗證
* @return verify datum
*/
List<DistroData> getVerifyData();
}
DistroFailedTaskHandler
用於Distro任務失敗重試
public interface DistroFailedTaskHandler {
/**
* Build retry task when distro task execute failed.
* 當Distro任務執行失敗可以建立重試任務
* @param distroKey distro key of failed task 失敗任務的distroKey
* @param action action of task 任務的操作型別
*/
void retry(DistroKey distroKey, DataOperation action);
}
DistroTransportAgent
DistroData的傳輸代理,用於傳送請求。
public interface DistroTransportAgent {
/**
* Whether support transport data with callback.
* 是否支援回撥
* @return true if support, otherwise false
*/
boolean supportCallbackTransport();
/**
* Sync data.
* 同步資料
* @param data data 被同步的資料
* @param targetServer target server同步的目標伺服器
* @return true is sync successfully, otherwise false
*/
boolean syncData(DistroData data, String targetServer);
/**
* Sync data with callback.
* 帶回調的同步方法
* @param data data
* @param targetServer target server
* @param callback callback
* @throws UnsupportedOperationException if method supportCallbackTransport is false, should throw {@code
* UnsupportedOperationException}
*/
void syncData(DistroData data, String targetServer, DistroCallback callback);
/**
* Sync verify data.
* 同步驗證資料
* @param verifyData verify data
* @param targetServer target server
* @return true is verify successfully, otherwise false
*/
boolean syncVerifyData(DistroData verifyData, String targetServer);
/**
* Sync verify data.
* 帶回調的同步驗證資料
* @param verifyData verify data
* @param targetServer target server
* @param callback callback
* @throws UnsupportedOperationException if method supportCallbackTransport is false, should throw {@code
* UnsupportedOperationException}
*/
void syncVerifyData(DistroData verifyData, String targetServer, DistroCallback callback);
/**
* get Data from target server.
* 從遠端節點獲取指定資料
* @param key key of data 需要獲取資料的key
* @param targetServer target server遠端節點地址
* @return distro data
*/
DistroData getData(DistroKey key, String targetServer);
/**
* Get all datum snapshot from target server.
* 從遠端節點獲取全量快照資料
* @param targetServer target server.
* @return distro data
*/
DistroData getDatumSnapshot(String targetServer);
}
com.alibaba.nacos.core.distributed.distro.entity
這裡存放了Distro協議的資料物件。
DistroData
Distro協議的核心物件,協議互動過程中的資料傳輸將使用此物件,它的設計也可以看做是一個容器,後期將會經常看見他。
public class DistroData {
// 資料的key
private DistroKey distroKey;
// 資料的操作型別,也可以理解為是什麼操作產生了此資料,或此資料用於什麼操作
private DataOperation type;
// 資料的位元組陣列
private byte[] content;
public DistroData() {
}
public DistroData(DistroKey distroKey, byte[] content) {
this.distroKey = distroKey;
this.content = content;
}
public DistroKey getDistroKey() {
return distroKey;
}
public void setDistroKey(DistroKey distroKey) {
this.distroKey = distroKey;
}
public DataOperation getType() {
return type;
}
public void setType(DataOperation type) {
this.type = type;
}
public byte[] getContent() {
return content;
}
public void setContent(byte[] content) {
this.content = content;
}
}
DistroKey
DistroData的key物件,可以包含較多的屬性。
public class DistroKey {
// 資料本身的key
private String resourceKey;
// 資料的型別
private String resourceType;
// 資料傳輸的目標伺服器
private String targetServer;
public DistroKey() {
}
public DistroKey(String resourceKey, String resourceType) {
this.resourceKey = resourceKey;
this.resourceType = resourceType;
}
public DistroKey(String resourceKey, String resourceType, String targetServer) {
this.resourceKey = resourceKey;
this.resourceType = resourceType;
this.targetServer = targetServer;
}
public String getResourceKey() {
return resourceKey;
}
public void setResourceKey(String resourceKey) {
this.resourceKey = resourceKey;
}
public String getResourceType() {
return resourceType;
}
public void setResourceType(String resourceType) {
this.resourceType = resourceType;
}
public String getTargetServer() {
return targetServer;
}
public void setTargetServer(String targetServer) {
this.targetServer = targetServer;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
DistroKey distroKey = (DistroKey) o;
return Objects.equals(resourceKey, distroKey.resourceKey) && Objects
.equals(resourceType, distroKey.resourceType) && Objects.equals(targetServer, distroKey.targetServer);
}
@Override
public int hashCode() {
return Objects.hash(resourceKey, resourceType, targetServer);
}
@Override
public String toString() {
return "DistroKey{" + "resourceKey='" + resourceKey + '\'' + ", resourceType='" + resourceType + '\''
+ ", targetServer='" + targetServer + '\'' + '}';
}
}
com.alibaba.nacos.core.distributed.distro.exception
com.alibaba.nacos.core.distributed.distro.task
DistroTaskEngineHolder
Distro任務引擎持有者,用於管理不同型別的任務執行引擎。
@Component
public class DistroTaskEngineHolder {
// 延遲任務執行引擎
private final DistroDelayTaskExecuteEngine delayTaskExecuteEngine = new DistroDelayTaskExecuteEngine();
// 非延遲任務執行引擎
private final DistroExecuteTaskExecuteEngine executeWorkersManager = new DistroExecuteTaskExecuteEngine();
public DistroTaskEngineHolder(DistroComponentHolder distroComponentHolder) {
// 為延遲任務執行引擎新增預設任務處理器
DistroDelayTaskProcessor defaultDelayTaskProcessor = new DistroDelayTaskProcessor(this, distroComponentHolder);
delayTaskExecuteEngine.setDefaultTaskProcessor(defaultDelayTaskProcessor);
}
public DistroDelayTaskExecuteEngine getDelayTaskExecuteEngine() {
return delayTaskExecuteEngine;
}
public DistroExecuteTaskExecuteEngine getExecuteWorkersManager() {
return executeWorkersManager;
}
/**
* 為延遲任務新增預設任務處理器
* @param key 處理器向容器儲存時的key
* @param nacosTaskProcessor 處理器物件
*/
public void registerNacosTaskProcessor(Object key, NacosTaskProcessor nacosTaskProcessor) {
this.delayTaskExecuteEngine.addProcessor(key, nacosTaskProcessor);
}
}
com.alibaba.nacos.core.distributed.distro.task.delay
DistroDelayTask
Distro延遲任務
public class DistroDelayTask extends AbstractDelayTask {
// 當前任務處理資料的key
private final DistroKey distroKey;
// 當前任務處理資料的操作型別
private DataOperation action;
// 當前任務建立的時間
private long createTime;
public DistroDelayTask(DistroKey distroKey, long delayTime) {
this(distroKey, DataOperation.CHANGE, delayTime);
}
// 構造一個延遲任務
public DistroDelayTask(DistroKey distroKey, DataOperation action, long delayTime) {
this.distroKey = distroKey;
this.action = action;
this.createTime = System.currentTimeMillis();
// 建立時設定上次處理的時間
setLastProcessTime(createTime);
// 設定間隔多久執行
setTaskInterval(delayTime);
}
public DistroKey getDistroKey() {
return distroKey;
}
public DataOperation getAction() {
return action;
}
public long getCreateTime() {
return createTime;
}
/**
* 從字面意思是合併任務,實際的操作證明它是用於更新過時的任務
* 在向任務列表新增新的任務時,使用新任務的key來從任務列表獲取,若結果不為空,表明此任務已經存在
* 相同的任務再次新增的話,就重複了,因此再此合併
* 為什麼新的任務會過時?(新任務指的是當前類)
* 想要理解此處邏輯,請參考{@link com.alibaba.nacos.common.task.engine.NacosDelayTaskExecuteEngine#addTask(Object,
* AbstractDelayTask)}.新增任務時是帶鎖操作的。因此新增的先後順序不能保證
* @param task task 已存在的任務
*/
@Override
public void merge(AbstractDelayTask task) {
if (!(task instanceof DistroDelayTask)) {
return;
}
DistroDelayTask oldTask = (DistroDelayTask) task;
// 若舊的任務和新的任務的操作型別不同,並且新任務的建立時間小於舊任務的建立時間,說明當前這個新任務還未被新增成功
// 這個新的任務已經過時了,不需要再執行這個任務的操作,因此將舊的任務的操作型別和建立時間設定給新任務
if (!action.equals(oldTask.getAction()) && createTime < oldTask.getCreateTime()) {
action = oldTask.getAction();
createTime = oldTask.getCreateTime();
}
setLastProcessTime(oldTask.getLastProcessTime());
}
}
DistroDelayTaskExecuteEngine
延遲任務執行引擎
public class DistroDelayTaskExecuteEngine extends NacosDelayTaskExecuteEngine {
public DistroDelayTaskExecuteEngine() {
super(DistroDelayTaskExecuteEngine.class.getName(), Loggers.DISTRO);
}
@Override
public void addProcessor(Object key, NacosTaskProcessor taskProcessor) {
// 構建當前任務的key
Object actualKey = getActualKey(key);
super.addProcessor(actualKey, taskProcessor);
}
@Override
public NacosTaskProcessor getProcessor(Object key) {
Object actualKey = getActualKey(key);
return super.getProcessor(actualKey);
}
private Object getActualKey(Object key) {
return key instanceof DistroKey ? ((DistroKey) key).getResourceType() : key;
}
}
DistroDelayTaskProcessor
延遲任務處理器
/**
* Distro delay task processor.
*
* @author xiweng.yy
*/
public class DistroDelayTaskProcessor implements NacosTaskProcessor {
// Distro任務引擎持有者
private final DistroTaskEngineHolder distroTaskEngineHolder;
// Distro元件持有者
private final DistroComponentHolder distroComponentHolder;
public DistroDelayTaskProcessor(DistroTaskEngineHolder distroTaskEngineHolder,
DistroComponentHolder distroComponentHolder) {
this.distroTaskEngineHolder = distroTaskEngineHolder;
this.distroComponentHolder = distroComponentHolder;
}
@Override
public boolean process(NacosTask task) {
// 不處理非延遲任務
if (!(task instanceof DistroDelayTask)) {
return true;
}
DistroDelayTask distroDelayTask = (DistroDelayTask) task;
DistroKey distroKey = distroDelayTask.getDistroKey();
// 根據不同的操作型別建立具體的任務
switch (distroDelayTask.getAction()) {
case DELETE:
DistroSyncDeleteTask syncDeleteTask = new DistroSyncDeleteTask(distroKey, distroComponentHolder);
distroTaskEngineHolder.getExecuteWorkersManager().addTask(distroKey, syncDeleteTask);
return true;
case CHANGE:
case ADD:
DistroSyncChangeTask syncChangeTask = new DistroSyncChangeTask(distroKey, distroComponentHolder);
distroTaskEngineHolder.getExecuteWorkersManager().addTask(distroKey, syncChangeTask);
return true;
default:
return false;
}
}
}
com.alibaba.nacos.core.distributed.distro.task.execute
AbstractDistroExecuteTask
抽象的執行任務,定義了任務處理流程。
public abstract class AbstractDistroExecuteTask extends AbstractExecuteTask {
private final DistroKey distroKey;
private final DistroComponentHolder distroComponentHolder;
protected AbstractDistroExecuteTask(DistroKey distroKey, DistroComponentHolder distroComponentHolder) {
this.distroKey = distroKey;
this.distroComponentHolder = distroComponentHolder;
}
protected DistroKey getDistroKey() {
return distroKey;
}
protected DistroComponentHolder getDistroComponentHolder() {
return distroComponentHolder;
}
@Override
public void run() {
// 獲取被處理的資料資源型別
String type = getDistroKey().getResourceType();
// 根據型別獲取資料傳輸代理
DistroTransportAgent transportAgent = distroComponentHolder.findTransportAgent(type);
if (null == transportAgent) {
Loggers.DISTRO.warn("No found transport agent for type [{}]", type);
return;
}
Loggers.DISTRO.info("[DISTRO-START] {}", toString());
// 判斷代理物件是否支援回撥
if (transportAgent.supportCallbackTransport()) {
doExecuteWithCallback(new DistroExecuteCallback());
} else {
executeDistroTask();
}
}
// 執行任務
private void executeDistroTask() {
try {
boolean result = doExecute();
if (!result) {
// 執行失敗之後,進行失敗處理
handleFailedTask();
}
Loggers.DISTRO.info("[DISTRO-END] {} result: {}", toString(), result);
} catch (Exception e) {
Loggers.DISTRO.warn("[DISTRO] Sync data change failed.", e);
// 執行失敗任務,進行失敗處理
handleFailedTask();
}
}
/**
* Get {@link DataOperation} for current task.
*
* @return data operation
*/
protected abstract DataOperation getDataOperation();
/**
* Do execute for different sub class.
*
* @return result of execute
*/
protected abstract boolean doExecute();
/**
* Do execute with callback for different sub class.
*
* @param callback callback
*/
protected abstract void doExecuteWithCallback(DistroCallback callback);
/**
* Handle failed task.
* 處理失敗的任務
*/
protected void handleFailedTask() {
String type = getDistroKey().getResourceType();
// 使用失敗任務處理器進行重試
DistroFailedTaskHandler failedTaskHandler = distroComponentHolder.findFailedTaskHandler(type);
if (null == failedTaskHandler) {
Loggers.DISTRO.warn("[DISTRO] Can't find failed task for type {}, so discarded", type);
return;
}
failedTaskHandler.retry(getDistroKey(), getDataOperation());
}
private class DistroExecuteCallback implements DistroCallback {
@Override
public void onSuccess() {
Loggers.DISTRO.info("[DISTRO-END] {} result: true", getDistroKey().toString());
}
@Override
public void onFailed(Throwable throwable) {
if (null == throwable) {
Loggers.DISTRO.info("[DISTRO-END] {} result: false", getDistroKey().toString());
} else {
Loggers.DISTRO.warn("[DISTRO] Sync data change failed.", throwable);
}
handleFailedTask();
}
}
}
DistroExecuteTaskExecuteEngine
Distro協議負責執行任務的執行引擎
package com.alibaba.nacos.common.task.engine;
public class DistroExecuteTaskExecuteEngine extends NacosExecuteTaskExecuteEngine {
// 直接建立了一個新的NacosExecuteTaskExecuteEngine執行引擎
public DistroExecuteTaskExecuteEngine() {
super(DistroExecuteTaskExecuteEngine.class.getSimpleName(), Loggers.DISTRO);
}
}
NacosExecuteTaskExecuteEngine
package com.alibaba.nacos.common.task.engine;
/**
* Nacos execute task execute engine.
* Nacos負責執行任務的執行引擎
* @author xiweng.yy
*/
public class NacosExecuteTaskExecuteEngine extends AbstractNacosTaskExecuteEngine<AbstractExecuteTask> {
// 任務執行者
private final TaskExecuteWorker[] executeWorkers;
public NacosExecuteTaskExecuteEngine(String name, Logger logger) {
// 任務執行者的數量,取決於CPU的核數,預設為CPU核數的1.5~2倍,傳遞的引數是表示需要產生的執行緒數量是CPU核數的多少倍
this(name, logger, ThreadUtils.getSuitableThreadCount(1));
}
public NacosExecuteTaskExecuteEngine(String name, Logger logger, int dispatchWorkerCount) {
super(logger);
// 建立一組任務執行者
executeWorkers = new TaskExecuteWorker[dispatchWorkerCount];
for (int mod = 0; mod < dispatchWorkerCount; ++mod) {
executeWorkers[mod] = new TaskExecuteWorker(name, mod, dispatchWorkerCount, getEngineLog());
}
}
@Override
public int size() {
int result = 0;
for (TaskExecuteWorker each : executeWorkers) {
result += each.pendingTaskCount();
}
return result;
}
@Override
public boolean isEmpty() {
return 0 == size();
}
@Override
public void addTask(Object tag, AbstractExecuteTask task) {
// 從父類獲取任務處理器
NacosTaskProcessor processor = getProcessor(tag);
// 若存在處理器,則用處理器來處理
if (null != processor) {
processor.process(task);
return;
}
// 不存在處理器則使用worker處理
TaskExecuteWorker worker = getWorker(tag);
worker.process(task);
}
private TaskExecuteWorker getWorker(Object tag) {
// 計算當前任務應該由哪個worker處理
int idx = (tag.hashCode() & Integer.MAX_VALUE) % workersCount();
return executeWorkers[idx];
}
private int workersCount() {
return executeWorkers.length;
}
@Override
public AbstractExecuteTask removeTask(Object key) {
throw new UnsupportedOperationException("ExecuteTaskEngine do not support remove task");
}
@Override
public Collection<Object> getAllTaskKeys() {
throw new UnsupportedOperationException("ExecuteTaskEngine do not support get all task keys");
}
@Override
public void shutdown() throws NacosException {
for (TaskExecuteWorker each : executeWorkers) {
each.shutdown();
}
}
/**
* Get workers status.
*
* @return workers status string
*/
public String workersStatus() {
StringBuilder sb = new StringBuilder();
for (TaskExecuteWorker worker : executeWorkers) {
sb.append(worker.status()).append("\n");
}
return sb.toString();
}
}
TaskExecuteWorker
package com.alibaba.nacos.common.task.engine;
/**
* Nacos execute task execute worker.
* Nacos任務執行者,每個執行者在建立的時候會同時啟動一個執行緒InnerWorker,持續從內部佇列中獲取需要處理的任務
* @author xiweng.yy
*/
public final class TaskExecuteWorker implements NacosTaskProcessor, Closeable {
/**
* Max task queue size 32768.
* 佇列最大數量為32768
*/
private static final int QUEUE_CAPACITY = 1 << 15;
private final Logger log;
/**
* 當前執行者執行緒的名稱
*/
private final String name;
/**
* 負責處理的執行緒佇列
*/
private final BlockingQueue<Runnable> queue;
/**
* 工作狀態
*/
private final AtomicBoolean closed;
public TaskExecuteWorker(final String name, final int mod, final int total) {
this(name, mod, total, null);
}
public TaskExecuteWorker(final String name, final int mod, final int total, final Logger logger) {
/**
* 執行執行緒的名稱,以DistroExecuteTaskExecuteEngine舉例:
* DistroExecuteTaskExecuteEngine_0%8
* DistroExecuteTaskExecuteEngine_1%8
* DistroExecuteTaskExecuteEngine_2%8
* DistroExecuteTaskExecuteEngine_3%8
* DistroExecuteTaskExecuteEngine_4%8
* DistroExecuteTaskExecuteEngine_5%8
* DistroExecuteTaskExecuteEngine_6%8
* DistroExecuteTaskExecuteEngine_7%8
*/
this.name = name + "_" + mod + "%" + total;
this.queue = new ArrayBlockingQueue<Runnable>(QUEUE_CAPACITY);
this.closed = new AtomicBoolean(false);
this.log = null == logger ? LoggerFactory.getLogger(TaskExecuteWorker.class) : logger;
// 啟動一個新執行緒來消費佇列
new InnerWorker(name).start();
}
public String getName() {
return name;
}
@Override
public boolean process(NacosTask task) {
if (task instanceof AbstractExecuteTask) {
putTask((Runnable) task);
}
return true;
}
private void putTask(Runnable task) {
try {
queue.put(task);
} catch (InterruptedException ire) {
log.error(ire.toString(), ire);
}
}
public int pendingTaskCount() {
return queue.size();
}
/**
* Worker status.
*/
public String status() {
return name + ", pending tasks: " + pendingTaskCount();
}
@Override
public void shutdown() throws NacosException {
queue.clear();
closed.compareAndSet(false, true);
}
/**
* Inner execute worker.
*/
private class InnerWorker extends Thread {
InnerWorker(String name) {
setDaemon(false);
setName(name);
}
@Override
public void run() {
// 若執行緒還未中斷,則持續執行
while (!closed.get()) {
try {
// 從佇列獲取任務
Runnable task = queue.take();
long begin = System.currentTimeMillis();
// 在當前InnerWorker執行緒內執行任務
task.run();
long duration = System.currentTimeMillis() - begin;
// 若任務執行時間超過1秒,則警告
if (duration > 1000L) {
log.warn("task {} takes {}ms", task, duration);
}
} catch (Throwable e) {
log.error("[TASK-FAILED] " + e.toString(), e);
}
}
}
}
}
DistroSyncChangeTask
Distro同步變更任務,此任務用於向其他節點發送本機資料
public class DistroSyncChangeTask extends AbstractDistroExecuteTask {
// 此任務操作型別為變更
private static final DataOperation OPERATION = DataOperation.CHANGE;
public DistroSyncChangeTask(DistroKey distroKey, DistroComponentHolder distroComponentHolder) {
super(distroKey, distroComponentHolder);
}
@Override
protected DataOperation getDataOperation() {
return OPERATION;
}
/**
* 執行不帶回調的任務
* @return
*/
@Override
protected boolean doExecute() {
// 獲取同步的資料型別
String type = getDistroKey().getResourceType();
// 獲取同步資料
DistroData distroData = getDistroData(type);
if (null == distroData) {
Loggers.DISTRO.warn("[DISTRO] {} with null data to sync, skip", toString());
return true;
}
// 使用DistroTransportAgent同步資料
return getDistroComponentHolder().findTransportAgent(type).syncData(distroData, getDistroKey().getTargetServer());
}
/**
* 執行帶回調的任務
* @param callback callback
*/
@Override
protected void doExecuteWithCallback(DistroCallback callback) {
String type = getDistroKey().getResourceType();
DistroData distroData = getDistroData(type);
if (null == distroData) {
Loggers.DISTRO.warn("[DISTRO] {} with null data to sync, skip", toString());
return;
}
getDistroComponentHolder().findTransportAgent(type).syncData(distroData, getDistroKey().getTargetServer(), callback);
}
@Override
public String toString() {
return "DistroSyncChangeTask for " + getDistroKey().toString();
}
private DistroData getDistroData(String type) {
DistroData result = getDistroComponentHolder().findDataStorage(type).getDistroData(getDistroKey());
if (null != result) {
result.setType(OPERATION);
}
return result;
}
}
DistroSyncDeleteTask
Distro同步刪除任務,用於向其他節點發送本機刪除的資料
public class DistroSyncDeleteTask extends AbstractDistroExecuteTask {
// 此任務操作型別為刪除
private static final DataOperation OPERATION = DataOperation.DELETE;
public DistroSyncDeleteTask(DistroKey distroKey, DistroComponentHolder distroComponentHolder) {
super(distroKey, distroComponentHolder);
}
@Override
protected DataOperation getDataOperation() {
return OPERATION;
}
/**
* 執行不帶回調的任務
* @return
*/
@Override
protected boolean doExecute() {
// 構建請求引數
String type = getDistroKey().getResourceType();
DistroData distroData = new DistroData();
distroData.setDistroKey(getDistroKey());
distroData.setType(OPERATION);
// 使用DistroTransportAgent同步資料
return getDistroComponentHolder().findTransportAgent(type).syncData(distroData, getDistroKey().getTargetServer());
}
/**
* 執行帶回調的任務
* @param callback callback
*/
@Override
protected void doExecuteWithCallback(DistroCallback callback) {
String type = getDistroKey().getResourceType();
DistroData distroData = new DistroData();
distroData.setDistroKey(getDistroKey());
distroData.setType(OPERATION);
getDistroComponentHolder().findTransportAgent(type).syncData(distroData, getDistroKey().getTargetServer(), callback);
}
@Override
public String toString() {
return "DistroSyncDeleteTask for " + getDistroKey().toString();
}
}
提示:
DistroSyncChangeTask
是將本機所有的服務傳送到其他節點DistroSyncDeleteTask
是將本機刪除的服務傳送到其他節點
com.alibaba.nacos.core.distributed.distro.task.load
DistroLoadDataTask
Distro全量資料同步任務,用於在節點啟動後首次從其他節點同步服務資料到當前節點。
public class DistroLoadDataTask implements Runnable {
// 節點管理器
private final ServerMemberManager memberManager;
// Distro協議元件持有者
private final DistroComponentHolder distroComponentHolder;
// Distro協議配置
private final DistroConfig distroConfig;
// 回撥函式
private final DistroCallback loadCallback;
// 已載入資料集合
private final Map<String, Boolean> loadCompletedMap;
public DistroLoadDataTask(ServerMemberManager memberManager, DistroComponentHolder distroComponentHolder, DistroConfig distroConfig, DistroCallback loadCallback) {
this.memberManager = memberManager;
this.distroComponentHolder = distroComponentHolder;
this.distroConfig = distroConfig;
this.loadCallback = loadCallback;
loadCompletedMap = new HashMap<>(1);
}
@Override
public void run() {
try {
// 首次載入
load();
// 若首次載入沒有完成,繼續載入
if (!checkCompleted()) {
// 繼續建立一個新的載入任務進行載入
GlobalExecutor.submitLoadDataTask(this, distroConfig.getLoadDataRetryDelayMillis());
} else {
// 觸發回撥函式
loadCallback.onSuccess();
Loggers.DISTRO.info("[DISTRO-INIT] load snapshot data success");
}
} catch (Exception e) {
loadCallback.onFailed(e);
Loggers.DISTRO.error("[DISTRO-INIT] load snapshot data failed. ", e);
}
}
private void load() throws Exception {
// 若出自身之外沒有其他節點,則休眠1秒,可能其他節點還未啟動完畢
while (memberManager.allMembersWithoutSelf().isEmpty()) {
Loggers.DISTRO.info("[DISTRO-INIT] waiting server list init...");
TimeUnit.SECONDS.sleep(1);
}
// 若資料型別為空,說明distroComponentHolder的元件註冊器還未初始化完畢(v1版本為DistroHttpRegistry, v2版本為DistroClientComponentRegistry)
while (distroComponentHolder.getDataStorageTypes().isEmpty()) {
Loggers.DISTRO.info("[DISTRO-INIT] waiting distro data storage register...");
TimeUnit.SECONDS.sleep(1);
}
// 載入每個型別的資料
for (String each : distroComponentHolder.getDataStorageTypes()) {
if (!loadCompletedMap.containsKey(each) || !loadCompletedMap.get(each)) {
// 呼叫載入方法,並標記已處理
loadCompletedMap.put(each, loadAllDataSnapshotFromRemote(each));
}
}
}
/**
* 從其他節點獲取同步資料
* @param resourceType
* @return
*/
private boolean loadAllDataSnapshotFromRemote(String resourceType) {
// 獲取資料傳輸物件
DistroTransportAgent transportAgent = distroComponentHolder.findTransportAgent(resourceType);
// 獲取資料處理器
DistroDataProcessor dataProcessor = distroComponentHolder.findDataProcessor(resourceType);
if (null == transportAgent || null == dataProcessor) {
Loggers.DISTRO.warn("[DISTRO-INIT] Can't find component for type {}, transportAgent: {}, dataProcessor: {}", resourceType, transportAgent, dataProcessor);
return false;
}
// 向每個節點請求資料
for (Member each : memberManager.allMembersWithoutSelf()) {
try {
Loggers.DISTRO.info("[DISTRO-INIT] load snapshot {} from {}", resourceType, each.getAddress());
// 獲取到資料
DistroData distroData = transportAgent.getDatumSnapshot(each.getAddress());
// 解析資料
boolean result = dataProcessor.processSnapshot(distroData);
Loggers.DISTRO.info("[DISTRO-INIT] load snapshot {} from {} result: {}", resourceType, each.getAddress(), result);
// 若解析成功,標記此型別資料已載入完畢
if (result) {
distroComponentHolder.findDataStorage(resourceType).finishInitial();
return true;
}
} catch (Exception e) {
Loggers.DISTRO.error("[DISTRO-INIT] load snapshot {} from {} failed.", resourceType, each.getAddress(), e);
}
}
return false;
}
// 判斷是否完成載入
private boolean checkCompleted() {
// 若待載入的資料型別數量和已經載入完畢的資料型別數量不一致,鐵定是未載入完成
if (distroComponentHolder.getDataStorageTypes().size() != loadCompletedMap.size()) {
return false;
}
// 若載入完畢列表內的狀態有false的,說明可能是解析失敗,還需要重新載入
for (Boolean each : loadCompletedMap.values()) {
if (!each) {
return false;
}
}
return true;
}
}
com.alibaba.nacos.core.distributed.distro.task.verify
DistroVerifyExecuteTask
Distro資料驗證任務執行器,用於向其他節點發送當前節點負責的Client狀態報告,通知對方此Client正常服務。它的資料處理維度是DistroData。
/**
* Execute distro verify task.
* 執行Distro協議資料驗證的任務,為每個DistroData傳送一個非同步的rpc請求
* @author xiweng.yy
*/
public class DistroVerifyExecuteTask extends AbstractExecuteTask {
/**
* 被驗證資料的傳輸物件
*/
private final DistroTransportAgent transportAgent;
/**
* 被驗證資料
*/
private final List<DistroData> verifyData;
/**
* 目標節點
*/
private final String targetServer;
/**
* 被驗證資料的型別
*/
private final String resourceType;
public DistroVerifyExecuteTask(DistroTransportAgent transportAgent, List<DistroData> verifyData,
String targetServer, String resourceType) {
this.transportAgent = transportAgent;
this.verifyData = verifyData;
this.targetServer = targetServer;
this.resourceType = resourceType;
}
@Override
public void run() {
for (DistroData each : verifyData) {
try {
// 判斷傳輸物件是否支援回撥(若是http的則不支援,實際上沒區別,當前2.0.1版本沒有實現回撥的實質內容)
if (transportAgent.supportCallbackTransport()) {
doSyncVerifyDataWithCallback(each);
} else {
doSyncVerifyData(each);
}
} catch (Exception e) {
Loggers.DISTRO
.error("[DISTRO-FAILED] verify data for type {} to {} failed.", resourceType, targetServer, e);
}
}
}
/**
* 支援回撥的同步資料驗證
* @param data
*/
private void doSyncVerifyDataWithCallback(DistroData data) {
// 回撥實際上,也沒啥。。。基本算是空物件
transportAgent.syncVerifyData(data, targetServer, new DistroVerifyCallback());
}
/**
* 不支援回撥的同步資料驗證
* @param data
*/
private void doSyncVerifyData(DistroData data) {
transportAgent.syncVerifyData(data, targetServer);
}
/**
* TODO add verify monitor.
*/
private class DistroVerifyCallback implements DistroCallback {
@Override
public void onSuccess() {
if (Loggers.DISTRO.isDebugEnabled()) {
Loggers.DISTRO.debug("[DISTRO] verify data for type {} to {} success", resourceType, targetServer);
}
}
@Override
public void onFailed(Throwable throwable) {
if (Loggers.DISTRO.isDebugEnabled()) {
Loggers.DISTRO
.debug("[DISTRO-FAILED] verify data for type {} to {} failed.", resourceType, targetServer,
throwable);
}
}
}
}
DistroVerifyTimedTask
定時驗證任務,此任務在啟動時延遲5秒,間隔5秒執行。主要用於為每個節點建立一個數據驗證的執行任務DistroVerifyExecuteTask。它的資料處理維度是Member。
/**
* Timed to start distro verify task.
* 啟動Distro協議的資料驗證流程
* @author xiweng.yy
*/
public class DistroVerifyTimedTask implements Runnable {
private final ServerMemberManager serverMemberManager;
private final DistroComponentHolder distroComponentHolder;
private final DistroExecuteTaskExecuteEngine executeTaskExecuteEngine;
public DistroVerifyTimedTask(ServerMemberManager serverMemberManager, DistroComponentHolder distroComponentHolder,
DistroExecuteTaskExecuteEngine executeTaskExecuteEngine) {
this.serverMemberManager = serverMemberManager;
this.distroComponentHolder = distroComponentHolder;
this.executeTaskExecuteEngine = executeTaskExecuteEngine;
}
@Override
public void run() {
try {
List<Member> targetServer = serverMemberManager.allMembersWithoutSelf();
if (Loggers.DISTRO.isDebugEnabled()) {
Loggers.DISTRO.debug("server list is: {}", targetServer);
}
for (String each : distroComponentHolder.getDataStorageTypes()) {
verifyForDataStorage(each, targetServer);
}
} catch (Exception e) {
Loggers.DISTRO.error("[DISTRO-FAILED] verify task failed.", e);
}
}
private void verifyForDataStorage(String type, List<Member> targetServer) {
DistroDataStorage dataStorage = distroComponentHolder.findDataStorage(type);
if (!dataStorage.isFinishInitial()) {
Loggers.DISTRO.warn("data storage {} has not finished initial step, do not send verify data",
dataStorage.getClass().getSimpleName());
return;
}
List<DistroData> verifyData = dataStorage.getVerifyData();
if (null == verifyData || verifyData.isEmpty()) {
return;
}
for (Member member : targetServer) {
DistroTransportAgent agent = distroComponentHolder.findTransportAgent(type);
if (null == agent) {
continue;
}
executeTaskExecuteEngine.addTask(member.getAddress() + type,
new DistroVerifyExecuteTask(agent, verifyData, member.getAddress(), type));
}
}
}
DistroConfig
Distro協議的配置資訊。
public class DistroConfig {
private static final DistroConfig INSTANCE = new DistroConfig();
// 同步任務延遲時長(單位:毫秒)
private long syncDelayMillis = DistroConstants.DEFAULT_DATA_SYNC_DELAY_MILLISECONDS;
// 同步任務超時時長(單位:毫秒)
private long syncTimeoutMillis = DistroConstants.DEFAULT_DATA_SYNC_TIMEOUT_MILLISECONDS;
// 同步任務重試延遲時長(單位:毫秒)
private long syncRetryDelayMillis = DistroConstants.DEFAULT_DATA_SYNC_RETRY_DELAY_MILLISECONDS;
// 驗證任務執行間隔時長(單位:毫秒)
private long verifyIntervalMillis = DistroConstants.DEFAULT_DATA_VERIFY_INTERVAL_MILLISECONDS;
// 驗證任務超時時長(單位:毫秒)
private long verifyTimeoutMillis = DistroConstants.DEFAULT_DATA_VERIFY_TIMEOUT_MILLISECONDS;
// 首次同步資料重試延遲時長(單位:毫秒)
private long loadDataRetryDelayMillis = DistroConstants.DEFAULT_DATA_LOAD_RETRY_DELAY_MILLISECONDS;
private DistroConfig() {
try {
// 嘗試從環境資訊中獲取配置
getDistroConfigFromEnv();
} catch (Exception e) {
Loggers.CORE.warn("Get Distro config from env failed, will use default value", e);
}
}
/**
* 從環境資訊中獲取配置,若沒有,則使用預設值
*/
private void getDistroConfigFromEnv() {
// 從常量物件中獲取key和default value
syncDelayMillis = EnvUtil.getProperty(DistroConstants.DATA_SYNC_DELAY_MILLISECONDS, Long.class,
DistroConstants.DEFAULT_DATA_SYNC_DELAY_MILLISECONDS);
syncTimeoutMillis = EnvUtil.getProperty(DistroConstants.DATA_SYNC_TIMEOUT_MILLISECONDS, Long.class,
DistroConstants.DEFAULT_DATA_SYNC_TIMEOUT_MILLISECONDS);
syncRetryDelayMillis = EnvUtil.getProperty(DistroConstants.DATA_SYNC_RETRY_DELAY_MILLISECONDS, Long.class,
DistroConstants.DEFAULT_DATA_SYNC_RETRY_DELAY_MILLISECONDS);
verifyIntervalMillis = EnvUtil.getProperty(DistroConstants.DATA_VERIFY_INTERVAL_MILLISECONDS, Long.class,
DistroConstants.DEFAULT_DATA_VERIFY_INTERVAL_MILLISECONDS);
verifyTimeoutMillis = EnvUtil.getProperty(DistroConstants.DATA_VERIFY_TIMEOUT_MILLISECONDS, Long.class,
DistroConstants.DEFAULT_DATA_VERIFY_TIMEOUT_MILLISECONDS);
loadDataRetryDelayMillis = EnvUtil.getProperty(DistroConstants.DATA_LOAD_RETRY_DELAY_MILLISECONDS, Long.class,
DistroConstants.DEFAULT_DATA_LOAD_RETRY_DELAY_MILLISECONDS);
}
public static DistroConfig getInstance() {
return INSTANCE;
}
public long getSyncDelayMillis() {
return syncDelayMillis;
}
public void setSyncDelayMillis(long syncDelayMillis) {
this.syncDelayMillis = syncDelayMillis;
}
public long getSyncTimeoutMillis() {
return syncTimeoutMillis;
}
public void setSyncTimeoutMillis(long syncTimeoutMillis) {
this.syncTimeoutMillis = syncTimeoutMillis;
}
public long getSyncRetryDelayMillis() {
return syncRetryDelayMillis;
}
public void setSyncRetryDelayMillis(long syncRetryDelayMillis) {
this.syncRetryDelayMillis = syncRetryDelayMillis;
}
public long getVerifyIntervalMillis() {
return verifyIntervalMillis;
}
public void setVerifyIntervalMillis(long verifyIntervalMillis) {
this.verifyIntervalMillis = verifyIntervalMillis;
}
public long getVerifyTimeoutMillis() {
return verifyTimeoutMillis;
}
public void setVerifyTimeoutMillis(long verifyTimeoutMillis) {
this.verifyTimeoutMillis = verifyTimeoutMillis;
}
public long getLoadDataRetryDelayMillis() {
return loadDataRetryDelayMillis;
}
public void setLoadDataRetryDelayMillis(long loadDataRetryDelayMillis) {
this.loadDataRetryDelayMillis = loadDataRetryDelayMillis;
}
}
DistroConstants
Distro常量配置,主要定義了一些關於任務執行時長的可配置的配置名稱和對應的預設值。具體的使用,可以參考DistroConfig
。
public class DistroConstants {
public static final String DATA_SYNC_DELAY_MILLISECONDS = "nacos.core.protocol.distro.data.sync.delayMs";
public static final long DEFAULT_DATA_SYNC_DELAY_MILLISECONDS = 1000L;
public static final String DATA_SYNC_TIMEOUT_MILLISECONDS = "nacos.core.protocol.distro.data.sync.timeoutMs";
public static final long DEFAULT_DATA_SYNC_TIMEOUT_MILLISECONDS = 3000L;
public static final String DATA_SYNC_RETRY_DELAY_MILLISECONDS = "nacos.core.protocol.distro.data.sync.retryDelayMs";
public static final long DEFAULT_DATA_SYNC_RETRY_DELAY_MILLISECONDS = 3000L;
public static final String DATA_VERIFY_INTERVAL_MILLISECONDS = "nacos.core.protocol.distro.data.verify.intervalMs";
public static final long DEFAULT_DATA_VERIFY_INTERVAL_MILLISECONDS = 5000L;
public static final String DATA_VERIFY_TIMEOUT_MILLISECONDS = "nacos.core.protocol.distro.data.verify.timeoutMs";
public static final long DEFAULT_DATA_VERIFY_TIMEOUT_MILLISECONDS = 3000L;
public static final String DATA_LOAD_RETRY_DELAY_MILLISECONDS = "nacos.core.protocol.distro.data.load.retryDelayMs";
public static final long DEFAULT_DATA_LOAD_RETRY_DELAY_MILLISECONDS = 30000L;
}
DistroProtocol
Distro協議的真正入口,這裡將使用上面定義的所有元件來共同完實現Distro協議。可以看到它使用了Spring的@Componet
註解,意味著它將被Spring容器管理,執行到構造方法的時候將會啟動Distro協議的工作。
@Component
public class DistroProtocol {
private Logger logger = LoggerFactory.getLogger(DistroProtocol.class);
/**
* 節點管理器
*/
private final ServerMemberManager memberManager;
/**
* Distro元件持有者
*/
private final DistroComponentHolder distroComponentHolder;
/**
* Distro任務引擎持有者
*/
private final DistroTaskEngineHolder distroTaskEngineHolder;
private volatile boolean isInitialized = false;
public DistroProtocol(ServerMemberManager memberManager, DistroComponentHolder distroComponentHolder,
DistroTaskEngineHolder distroTaskEngineHolder) {
this.memberManager = memberManager;
this.distroComponentHolder = distroComponentHolder;
this.distroTaskEngineHolder = distroTaskEngineHolder;
// 啟動Distro協議
startDistroTask();
}
private void startDistroTask() {
// 單機模式不進行資料同步操作
if (EnvUtil.getStandaloneMode()) {
isInitialized = true;
return;
}
// 開啟節點Client狀態報告任務
startVerifyTask();
// 啟動資料同步任務
startLoadTask();
}
/**
* 從其他節點獲取資料到當前節點
*/
private void startLoadTask() {
DistroCallback loadCallback = new DistroCallback() {
@Override
public void onSuccess() {
isInitialized = true;
}
@Override
public void onFailed(Throwable throwable) {
isInitialized = false;
}
};
// 提交資料載入任務
GlobalExecutor.submitLoadDataTask(new DistroLoadDataTask(memberManager, distroComponentHolder, DistroConfig.getInstance(), loadCallback));
}
private void startVerifyTask() {
// 啟動資料報告的定時任務
GlobalExecutor.schedulePartitionDataTimedSync(
new DistroVerifyTimedTask(
memberManager,
distroComponentHolder,
distroTaskEngineHolder.getExecuteWorkersManager()
),
DistroConfig.getInstance().getVerifyIntervalMillis());
}
public boolean isInitialized() {
return isInitialized;
}
/**
* Start to sync by configured delay.
* 按配置的延遲開始同步
* @param distroKey distro key of sync data
* @param action the action of data operation
*/
public void sync(DistroKey distroKey, DataOperation action) {
sync(distroKey, action, DistroConfig.getInstance().getSyncDelayMillis());
}
/**
* Start to sync data to all remote server.
* 開始將資料同步到其他節點
* @param distroKey distro key of sync data
* @param action the action of data operation
* @param delay delay time for sync
*/
public void sync(DistroKey distroKey, DataOperation action, long delay) {
for (Member each : memberManager.allMembersWithoutSelf()) {
syncToTarget(distroKey, action, each.getAddress(), delay);
}
}
/**
* Start to sync to target server.
*
* @param distroKey distro key of sync data
* @param action the action of data operation
* @param targetServer target server
* @param delay delay time for sync
*/
public void syncToTarget(DistroKey distroKey, DataOperation action, String targetServer, long delay) {
DistroKey distroKeyWithTarget = new DistroKey(distroKey.getResourceKey(), distroKey.getResourceType(), targetServer);
DistroDelayTask distroDelayTask = new DistroDelayTask(distroKeyWithTarget, action, delay);
distroTaskEngineHolder.getDelayTaskExecuteEngine().addTask(distroKeyWithTarget, distroDelayTask);
if (Loggers.DISTRO.isDebugEnabled()) {
Loggers.DISTRO.debug("[DISTRO-SCHEDULE] {} to {}", distroKey, targetServer);
}
}
/**
* Query data from specified server.
* 從指定節點查詢資料
* @param distroKey data key
* @return data
*/
public DistroData queryFromRemote(DistroKey distroKey) {
if (null == distroKey.getTargetServer()) {
Loggers.DISTRO.warn("[DISTRO] Can't query data from empty server");
return null;
}
String resourceType = distroKey.getResourceType();
DistroTransportAgent transportAgent = distroComponentHolder.findTransportAgent(resourceType);
if (null == transportAgent) {
Loggers.DISTRO.warn("[DISTRO] Can't find transport agent for key {}", resourceType);
return null;
}
return transportAgent.getData(distroKey, distroKey.getTargetServer());
}
/**
* Receive synced distro data, find processor to process.
* 接收到同步資料,並查詢處理器進行處理
* @param distroData Received data
* @return true if handle receive data successfully, otherwise false
*/
public boolean onReceive(DistroData distroData) {
Loggers.DISTRO.info("[DISTRO] Receive distro data type: {}, key: {}", distroData.getType(),
distroData.getDistroKey());
String resourceType = distroData.getDistroKey().getResourceType();
DistroDataProcessor dataProcessor = distroComponentHolder.findDataProcessor(resourceType);
if (null == dataProcessor) {
Loggers.DISTRO.warn("[DISTRO] Can't find data process for received data {}", resourceType);
return false;
}
return dataProcessor.processData(distroData);
}
/**
* Receive verify data, find processor to process.
* 接收到驗證資料,並查詢處理器進行處理
* @param distroData verify data
* @param sourceAddress source server address, might be get data from source server
* @return true if verify data successfully, otherwise false
*/
public boolean onVerify(DistroData distroData, String sourceAddress) {
if (Loggers.DISTRO.isDebugEnabled()) {
Loggers.DISTRO.debug("[DISTRO] Receive verify data type: {}, key: {}", distroData.getType(), distroData.getDistroKey());
}
String resourceType = distroData.getDistroKey().getResourceType();
DistroDataProcessor dataProcessor = distroComponentHolder.findDataProcessor(resourceType);
if (null == dataProcessor) {
Loggers.DISTRO.warn("[DISTRO] Can't find verify data process for received data {}", resourceType);
return false;
}
return dataProcessor.processVerifyData(distroData, sourceAddress);
}
/**
* Query data of input distro key.
* 根據條件查詢資料
* @param distroKey key of data
* @return data
*/
public DistroData onQuery(DistroKey distroKey) {
String resourceType = distroKey.getResourceType();
DistroDataStorage distroDataStorage = distroComponentHolder.findDataStorage(resourceType);
if (null == distroDataStorage) {
Loggers.DISTRO.warn("[DISTRO] Can't find data storage for received key {}", resourceType);
return new DistroData(distroKey, new byte[0]);
}
return distroDataStorage.getDistroData(distroKey);
}
/**
* Query all datum snapshot.
* 查詢所有快照資料
* @param type datum type
* @return all datum snapshot
*/
public DistroData onSnapshot(String type) {
DistroDataStorage distroDataStorage = distroComponentHolder.findDataStorage(type);
if (null == distroDataStorage) {
Loggers.DISTRO.warn("[DISTRO] Can't find data storage for received key {}", type);
return new DistroData(new DistroKey("snapshot", type), new byte[0]);
}
return distroDataStorage.getDatumSnapshot();
}
}
如果您認真從頭看到這裡,相信您腦海中會記住一些關鍵字,比如Task
、sync
、processor
、DistroData
。所謂的Distro協議,不就是同步資料嘛,沒錯,它就是同步資料,在多個節點之間同步資料。通過DistroProtocol
這個類不難發現,它實現了定時向其他節點報告狀態、首次從其他節點載入資料、同步資料到指定節點、獲取當前節點的快照資料。將這些功能組合在一起便可以實現多節點同步。因為所有節點都會做這些操作。
Distro協議資料物件
在整個互動過程中,是使用DistroData
物件作為資料載體,它可以儲存多種操作型別的任意資料。結構如圖:
在DistroKey中,包含了資源的標識、資源的型別,以及該資源所屬的節點。因此任何DistroData資料都能夠確定它是來自於那臺機器的什麼型別的資料,在DataOperation中則定義了該資料將被用於什麼操作。至於真正的資料型別,位元組陣列保證了它的相容性,實際上DistroKey和Operation也能確定它將會是什麼型別。
Distro協議重要角色
我們知道DistroData
是作為Distro協議的互動物件,剩下的還有負責儲存資料的元件、處理資料的元件、傳送資料的元件,它們共同協作來完成整個協議流程。
儲存DistroData
DistroDataStorage
用於儲存DistroData, 它有多種實現,用於處理不同型別的資料。實際上就是處理不同版本中的資料。
- v1版本的實現:DistroDataStorageImpl
- v2版本的實現:DistroClientDataProcessor
提示:
後續將不再刻意提及v1或者是v2的實現,預設以v2實現來分析。
資料的獲取發生在DistroDataStorage
介面的getDistroData(DistroKey distroKey)
、getDatumSnapshot()
、getVerifyData()
三個方法中。在v2版本中DistroClientDataProcessor
實現了DistroDataStorage介面,提供DistroData的獲取功能。
// DistroClientDataProcessor.java
@Override
public DistroData getDistroData(DistroKey distroKey) {
// 從Client管理器中獲取指定Client
Client client = clientManager.getClient(distroKey.getResourceKey());
if (null == client) {
return null;
}
byte[] data = ApplicationUtils.getBean(Serializer.class).serialize(client.generateSyncData());
return new DistroData(distroKey, data);
}
@Override
public DistroData getDatumSnapshot() {
List<ClientSyncData> datum = new LinkedList<>();
// 從Client管理器中獲取所有Client
for (String each : clientManager.allClientId()) {
Client client = clientManager.getClient(each);
if (null == client || !client.isEphemeral()) {
continue;
}
datum.add(client.generateSyncData());
}
ClientSyncDatumSnapshot snapshot = new ClientSyncDatumSnapshot();
snapshot.setClientSyncDataList(datum);
byte[] data = ApplicationUtils.getBean(Serializer.class).serialize(snapshot);
return new DistroData(new DistroKey(DataOperation.SNAPSHOT.name(), TYPE), data);
}
@Override
public List<DistroData> getVerifyData() {
List<DistroData> result = new LinkedList<>();
// 從Client管理器中獲取所有Client
for (String each : clientManager.allClientId()) {
Client client = clientManager.getClient(each);
if (null == client || !client.isEphemeral()) {
continue;
}
if (clientManager.isResponsibleClient(client)) {
// TODO add revision for client.
DistroClientVerifyInfo verifyData = new DistroClientVerifyInfo(client.getClientId(), 0);
DistroKey distroKey = new DistroKey(client.getClientId(), TYPE);
DistroData data = new DistroData(distroKey,
ApplicationUtils.getBean(Serializer.class).serialize(verifyData));
data.setType(DataOperation.VERIFY);
result.add(data);
}
}
return result;
}
通過v2版本的資料儲存實現可以發現,它並沒有直接去儲存資料,而是從ClientManager
內部獲取。
處理DistroData
DistroDataProcessor
用於處理DistroData。資料的處理髮生在processData(DistroData distroData)
、processVerifyData(DistroData distroData, String sourceAddress)
、processSnapshot(DistroData distroData)
三個方法中。在v2版本中DistroClientDataProcessor
實現了DistroDataProcessor
介面,提供DistroData的處理能力。
// DistroClientDataProcessor.java
@Override
public boolean processData(DistroData distroData) {
switch (distroData.getType()) {
case ADD:
case CHANGE:
ClientSyncData clientSyncData = ApplicationUtils.getBean(Serializer.class).deserialize(distroData.getContent(), ClientSyncData.class);
handlerClientSyncData(clientSyncData);
return true;
case DELETE:
String deleteClientId = distroData.getDistroKey().getResourceKey();
Loggers.DISTRO.info("[Client-Delete] Received distro client sync data {}", deleteClientId);
clientManager.clientDisconnected(deleteClientId);
return true;
default:
return false;
}
}
@Override
public boolean processVerifyData(DistroData distroData, String sourceAddress) {
DistroClientVerifyInfo verifyData = ApplicationUtils.getBean(Serializer.class).deserialize(distroData.getContent(), DistroClientVerifyInfo.class);
if (clientManager.verifyClient(verifyData.getClientId())) {
return true;
}
Loggers.DISTRO.info("client {} is invalid, get new client from {}", verifyData.getClientId(), sourceAddress);
return false;
}
@Override
public boolean processSnapshot(DistroData distroData) {
ClientSyncDatumSnapshot snapshot = ApplicationUtils.getBean(Serializer.class).deserialize(distroData.getContent(), ClientSyncDatumSnapshot.class);
for (ClientSyncData each : snapshot.getClientSyncDataList()) {
handlerClientSyncData(each);
}
return true;
}
傳送DistroData
DistroTransportAgent
用於傳輸DistroData,v2版本中DistroClientTransportAgent
實現了DistroTransportAgent
介面,提供DistroData的傳送能力。
/**
* Distro transport agent for v2.
* v2版本的DistroData傳輸代理
* @author xiweng.yy
*/
public class DistroClientTransportAgent implements DistroTransportAgent {
private final ClusterRpcClientProxy clusterRpcClientProxy;
private final ServerMemberManager memberManager;
public DistroClientTransportAgent(ClusterRpcClientProxy clusterRpcClientProxy,
ServerMemberManager serverMemberManager) {
this.clusterRpcClientProxy = clusterRpcClientProxy;
this.memberManager = serverMemberManager;
}
/**
* 當前實現支援回撥
* @return
*/
@Override
public boolean supportCallbackTransport() {
return true;
}
/**
* 向指定節點發送同步資料
* @param data data
* @param targetServer target server
* @return
*/
@Override
public boolean syncData(DistroData data, String targetServer) {
if (isNoExistTarget(targetServer)) {
return true;
}
DistroDataRequest request = new DistroDataRequest(data, data.getType());
Member member = memberManager.find(targetServer);
if (checkTargetServerStatusUnhealthy(member)) {
Loggers.DISTRO.warn("[DISTRO] Cancel distro sync caused by target server {} unhealthy", targetServer);
return false;
}
try {
Response response = clusterRpcClientProxy.sendRequest(member, request);
return checkResponse(response);
} catch (NacosException e) {
Loggers.DISTRO.error("[DISTRO-FAILED] Sync distro data failed! ", e);
}
return false;
}
/**
* 向指定節點發送回同步資料(支援回撥)
* @param data data
* @param targetServer target server
* @param callback callback
*/
@Override
public void syncData(DistroData data, String targetServer, DistroCallback callback) {
if (isNoExistTarget(targetServer)) {
callback.onSuccess();
}
DistroDataRequest request = new DistroDataRequest(data, data.getType());
Member member = memberManager.find(targetServer);
try {
clusterRpcClientProxy.asyncRequest(member, request, new DistroRpcCallbackWrapper(callback, member));
} catch (NacosException nacosException) {
callback.onFailed(nacosException);
}
}
/**
* 向指定節點發送驗證資料
* @param verifyData verify data
* @param targetServer target server
* @return
*/
@Override
public boolean syncVerifyData(DistroData verifyData, String targetServer) {
if (isNoExistTarget(targetServer)) {
return true;
}
// replace target server as self server so that can callback.
verifyData.getDistroKey().setTargetServer(memberManager.getSelf().getAddress());
DistroDataRequest request = new DistroDataRequest(verifyData, DataOperation.VERIFY);
Member member = memberManager.find(targetServer);
if (checkTargetServerStatusUnhealthy(member)) {
Loggers.DISTRO.warn("[DISTRO] Cancel distro verify caused by target server {} unhealthy", targetServer);
return false;
}
try {
Response response = clusterRpcClientProxy.sendRequest(member, request);
return checkResponse(response);
} catch (NacosException e) {
Loggers.DISTRO.error("[DISTRO-FAILED] Verify distro data failed! ", e);
}
return false;
}
/**
* 向指定節點發送驗證資料(支援回撥)
* @param verifyData verify data
* @param targetServer target server
* @param callback callback
*/
@Override
public void syncVerifyData(DistroData verifyData, String targetServer, DistroCallback callback) {
// 若此節點不在當前節點快取中,直接返回,因為可能下線、或者過期,不需要驗證了
if (isNoExistTarget(targetServer)) {
callback.onSuccess();
}
// 構建請求物件
DistroDataRequest request = new DistroDataRequest(verifyData, DataOperation.VERIFY);
Member member = memberManager.find(targetServer);
try {
// 建立一個回撥物件(Wrapper實現了RequestCallBack介面)
DistroVerifyCallbackWrapper wrapper = new DistroVerifyCallbackWrapper(targetServer,
verifyData.getDistroKey().getResourceKey(), callback, member);
// 使用叢集Rpc請求物件傳送非同步任務
clusterRpcClientProxy.asyncRequest(member, request, wrapper);
} catch (NacosException nacosException) {
callback.onFailed(nacosException);
}
}
/**
* 從指定節點獲取資料
* @param key key of data
* @param targetServer target server
* @return
*/
@Override
public DistroData getData(DistroKey key, String targetServer) {
Member member = memberManager.find(targetServer);
if (checkTargetServerStatusUnhealthy(member)) {
throw new DistroException(
String.format("[DISTRO] Cancel get snapshot caused by target server %s unhealthy", targetServer));
}
DistroDataRequest request = new DistroDataRequest();
DistroData distroData = new DistroData();
distroData.setDistroKey(key);
distroData.setType(DataOperation.QUERY);
request.setDistroData(distroData);
request.setDataOperation(DataOperation.QUERY);
try {
Response response = clusterRpcClientProxy.sendRequest(member, request);
if (checkResponse(response)) {
return ((DistroDataResponse) response).getDistroData();
} else {
throw new DistroException(
String.format("[DISTRO-FAILED] Get data request to %s failed, code: %d, message: %s",
targetServer, response.getErrorCode(), response.getMessage()));
}
} catch (NacosException e) {
throw new DistroException("[DISTRO-FAILED] Get distro data failed! ", e);
}
}
/**
* 從指定節點獲取快照資料
* @param targetServer target server.
* @return
*/
@Override
public DistroData getDatumSnapshot(String targetServer) {
Member member = memberManager.find(targetServer);
if (checkTargetServerStatusUnhealthy(member)) {
throw new DistroException(
String.format("[DISTRO] Cancel get snapshot caused by target server %s unhealthy", targetServer));
}
DistroDataRequest request = new DistroDataRequest();
request.setDataOperation(DataOperation.SNAPSHOT);
try {
Response response = clusterRpcClientProxy.sendRequest(member, request);
if (checkResponse(response)) {
return ((DistroDataResponse) response).getDistroData();
} else {
throw new DistroException(
String.format("[DISTRO-FAILED] Get snapshot request to %s failed, code: %d, message: %s",
targetServer, response.getErrorCode(), response.getMessage()));
}
} catch (NacosException e) {
throw new DistroException("[DISTRO-FAILED] Get distro snapshot failed! ", e);
}
}
private boolean isNoExistTarget(String target) {
return !memberManager.hasMember(target);
}
private boolean checkTargetServerStatusUnhealthy(Member member) {
return null == member || !NodeState.UP.equals(member.getState());
}
private boolean checkResponse(Response response) {
return ResponseCode.SUCCESS.getCode() == response.getResultCode();
}
/**
* rpc請求回撥包裝器
*/
private class DistroRpcCallbackWrapper implements RequestCallBack<Response> {
private final DistroCallback distroCallback;
private final Member member;
public DistroRpcCallbackWrapper(DistroCallback distroCallback, Member member) {
this.distroCallback = distroCallback;
this.member = member;
}
@Override
public Executor getExecutor() {
return GlobalExecutor.getCallbackExecutor();
}
@Override
public long getTimeout() {
return DistroConfig.getInstance().getSyncTimeoutMillis();
}
@Override
public void onResponse(Response response) {
if (checkResponse(response)) {
NamingTpsMonitor.distroSyncSuccess(member.getAddress(), member.getIp());
distroCallback.onSuccess();
} else {
NamingTpsMonitor.distroSyncFail(member.getAddress(), member.getIp());
distroCallback.onFailed(null);
}
}
@Override
public void onException(Throwable e) {
distroCallback.onFailed(e);
}
}
/**
* 驗證資料回撥包裝器
*/
private class DistroVerifyCallbackWrapper implements RequestCallBack<Response> {
private final String targetServer;
private final String clientId;
private final DistroCallback distroCallback;
private final Member member;
private DistroVerifyCallbackWrapper(String targetServer, String clientId, DistroCallback distroCallback,
Member member) {
this.targetServer = targetServer;
this.clientId = clientId;
this.distroCallback = distroCallback;
this.member = member;
}
@Override
public Executor getExecutor() {
return GlobalExecutor.getCallbackExecutor();
}
@Override
public long getTimeout() {
return DistroConfig.getInstance().getVerifyTimeoutMillis();
}
@Override
public void onResponse(Response response) {
if (checkResponse(response)) {
NamingTpsMonitor.distroVerifySuccess(member.getAddress(), member.getIp());
distroCallback.onSuccess();
} else {
Loggers.DISTRO.info("Target {} verify client {} failed, sync new client", targetServer, clientId);
// 驗證失敗之後釋出事件
NotifyCenter.publishEvent(new ClientEvent.ClientVerifyFailedEvent(clientId, targetServer));
NamingTpsMonitor.distroVerifyFail(member.getAddress(), member.getIp());
distroCallback.onFailed(null);
}
}
@Override
public void onException(Throwable e) {
distroCallback.onFailed(e);
}
}
}
不管同步資料的操作型別是什麼,最終傳送資料使用的是ClusterRpcClientProxy
物件。
以上3個元件是實現Distro協議中重要的一環,後續關於Distro協議的邏輯將全部圍繞這三個元件進行