1. 程式人生 > >使用spark寫資料到Hbase的三種方式

使用spark寫資料到Hbase的三種方式

方式一:直接使用HBase Table的PUT方法

import org.apache.hadoop.hbase.{HBaseConfiguration, TableName}
import org.apache.hadoop.hbase.client.{ConnectionFactory, Put, Table}
import org.apache.hadoop.hbase.util.Bytes
import org.apache.spark.{SparkConf, SparkContext}

/**
  * Description: Use Put method of Hbase Client insert data into hbase in Spark-streaming.
  *
  * Author : Adore Chen
  * Created: 2017-12-22
  */
object SparkPut {

  /**
    * insert 100,000 cost 20762 ms
    *
    * @param args
    */
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("SparkPut")
    val context = new SparkContext(conf)

    try {
      val rdd = context.makeRDD(1 to 100000, 4)

      // column family
      val family = Bytes.toBytes("cf")
      // column counter --> ctr
      val column = Bytes.toBytes("ctr")

      println("count is :" + rdd.count())
      rdd.take(5).foreach(println)

      // mapPartition & foreachPartition
      // mapPartition is a lazy transformation, if no action, there is no result.
      // foreachPartition is an action
      rdd.foreachPartition(list => {
        val table = createTable()
        list.foreach(value => {
          val put = new Put(Bytes.toBytes(value))
          put.addImmutable(family, column, Bytes.toBytes(value))
          table.put(put)
        })
        table.close()
      })
    } finally {
      context.stop()
    }
  }

  /**
    * create Hbase Table interface.
    *
    * @return
    */
  def createTable(): Table = {
    val hbaseConf = HBaseConfiguration.create()
    hbaseConf.set("hbase.zookeeper.quorum", "localhost")
    hbaseConf.set("hbase.zookeeper.property.clientPort", "2181")
    hbaseConf.set("hbase.defaults.for.version.skip", "true")
    val conn = ConnectionFactory.createConnection(hbaseConf)
    conn.getTable(TableName.valueOf("test_table"))
  }
}

方式二:Put(List)


import org.apache.hadoop.hbase.{HBaseConfiguration, TableName}
import org.apache.hadoop.hbase.client.{ConnectionFactory, Put}
import org.apache.hadoop.hbase.util.Bytes
import org.apache.spark.{SparkConf, SparkContext}
import scala.collection.JavaConversions

/**
  * Description: Use Mutator batch insert in spark context.
  *
  * Author : Adore Chen
  * Created: 2017-12-22
  */
object SparkPutList {

  /**
    * Use mutator batch insert 100,000, mutator.mutator(Put) cost: 22369
    * Use put list insert 100,000, cost: 25571
    * Use put list by Map 100,000, cost: 21299
    *
    * @param args
    */
  def main(args: Array[String]): Unit = {
    //    putByList()
    putByMap()
  }

  def putByMap(): Unit = {
    val conf = new SparkConf().setAppName(SparkPutList.getClass().getSimpleName())
    val context = new SparkContext(conf)

    // column family
    val family = Bytes.toBytes("cf")
    // column counter --> ctr
    val column = Bytes.toBytes("ctr")

    try {
      val rdd = context.makeRDD(1 to 100000, 4)
      rdd.map(value => {
        val put = new Put(Bytes.toBytes(value))
        put.addImmutable(family, column, Bytes.toBytes(value))
      }).foreachPartition(
        itr => {
          val hbaseConf = HBaseConfiguration.create()
          val conn = ConnectionFactory.createConnection(hbaseConf)
          val table = conn.getTable(TableName.valueOf("test_table"))
          table.put(JavaConversions.seqAsJavaList(itr.toSeq))
          table.close()
        })
    } finally {
      context.stop()
    }
  }

  def putByList(): Unit = {
    val conf = new SparkConf().setAppName(SparkPutList.getClass().getSimpleName())
    val context = new SparkContext(conf)

    // column family
    val family = Bytes.toBytes("cf")
    // column counter --> ctr
    val column = Bytes.toBytes("ctr")

    try {
      val rdd = context.makeRDD(1 to 100000, 4)
      rdd.foreachPartition(list => {
        val hbaseConf = HBaseConfiguration.create()
        val conn = ConnectionFactory.createConnection(hbaseConf)
        val table = conn.getTable(TableName.valueOf("test_table"))
        val putList = new java.util.LinkedList[Put]()
        list.foreach(value => {
          val put = new Put(Bytes.toBytes(value))
          put.addImmutable(family, column, Bytes.toBytes(value))
          putList.add(put)
        })
        table.put(putList)
        table.close()
      })
    } finally {
      context.stop()
    }
  }

  def putByMutator(): Unit = {
    val conf = new SparkConf().setAppName(SparkPutList.getClass().getSimpleName())
    val context = new SparkContext(conf)

    // column family
    val family = Bytes.toBytes("cf")
    // column counter --> ctr
    val column = Bytes.toBytes("ctr")

    try {
      val rdd = context.makeRDD(1 to 100000, 4)
      rdd.foreachPartition(list => {
        val hbaseConf = HBaseConfiguration.create()
        val conn = ConnectionFactory.createConnection(hbaseConf)
        val mutator = conn.getBufferedMutator(TableName.valueOf("test_table"))
        list.foreach(value => {
          val put = new Put(Bytes.toBytes(value))
          put.addImmutable(family, column, Bytes.toBytes(value))
          mutator.mutate(put)
        })
        mutator.close()
      })
    } finally {
      context.stop()
    }
  }
}

方式三: 使用map reduce job 寫入Hbase


import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.mapreduce.Job
import org.apache.hadoop.mapreduce.lib.map.MultithreadedMapper
import org.apache.spark.{SparkConf, SparkContext}

/**
  * Description: Put data into Hbase by map reduce Job.
  *
  * Author : Adore Chen
  * Created: 2017-12-22
  */
object SparkMapJob {

    /**
      * insert 100,000 cost 21035 ms
      *
      * @param args
      */
    def main(args: Array[String]): Unit = {
      val conf = new SparkConf().setAppName("SparkPutByMap")
      val context = new SparkContext(conf)

      val hbaseConf =HBaseConfiguration.create()
      hbaseConf.set(TableOutputFormat.OUTPUT_TABLE, "test_table")
      //IMPORTANT: must set the attribute to solve the problem (can't create path from null string )
      hbaseConf.set("mapreduce.output.fileoutputformat.outputdir", "/tmp")

      val job = Job.getInstance(hbaseConf)
      job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])
      job.setOutputKeyClass(classOf[ImmutableBytesWritable])
      job.setOutputValueClass(classOf[Put])

      try{
        val rdd = context.makeRDD(1 to 100000)

        // column family
        val family = Bytes.toBytes("cf")
        // column counter --> ctr
        val column = Bytes.toBytes("ctr")

        rdd.map(value => {
          var put = new Put(Bytes.toBytes(value))
          put.addImmutable(family, column, Bytes.toBytes(value))
          (new ImmutableBytesWritable(), put)
          })
          .saveAsNewAPIHadoopDataset(job.getConfiguration)
      }finally{
        context.stop()
      }
    }

}

測試環境:

ext{
    sparkVersion = '2.3.0'
}

dependencies {

    compile 'org.slf4j:slf4j-api:1.7.25'
    compile 'org.apache.logging.log4j:log4j-api:2.11.0'
    compile 'org.apache.logging.log4j:log4j-core:2.11.0'

    // spark related
    compile "org.apache.spark:spark-core_2.11:${sparkVersion}"
    compile "org.apache.spark:spark-streaming_2.11:${sparkVersion}"
    compile "org.apache.spark:spark-sql_2.11:${sparkVersion}"
    compile "org.apache.spark:spark-streaming-kafka-0-10_2.11:${sparkVersion}"

    // hbase related
    compile 'org.apache.hbase:hbase-client:1.3.1'
    compile 'org.apache.hbase:hbase-server:1.3.1'

    // redis related
    compile 'redis.clients:jedis:2.9.0'

    // mysql connector
    compile 'mysql:mysql-connector-java:5.1.46'

    // hive jdbc
    compile 'org.apache.hive:hive-jdbc:2.3.3'

    compile 'org.apache.logging.log4j:log4j:2.11.0'
    compile 'org.apache.avro:avro:1.8.2'
    compile 'org.apache.avro:avro-tools:1.8.2'

    testCompile 'junit:junit:4.12'
}