1. 程式人生 > >Spark調優----資料本地化

Spark調優----資料本地化

Spark資料本地化-->如何達到效能調優的目的

1.Spark資料的本地化:移動計算,而不是移動資料

2.Spark中的資料本地化級別:

TaskSetManager 的 Locality Levels 分為以下五個級別:

PROCESS_LOCAL

 NODE_LOCAL

NO_PREF

   RACK_LOCAL

ANY

PROCESS_LOCAL   程序本地化:task要計算的資料在同一個Executor

https://images2015.cnblogs.com/blog/1008304/201703/1008304-20170310232213061-565676755.png

    NODE_LOCAL    節點本地化:速度比

PROCESS_LOCAL 稍慢,因為資料需要在不同程序之間傳遞或從檔案中讀取

                                        情況一:task要計算的資料是在同一個Worker的不同Executor程序中

                                        情況二:task要計算的資料是在同一個Worker的磁碟上,或在 HDFS 上,恰好有 block 在同一個節點上

    Spark計算資料來源於HDFS,那麼最好的資料本地化級別就是NODE_LOCAL

https://images2015.cnblogs.com/blog/1008304/201703/1008304-20170310232213498-2042397906.jpg

    NODE_PREF    沒有最佳位置這一說,資料從哪裡訪問都一樣快,不需要位置優先。比如說

SparkSQL讀取MySql中的資料

    RACK_LOCAL 機架本地化,資料在同一機架的不同節點上。需要通過網路傳輸資料及檔案 IO,比 NODE_LOCAL

                                         情況一:task計算的資料在Worker2Executor

                                         情況二:task計算的資料在Worker2的磁碟上

https://images2015.cnblogs.com/blog/1008304/201703/1008304-20170310232213967-209773010.png
    ANY   跨機架,資料在非同一機架的網路上,速度最慢
3.Spark中的資料本地化由誰負責?
     DAGScheduler,TaskScheduler
      val rdd1 = rdd1.cache
      rdd1.map.filter.count()
      Driver(TaskScheduler)在傳送task之前,首先應該拿到RDD1快取在哪一些節點上(node1,node2)-->這一步就是由DAGScheduler通過cacheManager物件呼叫getPreferredLocations()來拿到RDD1快取在哪些節點上,TaskScheduler根據這些節點來發送task。
      val rdd1 = sc.textFile("hdfs://...")    //rdd1中封裝了是這個檔案所對應的block的位置,getPreferredLocation()-->TaskScheduler呼叫拿到partition所對應的資料的位置
     rdd1.map.filter.count()
     Driver(TaskScheduler)在傳送task之前,首先應該拿到rdd1資料所在的位置(node1,node2)-->RDD1封裝了這個檔案所對應的block的位置,TaskScheduler通過呼叫getPreferredLocations()拿到partition所對應的資料的位置,TaskScheduler根據這些位置來發送相應的task
總的來說:
     Spark中的資料本地化由DAGScheduler和TaskScheduler共同負責。
     DAGScheduler切割Job,劃分Stage, 通過呼叫submitStage來提交一個Stage對應的tasks,submitStage會呼叫submitMissingTasks,submitMissingTasks 確定每個需要計算的 task 的preferredLocations,通過呼叫getPreferrdeLocations()得到partition 的優先位置,就是這個 partition 對應的 task 的優先位置,對於要提交到TaskScheduler的TaskSet中的每一個task,該task優先位置與其對應的partition對應的優先位置一致。
     TaskScheduler接收到了TaskSet後,TaskSchedulerImpl 會為每個 TaskSet 建立一個 TaskSetManager 物件,該物件包含taskSet 所有 tasks,並管理這些 tasks 的執行,其中就包括計算 TaskSetManager 中的 tasks 都有哪些locality levels,以便在排程和延遲排程 tasks 時發揮作用。
4.Spark中的資料本地化流程圖
        即某個 task 計算節點與其輸入資料的位置關係,下面將要挖掘Spark 的排程系統如何產生這個結果,這一過程涉及 RDD、DAGScheduler、TaskScheduler,搞懂了這一過程也就基本搞懂了 Spark 的 PreferredLocations(位置優先策略)

https://images2015.cnblogs.com/blog/1008304/201703/1008304-20170310232214311-1753946219.jpg


       第一步:PROCESS_LOCAL-->TaskScheduler首先根據資料所在的節點發送task,如果task在Worker1的Executor1中等待了3s(這個3s是spark的預設等待時間,通過spark.locality.wait來設定,可以在SparkConf()中修改),重試了5次還是無法執行,TaskScheduler會降低資料本地化的級別,從PROCESS_LOCAL降到NODE_LOCAL
     第二步:NODE_LOCAL-->TaskScheduler重新發送task到Worker1中的Executor2中執行,如果task在Worker1的Executor2中等待了3s,重試了5次還是無法執行,TaskScheduler會降低資料本地化的級別,從NODE_LOCAL降到RACK_LOCAL 
     第三步:RACK_LOCAL -->TaskScheduler重新發送task到Worker2中的Executor1中執行。
     第四步:當task分配完成之後,task會通過所在Worker的Executor中的BlockManager來獲取資料,如果BlockManager發現自己沒有資料,那麼它會呼叫getRemote()方法,通過ConnectionManager與原task所在節點的BlockManager中的ConnectionManager先建立連線,然後通過TransferService(網路傳輸元件)獲取資料,通過網路傳輸回task所在節點(這時候效能大幅下降,大量的網路IO佔用資源),計算後的結果返回給Driver。
總結:
      TaskScheduler在傳送task的時候,會根據資料所在的節點發送task,這時候的資料本地化的級別是最高的,如果這個task在這個Executor中等待了三秒,重試發射了5次還是依然無法執行,那麼TaskScheduler就會認為這個Executor的計算資源滿了,TaskScheduler會降低一級資料本地化的級別,重新發送task到其他的Executor中執行,如果還是依然無法執行,那麼繼續降低資料本地化的級別...
      現在想讓每一個task都能拿到最好的資料本地化級別,那麼調優點就是等待時間加長。注意!如果過度調大等待時間,雖然為每一個task都拿到了最好的資料本地化級別,但是我們job執行的時間也會隨之延長
        1.spark.locality.wait 3s//相當於是全域性的,下面預設以3s為準,手動設定了,以手動的為準
        2.spark.locality.wait.process
        3.spark.locality.wait.node
        4.spark.locality.wait.rack
        5.newSparkConf.set("spark.locality.wait","100")