1. 程式人生 > >Spark-Core應用詳解之高階篇

Spark-Core應用詳解之高階篇

三、RDD高階應用

1.RDD的分片數量

def makeRDD[T: ClassTag](
      seq: Seq[T],
      numSlices: Int = defaultParallelism): RDD[T] = withScope {
    parallelize(seq, numSlices)
  }

numSlices: 分片數,一個分片就是一個任務,所以defaultParallelisms是分片數也是相當於並行度。

2.RDD的函式傳遞問題

傳遞RDD函式的時候,要繼承java的Serializable介面,也就是序列化。

序列化

:將物件的狀態資訊轉換為可以儲存或傳輸的形式的過程。因為我們宣告一個物件的時候是以位元組碼,位元組陣列的形式呈現給jvm實現跨平臺操作。
而產生序列化的原因是因為需要分散式讀取,在兩臺節點相互合作的時候,就需要把命令轉換成為二進位制碼,令另一臺機器的jvm接收轉換為可以使用的物件,這也就是反序列化。

所以在我們打成jar包的時候就會產生序列化問題,當我們在spark上執行jar的時候,需要把它傳給很多worker,也就是我們要運用Serializable的原因。

而這個過程也就叫做RDD的傳遞操作。

import org.apache.spark.rdd

class SearchFunctions
(val query: String) extends java.io.Serializable{ def isMatch(s: String): Boolean = { s.contains(query) } def getMatchesFunctionReference(rdd: org.apache.spark.rdd.RDD[String]): org.apache.spark.rdd.RDD[String] = { // 問題:"isMatch"表示"this.isMatch",因此我們要傳遞整個"this" rdd.filter(isMatch)
} def getMatchesFieldReference(rdd: org.apache.spark.rdd.RDD[String]): org.apache.spark.rdd.RDD[String] = { // 問題:"query"表示"this.query",因此我們要傳遞整個"this" rdd.filter(x => x.contains(query)) } def getMatchesNoReference(rdd: org.apache.spark.rdd.RDD[String]): org.apache.spark.rdd.RDD[String] = { // 安全:只把我們需要的欄位拿出來放入區域性變數中 val query_ = this.query rdd.filter(x => x.contains(query_)) } }

解析一下這個SearchFunction類,首先傳入一個String型別的s,去匹配是否含有query,返回一個boolean值。
getMatchesFunctionReference方法主要用來過濾傳入進來的rdd,因為他在filter內寫進了isMatch方法做過濾的詳細指標。而isMatch又可以表示為this.isMatch,因為rdd.filter這個操作是要分散式執行到很多機器上去,所以這也就是我們要是用序列化操作的原因。
getMatchesFieldReference方法的query也可以表示為this.query,像上所述的一樣,也是要分散式執行到很多臺機器上去。
但是有一個方法,可以不採用序列化操作,那就是產生一個新的引數被this.所賦值,就可以不用序列化了。

小結:

<1>如果RDD的轉換操作中使用到了class中的方法或者變數,那麼該class需要支援例項化。
<2>如果通過區域性變數的方式將class中的變數賦值為區域性變數,那麼就不需要傳遞物件。

3.RDD的執行方式

(1)RDD的依賴關係

窄依賴:父類的RDD的Partition最多被子RDD的一個Partition使用。
在這裡插入圖片描述
寬依賴:指的是多個子RDD的Partition會依賴同一個父RDD的Partition會引起shuffle。
只要是xxbyKey基本都是存在shuffle過程的,因為存在混洗。
在這裡插入圖片描述

(2)DAG 有向無環圖

當程序互相矛盾,資源排程出現先後順序問題的時候,需要使用oozie進行資源排程。
在這裡插入圖片描述

(3)RDD的任務劃分

在這裡插入圖片描述
Application:一個執行的jar就是一個應用。
Job:一個Action操作就是一個Job,也就是Hadoop的MR
Stage:按照看窄依賴劃分,下面會詳講。
Task:一個程序就是一個Task。

Stage

在這裡插入圖片描述
以wordCount核心演算法為例:

  val file = sc.textFile("hdfs://Master:8020/person.txt")
  val words = file.flatMap(_.split(" "))
  val word2count = words.map((_,1))
  val result = word2count.reduceByKey(_+_)
  result.saveAsTextFile("path")

執行的時候是從上往下執行的,但是劃分stage的時候,是從下往上去劃分,如圖
因為最後的saveAsTextFile是一個Action操作,所以被劃分在最外面,也就是藍色的背景部分。
往上倒,之後是reduceByKey,也就是一個寬依賴,混洗操作,所以劃分在Stage2。
在往上看,上面的textFile,flatMap,map操作都是一個窄依賴,所以可以被共同劃分在Stage1。

Stage的結構就像棧結構一樣,先進後出,stage2先被壓入棧底,然後再壓stage1。

4.RDD的持久化

  def persist(newLevel: StorageLevel): this.type = {
    if (isLocallyCheckpointed) {
      // This means the user previously called localCheckpoint(), which should have already
      // marked this RDD for persisting. Here we should override the old storage level with
      // one that is explicitly requested by the user (after adapting it to use disk).
      persist(LocalRDDCheckpointData.transformStorageLevel(newLevel), allowOverride = true)
    } else {
      persist(newLevel, allowOverride = false)
    }
  }

  /**
   * Persist this RDD with the default storage level (`MEMORY_ONLY`).
   */
  def persist(): this.type = persist(StorageLevel.MEMORY_ONLY)

  /**
   * Persist this RDD with the default storage level (`MEMORY_ONLY`).
   */
  def cache(): this.type = persist()

RDD的持久化也就是RDD的快取操作,其中,RDD有兩個快取運算元,一個是cache,一個是persist,這兩個的關係就像makeRDD和 parallelize一樣,可以直接呼叫cache,這樣預設的persist引數為void,直接將StorageLevel的儲存級別設定為記憶體儲存(最好的一種儲存),而呼叫persist,填了引數的話,如圖介紹:

object StorageLevel {
  val NONE = new StorageLevel(false, false, false, false)
  val DISK_ONLY = new StorageLevel(true, false, false, false)
  val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)
  val MEMORY_ONLY = new StorageLevel(false, true, false, true)
  val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
  val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)
  val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)
  val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)
  val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
  val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)
  val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
  val OFF_HEAP = new StorageLevel(true, true, true, false, 1)

2的意思也就是儲存兩份的意思
在這裡插入圖片描述

scala> val rdd = sc.makeRDD(1 to 10)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at makeRDD at <console>:24

scala> val nocache = rdd.map(_.toString+"["+System.currentTimeMillis+"]")
nocache: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[1] at map at <console>:26

scala> val cache = rdd.map(_.toString+"["+System.currentTimeMillis+"]")
cache: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[2] at map at <console>:26

scala> cache.cache
res1: cache.type = MapPartitionsRDD[2] at map at <console>:26

scala> nocache.collect
res2: Array[String] = Array(1[1546694698266], 2[1546694698266], 3[1546694698266], 4[1546694698266], 5[1546694698266], 6[1546694698273], 7[1546694698273], 8[1546694698273], 9[1546694698273], 10[1546694698273])

scala> nocache.collect
res3: Array[String] = Array(1[1546694699746], 2[1546694699746], 3[1546694699746], 4[1546694699746], 5[1546694699746], 6[1546694699747], 7[1546694699747], 8[1546694699747], 9[1546694699747], 10[1546694699747])

scala> cache.collect
res4: Array[String] = Array(1[1546694705677], 2[1546694705677], 3[1546694705677], 4[1546694705677], 5[1546694705678], 6[1546694705678], 7[1546694705678], 8[1546694705678], 9[1546694705679], 10[1546694705679])

scala> cache.collect
res5: Array[String] = Array(1[1546694705677], 2[1546694705677], 3[1546694705677], 4[1546694705677], 5[1546694705678], 6[1546694705678], 7[1546694705678], 8[1546694705678], 9[1546694705679], 10[1546694705679])

從程式中,我們可以看出,使用了cache運算元進行快取的,時間不會改變,因為collect輸出的是快取的時間,是不經過計算的,而沒有經過cache進行快取的,所collect的時間是隨時都會變化的。

5.RDD的checkpoint機制

checkpoint和cache都是給RDD做快取作用的,但是他們還是有著顯著區別的,最明顯的區別就是cache把快取寫在了memory中,而checkpoint寫在了hdfs中。
我個人感覺,如果要是小專案的話,還可以,但是要是大專案的話,會導致記憶體超載,如果使用cache進行快取,當某個節點的executor宕機,RDD就會丟失,資料也會沒,而這時候,cache一種自帶的容錯機制,也就是依賴鏈就會起作用,重新把記憶體還原繼續計算,這倒是也可以,但是很浪費資源,浪費記憶體,相反,checkpoint一開始就把快取寫在了hdfs中,也就沒有依賴鏈一說,保證了高容錯性。在這裡插入圖片描述

scala> sc.setCheckpointDir("hdfs://linux01:8020/checkpoint")

scala> val ch1=sc.parallelize(1 to 2)
ch1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[5] at parallelize at <console>:24

scala> val ch2 = ch1.map(_.toString+"["+System.currentTimeMillis+"]")
ch2: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[6] at map at <console>:26

scala> val ch3 = ch1.map(_.toString+"["+System.currentTimeMillis+"]")
ch3: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[7] at map at <console>:26

scala> ch3.checkpoint

scala> ch2.collect
res10: Array[String] = Array(1[1546695226909], 2[1546695226909])

scala> ch2.collect
res11: Array[String] = Array(1[1546695231735], 2[1546695231736])

scala> ch3.collect
res12: Array[String] = Array(1[1546695237730], 2[1546695237728])                

scala> ch3.collect
res13: Array[String] = Array(1[1546695237805], 2[1546695237800])                

scala> ch3.collect
res14: Array[String] = Array(1[1546695237805], 2[1546695237800])

scala> ch3.collect
res15: Array[String] = Array(1[1546695237805], 2[1546695237800])

scala> 

在這裡插入圖片描述
被checkpoint的RDD第一次collect的時候我們發現時間還是變了,但是第二次就開始執行快取機制了,因為他內部有一個觸發器,並且根據hdfs的儲存目錄可知,最後快取的資料的確被存入了hdfs中。

6.鍵值對RDD資料分割槽

Spark目前可以使用HashPartition和RangePartition進行分割槽,使用者也可以自定義分割槽方法,Hash分割槽為當前的預設分割槽,Spark中分割槽器直接決定了RDD中分割槽的個數、RDD中的每條資料經過shuffle過程屬於哪個分割槽和Reduce的個數。
但是在這裡,HashPartition有一個弊端,就是會導致資料傾斜,因為Hash的本質是除留取餘法進行儲存,所以就會產生這種偶然性,導致大量偶然的資料進來之後會讓其中一個執行緒被擠爆,而其他執行緒佔用的很少。
所以,我們更傾向使用RangePartition,這種分割槽方法採用了水塘抽樣隨機演算法進行資料的儲存,可以讓資料平均的儲存到每一個分割槽中。
注意:
1.只有K-V型別的RDD才有分割槽,非K-V型別的RDD分割槽的值就是None
2.每個RDD 分割槽ID範圍:0~numPartitions-1,決定這個值是屬於哪個分割槽的。
3.當我們自己想製造一個分割槽方法的時候,只需要繼承Partitioner這個抽象類就可以了
在這裡插入圖片描述
具體程式碼:

import org.apache.spark.{Partitioner, SparkConf, SparkContext}

class CustomerPartitioner(numPartition:Int) extends Partitioner{
//  返回分割槽的總數
  override def numPartitions: Int = {
    numPartition
  }
//  根據傳入的key返回分割槽的索引
  override def getPartition(key: Any): Int = {
    key.toString.toInt % numPartition
  }
}
object CustomerPartitioner{
  def main(args: Array[String]): Unit = {
    val sparkConf=new SparkConf().setAppName("Partition").setMaster("local[*]")

    val sc = new SparkContext(sparkConf)

    val rdd = sc.makeRDD(0 to 10,1).zipWithIndex()//把下標拉到一起

    print(rdd.mapPartitionsWithIndex((index,items)=>Iterator(index+":"+items.mkString(","))).collect())

    val rdd2 = rdd.partitionBy(new CustomerPartitioner(5))

    print(rdd2.mapPartitionsWithIndex((index,items)=>Iterator(index+":"+items.mkString(","))).collect())

    sc.stop()
  }
}

7.RDD的累加器和廣播變數

(1)RDD 的累加器

Spark內部提供了一個累加器,但是隻能用於求和
使用方法:

scala> val blank = sc.textFile("./NOTICE")
notice: org.apache.spark.rdd.RDD[String] = ./NOTICE MapPartitionsRDD[40] at textFile at <console>:32

scala> val blanklines = sc.accumulator(0)
warning: there were two deprecation warnings; re-run with -deprecation for details
blanklines: org.apache.spark.Accumulator[Int] = 0

scala> val tmp = blank.flatMap(line => {
     |    if (line == "") {
     |       blanklines += 1
     |    }
     |    line.split(" ")
     | })
tmp: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[41] at flatMap at <console>:36

scala> tmp.count()
res1: Long = 3213

scala> blanklines.value
res2: Int = 171

累加器也是懶執行,所以需要Action操作觸發出來

自定義累加器

程式碼:

package Mapreduce



import org.apache.spark.util.AccumulatorV2
import org.apache.spark.{SparkConf, SparkContext}

import scala.collection.mutable

class CustomerAcc extends AccumulatorV2[String,mutable.HashMap[String,Int]] {

  private val _hash = new mutable.HashMap[String,Int]()

//  檢測是否為空
  override def isZero: Boolean = {
    _hash.isEmpty
  }
//  拷貝一個新的累加器
  override def copy(): AccumulatorV2[String,mutable.HashMap[String,Int]] = {
    val copyHash = new CustomerAcc()
//    創造一個copy的累加器,然後用synchronized方法設定同步操作
    _hash.synchronized{
      copyHash._hash++=_hash
    }
    copyHash
  }
//  重置累加器
  override def reset(): Unit = {
    _hash.clear()
  }
//  每一個分割槽中用於新增資料的方法 小Sum
  override def add(v: String) ={

    _hash.get(v) match {
      case None=>_hash+=((v,1))
      case Some(x)=>_hash+=((v,x+1))
    }

  }
//  合併每一個分割槽的輸出 總Sum
  override def merge(other: AccumulatorV2[String,mutable.HashMap[String,Int]]) = {
    other match{
      case o:AccumulatorV2[String,mutable.HashMap[String,Int]]=>{
        for ((k,v)<- o.value){
        _hash.get(k) match {
          case None=>_hash+=((k,v))
          case Some(x)=>_hash+=((k,v+x))
        }
    }
    }
    }
  }
//  輸出值
  override def value(): mutable.HashMap[String,Int] = {
    _hash
  }
}

object CustomerAcc {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setAppName("Partition1").setMaster("local[*]")

    val sc = new SparkContext(sparkConf)

    val hash = new CustomerAcc()
    sc.register(hash)

    val rdd = sc.makeRDD(Array("a","b","c","a","b","c","d"))

    rdd.foreach(hash.add(_))

    for((k,v)<-hash.value){
      println("["+k+":"+v+"]")
    }

    sc.stop()
  }
}

在這裡插入圖片描述
總結:
1.建立一個累加器的例項
2.通過sc.register()註冊一個累加器
3.通過累加器例項名.add新增資料
4.通過累加器例項名.value來獲取累加器的值
注:1.不要在轉換中訪問累加器,要在行動中訪問。
2.轉換或者行動中不能訪問累加器的值,只能.add

(2)廣播變數

1.當在定義的方法中定義了一個本地變數,想要和RDD中變數結合傳送給其他節點,那麼這個本地變數會在每一個分割槽中產生一個拷貝
2.但是在使用了廣播變數的情況下,每一個Executor中會有該變數的次copy,[大大節約在分割槽中佔有的快取]
使用方法

scala> val broadcaster = sc.broadcast(Array(1,2,3))
broadcaster: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(2)

scala> broadcaster.value
res2: Array[Int] = Array(1, 2, 3)

適用於高效分發較大的資料物件。