Spark將資料寫入Hbase以及從Hbase讀取資料
本文將介紹
1、spark如何利用saveAsHadoopDataset和saveAsNewAPIHadoopDataset將RDD寫入hbase
2、spark從hbase中讀取資料並轉化為RDD
操作方式為在eclipse本地執行spark連線到遠端的hbase。
java版本:1.7.0
scala版本:2.10.4
zookeeper版本:3.4.5(禁用了hbase自帶zookeeper,選擇自己部署的)
hadoop版本:2.4.1
spark版本:1.6.1
hbase版本:1.2.3
叢集:centos6.5_x64
將RDD寫入hbase
注意點:
依賴:
將lib目錄下的hadoop開頭jar包、hbase開頭jar包新增至classpath
此外還有lib目錄下的:zookeeper-3.4.6.jar、metrics-core-2.2.0.jar(缺少會提示hbase RpcRetryingCaller: Call exception不斷嘗試重連hbase,不報錯)、htrace-core-3.1.0-incubating.jar、guava-12.0.1.jar
$SPARK_HOME/lib目錄下的 spark-assembly-1.6.1-hadoop2.4.0.jar
不同的package中可能會有相同名稱的類,不要導錯
連線叢集:
spark應用需要連線到zookeeper叢集,然後藉助zookeeper訪問hbase。一般可以通過兩種方式連線到zookeeper:
第一種是將hbase-site.xml檔案加入classpath
第二種是在HBaseConfiguration例項中設定
如果不設定,預設連線的是localhost:2181會報錯:connection refused
本文使用的是第二種方式。
hbase建立表:
雖然可以在spark應用中建立hbase表,但是不建議這樣做,最好在hbase shell中建立表,spark寫或讀資料
使用saveAsHadoopDataset寫入資料
import org.apache.hadoop.hbase.HBaseConfiguration import org.apache.hadoop.hbase.client.Put import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.hbase.mapred.TableOutputFormat import org.apache.hadoop.hbase.util.Bytes import org.apache.hadoop.mapred.JobConf import org.apache.spark.{SparkContext, SparkConf} /** * User:leen * Date:2017/12/20 0020 * Time:16:51 */ object HbaseTest1 { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setAppName("HBaseTest1").setMaster("local") val sc = new SparkContext(sparkConf) val conf = HBaseConfiguration.create() //設定zooKeeper叢集地址,也可以通過將hbase-site.xml匯入classpath,但是建議在程式裡這樣設定 conf.set("hbase.zookeeper.quorum","slave1,slave2,slave3") //設定zookeeper連線埠,預設2181 conf.set("hbase.zookeeper.property.clientPort", "2181") val tablename = "account" //初始化jobconf,TableOutputFormat必須是org.apache.hadoop.hbase.mapred包下的 val jobConf = new JobConf(conf) jobConf.setOutputFormat(classOf[TableOutputFormat]) jobConf.set(TableOutputFormat.OUTPUT_TABLE, tablename) val indataRDD = sc.makeRDD(Array("1,jack,15","2,Lily,16","3,mike,16")) val rdd = indataRDD.map(_.split(',')).map{arr=>{ // 一個Put物件就是一行記錄,在構造方法中指定主鍵 // 所有插入的資料必須用org.apache.hadoop.hbase.util.Bytes.toBytes方法轉換 // Put.add方法接收三個引數:列族,列名,資料 val put = new Put(Bytes.toBytes(arr(0).toInt)) put.addColumn(Bytes.toBytes("cf"),Bytes.toBytes("name"),Bytes.toBytes(arr(1))) put.addColumn(Bytes.toBytes("cf"),Bytes.toBytes("age"),Bytes.toBytes(arr(2).toInt)) //轉化成RDD[(ImmutableBytesWritable,Put)]型別才能呼叫saveAsHadoopDataset (new ImmutableBytesWritable, put) }} rdd.saveAsHadoopDataset(jobConf) sc.stop() } }
使用saveAsNewAPIHadoopDataset寫入資料
import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat
import org.apache.hadoop.hbase.client.Result
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.mapreduce.Job
import org.apache.spark.{SparkContext, SparkConf}
/**
* User:leen
* Date:2017/12/20 0020
* Time:17:34
*/
object HbaseTest2 {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setAppName("HBaseTest").setMaster("local")
val sc = new SparkContext(sparkConf)
val tablename = "account"
sc.hadoopConfiguration.set("hbase.zookeeper.quorum","slave1,slave2,slave3")
sc.hadoopConfiguration.set("hbase.zookeeper.property.clientPort", "2181")
sc.hadoopConfiguration.set(TableOutputFormat.OUTPUT_TABLE, tablename)
val job = Job.getInstance(sc.hadoopConfiguration)
job.setOutputKeyClass(classOf[ImmutableBytesWritable])
job.setOutputValueClass(classOf[Result])
job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])
val indataRDD = sc.makeRDD(Array("1,jack,15","2,Lily,16","3,mike,16"))
val rdd = indataRDD.map(_.split(',')).map{arr=>{
val put = new Put(Bytes.toBytes(arr(0)))
put.addColumn(Bytes.toBytes("cf"),Bytes.toBytes("name"),Bytes.toBytes(arr(1)))
put.addColumn(Bytes.toBytes("cf"),Bytes.toBytes("age"),Bytes.toBytes(arr(2).toInt))
(new ImmutableBytesWritable, put)
}}
rdd.saveAsNewAPIHadoopDataset(job.getConfiguration())
sc.stop()
}
}
從hbase讀取資料轉化成RDD
本例基於官方提供的例子
package com.test
import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor, TableName}
import org.apache.hadoop.hbase.client.HBaseAdmin
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.spark._
import org.apache.hadoop.hbase.client.HTable
import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.io._
object TestHBase2 {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setAppName("HBaseTest").setMaster("local")
val sc = new SparkContext(sparkConf)
val tablename = "account"
val conf = HBaseConfiguration.create()
//設定zooKeeper叢集地址,也可以通過將hbase-site.xml匯入classpath,但是建議在程式裡這樣設定
conf.set("hbase.zookeeper.quorum","slave1,slave2,slave3")
//設定zookeeper連線埠,預設2181
conf.set("hbase.zookeeper.property.clientPort", "2181")
conf.set(TableInputFormat.INPUT_TABLE, tablename)
// 如果表不存在則建立表
val admin = new HBaseAdmin(conf)
if (!admin.isTableAvailable(tablename)) {
val tableDesc = new HTableDescriptor(TableName.valueOf(tablename))
admin.createTable(tableDesc)
}
//讀取資料並轉化成rdd
val hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat],
classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
classOf[org.apache.hadoop.hbase.client.Result])
val count = hBaseRDD.count()
println(count)
hBaseRDD.foreach{case (_,result) =>{
//獲取行鍵
val key = Bytes.toString(result.getRow)
//通過列族和列名獲取列
val name = Bytes.toString(result.getValue("cf".getBytes,"name".getBytes))
val age = Bytes.toInt(result.getValue("cf".getBytes,"age".getBytes))
println("Row key:"+key+" Name:"+name+" Age:"+age)
}}
sc.stop()
admin.close()
}
}