1. 程式人生 > >Vert.x系列(三)--ClusteredEventBus原始碼分析

Vert.x系列(三)--ClusteredEventBus原始碼分析

開發十年,就只剩下這套架構體系了! >>>   

 前言:因為ClusteredEventBus涉及叢集,有必產生網路問題,從而引入了NetServer、ServerID等涉及網路,埠的類。在之前的EventBusImpl中, 使用的資料結構是以address-List<Handler>作為k-v的map容器。作為EventBusImpl的子類,ClusteredEventBus的邏輯結構上一樣的。 不過把address-List<ServerID>作為k-v。

原理:在start方法中,利用第三方框架(預設hazelcast)實現的叢集同步map(變數subs) ,獲取已有的節點資訊。然後根據引數,對自身伺服器的埠實現監聽,把自身伺服器資訊放入前面的map,讓其他節點感知。呼叫consumer方法時,以address-List<ServerID>作為k-v存在一個map的容器中。呼叫send方法時,以address為k從map裡取出ServerID.然後把訊息利用TCP協議傳送給對應的伺服器。

程式碼:

public static final String CLUSTER_PUBLIC_HOST_PROP_NAME = "vertx.cluster.public.host";// 這2個欄位是為了從System.getProperty()取值,優先順序//1.System.getProperty()2.EventBusOptions

public static final String CLUSTER_PUBLIC_PORT_PROP_NAME = "vertx.cluster.public.port";

private static final Buffer PONG = Buffer.buffer(new byte[]{(byte) 1});

private static final String SERVER_ID_HA_KEY = "server_id";

private static final String SUBS_MAP_NAME = "__vertx.subs"; //叢集資料存放在叢集同步的map中,需要約定一個固定的key統一存取。

private final ClusterManager clusterManager;

private final HAManager haManager;

private final ConcurrentMap<ServerID, ConnectionHolder> connections = new ConcurrentHashMap<>();//根據socket長連結

private final Context sendNoContext;

private EventBusOptions options; // 建立時的引數

private AsyncMultiMap<String, ClusterNodeInfo> subs; // 叢集核心資料 k是address,value是HazelcastClusterNodeInfo

private Set<String> ownSubs = new ConcurrentHashSet<>();// 自身訂閱(Subscribe)的addrees

private ServerID serverID; // 自身伺服器資訊(IP和port)

private ClusterNodeInfo nodeInfo; // 自身叢集資訊(NodeID、IP和port)

private NetServer server;//

在 public void start(Handler<AsyncResult<Void>> resultHandler) 方法中。

做了很多事件,很多邏輯。

1.subs = ar2.result(); 獲取叢集資料。從叢集拉取資料,ar2.succeeded() 為前置判斷。直接排除網路、配置等錯誤的可能。

2.建立底層的埠監聽。這裡埠有大坑,有2個概念:

actualPort 和 publicPort

actualPort是值真正監聽的埠,從option傳值過來,沒有則隨機產生。

publicPort是放到共享給叢集的埠,為了通知別的節點讓它們往這裡發資料。官方的解釋是為了容器情況考慮。在容器裡執行時,和主機的埠是通過代理訪問的。對於這2個port ,因為這裡有好幾個變數可以賦值,所有裡面有優先順序:

actualPort:
1.VertxOptions 也是 EventBusOptions 的setClusterPublicHost,檢視VertxOptions.setClusterPort() / VertxOptions.setClusterPublicHost() 方法,發現其實就是對EventBusOptions操作。
2.隨機產生。

 

publicPort
1.系統變數CLUSTER_PUBLIC_PORT_PROP_NAME
2.VertxOptions 也是 EventBusOptions 的setClusterPublicHost
3.上面的actualPort

這因為埠直接涉及到通訊,設定不對就無法使用。如果是叢集內多節點的情況,需要設定host,不需要設定port. 因為host預設值是 "localhost",port預設值是隨機產生的可用埠(假設為51854),host和port會產生ServerID。如果不設定host,A節點就會把 "localhost:51854"傳到叢集上。其他B節想要訪問A時,會根據這個資訊去訪問 localhost:51854,結果訪問到自身去了。

 

下面重點就是consumer 和 send/poblish方法。

呼叫consumer方法時,會依次呼叫到addRegistration(),往叢集共享的subs中放入資訊,達到傳播的目的。

@Override
protected <T> void addRegistration(boolean newAddress, String address,boolean replyHandler, boolean localOnly,Handler<AsyncResult<Void>> completionHandler) {
if (newAddress && subs != null && !replyHandler && !localOnly) {
    // Propagate the information
    subs.add(address, nodeInfo, completionHandler);
    ownSubs.add(address);
} else {
    completionHandler.handle(Future.succeededFuture());
}
}

呼叫send/poblish方法時,會依次呼叫到sendOrPub(),

@Override

protected <T> void sendOrPub(SendContextImpl<T> sendContext) {

String address = sendContext.message.address();
// 這裡只是定義resultHandler,沒有執行,如果要執行,還需
//要resultHandler.handler(AsyncResult)
Handler<AsyncResult<ChoosableIterable<ClusterNodeInfo>>> resultHandler = asyncResult -> {
if (asyncResult.succeeded()) {
// 重要的 server
ChoosableIterable<ClusterNodeInfo> serverIDs = asyncResult.result();
if (serverIDs != null && !serverIDs.isEmpty()) {
sendToSubs(serverIDs, sendContext);
} else {
if (metrics != null) {
metrics.messageSent(address, !sendContext.message.isSend(), true, false);
}
deliverMessageLocally(sendContext);
}
} else {
log.error("Failed to send message", asyncResult.cause());
}
};

// 這裡才是處理 。subs存的是k-v是 address-List<HazelcastClusterNodeInfo>
// get(k)就是把List<HazelcastClusterNodeInfo>取出來,交給上面的handler
if (Vertx.currentContext() == null) {
// Guarantees the order when there is no current context
sendNoContext.runOnContext(v -> {
subs.get(address, resultHandler);
});
} else {
subs.get(address, resultHandler);
}
}

sendToSubs()方法是包含了 send/publish 的判斷,這個邏輯本來是在deliverMessageLocally(MessageImpl msg)完成的。

protected MessageImpl createMessage(boolean send, String address, MultiMap headers, Object body, String codecName)方法裡,單機版產生的是 MessageImpl, 叢集版產生ClusteredMessage。 ClusteredMessage此類包含了對Buffer 的操作,幫助