1. 程式人生 > >HBase協處理器同步二級索引到Solr

HBase協處理器同步二級索引到Solr

一、 已知的問題和不足

    在上一個版本中,實現了使用HBase的協處理器將HBase的二級索引同步到Solr中,但是仍舊有幾個缺陷:

  1. 寫入Solr的Collection是寫死在程式碼裡面,且是唯一的。如果我們有一張表的資料希望將不同的欄位同步到Solr中該如何做呢?
  2. 目前所有配置相關資訊都是寫死到了程式碼中的,是否可以新增外部配置檔案。
  3. 原來的方法是每次都需要編譯新的Jar檔案單獨執行,能否將所有的同步使用一段通用的程式碼完成?

二、解決思路

針對上面的三個主要問題,我們一一解決

  1. 通常一張表會對應多個SolrCollection以及不同的Column。我們可以使用Map[表名->List[(Collection1,List[Columns]),(Collection2,List[Columns])...]]
    這樣的型別,根據表名獲取所有的Collection和Column。
  2. 通過Typesafe Config讀取外部配置檔案,達到所有資訊可配的目的。
  3. 所有的資料都只有Put和Delete,只要我們攔截到具體的訊息之後判斷當前的表名,然後根據問題一中的Collection和Column即可寫入對應的SolrServer。在協處理器中獲取表名的是e.getEnvironment().getRegion().getTableDesc().getTableName().getNameAsString()其中e是ObserverContext

三、程式碼

3.1 讀取config檔案內容

使用typesafe的config元件讀取morphlines.conf檔案,將內容轉換為 Map<String,List<HBaseIndexerMappin>>

。具體程式碼如下

  1. publicclassConfigManager{
  2. privatestaticSourceConfig sourceConfig =newSourceConfig();
  3. publicstaticConfig config;
  4. static{
  5. sourceConfig.setConfigFiles("morphlines.conf");
  6. config = sourceConfig.getConfig();
  7. }
  8. publicstaticMap<String,List<HBaseIndexerMappin>> getHBaseIndexerMappin(){
  9. Map<String,List<HBaseIndexerMappin>> mappin =newHashMap<String,List<HBaseIndexerMappin>>();
  10. Config mappinConf = config.getConfig("Mappin");
  11. List<String> tables = mappinConf.getStringList("HBaseTables");
  12. for(String table :tables){
  13. List<Config> confList =(List<Config>) mappinConf.getConfigList(table);
  14. List<HBaseIndexerMappin> maps =newLinkedList<HBaseIndexerMappin>();
  15. for(Config tmp :confList){
  16. HBaseIndexerMappin map =newHBaseIndexerMappin();
  17. map.solrConnetion = tmp.getString("SolrCollection");
  18. map.columns = tmp.getStringList("Columns");
  19. maps.add(map);
  20. }
  21. mappin.put(table,maps);
  22. }
  23. return mappin;
  24. }
  25. }

3.2 封裝SolrServer的獲取方式

因為目前我使用的環境是Solr和HBase公用的同一套Zookeeper,因此我們完全可以藉助HBase的Zookeeper資訊。HBase的協處理器是執行在HBase的環境中的,自然可以通過HBase的Configuration獲取當前的Zookeeper節點和埠,然後輕鬆的獲取到Solr的地址。

  1. publicclassSolrServerManagerimplementsLogManager{
  2. staticConfiguration conf =HBaseConfiguration.create();
  3. publicstaticStringZKHost= conf.get("hbase.zookeeper.quorum","bqdpm1,bqdpm2,bqdps2");
  4. publicstaticStringZKPort= conf.get("hbase.zookeeper.property.clientPort","2181");
  5. publicstaticStringSolrUrl=ZKHost+":"+ZKPort+"/"+"solr";
  6. publicstaticint zkClientTimeout =1800000;// 心跳
  7. publicstaticint zkConnectTimeout =1800000;// 連線時間
  8. publicstaticCloudSolrServer create(String defaultCollection){
  9. log.info("Create SolrCloudeServer .This collection is "+ defaultCollection);
  10. CloudSolrServer solrServer =newCloudSolrServer(SolrUrl);
  11. solrServer.setDefaultCollection(defaultCollection);
  12. solrServer.setZkClientTimeout(zkClientTimeout);
  13. solrServer.setZkConnectTimeout(zkConnectTimeout);
  14. return solrServer;
  15. }
  16. }

3.3 編寫提交資料到Solr的程式碼

理想狀態下,我們時時刻刻都需要提交資料到Solr中,但是事實上我們資料寫入的時間是比較分散的,可能集中再每一天的某幾個時間點。因此我們必須保證在高併發下能達到一定資料量自動提交,在低併發的情況下能隔一段時間寫入一次。只有兩種機制並存的情況下才能保證資料能即時寫入。

  1. publicclassSolrCommitTimerextendsTimerTaskimplementsLogManager{
  2. publicMap<String,List<SolrInputDocument>> putCache =newHashMap<String,List<SolrInputDocument>>();//Collection名字->更新(插入)操作快取
  3. publicMap<String,List<String>> deleteCache =newHashMap<String,List<String>>();//Collection名字->刪除操作快取
  4. Map<String,CloudSolrServer> solrServers =newHashMap<String,CloudSolrServer>();//Collection名字->SolrServers
  5. int maxCache =ConfigManager.config.getInt("MaxCommitSize");
  6. // 任何時候,保證只能有一個執行緒在提交索引,並清空集合
  7. finalstaticSemaphore semp =newSemaphore(1);
  8. //新增Collection和SolrServer
  9. publicvoid addCollecttion(String collection,CloudSolrServer server){
  10. this.solrServers.put(collection,server);
  11. }
  12. //往Solr新增(更新)資料
  13. publicUpdateResponse put(CloudSolrServer server,SolrInputDocument doc)throwsIOException,SolrServerException{
  14. server.add(doc);
  15. return server.commit(false,false);
  16. }
  17. //往Solr新增(更新)資料
  18. publicUpdateResponse put(CloudSolrServer server,List<SolrInputDocument> docs)throwsIOException,SolrServerException{
  19. server.add(docs);
  20. return server.commit(false,false);
  21. }
  22. //根據ID刪除Solr資料
  23. publicUpdateResponsedelete(CloudSolrServer server,String rowkey)throwsIOException,SolrServerException{
  24. server.deleteById(rowkey);
  25. return server.commit(false,false);
  26. }
  27. //根據ID刪除Solr資料
  28. publicUpdateResponsedelete(CloudSolrServer server,List<String> rowkeys)throwsIOException,SolrServerException{
  29. server.deleteById(rowkeys);
  30. return server.commit(false,false);
  31. }
  32. //將doc新增到快取
  33. publicvoid addPutDocToCache(String collection,SolrInputDocument doc)throwsIOException,SolrServerException,InterruptedException{
  34. semp.acquire();
  35. log.debug("addPutDocToCache:"+"collection="+ collection +"data="+ doc.toString());
  36. if(!putCache.containsKey(collection)){