1. 程式人生 > >大資料開發-Spark-拷問靈魂的5個問題

大資料開發-Spark-拷問靈魂的5個問題

# 1.Spark計算依賴記憶體,如果目前只有10g記憶體,但是需要將500G的檔案排序並輸出,需要如何操作? ①、把磁碟上的500G資料分割為100塊(chunks),每份5GB。(注意,要留一些系統空間!) ②、順序將每份5GB資料讀入記憶體,使用quick sort演算法排序。  ③、把排序好的資料(也是5GB)存放回磁碟。  ④、迴圈100次,現在,所有的100個塊都已經各自排序了。(剩下的工作就是如何把它們合併排序!)  ⑤、從100個塊中分別讀取5G/100=0.05 G入記憶體(100input buffers)。  ⑥、執行100路合併,並將合併結果臨時儲存於5g基於記憶體的輸出緩衝區中。當緩衝區寫滿5GB時,寫入硬碟上最終檔案,並清空輸出緩衝區;當100個輸入緩衝區中任何一個處理完畢時,寫入該緩衝區所對應的塊中的下一個0.05 GB,直到全部處理完成。 # 2.countByValue和countByKey的區別 首先從原始碼角度來看: ```Scala // PairRDDFunctions.scala def countByKey(): Map[K, Long] = self.withScope { self.mapValues(_ => 1L).reduceByKey(_ + _).collect().toMap } // RDD.scala def countByValue()(implicit ord: Ordering[T] = null): Map[T, Long] = withScope { map(value => (value, null)).countByKey() } ``` `countByValue(RDD.scala)` - 作用在普通的`RDD`上 - 其實現過程呼叫了 `countByKey` `countByKey(PairRDDFunctions.scala)` - 作用在 PairRDD 上 - 對 key 進行計數 - 資料要收到Driver端,結果集大時,不適用 問題: - `countByKey `可以作用在 普通的`RDD`上嗎 - `countByValue `可以作用在 `PairRDD `上嗎 ```Scala val rdd1: RDD[Int] = sc.makeRDD(1 to 10) val rdd2: RDD[(Int, Int)] = sc.makeRDD((1 to 10).toList.zipWithIndex) val result1 = rdd1.countByValue() //可以 val result2 = rdd1.countByKey() //語法錯誤 val result3 = rdd2.countByValue() //可以 val result4 = rdd2.countByKey() //可以 ``` # 3.兩個rdd join 什麼時候有shuffle什麼時候沒有shuffle 其中join操作是考驗所有資料庫效能的一項重要指標,對於Spark來說,考驗join的效能就是Shuffle,Shuffle 需要經過磁碟和網路傳輸,Shuffle資料越少效能越好,有時候可以儘量避免程式進行Shuffle ,那麼什麼情況下有Shuffle ,什麼情況下沒有Shuffle 呢 ## 3.1 Broadcast join broadcast join 比較好理解,除了自己實現外,`Spark SQL` 已經幫我們預設來實現了,其實就是小表分發到所有`Executors`,控制引數是:`spark.sql.autoBroadcastJoinThreshold` 預設大小是10m, 即小於這個閾值即自動使用`broadcast join`. # 3.2 Bucket join 其實rdd方式和table類似,不同的是後者要寫入Bucket表,這裡主要講rdd的方式,原理就是,當兩個rdd根據相同分割槽方式,預先做好分割槽,分割槽結果是一致的,這樣就可以進行Bucket join, 另外這種join沒有預先的運算元,需要在寫程式時候自己來開發,對於表的這種join可以看一下 [位元組跳動在Spark SQL上的核心優化實踐](https://juejin.cn/post/6844903989557854216) 。可以看下下面的例子 rdd1、rdd2都是Pair RDD rdd1、rdd2的資料完全相同 一定有shuffle rdd1 => 5個分割槽 rdd2 => 6個分割槽 rdd1 => 5個分割槽 => (1, 0), (2,0), || (1, 0), (2,0), || (1, 0), (2,0), || (1, 0), (2,0),(1, 0), || (2,0),(1, 0), (2,0) rdd2 => 5個分割槽 => (1, 0), (2,0), || (1, 0), (2,0), || (1, 0), (2,0), || (1, 0), (2,0),(1, 0), || (2,0),(1, 0), (2,0) 一定沒有shuffle rdd1 => 5個分割槽 => (1,0), (1,0), (1,0), (1,0), (1,0), || (2,0), (2,0), (2,0), (2,0), (2,0), (2,0), (2,0) || 空 || 空 || 空 rdd2 => 5個分割槽 => (1,0), (1,0), (1,0), (1,0), (1,0), || (2,0), (2,0), (2,0), (2,0), (2,0), (2,0), (2,0) || 空 || 空 || 空 這樣所有`Shuffle`的運算元,如果資料提前做好了分割槽(`partitionBy`),很多情況下沒有`Shuffle`. 除上面兩種方式外,一般就是有`Shuffle`的`join`, 關於spark的join原理可以檢視:大資料開發-Spark Join原理詳解 # 4..transform 是不是一定不觸發action 有個運算元例外,那就是sortByKey,其底層有個抽樣演算法,水塘抽樣,最後需要根據抽樣的結果,進行RangePartition的,所以從job角度來說會看到兩個job,除了觸發action的本身運算元之外,記住下面的 sortByKey → 水塘抽樣→ collect # 5.廣播變數是怎麼設計的 我們都知道,廣播變數是把資料放到每個excutor上,也都知道廣播變數的資料一定是從driver開始出去的,什麼意思呢,如果廣播表放在hive表中,那麼它的儲存就是在各個block塊上,也對應多個excutor (不一樣的叫法),首先將資料拉到driver上,然後再進行廣播,廣播時候不是全部廣播,是根據excutor預先用到資料的,首先拿資料,然後通過bt協議進行傳輸,什麼是bt協議呢,就是資料在分散式點對點網路上,根據網路距離來去拉對應的資料,下載者也是上傳者,這樣就不同每個task (excutor)都從driver上來拉資料,這樣就減少了壓力,另外在spark1.幾的時候還是task級別,現在是共同的一個鎖,整個excutor上的task共享這份資料。 # 參考 [https://juejin.cn/post/6844903989557854216](https://juejin.cn/post/6844903989557854216) [https://www.jianshu.com/p/6bf887bf52b2](https://www.jianshu.com/p/6bf887bf52b2) 吳邪,小三爺,混跡於後臺,大資料,人工智慧領域的小菜鳥。 更多請關注 ![file](https://img2020.cnblogs.com/other/669466/202101/669466-20210131173926356-549562