1. 程式人生 > >Spark 2.0介紹:從RDD API遷移到DataSet API

Spark 2.0介紹:從RDD API遷移到DataSet API

RDD遷移到DataSet

DataSet API將RDD和DataFrame兩者的優點整合起來,DataSet中的許多API模仿了RDD的API,雖然兩者的實現很不一樣。所以大多數呼叫RDD API編寫的程式可以很容易地遷移到DataSet API中,下面我將簡單地展示幾個片段來說明如何將RDD編寫的程式遷移到DataSet。

1、載入檔案

RDD

val rdd = sparkContext.textFile("src/main/resources/data.txt")

Dataset

val ds = sparkSession.read.text("src/main/resources/data.txt"
)

2、計算總數

RDD

rdd.count()

Dataset

ds.count()

3、WordCount例項

RDD

val wordsRDD = rdd.flatMap(value => value.split("\\s+"))
val wordsPair = wordsRDD.map(word => (word,1))
val wordCount = wordsPair.reduceByKey(_+_)

Dataset

import sparkSession.implicits._
val wordsDs = ds.flatMap(value
=> value.split("\\s+")) val wordsPairDs = wordsDs.groupByKey(value => value) val wordCountDs = wordsPairDs.count()

4、快取(Caching)

RDD

rdd.cache()

Dataset

ds.cache()

5、過濾(Filter)

RDD

val filteredRDD = wordsRDD.filter(value => value =="hello")

Dataset

val filteredDS = wordsDs.filter
(value => value =="hello")

6、Map Partitions

RDD

val mapPartitionsRDD = rdd.mapPartitions(iterator =>
List(iterator.count(value => true)).iterator)

Dataset

val mapPartitionsDs = ds.mapPartitions(iterator =>
List(iterator.count(value => true)).iterator)

7、reduceByKey

RDD

val reduceCountByRDD = wordsPair.reduceByKey(_+_)

Dataset

val reduceCountByDs = wordsPairDs.mapGroups((key,values) =>(key,values.length))

8、RDD和DataSet互相轉換

RDD

val dsToRDD = ds.rdd

Dataset
將RDD轉換成DataFrame需要做一些工作,比如需要指定特定的模式。下面展示如何將RDD[String]轉換成DataFrame[String]:

val rddStringToRowRDD = rdd.map(value => Row(value))
val dfschema = StructType(Array(StructField("value",StringType)))
val rddToDF = sparkSession.createDataFrame(rddStringToRowRDD,dfschema)
val rDDToDataSet = rddToDF.as[String]

9、基於Double的操作

RDD

val doubleRDD = sparkContext.makeRDD(List(1.0,5.0,8.9,9.0))
val rddSum =doubleRDD.sum()
val rddMean = doubleRDD.mean()

Dataset

val rowRDD = doubleRDD.map(value => Row.fromSeq(List(value)))
val schema = StructType(Array(StructField("value",DoubleType)))
val doubleDS = sparkSession.createDataFrame(rowRDD,schema)
import org.apache.spark.sql.functions._
doubleDS.agg(sum("value"))
doubleDS.agg(mean("value"))

10、Reduce API

RDD

val rddReduce = doubleRDD.reduce((a,b) => a +b)

Dataset

val dsReduce = doubleDS.reduce((row1,row2) =>Row(row1.getDouble(0) + row2.getDouble(0)))

上面的程式碼片段展示瞭如何將你之前使用RDD API編寫的程式轉換成DataSet API編寫的程式。雖然這裡並沒有覆蓋所有的RDD API,但是通過上面的介紹,你肯定可以將其他RDD API編寫的程式轉換成DataSet API編寫的程式。

完整程式碼

package com.iteblog.spark

import org.apache.spark.sql.types.{DoubleType, StringType, StructField, StructType}
import org.apache.spark.sql.{Row, SparkSession}

/**
  * RDD API to Dataset API
  * http://www.iteblog.com
  */
object RDDToDataSet {

  def main(args: Array[String]) {

    val sparkSession = SparkSession.builder.
      master("local")
      .appName("example")
      .getOrCreate()

    val sparkContext = sparkSession.sparkContext

    //read data from text file
    val rdd = sparkContext.textFile("src/main/resources/data.txt")
    val ds = sparkSession.read.text("src/main/resources/data.txt")

    // do count
    println("count ")
    println(rdd.count())
    println(ds.count())

    // wordcount
    println(" wordcount ")

    val wordsRDD = rdd.flatMap(value => value.split("\\s+"))
    val wordsPair = wordsRDD.map(word => (word,1))
    val wordCount = wordsPair.reduceByKey(_+_)
    println(wordCount.collect.toList)

    import sparkSession.implicits._
    val wordsDs = ds.flatMap(value => value.split("\\s+"))
    val wordsPairDs = wordsDs.groupByKey(value => value)
    val wordCountDs = wordsPairDs.count
    wordCountDs.show()

    //cache
    rdd.cache()
    ds.cache()

    //filter

    val filteredRDD = wordsRDD.filter(value => value =="hello")
    println(filteredRDD.collect().toList)

    val filteredDS = wordsDs.filter(value => value =="hello")
    filteredDS.show()


    //map partitions

    val mapPartitionsRDD = rdd.mapPartitions(iterator => 
    List(iterator.count(value => true)).iterator)
    println(s" the count each partition is ${mapPartitionsRDD.collect().toList}")

    val mapPartitionsDs = ds.mapPartitions(iterator => 
    List(iterator.count(value => true)).iterator)
    mapPartitionsDs.show()

    //converting to each other
    val dsToRDD = ds.rdd
    println(dsToRDD.collect())

    val rddStringToRowRDD = rdd.map(value => Row(value))
    val dfschema = StructType(Array(StructField("value",StringType)))
    val rddToDF = sparkSession.createDataFrame(rddStringToRowRDD,dfschema)
    val rDDToDataSet = rddToDF.as[String]
    rDDToDataSet.show()

    // double based operation

    val doubleRDD = sparkContext.makeRDD(List(1.0,5.0,8.9,9.0))
    val rddSum =doubleRDD.sum()
    val rddMean = doubleRDD.mean()

    println(s"sum is $rddSum")
    println(s"mean is $rddMean")

    val rowRDD = doubleRDD.map(value => Row.fromSeq(List(value)))
    val schema = StructType(Array(StructField("value",DoubleType)))
    val doubleDS = sparkSession.createDataFrame(rowRDD,schema)

    import org.apache.spark.sql.functions._
    doubleDS.agg(sum("value")).show()
    doubleDS.agg(mean("value")).show()

    //reduceByKey API
    val reduceCountByRDD = wordsPair.reduceByKey(_+_)
    val reduceCountByDs = wordsPairDs.mapGroups((key,values) =>(key,values.length))

    println(reduceCountByRDD.collect().toList)
    println(reduceCountByDs.collect().toList)

    //reduce function
    val rddReduce = doubleRDD.reduce((a,b) => a +b)
    val dsReduce = doubleDS.reduce((row1,row2) =>
    Row(row1.getDouble(0) + row2.getDouble(0)))

    println("rdd reduce is " +rddReduce +" dataset reduce "+dsReduce)

  }

}