1. 程式人生 > >spark生成HFile匯入到hbase

spark生成HFile匯入到hbase

原文地址:http://www.cnblogs.com/luckuan/p/5142203.html

import java.util.Date

import org.apache.hadoop.fs.Path
import org.apache.hadoop.hbase.client.{HTable, Table, _}
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.{HFileOutputFormat2, LoadIncrementalHFiles}
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.{HBaseConfiguration, KeyValue, TableName}
import org.apache.hadoop.mapreduce.Job
import org.apache.spark.{SparkConf, SparkContext}


object App7 {
  def main(args: Array[String]) {
    //建立sparkcontext,用預設的配置
    val sc = new SparkContext(new SparkConf())
    //hbase的列族
    val columnFamily1 = "f1"
    //hbase的預設配置檔案
    val conf = HBaseConfiguration.create()
    //當前時間
    val date = new Date().getTime
    //初始化RDD,用 sc.parallelize 生成一個RDD
    val sourceRDD = sc.parallelize(Array(
      (Bytes.toBytes("41"), //41是rowkey
        (Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("foo1"))), //分別設定family  colum  和 value
      (Bytes.toBytes("41"),
        (Bytes.toBytes(columnFamily1), Bytes.toBytes("b"), Bytes.toBytes("foo2.b"))),
      (Bytes.toBytes("42"),
        (Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("foo2.a"))),
      (Bytes.toBytes("42"),
        (Bytes.toBytes(columnFamily1), Bytes.toBytes("c"), Bytes.toBytes("foo2.c"))),
      (Bytes.toBytes("43"),
        (Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("foo3"))),
      (Bytes.toBytes("44"),
        (Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("foo.1"))),
      (Bytes.toBytes("44"),
        (Bytes.toBytes(columnFamily1), Bytes.toBytes("b"), Bytes.toBytes("foo.2"))),
      (Bytes.toBytes("45"),
        (Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("bar.1"))),
      (Bytes.toBytes("45"),
        (Bytes.toBytes(columnFamily1), Bytes.toBytes("d"), Bytes.toBytes("bar.2")))))

    val rdd = sourceRDD.map(x => {
      //將rdd轉換成HFile需要的格式,我們上面定義了Hfile的key是ImmutableBytesWritable,那麼我們定義的RDD也是要以ImmutableBytesWritable的例項為key
      //KeyValue的例項為value
      //rowkey
      val rowKey = x._1
      val family = x._2._1
      val colum = x._2._2
      val value = x._2._3
      (new ImmutableBytesWritable(rowKey), new KeyValue(rowKey, family, colum, date, value))
    })

    //生成的HFile的臨時儲存路徑
    val stagingFolder = "/user/hbase/spark/"
    //將日誌儲存到指定目錄
    rdd.saveAsNewAPIHadoopFile(stagingFolder,
      classOf[ImmutableBytesWritable],
      classOf[KeyValue],
      classOf[HFileOutputFormat2],
      conf)
    //此處執行完成之後,在stagingFolder會有我們生成的Hfile檔案


    //開始即那個HFile匯入到Hbase,此處都是hbase的api操作
    val load = new LoadIncrementalHFiles(conf)
    //hbase的表名
    val tableName = "output_table"
    //建立hbase的連結,利用預設的配置檔案,實際上讀取的hbase的master地址
    val conn = ConnectionFactory.createConnection(conf)
    //根據表名獲取表
    val table: Table = conn.getTable(TableName.valueOf(tableName))
    try {
      //獲取hbase表的region分佈
      val regionLocator = conn.getRegionLocator(TableName.valueOf(tableName))
      //建立一個hadoop的mapreduce的job
      val job = Job.getInstance(conf)
      //設定job名稱
      job.setJobName("DumpFile")
      //此處最重要,需要設定檔案輸出的key,因為我們要生成HFil,所以outkey要用ImmutableBytesWritable
      job.setMapOutputKeyClass(classOf[ImmutableBytesWritable])
      //輸出檔案的內容KeyValue
      job.setMapOutputValueClass(classOf[KeyValue])
      //配置HFileOutputFormat2的資訊
      HFileOutputFormat2.configureIncrementalLoad(job, table, regionLocator)

      //開始匯入
      load.doBulkLoad(new Path(stagingFolder), table.asInstanceOf[HTable])
    } finally {
      table.close()
      conn.close()
    }
  }
}