1. 程式人生 > >reduceByKey和groupByKey區別與用法

reduceByKey和groupByKey區別與用法

轉自:https://blog.csdn.net/zongzhiyuan/article/details/49965021

在spark中,我們知道一切的操作都是基於RDD的。在使用中,RDD有一種非常特殊也是非常實用的format——pair RDD,即RDD的每一行是(key, value)的格式。這種格式很像Python的字典型別,便於針對key進行一些處理。

針對pair RDD這樣的特殊形式,spark中定義了許多方便的操作,今天主要介紹一下reduceByKey和groupByKey,因為在接下來講解《在spark中如何實現SQL中的group_concat功能?》時會用到這兩個operations。

首先,看一看spark官網[1]是怎麼解釋的:

reduceByKey(func, numPartitions=None)

Merge the values for each key using an associative reduce function. This will also perform the merginglocally on each mapper before sending results to a reducer, similarly to a “combiner” in MapReduce. Output will be hash-partitioned with numPartitions partitions, or the default parallelism level if numPartitions is not specified.

也就是,reduceByKey用於對每個key對應的多個value進行merge操作,最重要的是它能夠在本地先進行merge操作,並且merge操作可以通過函式自定義。

groupByKey(numPartitions=None)

Group the values for each key in the RDD into a single sequence. Hash-partitions the resulting RDD with numPartitions partitions. Note: If you are grouping in order to perform an aggregation (such as a sum or average) over each key, using reduceByKey

 or aggregateByKey will provide much better performance.

也就是,groupByKey也是對每個key進行操作,但只生成一個sequence。需要特別注意“Note”中的話,它告訴我們:如果需要對sequence進行aggregation操作(注意,groupByKey本身不能自定義操作函式),那麼,選擇reduceByKey/aggregateByKey更好。這是因為groupByKey不能自定義函式,我們需要先用groupByKey生成RDD,然後才能對此RDD通過map進行自定義函式操作。

為了更好的理解上面這段話,下面我們使用兩種不同的方式去計算單詞的個數[2]:

  1. val words = Array("one""two""two""three""three""three")  
  2. val wordPairsRDD = sc.parallelize(words).map(word => (word, 1))  
  3. val wordCountsWithReduce = wordPairsRDD.reduceByKey(_ + _)  
  4. val wordCountsWithGroup = wordPairsRDD.groupByKey().map(t => (t._1, t._2.sum))  
上面得到的wordCountsWithReduce和wordCountsWithGroup是完全一樣的,但是,它們的內部運算過程是不同的。

(1)當採用reduceByKeyt時,Spark可以在每個分割槽移動資料之前將待輸出資料與一個共用的key結合。藉助下圖可以理解在reduceByKey裡究竟發生了什麼。 注意在資料對被搬移前同一機器上同樣的key是怎樣被組合的(reduceByKey中的lamdba函式)。然後lamdba函式在每個區上被再次呼叫來將所有值reduce成一個最終結果。整個過程如下:


(2)當採用groupByKey時,由於它不接收函式,spark只能先將所有的鍵值對(key-value pair)都移動,這樣的後果是叢集節點之間的開銷很大,導致傳輸延時。整個過程如下:


因此,在對大資料進行復雜計算時,reduceByKey優於groupByKey

另外,如果僅僅是group處理,那麼以下函式應該優先於 groupByKey :
  (1)、combineByKey 組合資料,但是組合之後的資料型別與輸入時值的型別不一樣。
  (2)、foldByKey合併每一個 key 的所有值,在級聯函式和“零值”中使用。

最後,對reduceByKey中的func做一些介紹:

如果是用Python寫的spark,那麼有一個庫非常實用:operator[3],其中可以用的函式包括:大小比較函式,邏輯操作函式,數學運算函式,序列操作函式等等。這些函式可以直接通過“from operator import *”進行呼叫,直接把函式名作為引數傳遞給reduceByKey即可。如下:

  1. <span style="font-size:14px;">from operator import add  
  2. rdd = sc.parallelize([("a"1), ("b"1), ("a"1)])  
  3. sorted(rdd.reduceByKey(add).collect())  
  4. [('a'2), ('b'1)]</span>  

下面是附加原始碼更加詳細的解釋

轉自:https://blog.csdn.net/ZMC921/article/details/75098903

一、首先他們都是要經過shuffle的,groupByKey在方法shuffle之間不會合並原樣進行shuffle,。reduceByKey進行shuffle之前會先做合併,這樣就減少了shuffle的io傳送,所以效率高一點。
案例:
  1. object GroupyKeyAndReduceByKeyDemo {  
  2.   def main(args: Array[String]): Unit = {  
  3.     Logger.getLogger("org").setLevel(Level.WARN)  
  4.     val config = new SparkConf().setAppName("GroupyKeyAndReduceByKeyDemo").setMaster("local")  
  5.     val sc = new SparkContext(config)  
  6.     val arr = Array("val config", "val arr")  
  7.     val socketDS = sc.parallelize(arr).flatMap(_.split(" ")).map((_, 1))  
  8.     //groupByKey 和reduceByKey 的區別:  
  9.     //他們都是要經過shuffle的,groupByKey在方法shuffle之間不會合並原樣進行shuffle,  
  10.     //reduceByKey進行shuffle之前會先做合併,這樣就減少了shuffle的io傳送,所以效率高一點  
  11.     socketDS.groupByKey().map(tuple => (tuple._1, tuple._2.sum)).foreach(x => {  
  12.       println(x._1 + " " + x._2)  
  13.     })  
  14.     println("----------------------")  
  15.     socketDS.reduceByKey(_ + _).foreach(x => {  
  16.       println(x._1 + " " + x._2)  
  17.     })  
  18.     sc.stop()  
  19.   }  
  20. }  
二 、首先groupByKey有三種

檢視原始碼groupByKey()實現了 groupByKey(defaultPartitioner(self))
  1. /** 
  2.    * Group the values for each key in the RDD into a single sequence. Hash-partitions the 
  3.    * resulting RDD with the existing partitioner/parallelism level. The ordering of elements 
  4.    * within each group is not guaranteed, and may even differ each time the resulting RDD is 
  5.    * evaluated. 
  6.    * 
  7.    * @note This operation may be very expensive. If you are grouping in order to perform an 
  8.    * aggregation (such as a sum or average) over each key, using `PairRDDFunctions.aggregateByKey` 
  9.    * or `PairRDDFunctions.reduceByKey` will provide much better performance. 
  10.    */
  11.   def groupByKey(): RDD[(K, Iterable[V])] = self.withScope {  
  12.     groupByKey(defaultPartitioner(self))  
  13.   }  

檢視原始碼 groupByKey(numPartitions: Int) 實現了 groupByKey(new HashPartitioner(numPartitions))

  1. /** 
  2.    * Group the values for each key in the RDD into a single sequence. Hash-partitions the 
  3.    * resulting RDD with into `numPartitions` partitions. The ordering of elements within 
  4.    * each group is not guaranteed, and may even differ each time the resulting RDD is evaluated. 
  5.    * 
  6.    * @note This operation may be very expensive. If you are grouping in order to perform an 
  7.    * aggregation (such as a sum or average) over each key, using `PairRDDFunctions.aggregateByKey` 
  8.    * or `PairRDDFunctions.reduceByKey` will provide much better performance. 
  9.    * 
  10.    * @note As currently implemented, groupByKey must be able to hold all the key-value pairs for any 
  11.    * key in memory. If a key has too many values, it can result in an `OutOfMemoryError`. 
  12.    */
  13.   def groupByKey(numPartitions: Int): RDD[(K, Iterable[V])] = self.withScope {  
  14.     groupByKey(new HashPartitioner(numPartitions))  
  15.   }  

其實上面兩個都是實現了groupByKey(partitioner: Partitioner)

  1. /** 
  2.    * Group the values for each key in the RDD into a single sequence. Allows controlling the 
  3.    * partitioning of the resulting key-value pair RDD by passing a Partitioner. 
  4.    * The ordering of elements within each group is not guaranteed, and may even differ 
  5.    * each time the resulting RDD is evaluated. 
  6.    * 
  7.    * @note This operation may be very expensive. If you are grouping in order to perform an 
  8.    * aggregation (such as a sum or average) over each key, using `PairRDDFunctions.aggregateByKey` 
  9.    * or `PairRDDFunctions.reduceByKey` will provide much better performance. 
  10.    * 
  11.    * @note As currently implemented, groupByKey must be able to hold all the key-value pairs for any 
  12.    * key in memory. If a key has too many values, it can result in an `OutOfMemoryError`. 
  13.    */
  14.   def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])] = self.withScope {  
  15.     // groupByKey shouldn't use map side combine because map side combine does not
  16.     // reduce the amount of data shuffled and requires all map side data be inserted
  17.     // into a hash table, leading to more objects in the old gen.
  18.     val createCombiner = (v: V) => CompactBuffer(v)  
  19.     val mergeValue = (buf: CompactBuffer[V], v: V) => buf += v  
  20.     val mergeCombiners = (c1: CompactBuffer[V], c2: CompactBuffer[V]) => c1 ++= c2  
  21.     val bufs = combineByKeyWithClassTag[CompactBuffer[V]](  
  22.       createCombiner, mergeValue, mergeCombiners, partitioner, mapSideCombine = false)  
  23.     bufs.asInstanceOf[RDD[(K, Iterable[V])]]  
  24.   }  
而groupByKey(partitioner: Partitioner)有實現了combineByKeyWithClassTag,所以可以說groupByKey其實底層都是combineByKeyWithClassTag的實現,只是實現的方式不同。
三、再檢視reduceByKey也有三種方式


  1. /** 
  2.    * Merge the values for each key using an associative and commutative reduce function. This will 
  3.    * also perform the merging locally on each mapper before sending results to a reducer, similarly 
  4.    * to a "combiner" in MapReduce. Output will be hash-partitioned with the existing partitioner/ 
  5.    * parallelism level. 
  6.    */
  7.   def reduceByKey(func: (V, V) => V): RDD[(K, V)] = self.withScope {  
  8.     reduceByKey(defaultPartitioner(self), func)  
  9.   }  
  1. /** 
  2.    * Merge the values for each key using an associative and commutative reduce function. This will 
  3.    * also perform the merging locally on each mapper before sending results to a reducer, similarly 
  4.    * to a "combiner" in MapReduce. Output will be hash-partitioned with numPartitions partitions. 
  5.    */
  6.   def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)] = self.withScope {  
  7.     reduceByKey(new HashPartitioner(numPartitions), func)  
  8.   }  
  1. /** 
  2.    * Merge the values for each key using an associative and commutative reduce function. This will 
  3.    * also perform the merging locally on each mapper before sending results to a reducer, similarly 
  4.    * to a "combiner" in MapReduce. 
  5. 相關推薦

    【Spark系列2】reduceByKeygroupByKey區別用法

    在spark中,我們知道一切的操作都是基於RDD的。在使用中,RDD有一種非常特殊也是非常實用的format——pair RDD,即RDD的每一行是(key, value)的格式。這種格式很像Python的字典型別,便於針對key進行一些處理。 針對pair RDD這樣的

    reduceByKeygroupByKey區別用法

    轉自:https://blog.csdn.net/zongzhiyuan/article/details/49965021在spark中,我們知道一切的操作都是基於RDD的。在使用中,RDD有一種非常特殊也是非常實用的format——pair RDD,即RDD的每一行是(ke

    [Spark原始碼學習] reduceByKeygroupByKey實現combineByKey的關係

    reduceByKey原始碼: def reduceByKey(self, func, numPartitions=None, partitionFunc=portable_hash): """ Merge the val

    Mybatis中的mapper.xml裡面${} #{}區別用法

    Mybatis 的Mapper.xml語句中parameterType向SQL語句傳參有兩種方式:#{}和${} #{}方式能夠很大程度防止sql注入。 $方式無法防止Sql注入。 $方式一般用於傳入資料庫物件,例如傳入表名. 一般能用#的就別用$. #{}表示一個佔

    NSMutableDictionary NSDictionary的區別用法大全

    NSDictionary 初始化新字典,新字典包含otherDic NSDictionary *dic = [NSDictionary dictionaryWithDictionary:otherDic]; 以檔案內容初始化字典 NSDictionary *dic

    List集合ListIteratorIterator迭代器區別用法

    ListIterator是Iterator的子介面,是List集合特有的迭代輸出介面,它可以實現在迭代過程中對元素的增刪改查。 在使用Iterator迭代過程中,不要使用集合操作元素,容易出現異常

    Mybatis中的 ${} #{}區別用法

    Mybatis 的Mapper.xml語句中parameterType向SQL語句傳參有兩種方式:#{}和${} 我們經常使用的是#{},一般解說是因為這種方式可以防止SQL注入,簡單的說#{}這種方

    Synchronize Lock 的區別用法

    一、synchronized和lock的用法區別  (1)synchronized(隱式鎖):在需要同步的物件中加入此控制,synchronized可以加在方法上,也可以加在特定程式碼塊中,括號中表示

    深入研究 Java Synchronize Lock 的區別用法

    在分散式開發中,鎖是執行緒控制的重要途徑。Java為此也提供了2種鎖機制,synchronized和lock。做為Java愛好者,自然少不了對比一下這2種機制,也能從中學到些分散式開發需要注意的地方。 我們先從最簡單的入手,逐步分析這2種的區別。 一、synchronized和lock的用法區別 synchr

    ReduceByKey groupByKey區別

    浪費了“黃金五年”的Java程式設計師,還有救嗎? >>>   

    export ,export default import 區別 以及用法

    彈出 但是 clas 能夠 引用 port 模塊 返回值 兩個 首先要知道export,import ,export default是什麽 ES6模塊主要有兩個功能:export和import export用於對外輸出本模塊(一個文件可以理解為一個模塊)變量的接口 imp

    PHP數據庫連接mysqlmysqli的區別用法

    close ace ase 二次 銷毀 數據庫 table name .cn 一、mysql與mysqli的概念相關: 1、mysql與mysqli都是php方面的函數集,與mysql數據庫關聯不大。 2、在php5版本之前,一般是用php的mysql函數去驅動mysq

    ARRAYLIST VECTOR LINKEDLIST 區別用法

    用法 size 插入數據 區別 插入元素 lin 需要 cto linked ArrayList 和Vector是采用數組方式存儲數據,此數組元素數大於實際存儲的數據以便增加和插入元素,都允許直接序號索引元素,但是插入數據要設計到數組元素移動等內存操作,所以索引數據快插入數

    轉 [ORACLE]詳解not innot exists的區別用法(not in的性能並不差!)

    values 我們 and ons 一點 pla 出現 開始 min 在網上搜了下關於oracle中not exists和not in性能的比較,發現沒有描述的太全面的,可能是問題太簡單了,達人們都不屑於解釋吧。於是自己花了點時間,試圖把這個問題簡單描述清楚,其實歸根結底一

    PHP中VC6、VC9、TS、NTS版本的區別用法詳解

    進行 系統資源 stc 詳解 ron 線程安全 info 啟動 win 1. VC6與VC9的區別: VC6版本是使用Visual Studio 6編譯器編譯的,如果你的PHP是用Apache來架設的,那你就選擇VC6版本。 VC9版本是使用Visual Studio 20

    json_encodejson_decode的區別用法

    php json json_encode json_decode //json_encode顧名思義json編碼,就是將數組或對象,編碼成json字符串的函數$arr['a'] = 1;$arr['b'] = 2;var_dump(json_encode($ar

    scala的==、equals、eq、ne區別用法

    根據官方API的定義: final def ==(arg0: Any): Boolean The expression x == that is equivalent to if (x eq null) that eq null else x.equals(that) final de

    js中!!!的區別用法簡介

    js中!的用法是比較靈活的,它除了做邏輯運算常常會用!做型別判斷,可以用!與上物件來求得一個布林值, 1、!可將變數轉換成boolean型別,null、undefined和空字串取反都為false,其餘都為true。 複製程式碼 1 !null=true 2 3

    PHP資料庫連線mysqlmysqli的區別用法

    一、mysql與mysqli的概念相關: 1、mysql與mysqli都是php方面的函式集,與mysql資料庫關聯不大。 2、在php5版本之前,一般是用php的mysql函式去驅動mysql資料庫的,比如mysql_query()的函式,屬於面向過程3、在p

    c++ list, vector, map, set 區別用法

    List封裝了連結串列,Vector封裝了陣列, list和vector得最主要的區別在於vector使用連續記憶體儲存的,他支援[]運算子,而list是以連結串列形式實現的,不支援[]。 Vector對於隨機訪問的速度很快,但是對於插入尤其是在頭部插入元素速度很慢,在