1. 程式人生 > >Spark優化(六):使用高效能的運算元

Spark優化(六):使用高效能的運算元

除了shuffle相關的運算元有優化原則之外,其它的運算元也都有著相應的優化原則:

 

  • 使用reduceByKey/aggregateByKey替代groupByKey。

    詳情見“Spark優化(五):使用map-side預聚合的shuffle操作”。

  • 使用mapPartitions替代普通map。

    mapPartitions類的運算元,一次函式呼叫會處理一個partition所有的資料,而不是一次函式呼叫處理一條,效能相對來說會高一些。但是有的時候,使用mapPartitions會出現OOM(記憶體溢位)的問題。因為單次函式呼叫就要處理掉一個partition所有的資料,如果記憶體不夠,垃圾回收時是無法回收掉太多物件的,很可能出現OOM異常。所以使用這類操作時要慎重!

  • 使用foreachPartitions替代foreach。

    原理類似於“使用mapPartitions替代map”,也是一次函式呼叫處理一個partition的所有資料,而不是一次函式呼叫處理一條資料。在實踐中發現,foreachPartitions類的運算元,對效能的提升還是很有幫助的。比如在foreach函式中,將RDD中所有資料寫MySQL,那麼如果是普通的foreach運算元,就會一條資料一條資料地寫,每次函式呼叫可能就會建立一個數據庫連線,此時就勢必會頻繁地建立和銷燬資料庫連線,效能是非常低下;但是如果用foreachPartitions運算元一次性處理一個partition的資料,那麼對於每個partition,只要建立一個數據庫連線即可,然後執行批量插入操作,此時效能是比較高的。實踐中發現,對於1萬條左右的資料量寫MySQL,效能可以提升30%以上。

  • 使用filter之後進行coalesce操作。

    通常對一個RDD執行filter運算元過濾掉RDD中較多資料後(比如30%以上的資料),建議使用coalesce運算元,手動減少RDD的partition數量,將RDD中的資料壓縮到更少的partition中去。因為filter之後,RDD的每個partition中都會有很多資料被過濾掉,此時如果照常進行後續的計算,其實每個task處理的partition中的資料量並不是很多,有一點資源浪費,而且此時處理的task越多,可能速度反而越慢。因此用coalesce減少partition數量,將RDD中的資料壓縮到更少的partition之後,只要使用更少的task即可處理完所有的partition。在某些場景下,對於效能的提升會有一定的幫助。

  • 使用repartitionAndSortWithinPartitions替代repartition與sort類操作。

    repartitionAndSortWithinPartitions是Spark官網推薦的一個運算元。官方建議,如果是需要在repartition重分割槽之後還要進行排序,就可以直接使用repartitionAndSortWithinPartitions運算元。因為該運算元可以一邊進行重分割槽的shuffle操作,一邊進行排序。shuffle與sort兩個操作同時進行,比先shuffle再sort來說,效能可能是要高的。

附:sortByKey、repartitionAndSortWithinPartitions 二者的使用

sortByKey:

原始碼:

def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.length) : RDD[(K, V)] = self.withScope{ val part = new RangePartitioner(numPartitions, self, ascending) new ShuffledRDD[K, V, V](self, part) .setKeyOrdering(if (ascending) ordering else ordering.reverse) } 

sortByKey() 將 RDD[(K, V)] 中的 records 按 key 排序,ascending = true 表示升序,false 表示降序。目前 sortByKey() 的資料依賴很簡單,先使用 shuffle 將 records 聚集在一起(放到對應的 partition 裡面),然後將 partition 內的所有 records 按 key 排序,最後得到的 MapPartitionsRDD 中的 records 就有序了。目前 sortByKey() 先使用 Array 來儲存 partition 中所有的 records,再排序。

例項:

List<Integer> data = Arrays.asList(1, 2, 4, 3, 5, 6, 7);
JavaRDD<Integer> javaRDD = javaSparkContext.parallelize(data);
final Random random = new Random(100);
JavaPairRDD<Integer,Integer> javaPairRDD = javaRDD.mapToPair(new PairFunction<Integer, Integer, Integer>() {    
    @Override      
    public Tuple2<Integer, Integer> call(Integer integer) throws Exception {        
      return new Tuple2<Integer, Integer>(integer,random.nextInt(10));    
  }
});

JavaPairRDD<Integer,Integer> sortByKeyRDD = javaPairRDD.sortByKey();
System.out.println(sortByKeyRDD.collect());

repartitionAndSortWithinPartitions

原始碼分析:

def repartitionAndSortWithinPartitions(partitioner: Partitioner): RDD[(K, V)] = self.withScope { new ShuffledRDD[K, V, V](self, partitioner).setKeyOrdering(ordering) } 

從原始碼中可以看出,該方法依據partitioner對RDD進行分割槽,並且在每個結果分割槽中按key進行排序;通過對比sortByKey發現,這種方式比先分割槽,然後在每個分割槽中進行排序效率高,這是因為它可以將排序融入到shuffle階段。

例項:

List<Integer> data = Arrays.asList(1, 2, 4, 3, 5, 6, 7);
JavaRDD<Integer> javaRDD = javaSparkContext.parallelize(data);
final Random random = new Random();JavaPairRDD<Integer,Integer> javaPairRDD = javaRDD.mapToPair(new PairFunction<Integer, Integer, Integer>() {    
    @Override    
    public Tuple2<Integer, Integer> call(Integer integer) throws Exception {        
      return new Tuple2<Integer, Integer>(integer,random.nextInt(10));    
  }
});

JavaPairRDD<Integer,Integer> RepartitionAndSortWithPartitionsRDD = javaPairRDD.repartitionAndSortWithinPartitions(new Partitioner() {    
    @Override    
    public int numPartitions() {   return 2;    }    
    @Override    
    public int getPartition(Object key) { return key.toString().hashCode() % numPartitions();    
  }
});
System.out.println(RepartitionAndSortWithPartitionsRDD.collect());