1. 程式人生 > >Spark運算元[20]:saveAsHadoopDataset、saveAsNewAPIHadoopDataset 例項詳解

Spark運算元[20]:saveAsHadoopDataset、saveAsNewAPIHadoopDataset 例項詳解

概要

saveAsHadoopDataset:
使用舊的Hadoop API將RDD輸出到任何Hadoop支援的儲存系統,為該儲存系統使用Hadoop JobConf 物件。
JobConf設定一個OutputFormat和任何需要的輸出路徑(如要寫入的表名),就像為Hadoop MapReduce作業配置的那樣。

saveAsNewAPIHadoopDataset:

使用新的Hadoop API將RDD輸出到任何Hadoop支援的儲存系統,為該儲存系統使用Hadoop Configuration物件。
Conf設定一個OutputFormat和任何需要的輸出路徑(如要寫入的表名),就像為Hadoop MapReduce作業配置的那樣。

saveAsHadoopDataset

saveAsHadoopDataset(conf: JobConf): Unit 

案例:將RDD寫入hbase
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapred.TableOutputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.mapred.JobConf
import org.apache.spark.{SparkContext, SparkConf}

/**
 * User:leen
 * Date:2017/12/20 0020
 * Time:16:51
 */
object HbaseTest1 {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setAppName("HBaseTest1").setMaster("local")
    val sc = new SparkContext(sparkConf)

    val conf = HBaseConfiguration.create()
    //設定zooKeeper叢集地址,也可以通過將hbase-site.xml匯入classpath,但是建議在程式裡這樣設定
    conf.set("hbase.zookeeper.quorum","slave1,slave2,slave3")
    //設定zookeeper連線埠,預設2181
    conf.set("hbase.zookeeper.property.clientPort", "2181")

    val tablename = "account"

    //初始化jobconf,TableOutputFormat必須是org.apache.hadoop.hbase.mapred包下的
    val jobConf = new JobConf(conf)
    jobConf.setOutputFormat(classOf[TableOutputFormat])
    jobConf.set(TableOutputFormat.OUTPUT_TABLE, tablename)

    val indataRDD = sc.makeRDD(Array("1,jack,15","2,Lily,16","3,mike,16"))

    val rdd = indataRDD.map(_.split(',')).map{arr=>{
    
      // 一個Put物件就是一行記錄,在構造方法中指定主鍵
      // 所有插入的資料必須用org.apache.hadoop.hbase.util.Bytes.toBytes方法轉換
      // Put.add方法接收三個引數:列族,列名,資料
      val put = new Put(Bytes.toBytes(arr(0).toInt))
      put.addColumn(Bytes.toBytes("cf"),Bytes.toBytes("name"),Bytes.toBytes(arr(1)))
      put.addColumn(Bytes.toBytes("cf"),Bytes.toBytes("age"),Bytes.toBytes(arr(2).toInt))
      //轉化成RDD[(ImmutableBytesWritable,Put)]型別才能呼叫saveAsHadoopDataset
      (new ImmutableBytesWritable, put)
    }}

    rdd.saveAsHadoopDataset(jobConf)

    sc.stop()
  }
}

saveAsNewAPIHadoopDataset

 saveAsNewAPIHadoopDataset(conf: Configuration): Unit 

案例:將RDD寫入HBASE

import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat
import org.apache.hadoop.hbase.client.Result
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.mapreduce.Job
import org.apache.spark.{SparkContext, SparkConf}

/**
 * User:leen
 * Date:2017/12/20 0020
 * Time:17:34
 */
object HbaseTest2 {

  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setAppName("HBaseTest").setMaster("local")
    val sc = new SparkContext(sparkConf)

    val tablename = "account"

    sc.hadoopConfiguration.set("hbase.zookeeper.quorum","slave1,slave2,slave3")
    sc.hadoopConfiguration.set("hbase.zookeeper.property.clientPort", "2181")
    sc.hadoopConfiguration.set(TableOutputFormat.OUTPUT_TABLE, tablename)

    val job = Job.getInstance(sc.hadoopConfiguration)
    job.setOutputKeyClass(classOf[ImmutableBytesWritable])
    job.setOutputValueClass(classOf[Result])
    job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])

    val indataRDD = sc.makeRDD(Array("1,jack,15","2,Lily,16","3,mike,16"))
    
    val rdd = indataRDD.map(_.split(',')).map{arr=>{
      val put = new Put(Bytes.toBytes(arr(0)))
      put.addColumn(Bytes.toBytes("cf"),Bytes.toBytes("name"),Bytes.toBytes(arr(1)))
      put.addColumn(Bytes.toBytes("cf"),Bytes.toBytes("age"),Bytes.toBytes(arr(2).toInt))
      (new ImmutableBytesWritable, put)
    }}

    rdd.saveAsNewAPIHadoopDataset(job.getConfiguration())
    
    sc.stop()
  }
}