1. 程式人生 > >Hbase實戰--HBASE的API操作(增刪改查)

Hbase實戰--HBASE的API操作(增刪改查)

連線HBase的正確姿勢

Connection是什麼

在眾多HBase使用者中,常見的使用Connection的錯誤方法有:

(1)自己實現一個Connection物件的資源池,每次使用都從資源池中取出一個Connection物件;
(2)每個執行緒一個Connection物件。
(3)每次訪問HBase的時候臨時建立一個Connection物件,使用完之後呼叫close關閉連線。
從這些做法來看,這些使用者顯然是把Connection物件當成了單機資料庫裡面的連線物件來用了。然而,作為一個分散式資料庫,HBase客戶端需要和多個伺服器中的不同服務角色建立連線,所以HBase客戶端中的Connection物件並不是簡單對應一個socket連線。HBase的API文件當中對Connection的定義是:
A cluster connection encapsulating lower level individual connections to actual servers and a connection to zookeeper.
我們知道,HBase訪問一條資料的過程中,需要連線三個不同的服務角色:

(1)Zookeeper
(2)HBase Master
(3)HBase RegionServer

而HBase客戶端的Connection包含了對以上三種socket連線的封裝。Connection物件和實際的socket連線之間的對應關係如下圖:


在HBase客戶端程式碼中,真正對應socket連線的是RpcConnection物件。HBase使用PoolMap這種資料結構來儲存客戶端到HBase伺服器之間的連線。PoolMap封裝了ConcurrentHashMap>的結構,key是ConnectionId(封裝了伺服器地址和使用者ticket),value是一個RpcConnection物件的資源池。當HBase需要連線一個伺服器時,首先會根據ConnectionId找到對應的連線池,然後從連線池中取出一個連線物件。HBase中提供了三種資源池的實現,分別是Reusable,RoundRobin和ThreadLocal。

具體實現可以通過hbase.client.ipc.pool.type配置項指定,預設為Reusable。連線池的大小也可以通過hbase.client.ipc.pool.size配置項指定,預設為1

連線HBase的正確方式

在HBase中Connection類已經實現了對連線的管理功能,所以我們不需要自己在Connection之上再做額外的管理。另外,Connection是執行緒安全的,而Table和Admin則不是執行緒安全的,因此正確的做法是一個程序共用一個Connection物件,而在不同的執行緒中使用單獨的Table和Admin物件。

所有程序共用一個connection物件

connection = ConnectionFactory.createConnection(config);

每個執行緒使用單獨的table物件          

 Table table = connection.getTable(TableName.valueOf("test"));
           try {
               ...
           } finally {
               table.close();
           }

HBase客戶端預設的是連線池大小是1,也就是每個RegionServer 1個連線。如果應用需要使用更大的連線池或指定其他的資源池型別,也可以通過修改配置實現:

config.set("hbase.client.ipc.pool.type",...);
config.set("hbase.client.ipc.pool.size",...);
connection = ConnectionFactory.createConnection(config);

con...​​​建立一個連線

Connection conn = ConnectionFactory.createConnection(conf);

       拿到一個DDL操作器:表管理器admin

Admin admin = conn.getAdmin();

      用表管理器的api去建表、刪表、修改表定義

admin.createTable(HTableDescriptor descriptor);
public class HbaseApiDemo {
@Test
public void testCreateTable() throws Exception {
建立hbase的配置物件
Configuration conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum", "cts02:2181,cts03:2181,cts04:2181");
建立hbase的連線物件
Connection conn = ConnectionFactory.createConnection(conf);
 DDL操作工具
Admin admin = conn.getAdmin();
建立一個表定義描述物件
HTableDescriptor tUser = new HTableDescriptor(TableName.valueOf("t_user"));
構造一個列族描述物件
HColumnDescriptor f1 = new HColumnDescriptor("f1");
HColumnDescriptor f2 = new HColumnDescriptor("f2");
在表描述物件中加入列族描述
tUser.addFamily(f1);
tUser.addFamily(f2);
呼叫admin的建表方法來建表
admin.createTable(tUser);
// 關閉連線
admin.close();
conn.close();
}

修改表定義

public void testAlterTable() throws Exception {
Configuration conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum", "cts02:2181,cts03:2181,cts04:2181");
Connection conn = ConnectionFactory.createConnection(conf);
DDL操作工具
Admin admin = conn.getAdmin();
HTableDescriptor user = admin.getTableDescriptor(TableName.valueOf("t_user"));
HColumnDescriptor f3 = new HColumnDescriptor("f3");
f3.setMaxVersions(3);
user.addFamily(f3);
admin.modifyTable(TableName.valueOf("t_user"), user);
admin.close();
conn.close();
}



刪除表

public void testDropTable() throws Exception {
Configuration conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum", "cts02:2181,cts03:2181,cts04:2181");
Connection conn = ConnectionFactory.createConnection(conf);
// DDL操作工具
Admin admin = conn.getAdmin();
// 先禁用
admin.disableTable(TableName.valueOf("t_user"));
// 再刪除
admin.deleteTable(TableName.valueOf("t_user"));
admin.close();
conn.close();
}

插入|更新

public void testPut() throws Exception {
Configuration conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum", "cts02:2181,cts03:2181,cts04:2181");
Connection conn = ConnectionFactory.createConnection(conf);
//獲得表物件
Table table = conn.getTable(TableName.valueOf("t_user"));
//建立put物件 ,一個put操作一行資料,並設定rowkey名稱
Put put1 = new Put("001".getBytes());
//新增一個列,要制定列族,和列名稱
put1.addColumn("f1".getBytes(), "name".getBytes(), "張三".getBytes());
put1.addColumn("f1".getBytes(), Bytes.toBytes("age"), Bytes.toBytes(28));
Put put2 = new Put("002".getBytes());
// 新增一個列
put2.addColumn("f1".getBytes(), "name".getBytes(), "李四".getBytes());
put2.addColumn("f1".getBytes(), Bytes.toBytes("age"), Bytes.toBytes(38));
ArrayList<Put> puts = new ArrayList<>();
puts.add(put1);
puts.add(put2);
table.put(puts);
table.close();
conn.close();

hbase寫入資料的幾種方式

  接下來我們總結一下hbase幾種寫入常見的方式,以及涉及的應用場景,儘量覆蓋日常業務中的使用場景,另外再總結一下其中涉及到的一些原理知識。hbase一般的插入過程都使用HTable物件,將資料封裝在Put物件中,Put在new建立的時候需要傳入rowkey,並將列族,列名,列值add進去。然後HTable呼叫put方法,通過rpc請求提交到Regionserver端。寫入的方式可以分為以下幾種

  • 單條put
  • 批量put
  • bluckload

Htable介紹

  要向hbase中寫入就免不了要和HTable打交道,HTable負責向一張hbase表中讀或者寫資料,HTable物件是非執行緒安全的。多執行緒使用時需要注意,建立HTable物件時需要指定表名引數,HTable內部有一個LinkedList<Row>的佇列writeAsyncBuffer ,負責對寫入到hbase的資料在客戶端快取,開啟快取使用引數  table.setAutoFlushTo(false);  預設情況不開啟每次put一條資料時,htable物件就會呼叫flushCommits方法向regserver中提交,開啟快取則會比較佇列的大小,如果大於某個值則呼叫flushCommits,這個值預設是2m,可以通過在hbase-site.xml中設定引數 "hbase.client.write.buffer"來調整,預設是2097152, 在關閉htable連線時,會隱式的呼叫flushCommits方法,保證資料完全提交。提交時會根據rowkey定位該put應該提交到哪個reginserver,然後每個regionserver一組action傳送出去。

注意:有了BufferedMutator之後,BufferedMutator替換了HTable的setAutoFlush(false)的作用。可以從連線的例項中獲取BufferedMutator的例項。在使用完成後需要呼叫的close()方法關閉連線。對BufferedMutator進行配置需要通過BufferedMutatorParams完成。BufferedMutatorParams要比Htable更搞效,所以心在我們在向hbase插入資料時儘量使用BufferedMutatorParams

單條put

  最簡單基礎的寫入hbase,一般應用場景是線上業務執行時,記錄單條插入,如報文記錄,處理記錄,寫入後htable物件即釋放。每次提交就是一次rpc請求。

table.setAutoFlushTo(true);
/**
   * 插入一條記錄
   * rowkey 為rk001 列族為f1
   * 插入兩列  c1列   值為001
   *          c2列   值為002
   *
   */
  public void insertPut(){
      //Configuration 載入hbase的配置資訊,HBaseConfiguration.create()是先new Configuration然後呼叫addResource方法將
      //hbase-site.xml配置檔案載入進來
      Configuration conf = HBaseConfiguration.create();
      try {
          table = new HTable(conf,tableName);
          table.setAutoFlushTo(true);//不顯示設定則預設是true																														
          String rowkey  = "rk001";
          Put  put = new Put(rowkey.getBytes());
          put.add(cf.getBytes(),"c1".getBytes(),"001".getBytes());
          put.add(cf.getBytes(),"c2".getBytes(),"002".getBytes());
          table.put(put);
          table.close();//關閉hbase連線
 } catch (IOException e) {
          e.printStackTrace();
      }
  }


多條寫入  

有了單條的put自然就想到這種方式其實是低效的,每次只能提交一條記錄,有沒有上面方法可以一次提交多條記錄呢?減少請求次數, 最簡單的方式使用List<Put>,這種方式操作時和單條put沒有區別,將put物件add到list中,然後呼叫put(List<Put>)方法,過程和單條put基本一致,應用場景一般在資料量稍多的環境下,通過批量提交減少請求次數

  /**
  * 批量請求,一次提交兩條
  */

 public void insertPuts() {
     Configuration conf = HBaseConfiguration.create();
     try {
         table = new HTable(conf, tableName);
         table.setAutoFlushTo(true);
         List<Put> lists = new ArrayList<Put>();
         String rowkey1 = "rk001";
         Put put1 = new Put(rowkey1.getBytes());
         put1.add(cf.getBytes(), "c1".getBytes(), "001".getBytes());
         put1.add(cf.getBytes(), "c2".getBytes(), "002".getBytes());
         lists.add(put1);
         String rowkey2 = "rk002";
         Put put2 = new Put(rowkey2.getBytes());
         put2.add(cf.getBytes(), "c1".getBytes(), "v2001".getBytes());
         put2.add(cf.getBytes(), "c2".getBytes(), "v2002".getBytes());
         lists.add(put2);
         table.put(lists);
         table.close();
     } catch (IOException e) {
         e.printStackTrace();
     }
 }

BufferedMutatorParams的使用

org.apache.hadoop.hbase.client.HTable歸根結底持有的就是BufferedMutatorImpl型別的屬性mutator,所有後續的操作都是基於mutator操作, 那麼其實我們操作hbase客戶端,完全可以摒棄HTable物件,直接構建BufferedMutator,然後操作hbase,BufferedMutatorParams主要是收集構造BufferedMutator物件的引數資訊,這些引數包括hbase資料表名、hbase客戶端緩衝區、hbase rowkey最大所佔空間、執行緒池和監聽hbase操作的回撥監聽器(比如監聽hbase寫入失敗)。

使用方式:

正如BufferedMutatorParams需要引數一樣,我們需要提供表名,設定好快取的大小,初始化mutator例項然後提價put對應,向hbase插入資料

案例:

//一個Put物件就是一行記錄,在構造方法中指定主鍵
      val put = new Put(Bytes.toBytes(MD5Util.getMD5(userId + userName)))
      put.addColumn(Bytes.toBytes("hiveData"), Bytes.toBytes("id"), Bytes.toBytes(userId)).addColumn(Bytes.toBytes("hiveData"), Bytes.toBytes("name"), Bytes.toBytes(userName)) .addColumn(Bytes.toBytes("hiveData"), Bytes.toBytes("money"), Bytes.toBytes(userMoney))
      putList.add(put)
//設定快取1m,當達到1m時資料會自動刷到hbase
      val params = new BufferedMutatorParams(TableName.valueOf("test6"))
      params.writeBufferSize(1024 * 1024) //設定快取的大小
      val mutator = connection.getBufferedMutator(params)
      mutator.mutate(putList)
      mutator.flush()
      putList.clear()

sparkStreaming向hbase寫資料

SparkStreaming怎麼向Hbase中寫資料。首先,需要說一下,下面的這個方法。
foreachRDD(func)
最通用的輸出操作,把func作用於從stream生成的每一個RDD。
注意:這個函式是在 執行streaming程式的driver程序 中執行的。
下面跟著思路,看一下,怎麼優雅的向Hbase中寫入資料
向外部寫資料 常見的錯誤:
向外部資料庫寫資料,通常會建立連線,使用連線傳送資料(也就是儲存資料)。
開發者可能 在driver中建立連線,而在spark worker 中儲存資料
例如:

 dstream.foreachRDD { rdd =>
  val connection = createNewConnection()  // 這個會在driver中執行
  rdd.foreach { record =>
    connection.send(record) //這個會在 worker中執行
  }
}

上面這種寫法是錯誤的!上面的寫法,需要connection 物件被序列化,然後從driver傳送到worker。
這樣的connection是很少在機器之間傳輸的。知道這個問題後,我們可以寫出以下的,修改後的程式碼:

dstream.foreachRDD { rdd =>
  rdd.foreach { record =>
    val connection = createNewConnection()
    connection.send(record)
    connection.close()
  }
}

這種寫法也是不對的。這會導致,對於每條資料,都建立一個connection(建立connection是消耗資源的)
下面的方法會好一些:

 dstream.foreachRDD { rdd =>
  rdd.foreachPartition { partitionOfRecords =>
    val connection = createNewConnection()
    partitionOfRecords.foreach(record => connection.send(record))
    connection.close()
  }
}

上面的方法,使用 rdd.foreachPartition 建立一個connection 物件, 一個RDD分割槽中的所有資料,都使用這一個connection。
更優的方法,在多個RDD之間,connection物件是可以重用的,所以可以建立一個連線池。如下

dstream.foreachRDD { rdd =>
  rdd.foreachPartition { partitionOfRecords =>
    // ConnectionPool是一個靜態的,延遲初始化的連線池
    val connection = ConnectionPool.getConnection()
    partitionOfRecords.foreach(record => connection.send(record))
    ConnectionPool.returnConnection(connection)  // 返回到池中 以便別人使用  }
}

連線池中的連線應該是,應需求而延遲建立,並且,如果一段時間沒用,就超時了(也就是關閉該連線)

實戰開發規範操作

在專案實際開發中我們操作hbase 一般需要單獨建立一個hbase工具類,方便之後的操作

hbase工具類案例

package com.util.hadoop
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase._
import org.apache.hadoop.hbase.client._
import org.apache.hadoop.hbase.util.Bytes
import scala.collection.mutable
object HbaseUtil {
  var conf: Configuration = _
  //執行緒池
  lazy val connection: Connection = ConnectionFactory.createConnection(conf)
  lazy val admin: Admin = connection.getAdmin
  /**
    * hbase conf
    * @param quorum hbase的zk地址
    * @param port   zk埠2181
    * @return
    */
  def setConf(quorum: String, port: String): Unit = {
    val conf = HBaseConfiguration.create()
    conf.set("hbase.zookeeper.quorum", quorum)
    conf.set("hbase.zookeeper.property.clientPort", port)
    this.conf = conf
  }
  /**
    * 如果不存在就建立表
    * @param tableName 名稱空間:表名
    * @param columnFamily 列族
    */
  def createTable(tableName: String, columnFamily: String): Unit = {
    val tbName = TableName.valueOf(tableName)
    if (!admin.tableExists(tbName)) {
      val htableDescriptor = new HTableDescriptor(tbName)
      val hcolumnDescriptor = new HColumnDescriptor(columnFamily)
      htableDescriptor.addFamily(hcolumnDescriptor)
      admin.createTable(htableDescriptor)
    }
  }
  def hbaseScan(tableName: String): ResultScanner = {
    val scan = new Scan()
    val table = connection.getTable(TableName.valueOf(tableName))
    table.getScanner(scan)
//    val scanner: CellScanner = rs.next().cellScanner()
  }
  /**
    * 獲取hbase單元格內容
    * @param tableName 名稱空間:表名
    * @param rowKey rowkey
    * @return 返回單元格組成的List
    */
  def getCell(tableName: String, rowKey: String): mutable.Buffer[Cell] = {
    val get = new Get(Bytes.toBytes(rowKey))
    /*if (qualifier == "") {
      get.addFamily(family.getBytes())
    } else {
      get.addColumn(family.getBytes(), qualifier.getBytes())
    }*/
    val table = connection.getTable(TableName.valueOf(tableName))
    val result: Result = table.get(get)
    import scala.collection.JavaConverters._
    result.listCells().asScala
    /*.foreach(cell=>{
    val rowKey=Bytes.toString(CellUtil.cloneRow(cell))
    val timestamp = cell.getTimestamp;  //取到時間戳
    val family = Bytes.toString(CellUtil.cloneFamily(cell))  //取到族列
    val qualifier  = Bytes.toString(CellUtil.cloneQualifier(cell))  //取到修飾名
    val value = Bytes.toString(CellUtil.cloneValue(cell))
    println(rowKey,timestamp,family,qualifier,value)
  })*/
  }
  /**
    * 單條插入
    * @param tableName 名稱空間:表名
    * @param rowKey rowkey
    * @param family 列族
    * @param qualifier column列
    * @param value 列值
    */
  def singlePut(tableName: String, rowKey: String, family: String, qualifier: String, value: String): Unit = {
    //向表中插入資料//向表中插入資料
    //a.單個插入
    val put: Put = new Put(Bytes.toBytes(rowKey)) //引數是行健row01
    put.addColumn(Bytes.toBytes(family), Bytes.toBytes(qualifier), Bytes.toBytes(value))
    //獲得表物件
    val table: Table = connection.getTable(TableName.valueOf(tableName))
    table.put(put)
    table.close()
  }
  /**
    * 刪除資料
    * @param tbName 表名
    * @param row rowkey
    */
  def deleteByRow(tbName:String,row:String): Unit ={
    val delete = new Delete(Bytes.toBytes(row))
//    delete.addColumn(Bytes.toBytes("fm2"), Bytes.toBytes("col2"))
    val table = connection.getTable(TableName.valueOf(tbName))
    table.delete(delete)
  }
  def close(): Unit = {
    admin.close()
    connection.close()
  }
  def main(args: Array[String]): Unit = {
    setConf("ip", "2181")
    /*singlePut("kafka_offset:topic_offset_range", "gid_topic_name", "info", "partition0", "200")
    singlePut("kafka_offset:topic_offset_range", "gid_topic_name", "info", "partition1", "300")
    singlePut("kafka_offset:to·pic_offset_range", "gid_topic_name", "info", "partition2", "100")*/
    val cells = getCell("kafka_offset:grampus_double_groupid", "grampus_erp")
    cells.foreach(cell => {
      val rowKey = Bytes.toString(CellUtil.cloneRow(cell))
      val timestamp = cell.getTimestamp; //取到時間戳
      val family = Bytes.toString(CellUtil.cloneFamily(cell)) //取到族列
      val qualifier = Bytes.toString(CellUtil.cloneQualifier(cell)) //取到修飾名
      val value = Bytes.toString(CellUtil.cloneValue(cell))
      println(rowKey, timestamp, family, qualifier, value)
    })
    /*val topics=List("")
    val resultScanner: ResultScanner = hbaseScan("kafka_offset:topic_offset_range")
    resultScanner.asScala.foreach(rs=>{
      val cells = rs.listCells()
      cells.asScala.foreach(cell => {
        val rowKey = Bytes.toString(CellUtil.cloneRow(cell))
        val qualifier = Bytes.toString(CellUtil.cloneQualifier(cell)) //取到修飾名
        val value = Bytes.toString(CellUtil.cloneValue(cell))
      })
    })*/
//    deleteByRow("bi_odi:redis_ip","900150983cd24fb0d6963f7d28e17f72")
    this.close()
  }
}

hive資料匯入hbase

裡面所有的hbase操作將會呼叫上面的hbase工具類,使用BufferedMutatorParams()方式將資料匯入hbase

object Hive2Hbase {
  def main(args: Array[String]): Unit = {
    val session: SparkSession = SparkSession.builder()
      .appName("Hive2Hbase")
      .enableHiveSupport()
      .config("spark.serializer","org.apache.spark.serializer.KryoSerializer")
      .getOrCreate()
    // 執行查詢
    print("====================   任務開始       ========================")
    val hiveData: DataFrame = session.sql("select * from t_order")
    HbaseUtil.setConf("ip", "2181")
    val connection = HbaseUtil.connection
    HbaseUtil.createTable("test6", "hiveData")
    val hiveRdd: RDD[Row] = hiveData.rdd
    hiveRdd.foreachRDD { rdd =>
     rdd.foreachPartition { x =>
      val putList = new util.ArrayList[Put]()
      HbaseUtil.setConf("ip", "2181")
      val connection = HbaseUtil.connection
      //獲取使用者資訊
      val userId = x.getAs[String]("id")
      val userName= x.getAs[String]("inamed")
      val userMoney= x.getAs[Double]("money")
      //一個Put物件就是一行記錄,在構造方法中指定主鍵
      val put = new Put(Bytes.toBytes(MD5Util.getMD5(userId + userName)))
      put.addColumn(Bytes.toBytes("hiveData"), Bytes.toBytes("id"), Bytes.toBytes(userId))
        .addColumn(Bytes.toBytes("hiveData"), Bytes.toBytes("name"), Bytes.toBytes(userName))
        .addColumn(Bytes.toBytes("hiveData"), Bytes.toBytes("money"), Bytes.toBytes(userMoney))
      putList.add(put)
      //設定快取1m,當達到1m時資料會自動刷到hbase
      val params = new BufferedMutatorParams(TableName.valueOf("test6"))
      params.writeBufferSize(1024 * 1024) //設定快取的大小
      val mutator = connection.getBufferedMutator(params)
      mutator.mutate(putList)
      mutator.flush()
      putList.clear() 
    }
    })
    session.stop()
    HbaseUtil.close()
    println("======================  任務結束  ============")
  }

}