Hbase實戰--HBASE的API操作(增刪改查)
連線HBase的正確姿勢
Connection是什麼
在眾多HBase使用者中,常見的使用Connection的錯誤方法有:
(1)自己實現一個Connection物件的資源池,每次使用都從資源池中取出一個Connection物件;
(2)每個執行緒一個Connection物件。
(3)每次訪問HBase的時候臨時建立一個Connection物件,使用完之後呼叫close關閉連線。
從這些做法來看,這些使用者顯然是把Connection物件當成了單機資料庫裡面的連線物件來用了。然而,作為一個分散式資料庫,HBase客戶端需要和多個伺服器中的不同服務角色建立連線,所以HBase客戶端中的Connection物件並不是簡單對應一個socket連線。HBase的API文件當中對Connection的定義是:
A cluster connection encapsulating lower level individual connections to actual servers and a connection to zookeeper.
我們知道,HBase訪問一條資料的過程中,需要連線三個不同的服務角色:
(1)Zookeeper
(2)HBase Master
(3)HBase RegionServer
而HBase客戶端的Connection包含了對以上三種socket連線的封裝。Connection物件和實際的socket連線之間的對應關係如下圖:
在HBase客戶端程式碼中,真正對應socket連線的是RpcConnection物件。HBase使用PoolMap這種資料結構來儲存客戶端到HBase伺服器之間的連線。PoolMap封裝了ConcurrentHashMap>的結構,key是ConnectionId(封裝了伺服器地址和使用者ticket),value是一個RpcConnection物件的資源池。當HBase需要連線一個伺服器時,首先會根據ConnectionId找到對應的連線池,然後從連線池中取出一個連線物件。HBase中提供了三種資源池的實現,分別是Reusable,RoundRobin和ThreadLocal。
具體實現可以通過hbase.client.ipc.pool.type配置項指定,預設為Reusable。連線池的大小也可以通過hbase.client.ipc.pool.size配置項指定,預設為1。
連線HBase的正確方式
在HBase中Connection類已經實現了對連線的管理功能,所以我們不需要自己在Connection之上再做額外的管理。另外,Connection是執行緒安全的,而Table和Admin則不是執行緒安全的,因此正確的做法是一個程序共用一個Connection物件,而在不同的執行緒中使用單獨的Table和Admin物件。
所有程序共用一個connection物件
connection = ConnectionFactory.createConnection(config);
每個執行緒使用單獨的table物件
Table table = connection.getTable(TableName.valueOf("test"));
try {
...
} finally {
table.close();
}
HBase客戶端預設的是連線池大小是1,也就是每個RegionServer 1個連線。如果應用需要使用更大的連線池或指定其他的資源池型別,也可以通過修改配置實現:
config.set("hbase.client.ipc.pool.type",...);
config.set("hbase.client.ipc.pool.size",...);
connection = ConnectionFactory.createConnection(config);
con...建立一個連線
Connection conn = ConnectionFactory.createConnection(conf);
拿到一個DDL操作器:表管理器admin
Admin admin = conn.getAdmin();
用表管理器的api去建表、刪表、修改表定義
admin.createTable(HTableDescriptor descriptor);
public class HbaseApiDemo {
@Test
public void testCreateTable() throws Exception {
建立hbase的配置物件
Configuration conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum", "cts02:2181,cts03:2181,cts04:2181");
建立hbase的連線物件
Connection conn = ConnectionFactory.createConnection(conf);
DDL操作工具
Admin admin = conn.getAdmin();
建立一個表定義描述物件
HTableDescriptor tUser = new HTableDescriptor(TableName.valueOf("t_user"));
構造一個列族描述物件
HColumnDescriptor f1 = new HColumnDescriptor("f1");
HColumnDescriptor f2 = new HColumnDescriptor("f2");
在表描述物件中加入列族描述
tUser.addFamily(f1);
tUser.addFamily(f2);
呼叫admin的建表方法來建表
admin.createTable(tUser);
// 關閉連線
admin.close();
conn.close();
}
修改表定義
public void testAlterTable() throws Exception {
Configuration conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum", "cts02:2181,cts03:2181,cts04:2181");
Connection conn = ConnectionFactory.createConnection(conf);
DDL操作工具
Admin admin = conn.getAdmin();
HTableDescriptor user = admin.getTableDescriptor(TableName.valueOf("t_user"));
HColumnDescriptor f3 = new HColumnDescriptor("f3");
f3.setMaxVersions(3);
user.addFamily(f3);
admin.modifyTable(TableName.valueOf("t_user"), user);
admin.close();
conn.close();
}
刪除表
public void testDropTable() throws Exception {
Configuration conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum", "cts02:2181,cts03:2181,cts04:2181");
Connection conn = ConnectionFactory.createConnection(conf);
// DDL操作工具
Admin admin = conn.getAdmin();
// 先禁用
admin.disableTable(TableName.valueOf("t_user"));
// 再刪除
admin.deleteTable(TableName.valueOf("t_user"));
admin.close();
conn.close();
}
插入|更新
public void testPut() throws Exception {
Configuration conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum", "cts02:2181,cts03:2181,cts04:2181");
Connection conn = ConnectionFactory.createConnection(conf);
//獲得表物件
Table table = conn.getTable(TableName.valueOf("t_user"));
//建立put物件 ,一個put操作一行資料,並設定rowkey名稱
Put put1 = new Put("001".getBytes());
//新增一個列,要制定列族,和列名稱
put1.addColumn("f1".getBytes(), "name".getBytes(), "張三".getBytes());
put1.addColumn("f1".getBytes(), Bytes.toBytes("age"), Bytes.toBytes(28));
Put put2 = new Put("002".getBytes());
// 新增一個列
put2.addColumn("f1".getBytes(), "name".getBytes(), "李四".getBytes());
put2.addColumn("f1".getBytes(), Bytes.toBytes("age"), Bytes.toBytes(38));
ArrayList<Put> puts = new ArrayList<>();
puts.add(put1);
puts.add(put2);
table.put(puts);
table.close();
conn.close();
hbase寫入資料的幾種方式
接下來我們總結一下hbase幾種寫入常見的方式,以及涉及的應用場景,儘量覆蓋日常業務中的使用場景,另外再總結一下其中涉及到的一些原理知識。hbase一般的插入過程都使用HTable物件,將資料封裝在Put物件中,Put在new建立的時候需要傳入rowkey,並將列族,列名,列值add進去。然後HTable呼叫put方法,通過rpc請求提交到Regionserver端。寫入的方式可以分為以下幾種
- 單條put
- 批量put
- bluckload
Htable介紹
要向hbase中寫入就免不了要和HTable打交道,HTable負責向一張hbase表中讀或者寫資料,HTable物件是非執行緒安全的。多執行緒使用時需要注意,建立HTable物件時需要指定表名引數,HTable內部有一個LinkedList<Row>的佇列writeAsyncBuffer ,負責對寫入到hbase的資料在客戶端快取,開啟快取使用引數 table.setAutoFlushTo(false); 預設情況不開啟每次put一條資料時,htable物件就會呼叫flushCommits方法向regserver中提交,開啟快取則會比較佇列的大小,如果大於某個值則呼叫flushCommits,這個值預設是2m,可以通過在hbase-site.xml中設定引數 "hbase.client.write.buffer"來調整,預設是2097152, 在關閉htable連線時,會隱式的呼叫flushCommits方法,保證資料完全提交。提交時會根據rowkey定位該put應該提交到哪個reginserver,然後每個regionserver一組action傳送出去。
注意:有了BufferedMutator之後,BufferedMutator替換了HTable的setAutoFlush(false)的作用。可以從連線的例項中獲取BufferedMutator的例項。在使用完成後需要呼叫的close()方法關閉連線。對BufferedMutator進行配置需要通過BufferedMutatorParams完成。BufferedMutatorParams要比Htable更搞效,所以心在我們在向hbase插入資料時儘量使用BufferedMutatorParams
單條put
最簡單基礎的寫入hbase,一般應用場景是線上業務執行時,記錄單條插入,如報文記錄,處理記錄,寫入後htable物件即釋放。每次提交就是一次rpc請求。
table.setAutoFlushTo(true);
/**
* 插入一條記錄
* rowkey 為rk001 列族為f1
* 插入兩列 c1列 值為001
* c2列 值為002
*
*/
public void insertPut(){
//Configuration 載入hbase的配置資訊,HBaseConfiguration.create()是先new Configuration然後呼叫addResource方法將
//hbase-site.xml配置檔案載入進來
Configuration conf = HBaseConfiguration.create();
try {
table = new HTable(conf,tableName);
table.setAutoFlushTo(true);//不顯示設定則預設是true
String rowkey = "rk001";
Put put = new Put(rowkey.getBytes());
put.add(cf.getBytes(),"c1".getBytes(),"001".getBytes());
put.add(cf.getBytes(),"c2".getBytes(),"002".getBytes());
table.put(put);
table.close();//關閉hbase連線
} catch (IOException e) {
e.printStackTrace();
}
}
多條寫入
有了單條的put自然就想到這種方式其實是低效的,每次只能提交一條記錄,有沒有上面方法可以一次提交多條記錄呢?減少請求次數, 最簡單的方式使用List<Put>,這種方式操作時和單條put沒有區別,將put物件add到list中,然後呼叫put(List<Put>)方法,過程和單條put基本一致,應用場景一般在資料量稍多的環境下,通過批量提交減少請求次數
/**
* 批量請求,一次提交兩條
*/
public void insertPuts() {
Configuration conf = HBaseConfiguration.create();
try {
table = new HTable(conf, tableName);
table.setAutoFlushTo(true);
List<Put> lists = new ArrayList<Put>();
String rowkey1 = "rk001";
Put put1 = new Put(rowkey1.getBytes());
put1.add(cf.getBytes(), "c1".getBytes(), "001".getBytes());
put1.add(cf.getBytes(), "c2".getBytes(), "002".getBytes());
lists.add(put1);
String rowkey2 = "rk002";
Put put2 = new Put(rowkey2.getBytes());
put2.add(cf.getBytes(), "c1".getBytes(), "v2001".getBytes());
put2.add(cf.getBytes(), "c2".getBytes(), "v2002".getBytes());
lists.add(put2);
table.put(lists);
table.close();
} catch (IOException e) {
e.printStackTrace();
}
}
BufferedMutatorParams的使用
org.apache.hadoop.hbase.client.HTable歸根結底持有的就是BufferedMutatorImpl型別的屬性mutator,所有後續的操作都是基於mutator操作, 那麼其實我們操作hbase客戶端,完全可以摒棄HTable物件,直接構建BufferedMutator,然後操作hbase,BufferedMutatorParams主要是收集構造BufferedMutator物件的引數資訊,這些引數包括hbase資料表名、hbase客戶端緩衝區、hbase rowkey最大所佔空間、執行緒池和監聽hbase操作的回撥監聽器(比如監聽hbase寫入失敗)。
使用方式:
正如BufferedMutatorParams需要引數一樣,我們需要提供表名,設定好快取的大小,初始化mutator例項然後提價put對應,向hbase插入資料
案例:
//一個Put物件就是一行記錄,在構造方法中指定主鍵
val put = new Put(Bytes.toBytes(MD5Util.getMD5(userId + userName)))
put.addColumn(Bytes.toBytes("hiveData"), Bytes.toBytes("id"), Bytes.toBytes(userId)).addColumn(Bytes.toBytes("hiveData"), Bytes.toBytes("name"), Bytes.toBytes(userName)) .addColumn(Bytes.toBytes("hiveData"), Bytes.toBytes("money"), Bytes.toBytes(userMoney))
putList.add(put)
//設定快取1m,當達到1m時資料會自動刷到hbase
val params = new BufferedMutatorParams(TableName.valueOf("test6"))
params.writeBufferSize(1024 * 1024) //設定快取的大小
val mutator = connection.getBufferedMutator(params)
mutator.mutate(putList)
mutator.flush()
putList.clear()
sparkStreaming向hbase寫資料
SparkStreaming怎麼向Hbase中寫資料。首先,需要說一下,下面的這個方法。
foreachRDD(func)
最通用的輸出操作,把func作用於從stream生成的每一個RDD。
注意:這個函式是在 執行streaming程式的driver程序 中執行的。
下面跟著思路,看一下,怎麼優雅的向Hbase中寫入資料
向外部寫資料 常見的錯誤:
向外部資料庫寫資料,通常會建立連線,使用連線傳送資料(也就是儲存資料)。
開發者可能 在driver中建立連線,而在spark worker 中儲存資料
例如:
dstream.foreachRDD { rdd =>
val connection = createNewConnection() // 這個會在driver中執行
rdd.foreach { record =>
connection.send(record) //這個會在 worker中執行
}
}
上面這種寫法是錯誤的!上面的寫法,需要connection 物件被序列化,然後從driver傳送到worker。
這樣的connection是很少在機器之間傳輸的。知道這個問題後,我們可以寫出以下的,修改後的程式碼:
dstream.foreachRDD { rdd =>
rdd.foreach { record =>
val connection = createNewConnection()
connection.send(record)
connection.close()
}
}
這種寫法也是不對的。這會導致,對於每條資料,都建立一個connection(建立connection是消耗資源的)。
下面的方法會好一些:
dstream.foreachRDD { rdd =>
rdd.foreachPartition { partitionOfRecords =>
val connection = createNewConnection()
partitionOfRecords.foreach(record => connection.send(record))
connection.close()
}
}
上面的方法,使用 rdd.foreachPartition 建立一個connection 物件, 一個RDD分割槽中的所有資料,都使用這一個connection。
更優的方法,在多個RDD之間,connection物件是可以重用的,所以可以建立一個連線池。如下
dstream.foreachRDD { rdd =>
rdd.foreachPartition { partitionOfRecords =>
// ConnectionPool是一個靜態的,延遲初始化的連線池
val connection = ConnectionPool.getConnection()
partitionOfRecords.foreach(record => connection.send(record))
ConnectionPool.returnConnection(connection) // 返回到池中 以便別人使用 }
}
連線池中的連線應該是,應需求而延遲建立,並且,如果一段時間沒用,就超時了(也就是關閉該連線)
實戰開發規範操作
在專案實際開發中我們操作hbase 一般需要單獨建立一個hbase工具類,方便之後的操作
hbase工具類案例
package com.util.hadoop
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase._
import org.apache.hadoop.hbase.client._
import org.apache.hadoop.hbase.util.Bytes
import scala.collection.mutable
object HbaseUtil {
var conf: Configuration = _
//執行緒池
lazy val connection: Connection = ConnectionFactory.createConnection(conf)
lazy val admin: Admin = connection.getAdmin
/**
* hbase conf
* @param quorum hbase的zk地址
* @param port zk埠2181
* @return
*/
def setConf(quorum: String, port: String): Unit = {
val conf = HBaseConfiguration.create()
conf.set("hbase.zookeeper.quorum", quorum)
conf.set("hbase.zookeeper.property.clientPort", port)
this.conf = conf
}
/**
* 如果不存在就建立表
* @param tableName 名稱空間:表名
* @param columnFamily 列族
*/
def createTable(tableName: String, columnFamily: String): Unit = {
val tbName = TableName.valueOf(tableName)
if (!admin.tableExists(tbName)) {
val htableDescriptor = new HTableDescriptor(tbName)
val hcolumnDescriptor = new HColumnDescriptor(columnFamily)
htableDescriptor.addFamily(hcolumnDescriptor)
admin.createTable(htableDescriptor)
}
}
def hbaseScan(tableName: String): ResultScanner = {
val scan = new Scan()
val table = connection.getTable(TableName.valueOf(tableName))
table.getScanner(scan)
// val scanner: CellScanner = rs.next().cellScanner()
}
/**
* 獲取hbase單元格內容
* @param tableName 名稱空間:表名
* @param rowKey rowkey
* @return 返回單元格組成的List
*/
def getCell(tableName: String, rowKey: String): mutable.Buffer[Cell] = {
val get = new Get(Bytes.toBytes(rowKey))
/*if (qualifier == "") {
get.addFamily(family.getBytes())
} else {
get.addColumn(family.getBytes(), qualifier.getBytes())
}*/
val table = connection.getTable(TableName.valueOf(tableName))
val result: Result = table.get(get)
import scala.collection.JavaConverters._
result.listCells().asScala
/*.foreach(cell=>{
val rowKey=Bytes.toString(CellUtil.cloneRow(cell))
val timestamp = cell.getTimestamp; //取到時間戳
val family = Bytes.toString(CellUtil.cloneFamily(cell)) //取到族列
val qualifier = Bytes.toString(CellUtil.cloneQualifier(cell)) //取到修飾名
val value = Bytes.toString(CellUtil.cloneValue(cell))
println(rowKey,timestamp,family,qualifier,value)
})*/
}
/**
* 單條插入
* @param tableName 名稱空間:表名
* @param rowKey rowkey
* @param family 列族
* @param qualifier column列
* @param value 列值
*/
def singlePut(tableName: String, rowKey: String, family: String, qualifier: String, value: String): Unit = {
//向表中插入資料//向表中插入資料
//a.單個插入
val put: Put = new Put(Bytes.toBytes(rowKey)) //引數是行健row01
put.addColumn(Bytes.toBytes(family), Bytes.toBytes(qualifier), Bytes.toBytes(value))
//獲得表物件
val table: Table = connection.getTable(TableName.valueOf(tableName))
table.put(put)
table.close()
}
/**
* 刪除資料
* @param tbName 表名
* @param row rowkey
*/
def deleteByRow(tbName:String,row:String): Unit ={
val delete = new Delete(Bytes.toBytes(row))
// delete.addColumn(Bytes.toBytes("fm2"), Bytes.toBytes("col2"))
val table = connection.getTable(TableName.valueOf(tbName))
table.delete(delete)
}
def close(): Unit = {
admin.close()
connection.close()
}
def main(args: Array[String]): Unit = {
setConf("ip", "2181")
/*singlePut("kafka_offset:topic_offset_range", "gid_topic_name", "info", "partition0", "200")
singlePut("kafka_offset:topic_offset_range", "gid_topic_name", "info", "partition1", "300")
singlePut("kafka_offset:to·pic_offset_range", "gid_topic_name", "info", "partition2", "100")*/
val cells = getCell("kafka_offset:grampus_double_groupid", "grampus_erp")
cells.foreach(cell => {
val rowKey = Bytes.toString(CellUtil.cloneRow(cell))
val timestamp = cell.getTimestamp; //取到時間戳
val family = Bytes.toString(CellUtil.cloneFamily(cell)) //取到族列
val qualifier = Bytes.toString(CellUtil.cloneQualifier(cell)) //取到修飾名
val value = Bytes.toString(CellUtil.cloneValue(cell))
println(rowKey, timestamp, family, qualifier, value)
})
/*val topics=List("")
val resultScanner: ResultScanner = hbaseScan("kafka_offset:topic_offset_range")
resultScanner.asScala.foreach(rs=>{
val cells = rs.listCells()
cells.asScala.foreach(cell => {
val rowKey = Bytes.toString(CellUtil.cloneRow(cell))
val qualifier = Bytes.toString(CellUtil.cloneQualifier(cell)) //取到修飾名
val value = Bytes.toString(CellUtil.cloneValue(cell))
})
})*/
// deleteByRow("bi_odi:redis_ip","900150983cd24fb0d6963f7d28e17f72")
this.close()
}
}
hive資料匯入hbase
裡面所有的hbase操作將會呼叫上面的hbase工具類,使用BufferedMutatorParams()方式將資料匯入hbase
object Hive2Hbase {
def main(args: Array[String]): Unit = {
val session: SparkSession = SparkSession.builder()
.appName("Hive2Hbase")
.enableHiveSupport()
.config("spark.serializer","org.apache.spark.serializer.KryoSerializer")
.getOrCreate()
// 執行查詢
print("==================== 任務開始 ========================")
val hiveData: DataFrame = session.sql("select * from t_order")
HbaseUtil.setConf("ip", "2181")
val connection = HbaseUtil.connection
HbaseUtil.createTable("test6", "hiveData")
val hiveRdd: RDD[Row] = hiveData.rdd
hiveRdd.foreachRDD { rdd =>
rdd.foreachPartition { x =>
val putList = new util.ArrayList[Put]()
HbaseUtil.setConf("ip", "2181")
val connection = HbaseUtil.connection
//獲取使用者資訊
val userId = x.getAs[String]("id")
val userName= x.getAs[String]("inamed")
val userMoney= x.getAs[Double]("money")
//一個Put物件就是一行記錄,在構造方法中指定主鍵
val put = new Put(Bytes.toBytes(MD5Util.getMD5(userId + userName)))
put.addColumn(Bytes.toBytes("hiveData"), Bytes.toBytes("id"), Bytes.toBytes(userId))
.addColumn(Bytes.toBytes("hiveData"), Bytes.toBytes("name"), Bytes.toBytes(userName))
.addColumn(Bytes.toBytes("hiveData"), Bytes.toBytes("money"), Bytes.toBytes(userMoney))
putList.add(put)
//設定快取1m,當達到1m時資料會自動刷到hbase
val params = new BufferedMutatorParams(TableName.valueOf("test6"))
params.writeBufferSize(1024 * 1024) //設定快取的大小
val mutator = connection.getBufferedMutator(params)
mutator.mutate(putList)
mutator.flush()
putList.clear()
}
})
session.stop()
HbaseUtil.close()
println("====================== 任務結束 ============")
}
}