瞭解Vert.x:事件匯流排
Vert.x基於輕量級actor,名為Verticles。
Verticle是一個獨立的工作單元,可以獨立擴充套件。
通常,actor模型要求actor具有稱為“傳入郵箱”的概念,該概念通常是佇列(更具體地說是阻塞緩衝佇列)。
因此,如果一個actor想要由另一個actor完成一些工作,它會將一條訊息推送到其郵箱中。
這個Event Bus到底是什麼?
我們不是讓actor彼此意識到對方存在,而是介紹一個知道所有actor的中間人broker,所有actor都可以與broker溝通。此元件稱為訊息匯流排,或Vert.x中的事件匯流排.
這是Observer設計模式的實現,或者更確切地說,是PubSub風格的實現。
如果您通過事件匯流排傳送訊息,並且沒有訂閱者,則該訊息將永久丟失,因為事件匯流排不會以任何方式保留訊息。
出於同樣的原因,也無法向新訂閱者重複舊訊息。
編碼器
關於Vert.x中事件匯流排的一個 ofollow,noindex" target="_blank">常見誤解 是,使用它時會產生開銷,即使您的Verticle與通常一樣駐留在同一個JVM中也是如此。
讓我們把這個說法進行測試。
首先,我們將建立一個非常大的物件(大約36MB):
class BigSerializedObject{ private String message; @Override public String toString() { return message; } public BigSerializedObject() { StringBuilder sb = new StringBuilder(UUID.randomUUID().toString()); for (int i = 0; i < 20; i++) { sb.append(sb); } this.message = sb.toString(); } }
一個Verticle將例項化並通過Event Bus傳送此物件:
class SenderVerticle extends AbstractVerticle { @Override public void start() { vertx.eventBus().send("address", new BigSerializedObject()); } }
而另一個會收到它:
class ReceiverVerticle extends AbstractVerticle { @Override public void start() { vertx.eventBus().consumer("address", (message) -> { System.out.println(message.body().toString().length()); }); } }
我們試著執行:
Vertx vertx = Vertx.vertx(); vertx.deployVerticle(new ReceiverVerticle()); vertx.deployVerticle(new SenderVerticle());
出錯了:
java.lang.IllegalArgumentException: No message codec for type: class eventbus.BigSerializedObject
Vert.x中事件匯流排的工作方式是,它可以將訊息傳遞到在不同JVM上執行並以不同語言編寫的Verticle,只要它們都是同一Vert.x叢集的一部分。
出於這個原因,它需要通過為它們指定編解碼器來指定如何通過線路對物件進行編碼和解碼。
String 和JsonObject 有編解碼器開箱即用。
這裡我們建立自己的編解碼器:
class BigSerializedObjectCodec implements MessageCodec<BigSerializedObject, BigSerializedObject> { @Override public void encodeToWire(Buffer buffer, BigSerializedObject o) { System.out.println("encodeToWire"); } @Override public BigSerializedObject decodeFromWire(int pos, Buffer buffer) { System.out.println("decodeFromWire"); return new BigSerializedObject(); } @Override public BigSerializedObject transform(BigSerializedObject o) { System.out.println("transform"); return o; } @Override public String name() { return "BrokenSerializedObjectCodec"; } @Override public byte systemCodecID() { return -1; } }
註冊:
vertx.eventBus().registerDefaultCodec(BigSerializedObject.class, new BigSerializedObjectCodec());
現在我們可以看到結果:
To create message: 45ms transform To get message: 6ms
如您所見,在同一JVM內部進行通訊時,物件將作為Verticle之間的記憶體引用傳遞。因此,對於這種情況,實際上沒有開銷。
結論
以下是幾個要點:
- Vert.x使用輕量級actor模型
- Vert.x中的Actor稱為Verticles
- Verticle使用Event Bus進行通訊
- 事件匯流排中的訊息不會保留
- 使用事件匯流排傳遞具有相同JVM的訊息沒有任何開銷