1. 程式人生 > >Spark將資料寫入Hbase以及從Hbase讀取資料

Spark將資料寫入Hbase以及從Hbase讀取資料

本文將介紹

1、spark如何利用saveAsHadoopDataset和saveAsNewAPIHadoopDataset將RDD寫入hbase

2、spark從hbase中讀取資料並轉化為RDD

操作方式為在eclipse本地執行spark連線到遠端的hbase。

java版本:1.7.0

scala版本:2.10.4

zookeeper版本:3.4.5(禁用了hbase自帶zookeeper,選擇自己部署的)

hadoop版本:2.4.1

spark版本:1.6.1

hbase版本:1.2.3

叢集:centos6.5_x64

將RDD寫入hbase

注意點:

依賴:

將lib目錄下的hadoop開頭jar包、hbase開頭jar包新增至classpath

此外還有lib目錄下的:zookeeper-3.4.6.jar、metrics-core-2.2.0.jar(缺少會提示hbase RpcRetryingCaller: Call exception不斷嘗試重連hbase,不報錯)、htrace-core-3.1.0-incubating.jar、guava-12.0.1.jar

$SPARK_HOME/lib目錄下的 spark-assembly-1.6.1-hadoop2.4.0.jar

不同的package中可能會有相同名稱的類,不要導錯

連線叢集:

spark應用需要連線到zookeeper叢集,然後藉助zookeeper訪問hbase。一般可以通過兩種方式連線到zookeeper:

第一種是將hbase-site.xml檔案加入classpath

第二種是在HBaseConfiguration例項中設定

如果不設定,預設連線的是localhost:2181會報錯:connection refused 

本文使用的是第二種方式。

hbase建立表:

雖然可以在spark應用中建立hbase表,但是不建議這樣做,最好在hbase shell中建立表,spark寫或讀資料

使用saveAsHadoopDataset寫入資料


    import org.apache.hadoop.hbase.HBaseConfiguration  
    import org.apache.hadoop.hbase.client.Put  
    import org.apache.hadoop.hbase.io.ImmutableBytesWritable  
    import org.apache.hadoop.hbase.mapred.TableOutputFormat  
    import org.apache.hadoop.hbase.util.Bytes  
    import org.apache.hadoop.mapred.JobConf  
    import org.apache.spark.{SparkContext, SparkConf}  
      
    /** 
     * User:leen 
     * Date:2017/12/20 0020 
     * Time:16:51 
     */  
    object HbaseTest1 {  
      def main(args: Array[String]): Unit = {  
        val sparkConf = new SparkConf().setAppName("HBaseTest1").setMaster("local")  
        val sc = new SparkContext(sparkConf)  
      
        val conf = HBaseConfiguration.create()  
        //設定zooKeeper叢集地址,也可以通過將hbase-site.xml匯入classpath,但是建議在程式裡這樣設定  
        conf.set("hbase.zookeeper.quorum","slave1,slave2,slave3")  
        //設定zookeeper連線埠,預設2181  
        conf.set("hbase.zookeeper.property.clientPort", "2181")  
      
        val tablename = "account"  
      
        //初始化jobconf,TableOutputFormat必須是org.apache.hadoop.hbase.mapred包下的  
        val jobConf = new JobConf(conf)  
        jobConf.setOutputFormat(classOf[TableOutputFormat])  
        jobConf.set(TableOutputFormat.OUTPUT_TABLE, tablename)  
      
        val indataRDD = sc.makeRDD(Array("1,jack,15","2,Lily,16","3,mike,16"))  
      
        val rdd = indataRDD.map(_.split(',')).map{arr=>{  
          
          // 一個Put物件就是一行記錄,在構造方法中指定主鍵  
          // 所有插入的資料必須用org.apache.hadoop.hbase.util.Bytes.toBytes方法轉換  
          // Put.add方法接收三個引數:列族,列名,資料  
          val put = new Put(Bytes.toBytes(arr(0).toInt))  
          put.addColumn(Bytes.toBytes("cf"),Bytes.toBytes("name"),Bytes.toBytes(arr(1)))  
          put.addColumn(Bytes.toBytes("cf"),Bytes.toBytes("age"),Bytes.toBytes(arr(2).toInt))  
          //轉化成RDD[(ImmutableBytesWritable,Put)]型別才能呼叫saveAsHadoopDataset  
          (new ImmutableBytesWritable, put)  
        }}  
      
        rdd.saveAsHadoopDataset(jobConf)  
      
        sc.stop()  
      }  
    }  

使用saveAsNewAPIHadoopDataset寫入資料
    import org.apache.hadoop.hbase.client.Put  
    import org.apache.hadoop.hbase.io.ImmutableBytesWritable  
    import org.apache.hadoop.hbase.mapreduce.TableOutputFormat  
    import org.apache.hadoop.hbase.client.Result  
    import org.apache.hadoop.hbase.util.Bytes  
    import org.apache.hadoop.mapreduce.Job  
    import org.apache.spark.{SparkContext, SparkConf}  
      
    /** 
     * User:leen 
     * Date:2017/12/20 0020 
     * Time:17:34 
     */  
    object HbaseTest2 {  
      
      def main(args: Array[String]): Unit = {  
        val sparkConf = new SparkConf().setAppName("HBaseTest").setMaster("local")  
        val sc = new SparkContext(sparkConf)  
      
        val tablename = "account"  
      
        sc.hadoopConfiguration.set("hbase.zookeeper.quorum","slave1,slave2,slave3")  
        sc.hadoopConfiguration.set("hbase.zookeeper.property.clientPort", "2181")  
        sc.hadoopConfiguration.set(TableOutputFormat.OUTPUT_TABLE, tablename)  
      
        val job = Job.getInstance(sc.hadoopConfiguration)  
        job.setOutputKeyClass(classOf[ImmutableBytesWritable])  
        job.setOutputValueClass(classOf[Result])  
        job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])  
      
        val indataRDD = sc.makeRDD(Array("1,jack,15","2,Lily,16","3,mike,16"))  
          
        val rdd = indataRDD.map(_.split(',')).map{arr=>{  
          val put = new Put(Bytes.toBytes(arr(0)))  
          put.addColumn(Bytes.toBytes("cf"),Bytes.toBytes("name"),Bytes.toBytes(arr(1)))  
          put.addColumn(Bytes.toBytes("cf"),Bytes.toBytes("age"),Bytes.toBytes(arr(2).toInt))  
          (new ImmutableBytesWritable, put)  
        }}  
      
        rdd.saveAsNewAPIHadoopDataset(job.getConfiguration())  
          
        sc.stop()  
      }  
    }  

從hbase讀取資料轉化成RDD

本例基於官方提供的例子

    package com.test  
      
    import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor, TableName}  
    import org.apache.hadoop.hbase.client.HBaseAdmin  
    import org.apache.hadoop.hbase.mapreduce.TableInputFormat  
    import org.apache.spark._  
    import org.apache.hadoop.hbase.client.HTable  
    import org.apache.hadoop.hbase.client.Put  
    import org.apache.hadoop.hbase.util.Bytes  
    import org.apache.hadoop.hbase.io.ImmutableBytesWritable  
    import org.apache.hadoop.hbase.mapreduce.TableOutputFormat  
    import org.apache.hadoop.mapred.JobConf  
    import org.apache.hadoop.io._  
      
    object TestHBase2 {  
      
      def main(args: Array[String]): Unit = {  
        val sparkConf = new SparkConf().setAppName("HBaseTest").setMaster("local")  
        val sc = new SparkContext(sparkConf)  
          
        val tablename = "account"  
        val conf = HBaseConfiguration.create()  
        //設定zooKeeper叢集地址,也可以通過將hbase-site.xml匯入classpath,但是建議在程式裡這樣設定  
        conf.set("hbase.zookeeper.quorum","slave1,slave2,slave3")  
        //設定zookeeper連線埠,預設2181  
        conf.set("hbase.zookeeper.property.clientPort", "2181")  
        conf.set(TableInputFormat.INPUT_TABLE, tablename)  
      
        // 如果表不存在則建立表  
        val admin = new HBaseAdmin(conf)  
        if (!admin.isTableAvailable(tablename)) {  
          val tableDesc = new HTableDescriptor(TableName.valueOf(tablename))  
          admin.createTable(tableDesc)  
        }  
      
        //讀取資料並轉化成rdd  
        val hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat],  
          classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],  
          classOf[org.apache.hadoop.hbase.client.Result])  
      
        val count = hBaseRDD.count()  
        println(count)  
        hBaseRDD.foreach{case (_,result) =>{  
          //獲取行鍵  
          val key = Bytes.toString(result.getRow)  
          //通過列族和列名獲取列  
          val name = Bytes.toString(result.getValue("cf".getBytes,"name".getBytes))  
          val age = Bytes.toInt(result.getValue("cf".getBytes,"age".getBytes))  
          println("Row key:"+key+" Name:"+name+" Age:"+age)  
        }}  
      
        sc.stop()  
        admin.close()  
      }  
    }