1. 程式人生 > >HBase Coprocessor的實現與應用

HBase Coprocessor的實現與應用

本文來自於中國HBase技術社群武漢站HBase MeetUp線下交流會的烽火大資料平臺研發負責人葉鏗(雲端浪子)。

HBase Coprocessor的實現與應用PPT下載:http://hbase.group/slides/159


ef883f0bd0cc5164a20c21ec7e5ec28d78723bee

本次分享的內容主要分為以下五點:

  1. Coprocessor簡介

  2. Endpoint服務端實現

  3. Endpoint客戶端實現

  4. Observer實現二級索引

  5. Coprocessor應用場景


1.Coprocessor簡介

HBase協處理器的靈感來自於Jeff Dean 09年的演講,根據該演講實現類似於Bigtable的協處理器,包括以下特性:每個表伺服器的任意子表都可以執行程式碼客戶端的高層呼叫介面(客戶端能夠直接訪問資料表的行地址,多行讀寫會自動分片成多個並行的RPC呼叫),提供一個非常靈活的、可用於建立分散式服務的資料模型,能夠自動化擴充套件、負載均衡、應用請求路由。HBase的協處理器靈感來

自Bigtable,但是實現細節不盡相同。HBase建立框架為使用者提供類庫和執行時環境,使得程式碼能夠在HBase Region Server和Master上面進行處理。


a5e6efe56123420dc5710262c59dccd84c6fbb1b



(1)實現目的

  • HBase無法輕易建立“二級索引”;

  • 執行求和、計數、排序等操作比較困難,必須通過MapReduce/Spark實現,對於簡單的統計或聚合計算時,可能會因為網路與IO開銷大而帶來效能問題。


(2)靈感來源

靈感來源於Bigtable的協處理器,包含如下特性:

  • 每個表伺服器的任意子表都可以執行程式碼;

  • 客戶端能夠直接訪問資料表的行,多行讀寫會自動分片成多個並行的RPC呼叫。


(3)提供介面

  • RegionObserver:提供客戶端的資料操縱事件鉤子:Get、Put、Delete、Scan等;

  • WALObserver:提供WAL相關操作鉤子;

  • MasterObserver:提供DDL-型別的操作鉤子。如建立、刪除、修改資料表等;

  • Endpoint:終端是動態RPC外掛的介面,它的實現程式碼被安裝在伺服器端,能夠通過HBase RPC呼叫喚醒。


(4)應用範圍

  • 通過使用RegionObserver介面可以實現二級索引的建立和維護;

  • 通過使用Endpoint介面,在對資料進行簡單排序和sum,count等統計操作時,能夠極大提高效能。

本文將通過具體例項來演示兩種協處理器的開發方法的詳細實現過程。


2.Endpoint服務端實現

在傳統關係型資料庫裡面,可以隨時的對某列進行求和sum,但是目前HBase目前所提供的介面,直接求和是比較困難的,所以先編寫好服務端程式碼,並載入到對應的Table上,載入協處理器有幾種方法,可以通過HTableDescriptor的addCoprocessor方法直接載入,同理也可以通過removeCoprocessor方法解除安裝協處理器。

Endpoint協處理器類似傳統資料庫的儲存過程,客戶端呼叫Endpoint協處理器執行一段Server端程式碼,並將Server端程式碼的結果返回給Client進一步處理,最常見的用法就是進行聚合操作。舉個例子說明:如果沒有協處理器,當用戶需要找出一張表中的最大資料即max聚合操作,必須進行全表掃描,客戶端程式碼遍歷掃描結果並執行求max操作,這樣的方法無法利用底層叢集的併發能力,而將所有計算都集中到Client端統一執行, 效率非常低。但是使用Coprocessor,使用者將求max的程式碼部署到HBase Server端,HBase將利用底層Cluster的多個節點並行執行求max的操作即在每個Region範圍內執行求最大值邏輯,將每個Region的最大值在Region Server端計算出,僅僅將該max值返回給客戶端。客戶端進一步將多個Region的max進一步處理而找到其中的max,這樣整體執行效率提高很多。但是一定要注意的是Coprocessor一定要寫正確,否則導致RegionServer宕機。

91b197c99d399fc4d4269ca5ee856d04f91d4367


Protobuf定義

如前所述,客戶端和服務端之間需要進行RPC通訊,所以兩者間需要確定介面,當前版本的HBase的協處理器是通過Google Protobuf協議來實現資料交換的,所以需要通過Protobuf來定義介面。

如下所示:

67f79f94a008a7f5c90f2ed6d0f80a22322527d3


可以看到這裡定義7個聚合服務RPC,名字分別叫做GetMax、GetMin、GetSum等,本文通過GetSum進行舉例,其他的聚合RPC也是類似的內部實現。RPC有一個入口引數,用訊息AggregateRequest表示;RPC的返回值用訊息AggregateResponse表示。Service是一個抽象概念,RPC的Server端可以看作一個用來提供服務的Service。在HBase Coprocessor中Service就是Server端需要提供的Endpoint Coprocessor服務,主要用來給HBase的Client提供服務。AggregateService.java是由Protobuf軟體通過終端命令“protoc filename.proto--java_out=OUT_DIR”自動生成的,其作用是將.proto檔案定義的訊息結構以及服務轉換成對應介面的RPC實現,其中包括如何構建request訊息和response響應以及訊息包含的內容的處理方式,並且將AggregateService包裝成一個抽象類,具體的服務以類的方法的形式提供。AggregateService.java定義Client端與Server端通訊的協議,程式碼中包含請求資訊結構AggregateRequest、響應資訊結構AggregateResponse、提供的服務種類AggregateService,其中AggregateRequest中的interpreter_class_name指的是column interpreter的類名,此類的作用在於將資料格式從儲存型別解析成所需型別。AggregateService.java由於程式碼太長,在這裡就不貼出來了。

下面我們來講一下服務端的架構,

首先,Endpoint Coprocessor是一個Protobuf Service的實現,因此需要它必須繼承某個ProtobufService。我們在前面已經通過proto檔案定義Service,命名為AggregateService,因此Server端程式碼需要過載該類,其次作為HBase的協處理器,Endpoint 還必須實現HBase定義的協處理器協議,用Java的介面來定義。具體來說就是CoprocessorService和Coprocessor,這些HBase介面負責將協處理器和HBase 的RegionServer等例項聯絡起來以便協同工作。Coprocessor介面定義兩個介面函式:start和stop。

載入Coprocessor之後Region開啟的時候被RegionServer自動載入,並會呼叫器start 介面完成初始化工作。一般情況該介面函式僅僅需要將協處理器的執行上下文環境變數CoprocessorEnvironment儲存到本地即可。

CoprocessorEnvironment儲存協處理器的執行環境,每個協處理器都是在一個RegionServer程序內執行並隸屬於某個Region。通過該變數獲取Region的例項等 HBase執行時環境物件。

Coprocessor介面還定義stop()介面函式,該函式在Region被關閉時呼叫,用來進行協處理器的清理工作。本文裡我們沒有進行任何清理工作,因此該函式什麼也不幹。

我們的協處理器還需要實現CoprocessorService介面。該介面僅僅定義一個介面函式 getService()。我們僅需要將本例項返回即可。HBase的Region Server在接收到客戶端的呼叫請求時,將呼叫該介面獲取實現RPCService的例項,因此本函式一般情況下就是返回自身例項即可。

完成以上三個介面函式之後,Endpoint的框架程式碼就已完成。每個Endpoint協處理器都必須實現這些框架程式碼而且寫法雷同。


a1a8f66fa598582fc7653b4188b0563e0d49b5e5


Server端的程式碼就是一個Protobuf RPC的Service實現,即通過Protobuf提供的某種服務。其開發內容主要包括:

  • 實現Coprocessor的基本框架程式碼

  • 實現服務的RPC具體程式碼


Endpoint 協處理的基本框架

Endpoint 是一個Server端Service的具體實現,其實現有一些框架程式碼,這些框架程式碼與具體的業務需求邏輯無關。僅僅是為了和HBase執行時環境協同工作而必須遵循和完成的一些粘合程式碼。因此多數情況下僅僅需要從一個例子程式拷貝過來並進行命名修改即可。不過我們還是完整地對這些粘合程式碼進行粗略的講解以便更好地理解程式碼。


412b21600c67e58fefe0d56f94e292c1882dcf41


Endpoint協處理器真正的業務程式碼都在每一個RPC函式的具體實現中。

在本文中,我們的Endpoint協處理器僅提供一個RPC函式即getSUM。我將分別介紹編寫該函式的幾個主要工作:瞭解函式的定義,引數列表;處理入口引數;實現業務邏輯;設定返回引數。


bed4bc7c6e6393c3b7466e77d0afd06507d703fe

f06576600f94bb6fef0cc98027063e3b9287e45d


Endpoint類比於資料庫的儲存過程,其觸發服務端的基於Region的同步執行再將各個結果在客戶端蒐集後歸併計算。特點類似於傳統的MapReduce框架,服務端Map客戶端Reduce。


3.Endpoint客戶端實現

HBase提供客戶端Java包org.apache.hadoop.hbase.client.HTable,提供以下三種方法來呼叫協處理器提供的服務:

  • coprocessorService(byte[])

  • coprocessorService(Class, byte[], byte[],Batch.Call),

  • coprocessorService(Class, byte[], byte[],Batch.Call, Batch.Callback)


f347b544f946ce72931c6018985da7a77b10e892

該方法採用 rowkey 指定Region。這是因為HBase客戶端很少會直接操作Region,一般不需要知道Region的名字;況且在HBase中Region名會隨時改變,所以用 rowkey 來指定Region是最合理的方式。使用 rowkey 可以指定唯一的一個Region,如果給定的 Rowkey 並不存在,只要在某個Region的 rowkey 範圍內依然用來指定該Region。比如Region 1處理[row1, row100]這個區間內的資料,則rowkey=row1就由Region 1來負責處理,換句話說我們可以用row1來指定Region 1,無論 rowkey 等於”row1”的記錄是否存在。CoprocessorService方法返回型別為CoprocessorRpcChannel的物件,該 RPC通道連線到由rowkey指定的Region上面,通過此通道可以呼叫該Region上面部署的協處理器RPC。

ad408a7e1a48f75d00f8cbc8212472d2653efd36

有時候客戶端需要呼叫多個 Region上的同一個協處理器,比如需要統計整個Table的sum,在這種情況下,需要所有的Region都參與進來,分別統計自身Region內部的sum並返回客戶端,最終客戶端將所有Region的返回結果彙總,就可以得到整張表的sum。

這意味著該客戶端同時和多個Region進行批處理互動。一個可行的方法是,收集每個 Region的startkey,然後迴圈呼叫第一種coprocessorService方法:用每一個Region的startkey 作為入口引數,獲得RPC通道建立stub物件,進而逐一呼叫每個Region上的協處理器RPC。這種做法需要寫很多的程式碼,為此HBase提供兩種更加簡單的 coprocessorService方法來處理多個Region的協處理器呼叫。先來看第一種方法 coprocessorService(Class, byte[],byte[],Batch.Call),

該方法有 4 個入口引數。第一個引數是實現RPC的Service 類,即前文中的AggregateService類。通過它,HBase就可以找到相應的部署在Region上的協處理器,一個Region上可以部署多個協處理器,客戶端必須通過指定Service類來區分究竟需要呼叫哪個協處理器提供的服務。

要呼叫哪些Region上的服務則由startkey和endkey來確定,通過 rowkey範圍即可確定多個 Region。為此,coprocessorService方法的第二個和第三個引數分別是 startkey和endkey,凡是落在[startkey,endkey]區間內的Region都會參與本次呼叫。

第四個引數是介面類Batch.Call。它定義瞭如何呼叫協處理器,使用者通過過載該介面的call()方法來實現客戶端的邏輯。在call()方法內,可以呼叫RPC,並對返回值進行任意處理。即前文程式碼清單1中所做的事情。coprocessorService將負責對每個 Region呼叫這個call()方法。

coprocessorService方法的返回值是一個Map型別的集合。該集合的key是Region名字,value是Batch.Call.call方法的返回值。該集合可以看作是所有Region的協處理器 RPC 返回的結果集。客戶端程式碼可以遍歷該集合對所有的結果進行彙總處理。

這種coprocessorService方法的大體工作流程如下。首先它分析startkey和 endkey,找到該區間內的所有Region,假設存放在regionList中。然後,遍歷regionList,為每一個Region呼叫Batch.Call,在該介面內,使用者定義具體的RPC呼叫邏輯。最後coprocessorService將所有Batch.Call.call()的返回值加入結果集合並返回。


8db09529d1eb770ca420ff83ec89c6670a94d2f2


coprocessorService的第三種方法比第二個方法多了一個引數callback。coprocessorService第二個方法內部使用HBase自帶的預設callback,該預設 callback將每個Region的返回結果都新增到一個Map型別的結果集中,並將該集合作為coprocessorService方法的返回值。

HBase 提供第三種coprocessorService方法允許使用者定義callback行為,coprocessorService 會為每一個RPC返回結果呼叫該callback,使用者可以在callback 中執行需要的邏輯,比如執行sum累加。用第二種方法的情況下,每個Region協處理器RPC的返回結果先放入一個列表,所有的 Region 都返回後,使用者程式碼再從該列表中取出每一個結果進行累加;用第三種方法,直接在callback中進行累加,省掉了建立結果集合和遍歷該集合的開銷,效率會更高一些。

因此我們只需要額外定義一個callback即可,callback是一個Batch.Callback介面類,使用者需要過載其update方法。


eb6eaaa95801368378cbd71d0e76e8f0a0e82935


4.Observer實現二級索引

Observer類似於傳統資料庫中的觸發器,當發生某些事件的時候這類協處理器會被 Server 端呼叫。Observer Coprocessor是一些散佈在HBase Server端程式碼的 hook鉤子, 在固定的事件發生時被呼叫。比如:put操作之前有鉤子函式prePut,該函式在pu 操作執 行前會被Region Server呼叫;在put操作之後則有postPut 鉤子函式。


069fdb2b8ebeb12ca7eeade13339dae4bd5e0a21


RegionObserver工作原理

RegionObserver提供客戶端的資料操縱事件鉤子,Get、Put、Delete、Scan,使用此功能能夠解決主表以及多個索引表之間資料一致性的問題


c33d3f2355d3fc3237be1a3895640234c44909ff


  1. 客戶端發出put請求;

  2. 該請求被分派給合適的RegionServer和Region;

  3. coprocessorHost攔截該請求,然後在該表上登記的每個 RegionObserver 上呼叫prePut();

  4. 如果沒有被preGet()攔截,該請求繼續送到 region,然後進行處理;

  5. Region產生的結果再次被CoprocessorHost攔截,呼叫postGet();

  6. 假如沒有postGet()攔截該響應,最終結果被返回給客戶端;


ebac9203abe59529cbdd446e8af1b01428d329c0

如上圖所示,HBase可以根據 rowkey 很快的檢索到資料,但是如果根據column檢索資料,首先要根據 rowkey 減小範圍,再通過列過濾器去過濾出資料,如果使用二級索引,可以先查基於column的索引表,獲取到 rowkey 後再快速的檢索到資料。

c2f225fa9fc89a6cd97d8d5c5a188857d0346b92

如圖所示首先繼承BaseRegionObserver類,重寫postPut,postDelete方法,在postPut方法體內中寫Put索引表資料的程式碼,在postDelete方法裡面寫Delete索引表資料,這樣可以保持資料的一致性。
在Scan表的時候首先判斷是否先查索引表,如果不查索引直接scan主表,如果走索引表通過索引表獲取主表的rowkey再去查主表。
使用Elastic Search建立二級索引也是一樣。
我們在同一個主機叢集上同時建立了HBase叢集和Elastic Search叢集,儲存到HBase的資料必須實時地同步到Elastic Search。而恰好HBase和Elastic Search都沒有更新的概念,我們的需求可以簡化為兩步:
當一個新的Put操作產生時,將Put資料轉化為json,索引到ElasticSearch,並把RowKey作為新文件的ID;
當一個新的Delete操作產生時獲取Delete資料的rowkey刪除Elastic Search中對應的ID。


5.協處理的主要應用場景
Observer允許叢集在正常的客戶端操作過程中可以有不同的行為表現;
Endpoint允許擴充套件叢集的能力,對客戶端應用開放新的運算命令;
Observer類似於RDBMS的觸發器,主要在服務端工作;
Endpoint類似於RDBMS的儲存過程,主要在服務端工作;
Observer可以實現許可權管理、優先順序設定、監控、ddl控制、二級索引等功能;
Endpoint可以實現min、max、avg、sum、distinct、group by等功能。
例如HBase原始碼org.apache.hadoop.hbase.security.access.AccessController利用Observer實現對HBase進行了許可權控制,有興趣的讀者可以看看相關程式碼。
d3f2f0da5b6761a64c7049db7719525a2c492a0c

大家工作學習遇到HBase技術問題,把問題釋出到HBase技術社群論壇http://hbase.group,歡迎大家論壇上面提問留言討論。想了解更多HBase技術關注HBase技術社群公眾號(微訊號:hbasegroup),非常歡迎大家積極投稿。

096973d69f34b1380151180fd0a8ff2cade5bced

烽火大資料平臺研發負責人葉鏗,雲端浪子的部落格,主要分享Hadoop、HBase、TensorFlow技術乾貨


15dc5058a1e178995d574b18e2e4604b10adb98f
HBase技術交流社群 - 阿里官方“HBase生態+Spark社群大群”點選加入: https://dwz.cn/Fvqv066s