1. 程式人生 > >Vert.x系列(二)--EventBusImpl原始碼分析

Vert.x系列(二)--EventBusImpl原始碼分析

開發十年,就只剩下這套架構體系了! >>>   

前言:Vert.x 實現了2種完成不同的eventBus:

EventBusImpl(A local event bus implementation)和 它的子類 ClusteredEventBus(An event bus implementation that clusters with other Vert.x nodes)。這裡介紹下EventBusImpl

 

EventBusImpl 原理:呼叫consumer方法時,以address-handler作為k-v存在一個map的容器中。接著呼叫send方法時,把message,DeploymentOptions等內容封裝成物件(MessageIml,命令模式),從以address為k從map裡取出handler.把MessageIml作為引數傳遞給handler執行。

 

一.初始化: 

初始化過程就是new  EventBusImpl,並修改狀態變數started。

首先,在VertxImpl的構造方法

VertxImpl(VertxOptions options, Handler<AsyncResult<Vertx>> resultHandler)

中進行初始化。以  options.isClustered()為判斷條件,呼叫createAndStartEventBus(options, resultHandler);

其次createAndStartEventBus中做了2件事

1.以options.isClustered()判斷條件,new出了ClusteredEventBus/ EventBusImpl. new時並沒有業務邏輯。(額外提一句eventBus = new EventBusImpl(this);使eventBus和VertImpl相互擁有對方的引用,是很常見的寫法。)

2.呼叫EventBusImpl的初始化方法start(),並返回結果給最外層resultHandler的。start()更沒做什麼事,只是EventBusImpl裡面有個狀態變數started。把它置為true.

 

二. consumer訂閱

EventBusImpl維護了

protected final ConcurrentMap<String, Handlers> handlerMap = new ConcurrentHashMap<>()

成員變數。

Handlers 是一個handler的List的封裝類,上面可以理解為 ConcurrentMap<String, List<Handler>>這種資料結構。consumer方法以address為k,以handler作v的list的一員,存放在handlerMap中。

所以重點關注對handlerMap的操作。

 

呼叫vertx.eventBus().consumer("Address1", ar -> {});發生了什麼?

檢視程式碼發現,先new HandlerRegistration這裡也有相互引用。再呼叫HandlerRegistration .handler,那裡面又會呼叫eventBusImpl.addRegistration()。在HandlerRegistration這個類兜了一圈,又回到eventBusImpl裡。

(相關程式碼截斷如下:  EventBusImpl.consumer(address);--> new HandlerRegistration --> consumer.handler-->eventBus.addRegistration(address, this, repliedAddress != null, localOnly);

核心邏輯在addRegistration() 和 addLocalRegistration()中。我的理解是,前個方法明顯有問題。最後一句addRegistration(newAddress, address, replyHandler, localOnly, registration::setResult);前面的引數都沒有使用,應該可以省略,修改為addRegistration(registration::setResult);就可以。很少在Vert.x框架中看到這樣不合規範的程式碼。如果讀者有好的見解,歡迎留言。

 

// 呼叫 addLocalRegistration

// 註冊完成

protected <T> void addRegistration(String address, HandlerRegistration<T> registration,
                                   boolean replyHandler, boolean localOnly) {
  Objects.requireNonNull(registration.getHandler(), "handler");
  boolean newAddress = addLocalRegistration(address, registration, replyHandler, localOnly);
  addRegistration(newAddress, address, replyHandler, localOnly, registration::setResult);
}

 

/** *

* 初始化 或 獲取原 Contex

初始化 或 獲取原 Handlers

* 新建  HandlerHolder

* Handlers 裡新增  HandlerHolder

**/

protected <T> boolean addLocalRegistration(String address, HandlerRegistration<T> registration,
                                           boolean replyHandler, boolean localOnly) {
  Objects.requireNonNull(address, "address");

  Context context = Vertx.currentContext();
  boolean hasContext = context != null;
  if (!hasContext) {
    // Embedded
    context = vertx.getOrCreateContext();
  }
  registration.setHandlerContext(context);

  boolean newAddress = false;

  HandlerHolder holder = new HandlerHolder<>(metrics, registration, replyHandler, localOnly, context);

  Handlers handlers = handlerMap.get(address);
  if (handlers == null) {
    handlers = new Handlers();
    Handlers prevHandlers = handlerMap.putIfAbsent(address, handlers);
    if (prevHandlers != null) {
      handlers = prevHandlers;
    }
    newAddress = true;
  }
  handlers.list.add(holder);

  if (hasContext) {
    HandlerEntry entry = new HandlerEntry<>(address, registration);
    context.addCloseHook(entry);
  }

  return newAddress;
}

新出現的幾個類的作用:

Context 執行緒排程--Vert.x框架的優點是執行緒安全,就是通過Context實現。

HandlerHolder--對HandlerRegistration的封裝,外加Context。

Handlers--上面HandlerHolder 的集合封裝,外加平衡輪詢邏輯。

 

handlers.list.add(holder);這句作為壓軸(戲曲名詞,指一場摺子戲演出的倒數第二個劇目)出場完成整個功能的核心註冊操作。

至於後面的那段程式碼,我覺得有點問題。

if (hasContext) {
    HandlerEntry entry = new HandlerEntry<>(address, registration);
    context.addCloseHook(entry);
  }

作用是在context上註冊關閉事件,由DeploymentManager在unploy的時候呼叫,對應的核心邏輯在 CloseHooks.run()方法中。但這個這個判斷條件案例只有第2次新增consumer的時候才有效果。或者是上面的程式碼boolean hasContext = context != null;給人的誤導? 以上consumer的流程還被reply方法使用。

 

三. Send/Publish傳送

多個send過載方法最後定位到EventBusImpl.send(String address, Object message, DeliveryOptions options, Handler<AsyncResult<Message<T>>> replyHandler)。但這個核心方法的最終卻呼叫了一個

名為sendOrPubInternal的方法,不由得在讓人想起寫程式最難的事之一是命名。正如開頭說的這個使用了設計模式中的命令模式,把引數封裝成MessageImpl物件傳送到後面的方法。

sendOrPubInternal做了3個事情,

1.createReplyHandlerRegistration -- 有replyHandler.reply()這步才有意義

2.new SendContextImpl -- 從Context類判斷,SendContextImpl可以繫結執行緒

3.sendContext.next(); -- 在執行方法前,執行攔截器。攔截器極大地豐富開發人員的自定義使用。

本來應該1,2,3順序介紹程式碼,但是訊息流程一般是:

Sender----(    message  )--->customer;

Sender<---(reply message)---customer;

根據這個流程,得先介紹2.new SendContextImpl 和3.sendContext.next();

再回頭介紹 1.createReplyHandlerRegistration

 

 

先說 2.new SendContextImpl

這個類是整個Send相關類的大封裝。

3.sendContext.next();

根據程式碼流程

sendOrPub--》deliverMessageLocally--》deliverMessageLocally

進入到deliverMessageLocally(),這個方法做了2個大事情。

  1. 獲取address所對應的所有handlers
  2. 根據isSend()區分 send (平衡輪詢發一個handler)/publish(遍歷handlers發給所有)

方法的第一句話msg.setBus(this);和reply邏輯有關係。在這個local eventbus下,是重複賦值,沒有作用的。

然後Handlers handlers = handlerMap.get(msg.address());

這句根據以address為k,取出Handlers。sender的messageImpl 終於和consumer的HandlerHold見面

 

Handler.choose()方法實現了輪詢傳送message, 個人認為這個方法叫做 balanceChoose()更好。

程式碼如下:

public HandlerHolder choose() {
  while (true) {
    int size = list.size();
    if (size == 0) {
      return null;
    }
    int p = pos.getAndIncrement();
    if (p >= size - 1) {
      pos.set(0);
    }
    try {
      return list.get(p);
    } catch (IndexOutOfBoundsException e) {
      // Can happen
      pos.set(0);
    }
  }
}

當時我使用Vert.x的時候,就很好奇eventBus的輪詢功能怎麼實現。現在看到其實非常簡單。維護一個 AtomicInteger 的變數,每次呼叫累加一次。如果超過List的長度,則重置為0,方法永遠返回 list.get(p)。巧妙!

最後在deliverToHandler()方法裡,在Context的執行緒控制下,完成message和handler的最終互動。

 

那麼,回到最開始的問題,

Sender----(    message  )--->Customer;

Sender<---(reply message)---Customer;

在上面的流程中,Sender根據address找到Customer從而傳送message,那麼Customer的reply是怎麼找到Sender的呢?

答案是一個臨時的replyAddress。通過以 replyAddress為key,把Sender作為handler註冊到eventBusImpl上,處理後直接登出。replyAddress的規律是從1開始的步長為1的自增數列,所以開發者不應該使用純數字作為自身業務的Address,避免衝突。

 

最後說說1.createReplyHandlerRegistration

如果sender在傳送訊息時使用了

send(String address, Object message, Handler<AsyncResult<Message<T>>> replyHandler);方法。

vertx.eventBus().send("address1", "測試訊息", ar -> {
    if (ar.succeeded()) {
        System.out.println("我是producer1:" + ar.result().body());
    }
});

並且consumer在接受訊息到後,呼叫了 reply();

vertx.eventBus().consumer("address1", ar -> {
    System.out.println("consumer:" + ar.body());
    ar.reply("consumer reply message ");
});

則會進入createReplyHandlerRegistration的處理邏輯。

使用

protected String generateReplyAddress() {
  return Long.toString(replySequence.incrementAndGet());
}

這裡產生從1開始的步長為1的自增數列address。

Handler<Message<T>> simpleReplyHandler = convertHandler(replyHandler);
HandlerRegistration<T> registration =
  new HandlerRegistration<>(vertx, metrics, this, replyAddress, message.address, true, replyHandler, timeout);
registration.handler(simpleReplyHandler);

裡面的this是eventBusImpl,並在handler()方法裡把 boolean replyHander的值置為true.

這樣,eventBusImpl的handlerMap變數裡,就多了<replyAddress, replyHander>。

在cuomser處呼叫reply()後,會在eventBusImpl的內部類ReplySendContextImpl<T> extends SendContextImpl 的參與下,走類似send()的流程。區別是最後在deliverToHandler()方法裡,會判斷boolean  replyHander的值,如果是true呼叫完畢就登出.

 

錯誤程式碼測驗:

vertx.eventBus().consumer("1", ar -> {
    System.out.println("我不應該在這裡" + ar.body());
    ar.reply("對不起,其實我是阿杜。");
});
vertx.eventBus().consumer("address1", ar -> {
    System.out.println("consumer:" + ar.body());
    ar.reply("我是高帥富");
});
vertx.eventBus().send("address1", "測試訊息", ar -> {
    if (ar.succeeded()) {
        System.out.println("sender:接收收到的迴應是:"+ar.result().body());
    }else{
        System.out.println("傳送失敗");
    }
});

存在consumer("1", ar -> {})的Console:

consumer:測試訊息

我不應該在這裡我是高帥富

20:08:56.404 [vert.x-eventloop-thread-0] DEBUG io.netty.util.internal.InternalThreadLocalMap - -Dio.netty.threadLocalMap.stringBuilder.initialSize: 1024

20:08:56.405 [vert.x-eventloop-thread-0] DEBUG io.netty.util.internal.InternalThreadLocalMap - -Dio.netty.threadLocalMap.stringBuilder.maxSize: 4096

傳送失敗

可以看到上面的輸出完全不是設想的結果。

 

如果不存在consumer("1", ar -> {})address為1的Console:

consumer:測試訊息
sender:接收收到的迴應是:我是高帥富

最後,再次提醒:使用eventBus時,不要使用純數字作為自身業務的addr