1. 程式人生 > >大資料之Spark(四)--- Dependency依賴,啟動模式,shuffle,RDD持久化,變數傳遞,共享變數,分散式計算PI的值

大資料之Spark(四)--- Dependency依賴,啟動模式,shuffle,RDD持久化,變數傳遞,共享變數,分散式計算PI的值

一、Dependency:依賴:RDD分割槽之間的依存關係
---------------------------------------------------------
    1.NarrowDependency: 子RDD的每個分割槽依賴於父RDD的少量分割槽。
         |
        / \
        ---
         |---- OneToOneDependency      //父子RDD之間的分割槽存在一對一關係。
         |---- RangeDependency         //父RDD的一個分割槽範圍和子RDD存在一對一關係。
         |---- PruneDependency         //刪減依賴--在PartitionPruningRDD和其父RDD之間的依賴,子RDD包含了父RDD的分割槽子集。。

    2.ShuffleDependency                 //混洗依賴,在shuffle階段輸出時的一種依賴。

二、SparkContext建立排程器的過程
------------------------------------------------------------------
   [SparkContext.scala:501行]
    val (sched, ts) = SparkContext.createTaskScheduler(this, master, deployMode)
    _schedulerBackend = sched
    _taskScheduler = ts
    _dagScheduler = new DAGScheduler(this)


三、Spark啟動模式
--------------------------------------------------
    1.[本地模式,通過執行緒模擬]
        本地後臺排程器
        spark local
        spark local[3]                   //3執行緒,模擬cluster叢集
        spark local[*]                   //匹配cpu個數,
        spark local[3,2]                  //3:3個執行緒,2表示每個分割槽重試1次[0和1等價,不重試]

        conf.seetMaster[3]
        conf.seetMaster[*]
        conf.seetMaster[3,2]

    2.[相當於偽分散式]
        StandaloneSchedulerBackend
        spark local-cluster[N, cores, memory]  //模擬spark叢集。

    3.[完全分散式]
        StandaloneSchedulerBackend
        spark spark://s201:7077                //連線到spark叢集上.

    4.程式碼演示
        
import org.apache.spark.{SparkConf, SparkContext}

        object Test8 {

            def main(args: Array[String]): Unit = {

                val sparkConf: SparkConf = new SparkConf()
                sparkConf.setAppName("Test-1")
                sparkConf.setMaster("local[2]")
                sparkConf.setMaster("local[*]")
                sparkConf.setMaster("local[2,3]")
                sparkConf.setMaster("local[*]")
                val sc: SparkContext = new SparkContext(sparkConf)

                val data = List(1,2,3,4,5,6,7,8,9,10);
                val rdd1 = sc.parallelize(data)
                val rdd2 = rdd1.map(
                    e => {
                        val s = Thread.currentThread().getName();
                        println(s);
                        e
                    }
                )

                val rdd3 = rdd1.repartition(4);
                val rdd4 = rdd3.map(
                    e => {
                        val s = Thread.currentThread().getName();
                        println(s);
                        e
                    }
                )
                rdd4.collect()
            }
        }


四、shuffle操作
--------------------------------------------------
    1.跨分割槽再分發資料的一種機制

    2.觸發shuffle的操作:groupByKey reduceByKey...  reparatition coalesce...  join cogroup

    3.shuffle對效能的影響
        a.shuffle涉及到磁碟或者網路io,所以成本很高
        b.shuffle為了資料重用,在磁碟上生成大量的中間檔案
        c.垃圾回收如果不及時,長時間的shuffle會佔用大量的磁碟空間


五、RDD持久化
--------------------------------------------------
    1.資料在記憶體中持久化,可以跨操作

    2.持久化RDD時,節點上的每個分割槽的資料都會儲存在記憶體中,以備在其他操作中進行重用

    3.這種快取技術是迭代式計算和互動式查詢的重要手段。

    4.可以使用persist()和cache(),來進行rdd的持久化,cache()是persist()一種,使用記憶體方式進行快取
        a.首次Action計算時,會發生persist(),會將計算的資料儲存在節點記憶體中
        b.spark的這種快取機制,是容錯的,如果rdd的任何一個分割槽丟失了,都可以通過最初建立rdd的進行重新計算出直到Action的RDD結果
        c.每個RDD都可以使用不同的儲存級別進行持久化,可以通過storelevel進行設定
            MEMORY_ONLY          //只在記憶體
            MEMORY_AND_DISK
            MEMORY_ONLY_SER       //記憶體儲存(序列化)
            MEMORY_AND_DISK_SER
            DISK_ONLY        //硬碟
            MEMORY_ONLY_2     //帶有副本
            MEMORY_AND_DISK_2  //快速容錯。
            OFF_HEAP

        d.級別選取建議
            如果想快速容錯,就帶副本:MEMORY_ONLY_2
            如果記憶體足夠,選取MEMORY_ONLY
            如果記憶體不是很足夠,選取MEMORY_ONLY_SER
            儘量不要快取到磁碟,除非記憶體非常不夠

    5.刪除持久化資料
        rdd2.unpersist(isBlocking = true)

    6.程式碼演示
        
import org.apache.spark.storage.StorageLevel
        import org.apache.spark.{SparkConf, SparkContext}

        object TestPersist {

            def main(args: Array[String]): Unit = {
                val sparkConf: SparkConf = new SparkConf()
                sparkConf.setAppName("Test-1")
                sparkConf.setMaster("local")
                val sc: SparkContext = new SparkContext(sparkConf)

                val rdd1 = sc.parallelize(1 to 20)

                val rdd2 = rdd1.map(
                    e => {
                        println(e)
                        e * 2
                    }
                )
                //更改快取地
                rdd2.persist(StorageLevel.DISK_ONLY)
                //rdd2.persist(StorageLevel.MEMORY_ONLY)
                //將rdd2的結果快取到記憶體中
                //rdd2.cache()
                println(rdd2.reduce(_ + _))
                //移除快取
                rdd2.unpersist(true)
                println(rdd2.reduce(_ + _))
            }
        }


六、變數傳遞過程中的序列化
----------------------------------------------------------
    因為是叢集計算,所以map過程的程式碼和過程中的變數,會將變數的值拷貝,然後通過網路,分發到各個節點,[區別於共享變數:每次計算都要分發的]過程中需要進行序列和反序列,所以就要求過程中的變數要能序列化
        scala> class Dog
        scala> val d = new Dog
        scala> sc.parallelize(1 to 20).map( e=> {println(d);e}).reduce(_ + _)
        //會報錯因為dog不能序列化
        //需要定義Dog的時候繼承Serializable介面
        scala> case class Dog (name:String,age:Int)
        scala> val d = Dog("tom",1)
        scala> sc.parallelize(1 to 20).map( e=> {println(d);e}).reduce(_ + _)


七、共享變數
---------------------------------------------------
    1.在所有的節點間共享一個變數。

    2.Spark通過廣播變數和累加器實現變數的共享

    3.廣播變數
        a.一個只讀的變數,在建立的時候廣播一次[也必須繼承了序列化介面],然後在每個節點進行快取。而不是跟隨任務進行網路間傳遞

        b.使用方式
            scala> val broadcastVar = sc.broadcast(Array(1,2,3))
            broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(3)

            scala> broadcastVar.value
            res7: Array[Int] = Array(1, 2, 3)

    4.累加器
        a.一個只能增加的變數,高效並行。通常用於MR的Counter和Sum。UI可見

        b.使用方式
            scala> val ac1 = sc.longAccumulator("ac1")
            ac1: org.apache.spark.util.LongAccumulator = LongAccumulator(id: 288, name: Some(ac1), value: 0)

            scala> ac1.value
            res8: Long = 0

            scala> sc.parallelize(1 to 20).map(_ * 2).map(e=>{ac1.add(1) ; e}).reduce(_+_)
            res9: Int = 420

            scala> ac1.value
            res10: Long = 20

        c.自定義累加器
            class MyAccumulatorV2 extends AccumulatorV2[V,T]{
                private val v:V = ...

                def reset():
                def add():
                def sum():
                def count():
                def value():
                ...
            }

八、Spark分散式計算PI的值
----------------------------------------------------------
    scala> sc.parallelize(1 to 999999999).map(e=>{
            val a = 1f / (2 * e - 1) ;
            val b = if (e % 2 == 0) -1 else 1 ;
            a * b * 4
        }
    ).reduce(_+_)

    res19: Float = 3.1415954