1. 程式人生 > >Pulsar官方文件翻譯-概念和架構-Pulsar客戶端(Pulsar Clients)

Pulsar官方文件翻譯-概念和架構-Pulsar客戶端(Pulsar Clients)

本篇翻譯轉自Zongyang在crowdin apache pulsar專案中的翻譯,由於已經完成翻譯,我直接轉載,不再翻譯。

------------------------------------------

Pulsar 客戶端

Pulsar向用戶暴露原生的JavaC++的客戶端API。

Puslar客戶端封裝並優化了客戶端-伺服器的通訊協議,並暴露出一套簡單直觀的API,提供給應用程式使用。

底層實現上,目前官方版的Pulsar客戶端支援對使用者透明的連線重連、故障切換、未ack訊息的緩衝、訊息重傳。

自定義客戶端庫

如果您想建立自己的客戶端庫, 我們建議參考Pulsar的自定義 

二進位制協議 的文件。

客戶端使用步驟

當應用程式要建立生產者/消費者時, Pulsar客戶端庫執行按以下兩個步驟的工作:

  1. 客戶端將嘗試通過向伺服器(Broker)傳送 HTTP 查詢請求,來確定主題(Topic)所在的伺服器(Broker)。客戶端通過查詢Zookeeper中(快取)的元資料,來確定這條訊息的topic在哪個broker上,如果該topic不在任何一個broker上,則把這個topic分配在負載最少的broker上。

  2. 當客戶端獲取了broker的地址之後,將會建立一個TCP連線(或複用連線池中的連線)並且進行鑑權。客戶端和broker通過該連線交換基於自定義協議的二進位制命令。同時,客戶端會向broker傳送一條命令用以在broker上建立生產者/消費者,該命令將會在驗證授權策略後生效。

每當 TCP 連線中斷時, 客戶端將立即重新啟動此安裝階段, 並將繼續嘗試使用指數退避重新建立生產者或使用者, 直到操作成功為止。

Reader 介面

在Pulsar中, "標準" 消費者介面 涉及使用消費者監聽 主題, 處理傳入訊息, 並在處理完這些訊息後最終確認它們。 每當使用者連線到某個主題時, 它就會自動開始從最早的沒被確認(unacked)的訊息處讀取, 因為該主題的遊標是由Pulsar自動管理的。

使用Pulsar的 讀取器介面, 應用程式可以手動管理遊標。 當使用讀取器連線到一個主題而非消費者時,在讀取器連線到主題的時候就需要指定讀取器從哪個位置開始讀訊息。

當連線到某個主題時, 讀取器從以下位置開始讀訊息:

  • 主題中的 最早的 可用訊息

  • 主題中的 最新 可用訊息

  • 除最早的和最新的之外的可用訊息位點。如果你使用該選項,你需要顯式的提供一個訊息ID。你的應用程式需要自己管理訊息ID,例如從持久化的資料儲存或快取中獲取

Pulsar的讀取器介面在流計算場景下,對提供effective-once的語義很有幫助。 Pulsar能夠將主題的訊息進行重放,並從重放後的位置開始讀取訊息,是滿足流處理的場景的重要基礎。讀取器介面為Pulsar客戶端在主體內提供了一種底層抽象“手動管理的位點”

未分割槽主題

Pulsar的讀取器目前無法在已分割槽的主題上使用。

下面是一個Java語言實現的從主題上最早可用訊息的位置開始消費的例子:

import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Reader;

// Create a reader on a topic and for a specific message (and onward)
Reader<byte[]> reader = pulsarClient.newReader()
    .topic("reader-api-test")
    .startMessageId(MessageId.earliest)
    .create();

while (true) {
    Message message = reader.readNext();

    // Process the message
}

建立一個從最新可用訊息處開始讀取訊息的讀取器 

Reader<byte[]> reader = pulsarClient.newReader()
    .topic(topic)
    .startMessageId(MessageId.latest)
    .create();

建立一個從其他位置(非最早可用且非最新可用訊息處)讀取訊息的讀取器

byte[] msgIdBytes = // Some byte array
MessageId id = MessageId.fromByteArray(msgIdBytes);
Reader<byte[]> reader = pulsarClient.newReader()
    .topic(topic)
    .startMessageId(id)
    .create();