BookKeeper 叢集搭建及使用
隨著 Apache Pulsar 成為 Apache 的頂級開源專案,其儲存層的解決方案 Apache BookKeeper 再次受到業界廣泛關注。BookKeeper 在 Pulsar 之前也有很多成功的應用,比如使用 BookKeeper 實現了 HDFS NameNode 的 HA 機制(可能大部分公司使用的還是 Quorum Journal Manage 方案)、Twitter 開源的 DistributedLog 系統(可參考 Github-DistributedLog" target="_blank" rel="nofollow,noindex">Twitter開源分散式高效能日誌複製服務 ),BookKeeper 作為一個高擴充套件、強容錯、低延遲的儲存服務(A scalable, fault-tolerant, and low-latency storage service optimized for real-time workloads),它相當於把底層的儲存層系統服務化(BookKeeper 是更底層的儲存服務,類似於 Kafka 的儲存層)。這樣可以使得依賴於 BookKeeper 實現的分散式儲存系統(包括分散式訊息佇列)在設計時可以只關注其應用層和功能層的內容,儲存層比較難解決的問題像一致性、容錯等,BookKeeper 已經實現了,從這個層面看,BookKeeper 確實解決業內的一些問題,而且 BookKeeper (Ledger 化,Ledger 相當於 Kafka segment)天生適合雲上部署,未來還是有很大潛力的。近段對 BookKeeper 做了一些相應的調研,做了一些總結,本文將會主要從叢集部署和使用角度來介紹一下 Apache BookKeeper,後面準備再寫一篇文章來深入講述其架構設計及實現原理。
BookKeeper 簡介
這裡先對 BookKeeper 的基本概念做一下介紹,下圖是 BookKeeper 的架構圖(圖片來自 Introduction to Apache BookKeeper ):
在 BookKeeper 中節點(Server)被稱作 Bookie(類似於 Kafka 中 Broker,HDFS 中的 DN,但是 BookKeeper 沒有 Master 節點,它是典型 Slave/Slave 架構),資料在 Bookie 上以 Ledger 的形式儲存(類似 Kafka 中的 Segment,HDFS 中的 Block), BookKeeper 相關的基本概念如下:
- Cluster: 所有的 Bookie 組成一個叢集(連線到同一個 zk 地址的 Bookie 集合);
- Bookie:BookKeeper 的儲存節點,也即 Server 節點;
- Ledger:Ledger 是對一個 log 檔案的抽象,它本質上是由一系列 Entry (類似與 Kafka 每條 msg)組成的,client 在向 BookKeeper 寫資料時也是往 Ledger 中寫的;
- Entry:entry 本質上就是一條資料,它會有一個 id 做標識;
- Journal: Write ahead log,資料是先寫到 Journal 中,這個也是 BookKeeper 讀寫分離實現機制的一部分,後續會詳細分析;
- Ensemble: Set of Bookies across which a ledger is striped,一個 Ledger 所涉及的 Bookie 集合,初始化 Ledger 時,需要指定這個 Ledger 可以在幾臺 Bookie 上儲存;
- Write Quorum Size: Number of replicas,要寫入的副本數;
- Ack Quorum Size: Number of responses needed before client’s write is satisfied,當這麼多副本寫入成功後才會向 client 返回成功,比如副本數設定了 3,這個設定了2,client 會同時向三副本寫入資料,當收到兩個成功響應後,會認為資料已經寫入成功;
- LAC: Last Add Confirmed,Ledger 中已經確認的最近一條資料的 entry id。
BookKeeper 叢集搭建
關於 BookKeeper 叢集的搭建可以參考 Apache BookKeeper Manual deployment 這篇文章。
叢集搭建前準備
BookKeeper 叢集搭建需要:
這裡先看下 BookKeeper 的目錄結構,跟其他分散式系統也類似,命令在 bin 目錄下,配置檔案在 conf 目錄下,lib 是其依賴的相關 jar 包,如下所示:
[matt@XXX2 bookkeeper]$ ll total 64 drwxr-xr-x 2 matt matt4096 Sep 20 18:35 bin drwxr-xr-x 2 matt matt4096 Sep 20 18:35 conf drwxrwxr-x 9 matt matt4096 Oct9 21:41 deps drwxrwxr-x 2 matt matt 12288 Oct9 21:41 lib -rw-r--r-- 1 matt matt 24184 Sep 20 18:35 LICENSE -rw-r--r-- 1 matt matt5114 Sep 20 18:35 NOTICE -rw-r--r-- 1 matt matt4267 Sep 20 18:35 README.md
bin 目錄下提供了 BookKeeper 相應的操作命令,這裡用的命令主要是 bin/bookkeeper*
( bookkeeper-daemon.sh
可以讓 Bookie 程序在後臺自動執行),可以在 bin/common.sh
配置一些通用的配置(比如 JAVA_HOME),關於 bookkeeper 命令的使用方法見 bookkeeper cli
[matt@XXX2 bookkeeper]$ ll bin/ total 56 -rwxr-xr-x 1 matt matt 2319 Sep 20 18:35 bkctl -rwxr-xr-x 1 matt matt 5874 Sep 20 18:35 bookkeeper -rwxr-xr-x 1 matt matt 2869 Sep 20 18:35 bookkeeper-cluster.sh -rwxr-xr-x 1 matt matt 4590 Sep 20 18:35 bookkeeper-daemon.sh -rwxr-xr-x 1 matt matt 7785 Sep 20 18:35 common.sh -rwxr-xr-x 1 matt matt 4575 Sep 20 18:35 dlog -rwxr-xr-x 1 matt matt 1738 Sep 20 18:35 standalone -rwxr-xr-x 1 matt matt 5128 Sep 20 18:35 standalone.docker-compose -rwxr-xr-x 1 matt matt 1854 Sep 20 18:35 standalone.process
在 bookkeper 命令中,又提供了 shell 的相關命令,這裡提供的命令非常豐富,可以參考 BookKeeper Shell ,如下所示:
[matt@XXX2 bookkeeper]$ bin/bookkeeper shell Usage: bookkeeper shell [-localbookie [<host:port>]] [-ledgeridformat <hex/long/uuid>] [-entryformat <hex/string>] [-conf configuration] <command> where command is one of: autorecovery [-enable|-disable] bookieformat [-nonInteractive] [-force] [-deleteCookie] bookieinfo bookiesanity [-entries N] [-timeout N] convert-to-db-storage convert-to-interleaved-storage decommissionbookie [-bookieid <bookieaddress>] deleteledger -ledgerid <ledgerid> [-force] help[COMMAND] initbookie initnewcluster lastmark ledger[-m] <ledger_id> ledgermetadata -ledgerid <ledgerid> listbookies[-readwrite|-readonly] [-hostnames] listfilesondisc[-journal|-entrylog|-index] listledgers[-meta] [-bookieid <bookieaddress>] listunderreplicated [[-missingreplica <bookieaddress>] [-excludingmissingreplica <bookieaddress>]] [-printmissingreplica] [-printreplicationworkerid] lostbookierecoverydelay [-get|-set <value>] metaformat[-nonInteractive] [-force] nukeexistingcluster -zkledgersrootpath <zkledgersrootpath> [-instanceid <instanceid> | -force] readjournal [-dir] [-msg] <journal_id | journal_file_name> readledger[-bookie <address:port>][-msg] -ledgerid <ledgerid> [-firstentryid <firstentryid> [-lastentryid <lastentryid>]] [-force-recovery] readlog[-msg] <entry_log_id | entry_log_file_name> [-ledgerid <ledgerid> [-entryid <entryid>]] [-startpos <startEntryLogBytePos> [-endpos <endEntryLogBytePos>]] readlogmetadata <entry_log_id | entry_log_file_name> rebuild-db-ledger-locations-index recover [-deleteCookie] <bookieSrc[:bookieSrc]> simpletest[-ensemble N] [-writeQuorum N] [-ackQuorum N] [-numEntries N] triggeraudit updatecookie [-bookieId <hostname|ip>] [-expandstorage] [-list] [-delete <force>] updateledgers -bookieId <hostname|ip> [-updatespersec N] [-limit N] [-verbose true/false] [-printprogress N] whatisinstanceid whoisauditor
conf 目錄下是關於 BookKeeper 的相關配置,如下所示,主要配置在 bk_server.conf
中,這裡可以提供的配置非常多,具體可配置的引數可以參考 BookKeeper Config ,
[matt@XXX2 bookkeeper]$ ll conf/ total 84 -rw-r--r-- 1 matt matt1804 Sep 20 18:35 bk_cli_env.sh -rw-r--r-- 1 matt matt2448 Sep 20 18:35 bkenv.sh -rwxr-xr-x 1 matt matt 42269 Sep 20 18:35 bk_server.conf -rw-r--r-- 1 matt matt1211 Sep 20 18:35 jaas_example.conf -rw-r--r-- 1 matt matt2311 Sep 20 18:35 log4j.cli.properties -rw-r--r-- 1 matt matt2881 Sep 20 18:35 log4j.properties -rw-r--r-- 1 matt matt1810 Sep 20 18:35 log4j.shell.properties -rw-r--r-- 1 matt matt1117 Sep 20 18:35 nettyenv.sh -rwxr-xr-x 1 matt matt1300 Sep 20 18:35 standalone.conf -rw-r--r-- 1 matt matt3275 Sep 20 18:35 zookeeper.conf -rw-r--r-- 1 matt matt843 Sep 20 18:35 zookeeper.conf.dynamic
叢集搭建
在 Apache BookKeeper Releases 中下載 BookKeeper 最新的安裝包(這裡以 bookkeeper-server-4.8.0-bin.tar.gz 為例)。
將安裝包在指定目錄下解壓後,啟動的操作分為以下幾步:
- 修改相關配置(
zkServers
、bookiePort
、journalDir
、ledgerDir
等); - 在相應的機器上啟動 Bookie 程序(使用
./bin/bookkeeper-daemon.sh start bookie
啟動 Bookie); - 當所有的 Bookie 啟動完成後,隨便選擇一臺,初始化叢集 meta 資訊(使用
bookkeeper-server/bin/bookkeeper shell metaformat
命令初始化叢集的 meta 資訊,這裡只需要初始化一次)。
如果啟動成功的話(如果有異常日誌,即使 Bookie 程序存在,也可能沒有啟動成功),啟動正常的情況下,在日誌中,可以看到類似下面的資訊:
2018-10-15 11:24:49,549 - INFO[main:ComponentStarter@81] - Started component bookie-server.
Admin REST API
BookKeeper 服務提供了相應的 Rest API,可供管理員使用,具體可以參考 BookKeeper Admin REST API ,如果想要使用這個功能,首先需要 Bookie 服務將 bk_server.conf 中的 httpServerEnabled
配置設定為 true ,相關的配置參考 Http server settings 。
安裝時踩的坑
在搭建 BookKeeper 叢集中,並沒有想象中那麼順暢,遇到了一些小問題,記錄如下:
問題1:修改配置後重新啟動失敗
在使用 ./bin/bookkeeper-daemon.sh stop bookie
命令關閉 Bookie 程序,當關閉完 Bookie 程序後,再次啟動時,發現無法啟動,報出了下面的錯誤:
2018-10-13 21:05:40,674 - ERROR [main:Main@221] - Failed to build bookie server org.apache.bookkeeper.bookie.BookieException$InvalidCookieException: instanceId 406a08e5-911e-4ab6-b97b-40e4a56279a8 is not matching with null at org.apache.bookkeeper.bookie.Cookie.verifyInternal(Cookie.java:142) at org.apache.bookkeeper.bookie.Cookie.verify(Cookie.java:147) at org.apache.bookkeeper.bookie.Bookie.verifyAndGetMissingDirs(Bookie.java:381) at org.apache.bookkeeper.bookie.Bookie.checkEnvironmentWithStorageExpansion(Bookie.java:444) at org.apache.bookkeeper.bookie.Bookie.checkEnvironment(Bookie.java:262) at org.apache.bookkeeper.bookie.Bookie.<init>(Bookie.java:646) at org.apache.bookkeeper.proto.BookieServer.newBookie(BookieServer.java:133) at org.apache.bookkeeper.proto.BookieServer.<init>(BookieServer.java:102) at org.apache.bookkeeper.server.service.BookieService.<init>(BookieService.java:43) at org.apache.bookkeeper.server.Main.buildBookieServer(Main.java:299) at org.apache.bookkeeper.server.Main.doMain(Main.java:219) at org.apache.bookkeeper.server.Main.main(Main.java:201)
大概的意思就是說現在 zk 上的 instanceId 是 406a08e5-911e-4ab6-b97b-40e4a56279a8
,而期望的 instanceId 是 null,索引因為驗證失敗導致程序無法啟動,instanceId 是搭建叢集第三步(初始化叢集 meta 資訊的地方)中初始化的。此時如果我們啟動測試的 client 程式,會丟擲以下異常,這是因為目前叢集只有2臺 Bookie 處在可用狀態,而 ensSize 預設是 3,writeQuorumSize 是 2,ackQuorumSize 是2。在 client 的測試程式中,新建一個 Ledger 時,由於叢集當前可用的 Bookie 為2,不滿足相應的條件,所以丟擲了一下的異常:
org.apache.bookkeeper.client.BKException$BKNotEnoughBookiesException: Not enough non-faulty bookies available at org.apache.bookkeeper.client.SyncCallbackUtils.finish(SyncCallbackUtils.java:83) at org.apache.bookkeeper.client.SyncCallbackUtils$SyncCreateCallback.createComplete(SyncCallbackUtils.java:106) at org.apache.bookkeeper.client.LedgerCreateOp.createComplete(LedgerCreateOp.java:238) at org.apache.bookkeeper.client.LedgerCreateOp.initiate(LedgerCreateOp.java:142) at org.apache.bookkeeper.client.BookKeeper.asyncCreateLedger(BookKeeper.java:891) at org.apache.bookkeeper.client.BookKeeper.createLedger(BookKeeper.java:975) at org.apache.bookkeeper.client.BookKeeper.createLedger(BookKeeper.java:930) at org.apache.bookkeeper.client.BookKeeper.createLedger(BookKeeper.java:911) at com.matt.test.bookkeeper.ledger.LedgerTest.createLedgerSync(LedgerTest.java:110) at com.matt.test.bookkeeper.ledger.LedgerTest.main(LedgerTest.java:25) Exception in thread "main" java.lang.NullPointerException at com.matt.test.bookkeeper.ledger.LedgerTest.main(LedgerTest.java:26)
關於這個 BookieException$InvalidCookieException 異常,google 了一下並沒有找到相應的解決辦法,所以就直接看了相應的程式碼,丟擲異常的程式碼如下:
private void verifyInternal(Cookie c,boolean checkIfSuperSet)throws BookieException.InvalidCookieException { String errMsg; if (c.layoutVersion < 3 && c.layoutVersion != layoutVersion) { errMsg = "Cookie is of too old version " + c.layoutVersion; LOG.error(errMsg); throw new BookieException.InvalidCookieException(errMsg); } else if (!(c.layoutVersion >= 3 && c.bookieHost.equals(bookieHost) && c.journalDirs.equals(journalDirs) && verifyLedgerDirs(c, checkIfSuperSet))) { errMsg = "Cookie [" + this + "] is not matching with [" + c + "]"; throw new BookieException.InvalidCookieException(errMsg); } else if ((instanceId == null && c.instanceId != null) || (instanceId != null && !instanceId.equals(c.instanceId))) { // instanceId should be same in both cookies errMsg = "instanceId " + instanceId + " is not matching with " + c.instanceId; throw new BookieException.InvalidCookieException(errMsg); // 由於 instanceId 不匹配,丟擲了相應的異常 } }
這裡可以看到的是從 zk 上拿到的 instanceId 是 406a08e5-911e-4ab6-b97b-40e4a56279a8
,而 Cookie 例項 c 中的 instanceId 為 null,那麼 這個 Cookie 是如何初始化的呢?往上追一下程式碼,發現是在初始化 Bookie 時,會檢查一下相應的執行環境,此時會從 journalDirectories 和 ledgerDirectories 中 current/VERSION
中初始化相應的 Cookie 物件,由於這個臺機器之前啟動過,所以這個檔案已經建立了,檔案的內容如下:
[matt@XXX2 bookkeeper]$ cat /tmp/bk-data/current/VERSION 4 bookieHost: "XXX:3181" journalDir: "/tmp/bk-txn" ledgerDirs: "1\t/tmp/bk-data" [matt@XXX2 bookkeeper]$ cat /tmp/bk-txn/current/VERSION 4 bookieHost: "XXX:3181" journalDir: "/tmp/bk-txn" ledgerDirs: "1\t/tmp/bk-data"
Cookie 從檔案載入相應檔案,並初始化物件的實現方法如下:
/** * Read cookie from registration manager for a given bookie <i>address</i>. * * @param rm registration manager * @param address bookie address * @return versioned cookie object * @throws BookieException when fail to read cookie */ public static Versioned<Cookie> readFromRegistrationManager(RegistrationManager rm, BookieSocketAddress address) throws BookieException { Versioned<byte[]> cookieData = rm.readCookie(address.toString()); try { try (BufferedReader reader = new BufferedReader( new StringReader(new String(cookieData.getValue(), UTF_8)))) { Builder builder = parse(reader); Cookie cookie = builder.build(); return new Versioned<Cookie>(cookie, cookieData.getVersion()); } } catch (IOException ioe) { throw new InvalidCookieException(ioe); } } private static Builder parse(BufferedReader reader)throws IOException { Builder cBuilder = Cookie.newBuilder(); int layoutVersion = 0; String line = reader.readLine(); if (null == line) { throw new EOFException("Exception in parsing cookie"); } try { layoutVersion = Integer.parseInt(line.trim()); cBuilder.setLayoutVersion(layoutVersion); } catch (NumberFormatException e) { throw new IOException("Invalid string '" + line.trim() + "', cannot parse cookie."); } if (layoutVersion == 3) { cBuilder.setBookieHost(reader.readLine()); cBuilder.setJournalDirs(reader.readLine()); cBuilder.setLedgerDirs(reader.readLine()); } else if (layoutVersion >= 4) { //這裡的版本預設為 4 CookieFormat.Builder cfBuilder = CookieFormat.newBuilder(); TextFormat.merge(reader, cfBuilder); CookieFormat data = cfBuilder.build(); cBuilder.setBookieHost(data.getBookieHost()); cBuilder.setJournalDirs(data.getJournalDir()); cBuilder.setLedgerDirs(data.getLedgerDirs()); // Since InstanceId is optional if (null != data.getInstanceId() && !data.getInstanceId().isEmpty()) { //如果檔案中沒有 instanceId 欄位,這裡就不會初始化到 Cookie 中 cBuilder.setInstanceId(data.getInstanceId()); } } return cBuilder; }
解決的方法很簡單,在 current/VERSION
檔案中新增相應的 instanceId 欄位後,Bookie 便可啟動成功。但是這裡還需要考慮的問題是:
- instanceId 在這裡的作用是什麼?instanceId 是在叢集初始化時設定的,關於這個值的含義,我推測它的目的是對節點的上線做一個簡單的認證,也就是說如果打算在叢集中新新增一臺 Bookie,需要知道當前的 instanceId 值,這樣才能加入到這個叢集中;
- Bookie 服務的啟動流程是什麼樣的?這裡就需要看下程式碼的具體實現,追一下 Bookie 的啟動流程了。
BookKeeper API 使用
關於 BookKeeper API,總共提供了以下三種 API:
- The ledger API is a lower-level API that enables you to interact with ledgers directly,第一種是一種較為底層的 API 介面,直接與 Ledger 互動,見 The Ledger API ;
- The Ledger Advanced API is an advanced extension to Ledger API to provide more flexibilities to applications,第二種較高階的 API,提供了一些較高階的功能,見 The Advanced Ledger API ;
- The DistributedLog API is a higher-level API that provides convenient abstractions,這種是關於 DistributedLog 的一些操作 API,見 DistributedLog 。
在這節,我們主要看下第一種的實現,會簡單講述一下第二種,第三種這裡不再介紹。
The Ledger API
關於 Ledger API 基本操作主要有以下幾種:
- 建立 Ledger;
- 向 Ledger 寫入資料(Entry);
- 關閉 Ledger,Ledger 關閉後資料就不能再寫入,Ledger 一旦關閉它的資料就是不可變的;
- 從 Ledger 中讀取資料;
- 刪除 Ledger。
當然實現上述操作的前提是,需要先初始化一個 BookKeeper Client,下面開始慢慢講述。
初始化 BookKeeper Client
BK Client 的初始化需要指定 zk 地址,BK Client 通過 zk 來連線到 BK 叢集,具體實現如下:
// 第一種初始化 BookKeeper Client 的方法 try { String connectionString = zkAddr; // For a single-node, local ZooKeeper cluster BookKeeper bkClient = new BookKeeper(connectionString); logger.info("BookKeeper client init success."); } catch (InterruptedException | IOException | BKException e) { e.printStackTrace(); throw new RuntimeException( "There is an exception throw while creating the BookKeeper client."); } // 第二種初始化 BookKeeper Client 的方法 try { ClientConfiguration config = new ClientConfiguration(); config.setZkServers(zkAddr); config.setAddEntryTimeout(2000); BookKeeper bkClient = new BookKeeper(config); logger.info("BookKeeper client init success."); } catch (InterruptedException | IOException | BKException e) { e.printStackTrace(); throw new RuntimeException( "There is an exception throw while creating the BookKeeper client."); }
新建一個 Ledger
Ledger 的建立有兩種,一種是同步建立,一種是非同步建立(建立時需要指定相應的 password),其實現分別如下:
/** * create the ledger, default ensemble size is 3, write quorum size is 2, ack quorum size is 2 * * @param pw password * @return LedgerHandle */ public LedgerHandle createLedgerSync(String pw){ byte[] password = pw.getBytes(); try { LedgerHandle handle = bkClient.createLedger(BookKeeper.DigestType.MAC, password); return handle; } catch (BKException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } return null; } /** * create the ledger * * @param pw password */ public void createLedgerAsync(String pw){ class LedgerCreationCallbackimplements AsyncCallback.CreateCallback{ public void createComplete(int returnCode, LedgerHandle handle, Object ctx){ System.out.println("Ledger successfully created"); logger.info("Ledger successfully created async."); } } bkClient.asyncCreateLedger( 3, // ensSize 2, // writeQuorumSize and ackQuorumSize BookKeeper.DigestType.MAC, pw.getBytes(), new LedgerCreationCallback(), "some context" ); }
新建好 Ledger 之後,會返回一個 LedgerHandle 例項,對於 Ledger 的操作都是通過這個例項物件完成的,也可以通過 LedgerHandle.getId()
方法獲取 Ledger 的 id,有了這個 id 就可以對映到具體的 Ledger,當需要讀取資料時,通過 ledger id 初始化相應的 LedgerHandle 例項即可。
向 Ledger 寫入資料
有了 Ledger 對應的 LedgerHandle 例項之後,可以通過 addEntry()
方法直接向 Ledger 寫資料,如下所示:
public long addEntry(LedgerHandle ledgerHandle, String msg){ try { return ledgerHandle.addEntry(msg.getBytes()); } catch (InterruptedException e) { e.printStackTrace(); } catch (BKException e) { e.printStackTrace(); } return -1; }
從 Ledger 讀取資料
從 Ledger 讀取資料時,也是通過 LedgerHandle 例項的方法實現,提供了以下三種方法:
- 指定讀取的 entry.id 範圍消費;
- 從某一個 entry.id 一直讀取到 LAC (LastAddConfirmed,該 Ledger 中最近的已經確認的資料)位置;
- 從某一個 entry.id 一直讀取到 lastEntryIdExpectedToRead 位置,該位置可以比 LAC 大,前提是需要該值已經有對應的資料;
方法實現如下:
/** * read entry from startId to endId * * @param ledgerHandle the ledger * @param startIdstart entry id * @param endIdend entry id * @return the entries, if occur exception, return null */ public Enumeration<LedgerEntry> readEntry(LedgerHandle ledgerHandle,int startId, int endId){ try { return ledgerHandle.readEntries(startId, endId); } catch (InterruptedException e) { e.printStackTrace(); } catch (BKException e) { e.printStackTrace(); } return null; } /** * read entry from 0 to the LAC * * @param ledgerHandle the ledger * @return the entries, if occur exception, return null */ public Enumeration<LedgerEntry> readEntry(LedgerHandle ledgerHandle){ try { return ledgerHandle.readEntries(0, ledgerHandle.getLastAddConfirmed()); } catch (InterruptedException e) { e.printStackTrace(); } catch (BKException e) { e.printStackTrace(); } return null; } /** * read entry form 0 to lastEntryIdExpectedToRead which can larger than the LastAddConfirmed range * * @param ledgerHandlethe handle * @param lastEntryIdExpectedToRead the last entry id * @return the entries, if occur exception, return null */ public Enumeration<LedgerEntry> readEntry(LedgerHandle ledgerHandle, long lastEntryIdExpectedToRead) { try { return ledgerHandle.readUnconfirmedEntries(0, lastEntryIdExpectedToRead); } catch (InterruptedException e) { e.printStackTrace(); } catch (BKException e) { e.printStackTrace(); } return null; }
刪除 Ledger
Ledger 的刪除實現也很簡潔,如下所示:
/** * delete the ledger * * @param ledgerId the ledger id * @return if occur exception, return false */ public boolean deleteLedger(long ledgerId){ try { bkClient.deleteLedger(ledgerId); return true; } catch (Exception e) { e.printStackTrace(); } return false; }
The Ledger Advanced API
Ledger 的 Advanced API 在用法上與上面的實現差異不大,它嚮應用提供了更大的靈活性,比如:在建立 Ledger 時,應用可以指定 LedgerId,寫入 Entry 時,應用也可以指定相應的 EntryID。
新建 Ledger
在新建 Ledger 這部分,Advanced API 可以指定 LedgerId 建立相應的 Ledger,如下面示例的第三種實現。
假設當前 BK 叢集的 LedgerId 已經到了5,這時候在新建 Ledger 時如果不指定 LedgerId,下一個被使用的 LedgerId 就是6,如果應用指定了 7,新建的 Leader 的 id 將會是設定的 7,id 6 會等待下次再被使用。
/** * create the ledger * * @param password pw * @return LedgerHandleAdv */ public LedgerHandleAdv createLedger(String password){ byte[] passwd = password.getBytes(); try { LedgerHandleAdv handle = (LedgerHandleAdv) bkClient.createLedgerAdv( 3, 3, 2, // replica settings BookKeeper.DigestType.CRC32, passwd); return handle; } catch (InterruptedException e) { e.printStackTrace(); } catch (BKException e) { e.printStackTrace(); } catch (Exception e) { e.printStackTrace(); } return null; } /** * create the ledger async * * @param password */ public void createLedgerAsync(String password){ class LedgerCreationCallbackimplements AsyncCallback.CreateCallback{ public void createComplete(int returnCode, LedgerHandle handle, Object ctx){ System.out.println("Ledger successfully created"); } } bkClient.asyncCreateLedgerAdv( 3, // ensemble size 3, // write quorum size 2, // ack quorum size BookKeeper.DigestType.CRC32, password.getBytes(), new LedgerCreationCallback(), "some context", null); } /** * create the ledger on special ledgerId * * @param password pw * @param ledgerId the ledger id, if the ledger id exist, it will return BKLedgerExistException * @return LedgerHandleAdv */ public LedgerHandleAdv createLedger(String password,long ledgerId){ byte[] passwd = password.getBytes(); try { LedgerHandleAdv handle = (LedgerHandleAdv) bkClient.createLedgerAdv( ledgerId, 3, 3, 2, // replica settings BookKeeper.DigestType.CRC32, passwd, null); return handle; } catch (InterruptedException e) { e.printStackTrace(); } catch (BKException e) { e.printStackTrace(); } catch (Exception e) { e.printStackTrace(); } return null; }
向 Ledger 新增 Entry
向 Ledger 新增 Entry API 中,最吸引我的是可以指定 EntryId 寫入(熟悉 Kafka 的同學知道,向 Kafka 寫入資料是可以指定 Partition,但是不能指定 offset,如果可以指定 offset 寫入,那麼在做容災時就可以實現 topic 的完全同步,下游可以根據 commit offset 隨時切換資料來源),其示例如下(注意,Advanced API 在寫資料時是強制要指定 entryId 的):
/** * add the msg to the ledger on the special entryId * * @param ledgerHandleAdv ledgerHandleAdv * @param entryIdthe entry id * @param msgmsg */ public void addEntry(LedgerHandleAdv ledgerHandleAdv,long entryId, String msg){ try { ledgerHandleAdv.addEntry(entryId, msg.getBytes()); } catch (InterruptedException e) { e.printStackTrace(); } catch (BKException e) { e.printStackTrace(); } }
關於這個 API,社群官方文件有如下介紹:
- The entry id has to be non-negative.
- Clients are okay to add entries out of order.
- However, the entries are only acknowledged in a monotonic order starting from 0.
首先,說下我對上面的理解:entry.id 要求是非負的,client 在新增 entry 時可以亂序,但是 entry 只有 0 開始單調順序增加時才會被 ack。最開始,我以為是隻要 entry.id 單調遞增就可以,跑了一個測試用例,第一個 entry 的 id 設定為 0,第二個設定為 2,然後程式直接 hang 在那裡了,相應日誌資訊為:
2018-10-19 16:58:34[ BookKeeperClientWorker-OrderedExecutor-0-0:662 ] - [ DEBUG ]Got Add response from bookie:XXX.230:3181 rc:EOK, ledger:8:entry:0 2018-10-19 16:58:34[ BookKeeperClientWorker-OrderedExecutor-0-0:663 ] - [ DEBUG ]Got Add response from bookie:XXX.247:3181 rc:EOK, ledger:8:entry:0 2018-10-19 16:58:34[ BookKeeperClientWorker-OrderedExecutor-0-0:663 ] - [ DEBUG ]Submit callback (lid:8, eid: 0). rc:0 2018-10-19 16:58:34[ main:663 ] - [ DEBUG ]Adding entry [50, 32, 109, 97, 116, 116, 32, 116, 101, 115, 116] 2018-10-19 16:58:34[ BookKeeperClientWorker-OrderedExecutor-0-0:673 ] - [ DEBUG ]Got Add response from bookie:XXX.247:3181 rc:EOK, ledger:8:entry:2 2018-10-19 16:58:34[ BookKeeperClientWorker-OrderedExecutor-0-0:673 ] - [ DEBUG ]Got Add response from bookie:XXX.230:3181 rc:EOK, ledger:8:entry:2 2018-10-19 16:58:34[ BookKeeperClientWorker-OrderedExecutor-0-0:673 ] - [ DEBUG ]Head of the queue entryId: 2 is not the expected value: 1 2018-10-19 16:58:34[ BookKeeperClientWorker-OrderedExecutor-0-0:673 ] - [ DEBUG ]Got Add response from bookie:XXX.146:3181 rc:EOK, ledger:8:entry:0 2018-10-19 16:58:34[ BookKeeperClientWorker-OrderedExecutor-0-0:673 ] - [ DEBUG ]Head of the queue entryId: 2 is not the expected value: 1 2018-10-19 16:58:34[ BookKeeperClientWorker-OrderedExecutor-0-0:681 ] - [ DEBUG ]Got Add response from bookie:XXX.146:3181 rc:EOK, ledger:8:entry:2 2018-10-19 16:58:34[ BookKeeperClientWorker-OrderedExecutor-0-0:681 ] - [ DEBUG ]Head of the queue entryId: 2 is not the expected value: 1 2018-10-19 16:58:37[ main-SendThread(zk01:2181):3702 ] - [ DEBUG ]Got ping response for sessionid: 0x3637dbff9e7486c after 0ms 2018-10-19 16:58:40[ main-SendThread(zk01:2181):7039 ] - [ DEBUG ]Got ping response for sessionid: 0x3637dbff9e7486c after 0ms 2018-10-19 16:58:43[ main-SendThread(zk01:2181):10374 ] - [ DEBUG ]Got ping response for sessionid: 0x3637dbff9e7486c after 0ms 2018-10-19 16:58:47[ main-SendThread(zk01:2181):13710 ] - [ DEBUG ]Got ping response for sessionid: 0x3637dbff9e7486c after 0ms 2018-10-19 16:58:50[ main-SendThread(zk01:2181):17043 ] - [ DEBUG ]Got ping response for sessionid: 0x3637dbff9e7486c after 0ms
可以看到有這樣的異常日誌 Head of the queue entryId: 2 is not the expected value: 1
,期望的 entry id 是 1,這裡是 2,亂序了,導致程式直接 hang 住(hang 住的原因推測是這個 Entry 沒有被 ack),該異常資訊出現地方如下:
void sendAddSuccessCallbacks(){ // Start from the head of the queue and proceed while there are // entries that have had all their responses come back PendingAddOp pendingAddOp; while ((pendingAddOp = pendingAddOps.peek()) != null && blockAddCompletions.get() == 0) { if (!pendingAddOp.completed) { if (LOG.isDebugEnabled()) { LOG.debug("pending add not completed: {}", pendingAddOp); } return; } // Check if it is the next entry in the sequence. if (pendingAddOp.entryId != 0 && pendingAddOp.entryId != pendingAddsSequenceHead + 1) { if (LOG.isDebugEnabled()) { LOG.debug("Head of the queue entryId: {} is not the expected value: {}", pendingAddOp.entryId, pendingAddsSequenceHead + 1); } return; } pendingAddOps.remove(); explicitLacFlushPolicy.updatePiggyBackedLac(lastAddConfirmed); pendingAddsSequenceHead = pendingAddOp.entryId; if (!writeFlags.contains(WriteFlag.DEFERRED_SYNC)) { this.lastAddConfirmed = pendingAddsSequenceHead; } pendingAddOp.submitCallback(BKException.Code.OK); } }
如果 entry id 出現了亂序,會導致這個 add 操作沒有正常處理。但是如果這裡強制要求 entry.id 從 0,而還有序,那麼這個 API 跟前面的 API 有什麼區別?這點沒有搞懂,也向社群發一封郵件諮詢,還在等待社群的響應。