HBase 2.0 協處理器實現 ES 資料同步
標籤:hbase 2.0、elasticsearch、Coprocessor、協處理器
在正式進行講述實現之前,我覺得有必要說一下出發點。團隊期初資料都是基於 HBase+Phoenix
這樣架構進行持久化。隨著業務的複雜性增加,對部分表的查詢效率和查詢條件多樣性,提出了更高的要求。 HBase+Phoenix
就會出現索引濫用。變更索引變的特別的頻繁,同時一些資料客觀的表,變更索引的代價是非常大的。
在海量資料的查詢方面, Elasticsearch 具有出色的效能。如果 HBase+ES
是不是會是更好的解決方法呢?其實,這個時候會有一個思考點,Phoenix 是如何實現二級索引的? HBase 協處理器(Coprocessor) 。
我的實現過程比較曲折,後文中也會提到,以幫助大家避免這些坑。在過程中,還嘗試了另一種實現方案。存放兩份資料,一份 HBase,一份 ES。該方案需要解決的一個問題—— 資料一致性問題 ,但這個問題協處理器可以解決。在此過程中,由於不當操作,把 HBase 服務宕機了,現象是 REGION SERVERS
無法啟動,只有通過硬刪的方式解決。
出於不死心,在經歷重灌 HBase 之後。內心又開始蠢蠢欲動。首先要宣告一下,我們團隊的環境是 HDP 3.0、HBase 2.0
,網上很多教程都是基於 1.X
, 2.X 與 1.X
區別還是挺大的。 RegionObserver
從繼承方式改為了面向介面程式設計。
協處理器
沒有選擇協處理情況下,HBase 實現 RDBMS SQL 方式查詢資料,大量的 Filter 需要在客戶端進行編碼完成,程式碼的臃腫,可維護性大大降低。如果這部分操作在伺服器端完成,是否是更好的選擇呢。協處理就能幫助實現該設想,由於在服務端完成,可以集中式優化查詢,降低請求的頻寬和提高查詢效率。當然,對 HBase 效能產生了一定影響。
型別
- Observer
- Endpoint
Observer
Observer 協處理器類似於 RDBMS 中的觸發器,當事件觸發的時候該類協處理器會被 Server 端呼叫。
Endpoint
Endpoint 協處理器類似傳統資料庫中的儲存過程,完成一些聚合操作。
實現
基礎嘗試
避免 ES 連線操作、程式碼複雜性導致的 Bug,在最初只通過打日誌的方式來驗證協處理方式。
程式碼實現概覽
HbaseDataSyncEsObserver.java
package com.tairanchina.csp.dmp.examples; import org.apache.hadoop.hbase.CoprocessorEnvironment; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.coprocessor.ObserverContext; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.coprocessor.RegionObserver; import org.apache.hadoop.hbase.wal.WALEdit; import org.apache.log4j.Logger; import java.io.IOException; import java.util.Optional; public class HbaseDataSyncEsObserver implements RegionObserver, RegionCoprocessor { private static final Logger LOG = Logger.getLogger(HbaseDataSyncEsObserver.class); public Optional<RegionObserver> getRegionObserver() { return Optional.of(this); } public void start(CoprocessorEnvironment env) throws IOException { LOG.info("====Test Start===="); } public void stop(CoprocessorEnvironment env) throws IOException { LOG.info("====Test End===="); } public void postPut(ObserverContext<RegionCoprocessorEnvironment> e, Put put, WALEdit edit, Durability durability) throws IOException { LOG.info("====Test postPut===="); } public void postDelete(ObserverContext<RegionCoprocessorEnvironment> e, Delete delete, WALEdit edit, Durability durability) throws IOException { LOG.info("====Test postDelete===="); } }
pom.xml
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.tairanchina.csp.dmp</groupId> <artifactId>hbase-observer-simple-example</artifactId> <version>1.0.0-SNAPSHOT</version> <packaging>jar</packaging> <name>HBase Observer Simple 用例</name> <properties> <hbase.version>2.0.0</hbase.version> <java.version>1.8</java.version> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <maven.deploy.skip>true</maven.deploy.skip> <maven.install.skip>true</maven.install.skip> </properties> <dependencies> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-client</artifactId> <version>${hbase.version}</version> <exclusions> <exclusion> <artifactId>jetty-servlet</artifactId> <groupId>org.eclipse.jetty</groupId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdfs-client</artifactId> <version>3.1.0</version> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-server</artifactId> <version>${hbase.version}</version> <exclusions> <exclusion> <artifactId>javax.servlet.jsp</artifactId> <groupId>org.glassfish.web</groupId> </exclusion> </exclusions> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <configuration> <source>${java.version}</source> <target>${java.version}</target> <encoding>${project.build.sourceEncoding}</encoding> </configuration> </plugin> <plugin> <artifactId>maven-assembly-plugin</artifactId> <configuration> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> <archive> <manifest> <mainClass></mainClass> </manifest> </archive> </configuration> <executions> <execution> <id>make-assembly</id> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin> </plugins> </build> </project>
包處理
打包
mvn clean assembly:assembly -Dmaven.test.skip=true
這裡 package 得到的包必須是將依賴都包含在內的,否則,會報類找不到之類的錯誤。
上傳包的時候,需要上傳到 HDFS 下,同時,要給 hbase 使用者授予許可權,因而,我在測試的過程中,將其上傳到 /apps/hbase
下(HDP 環境)。由於包名太長,這裡對包名進行了重新命名。
裝載協處理器
# 建立測試表 create 'gejx_test','cf' # 停用測試表 disable 'gejx_test' # 表與協處理器建立關係 alter 'gejx_test' , METHOD =>'table_att','coprocessor'=>'hdfs://dev-dmp2.fengdai.org:8020/apps/hbase/hbase-observer-simple-example.jar|com.tairanchina.csp.dmp.examples.HbaseDataSyncEsObserver|1073741823' # 啟用表 enable 'gejx_test' # 查看錶資訊 desc 'gejx_test'
測試
put 'gejx_test', '2','cf:name','gjx1' delete 'gejx_test', '2','cf:name'
檢視日誌要先在 HBase Master UI
介面下,確定資料儲存在哪個節點上,再到相應的節點下面的 /var/log/hbase
下檢視日誌
tail -100f hbase-hbase-regionserver-test.example.org.out
解除安裝協處理器
disable 'gejx_test' alter 'gejx_test', METHOD => 'table_att_unset', NAME => 'coprocessor$1' enable 'gejx_test'
以上,已經完成最基礎的協處理器實現。接下來進行講述 ES 的一種實現方案。
HBase+ES
這裡為了快速論證結果,在編碼方面採用了硬編碼方式,希望理解。
程式碼實現概覽
ElasticSearchBulkOperator.java
package com.tairanchina.csp.dmp.examples; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.elasticsearch.action.bulk.BulkRequestBuilder; import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.delete.DeleteRequestBuilder; import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.action.update.UpdateRequestBuilder; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; /** * Created on 2019/1/11. * * @author 跡_Jason */ public class ElasticSearchBulkOperator { private static final Log LOG = LogFactory.getLog(ElasticSearchBulkOperator.class); private static final int MAX_BULK_COUNT = 10000; private static BulkRequestBuilder bulkRequestBuilder = null; private static final Lock commitLock = new ReentrantLock(); private static ScheduledExecutorService scheduledExecutorService = null; static { // init es bulkRequestBuilder bulkRequestBuilder = ESClient.client.prepareBulk(); bulkRequestBuilder.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); // init thread pool and set size 1 scheduledExecutorService = Executors.newScheduledThreadPool(1); // create beeper thread( it will be sync data to ES cluster) // use a commitLock to protected bulk es as thread-save final Runnable beeper = () -> { commitLock.lock(); try { bulkRequest(0); } catch (Exception ex) { System.out.println(ex.getMessage()); } finally { commitLock.unlock(); } }; // set time bulk task // set beeper thread(10 second to delay first execution , 30 second period between successive executions) scheduledExecutorService.scheduleAtFixedRate(beeper, 10, 30, TimeUnit.SECONDS); } public static void shutdownScheduEx() { if (null != scheduledExecutorService && !scheduledExecutorService.isShutdown()) { scheduledExecutorService.shutdown(); } } private static void bulkRequest(int threshold) { if (bulkRequestBuilder.numberOfActions() > threshold) { BulkResponse bulkItemResponse = bulkRequestBuilder.execute().actionGet(); if (!bulkItemResponse.hasFailures()) { bulkRequestBuilder = ESClient.client.prepareBulk(); } } } /** * add update builder to bulk * use commitLock to protected bulk as thread-save * @param builder */ public static void addUpdateBuilderToBulk(UpdateRequestBuilder builder) { commitLock.lock(); try { bulkRequestBuilder.add(builder); bulkRequest(MAX_BULK_COUNT); } catch (Exception ex) { LOG.error(" update Bulk " + "gejx_test" + " index error : " + ex.getMessage()); } finally { commitLock.unlock(); } } /** * add delete builder to bulk * use commitLock to protected bulk as thread-save * * @param builder */ public static void addDeleteBuilderToBulk(DeleteRequestBuilder builder) { commitLock.lock(); try { bulkRequestBuilder.add(builder); bulkRequest(MAX_BULK_COUNT); } catch (Exception ex) { LOG.error(" delete Bulk " + "gejx_test" + " index error : " + ex.getMessage()); } finally { commitLock.unlock(); } } }
ESClient.java
package com.tairanchina.csp.dmp.examples; /** * Created on 2019/1/10. * * @author 跡_Jason */ import org.elasticsearch.client.Client; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.transport.client.PreBuiltTransportClient; import java.net.InetAddress; import java.net.UnknownHostException; /** * ES Cleint class */ public class ESClient { public static Client client; /** * init ES client */ public static void initEsClient() throws UnknownHostException { System.setProperty("es.set.netty.runtime.available.processors", "false"); Settings esSettings = Settings.builder().put("cluster.name", "elasticsearch").build();//設定ES例項的名稱 client = new PreBuiltTransportClient(esSettings).addTransportAddress(new TransportAddress(InetAddress.getByName("localhost"), 9300)); } /** * Close ES client */ public static void closeEsClient() { client.close(); } }
HbaseDataSyncEsObserver.java
package com.tairanchina.csp.dmp.examples; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.CoprocessorEnvironment; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.coprocessor.ObserverContext; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.coprocessor.RegionObserver; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.wal.WALEdit; import org.apache.log4j.Logger; import java.io.IOException; import java.util.*; /** * Created on 2019/1/10. * * @author 跡_Jason */ public class HbaseDataSyncEsObserver implements RegionObserver , RegionCoprocessor { private static final Logger LOG = Logger.getLogger(HbaseDataSyncEsObserver.class); public Optional<RegionObserver> getRegionObserver() { return Optional.of(this); } @Override public void start(CoprocessorEnvironment env) throws IOException { // init ES client ESClient.initEsClient(); LOG.info("****init start*****"); } @Override public void stop(CoprocessorEnvironment env) throws IOException { ESClient.closeEsClient(); // shutdown time task ElasticSearchBulkOperator.shutdownScheduEx(); LOG.info("****end*****"); } @Override public void postPut(ObserverContext<RegionCoprocessorEnvironment> e, Put put, WALEdit edit, Durability durability) throws IOException { String indexId = new String(put.getRow()); try { NavigableMap<byte[], List<Cell>> familyMap = put.getFamilyCellMap(); Map<String, Object> infoJson = new HashMap<>(); Map<String, Object> json = new HashMap<>(); for (Map.Entry<byte[], List<Cell>> entry : familyMap.entrySet()) { for (Cell cell : entry.getValue()) { String key = Bytes.toString(CellUtil.cloneQualifier(cell)); String value = Bytes.toString(CellUtil.cloneValue(cell)); json.put(key, value); } } // set hbase family to es infoJson.put("info", json); LOG.info(json.toString()); ElasticSearchBulkOperator.addUpdateBuilderToBulk(ESClient.client.prepareUpdate("gejx_test","dmp_ods", indexId).setDocAsUpsert(true).setDoc(json)); LOG.info("**** postPut success*****"); } catch (Exception ex) { LOG.error("observer puta doc, index [ " + "gejx_test" + " ]" + "indexId [" + indexId + "] error : " + ex.getMessage()); } } @Override public void postDelete(ObserverContext<RegionCoprocessorEnvironment> e, Delete delete, WALEdit edit, Durability durability) throws IOException { String indexId = new String(delete.getRow()); try { ElasticSearchBulkOperator.addDeleteBuilderToBulk(ESClient.client.prepareDelete("gejx_test", "dmp_ods", indexId)); LOG.info("**** postDelete success*****"); } catch (Exception ex) { LOG.error(ex); LOG.error("observer deletea doc, index [ " + "gejx_test" + " ]" + "indexId [" + indexId + "] error : " + ex.getMessage()); } } }
其他方面的操作與上文操作類似,這裡不再進行綴訴,直接看 Kibana 結果。
講在最後
上文中 HBase+ES
實現方案是在 HBase 和 ES
各自存放一份資料,使用協處理器達到資料一致性。這種方案存在資料冗餘問題,在 ES 這邊需要準備大量的儲存空間。
還有一種方案也是比較流行的。使用 ES 作為二級索引的實現。使用協處理將需要查詢的表查詢欄位與 RowKey 關係儲存到 ES,查詢資料的時候,先根據條件查詢 ES 得到 RowKey,通過得到的 RowKey 查詢 HBase 資料。以提高查詢的效率。
Anyway,這兩種方案都需要解決歷史資料的問題和還有需要注意資料更新操作。
Q&A
- 遇到
None of the configured nodes are available
錯誤資訊?請檢查一下 ES 的
cluster.name
配置是否錯誤。 - 為什麼
Hbase 2.0 Observer
未生效?HBase 2.0
中 observer 介面有變化。你需要實現RegionCoprocessor
的getRegionObserver
介面。 -
發現已經更新包,協處理器還是在執行歷史程式碼?
當更新包的時候,要進行包名的變更,否則,可能會出現類似於快取的現象問題。
待確認
- [ ] 未停用的情況下,更新 jar(已測試未操作表的時候,支援更新)
- [ ] 測試多張表公用同一個 jar
引文
使用Hbase協作器(Coprocessor)同步資料到ElasticSearch
面向高穩定,高效能之-Hbase資料實時同步到ElasticSearch(之二)
更多內容可以關注微信公眾號,或者訪問 AppZone 網站