Spark運算元[20]:saveAsHadoopDataset、saveAsNewAPIHadoopDataset 例項詳解
阿新 • • 發佈:2019-01-30
概要
saveAsHadoopDataset:使用舊的Hadoop API將RDD輸出到任何Hadoop支援的儲存系統,為該儲存系統使用Hadoop JobConf 物件。
JobConf設定一個OutputFormat和任何需要的輸出路徑(如要寫入的表名),就像為Hadoop MapReduce作業配置的那樣。
saveAsNewAPIHadoopDataset:
使用新的Hadoop API將RDD輸出到任何Hadoop支援的儲存系統,為該儲存系統使用Hadoop Configuration物件。
Conf設定一個OutputFormat和任何需要的輸出路徑(如要寫入的表名),就像為Hadoop MapReduce作業配置的那樣。
saveAsHadoopDataset
saveAsHadoopDataset(conf: JobConf): Unit
案例:將RDD寫入hbase
import org.apache.hadoop.hbase.HBaseConfiguration import org.apache.hadoop.hbase.client.Put import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.hbase.mapred.TableOutputFormat import org.apache.hadoop.hbase.util.Bytes import org.apache.hadoop.mapred.JobConf import org.apache.spark.{SparkContext, SparkConf} /** * User:leen * Date:2017/12/20 0020 * Time:16:51 */ object HbaseTest1 { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setAppName("HBaseTest1").setMaster("local") val sc = new SparkContext(sparkConf) val conf = HBaseConfiguration.create() //設定zooKeeper叢集地址,也可以通過將hbase-site.xml匯入classpath,但是建議在程式裡這樣設定 conf.set("hbase.zookeeper.quorum","slave1,slave2,slave3") //設定zookeeper連線埠,預設2181 conf.set("hbase.zookeeper.property.clientPort", "2181") val tablename = "account" //初始化jobconf,TableOutputFormat必須是org.apache.hadoop.hbase.mapred包下的 val jobConf = new JobConf(conf) jobConf.setOutputFormat(classOf[TableOutputFormat]) jobConf.set(TableOutputFormat.OUTPUT_TABLE, tablename) val indataRDD = sc.makeRDD(Array("1,jack,15","2,Lily,16","3,mike,16")) val rdd = indataRDD.map(_.split(',')).map{arr=>{ // 一個Put物件就是一行記錄,在構造方法中指定主鍵 // 所有插入的資料必須用org.apache.hadoop.hbase.util.Bytes.toBytes方法轉換 // Put.add方法接收三個引數:列族,列名,資料 val put = new Put(Bytes.toBytes(arr(0).toInt)) put.addColumn(Bytes.toBytes("cf"),Bytes.toBytes("name"),Bytes.toBytes(arr(1))) put.addColumn(Bytes.toBytes("cf"),Bytes.toBytes("age"),Bytes.toBytes(arr(2).toInt)) //轉化成RDD[(ImmutableBytesWritable,Put)]型別才能呼叫saveAsHadoopDataset (new ImmutableBytesWritable, put) }} rdd.saveAsHadoopDataset(jobConf) sc.stop() } }
saveAsNewAPIHadoopDataset
saveAsNewAPIHadoopDataset(conf: Configuration): Unit
案例:將RDD寫入HBASE
import org.apache.hadoop.hbase.client.Put import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.hbase.mapreduce.TableOutputFormat import org.apache.hadoop.hbase.client.Result import org.apache.hadoop.hbase.util.Bytes import org.apache.hadoop.mapreduce.Job import org.apache.spark.{SparkContext, SparkConf} /** * User:leen * Date:2017/12/20 0020 * Time:17:34 */ object HbaseTest2 { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setAppName("HBaseTest").setMaster("local") val sc = new SparkContext(sparkConf) val tablename = "account" sc.hadoopConfiguration.set("hbase.zookeeper.quorum","slave1,slave2,slave3") sc.hadoopConfiguration.set("hbase.zookeeper.property.clientPort", "2181") sc.hadoopConfiguration.set(TableOutputFormat.OUTPUT_TABLE, tablename) val job = Job.getInstance(sc.hadoopConfiguration) job.setOutputKeyClass(classOf[ImmutableBytesWritable]) job.setOutputValueClass(classOf[Result]) job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]]) val indataRDD = sc.makeRDD(Array("1,jack,15","2,Lily,16","3,mike,16")) val rdd = indataRDD.map(_.split(',')).map{arr=>{ val put = new Put(Bytes.toBytes(arr(0))) put.addColumn(Bytes.toBytes("cf"),Bytes.toBytes("name"),Bytes.toBytes(arr(1))) put.addColumn(Bytes.toBytes("cf"),Bytes.toBytes("age"),Bytes.toBytes(arr(2).toInt)) (new ImmutableBytesWritable, put) }} rdd.saveAsNewAPIHadoopDataset(job.getConfiguration()) sc.stop() } }