reduceByKey和groupByKey區別與用法
轉自:https://blog.csdn.net/zongzhiyuan/article/details/49965021
在spark中,我們知道一切的操作都是基於RDD的。在使用中,RDD有一種非常特殊也是非常實用的format——pair RDD,即RDD的每一行是(key, value)的格式。這種格式很像Python的字典型別,便於針對key進行一些處理。
針對pair RDD這樣的特殊形式,spark中定義了許多方便的操作,今天主要介紹一下reduceByKey和groupByKey,因為在接下來講解《在spark中如何實現SQL中的group_concat功能?》時會用到這兩個operations。
首先,看一看spark官網[1]是怎麼解釋的:
reduceByKey(func, numPartitions=None)
Merge the values for each key using an associative reduce function. This will also perform the merginglocally on each mapper before sending results to a reducer, similarly to a “combiner” in MapReduce. Output will be hash-partitioned with numPartitions partitions, or the default parallelism level if numPartitions is not specified.
也就是,reduceByKey用於對每個key對應的多個value進行merge操作,最重要的是它能夠在本地先進行merge操作,並且merge操作可以通過函式自定義。
groupByKey(numPartitions=None)Group the values for each key in the RDD into a single sequence. Hash-partitions the resulting RDD with numPartitions partitions. Note: If you are grouping in order to perform an aggregation (such as a sum or average) over each key, using reduceByKey
為了更好的理解上面這段話,下面我們使用兩種不同的方式去計算單詞的個數[2]:
- val words = Array("one", "two", "two", "three", "three", "three")
- val wordPairsRDD = sc.parallelize(words).map(word => (word, 1))
- val wordCountsWithReduce = wordPairsRDD.reduceByKey(_ + _)
- val wordCountsWithGroup = wordPairsRDD.groupByKey().map(t => (t._1, t._2.sum))
(1)當採用reduceByKeyt時,Spark可以在每個分割槽移動資料之前將待輸出資料與一個共用的key結合。藉助下圖可以理解在reduceByKey裡究竟發生了什麼。 注意在資料對被搬移前同一機器上同樣的key是怎樣被組合的(reduceByKey中的lamdba函式)。然後lamdba函式在每個區上被再次呼叫來將所有值reduce成一個最終結果。整個過程如下:
(2)當採用groupByKey時,由於它不接收函式,spark只能先將所有的鍵值對(key-value pair)都移動,這樣的後果是叢集節點之間的開銷很大,導致傳輸延時。整個過程如下:
因此,在對大資料進行復雜計算時,reduceByKey優於groupByKey。
另外,如果僅僅是group處理,那麼以下函式應該優先於 groupByKey :
(1)、combineByKey 組合資料,但是組合之後的資料型別與輸入時值的型別不一樣。
(2)、foldByKey合併每一個 key 的所有值,在級聯函式和“零值”中使用。
最後,對reduceByKey中的func做一些介紹:
如果是用Python寫的spark,那麼有一個庫非常實用:operator[3],其中可以用的函式包括:大小比較函式,邏輯操作函式,數學運算函式,序列操作函式等等。這些函式可以直接通過“from operator import *”進行呼叫,直接把函式名作為引數傳遞給reduceByKey即可。如下:
- <span style="font-size:14px;">from operator import add
- rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
- sorted(rdd.reduceByKey(add).collect())
- [('a', 2), ('b', 1)]</span>
下面是附加原始碼更加詳細的解釋
轉自:https://blog.csdn.net/ZMC921/article/details/75098903
一、首先他們都是要經過shuffle的,groupByKey在方法shuffle之間不會合並原樣進行shuffle,。reduceByKey進行shuffle之前會先做合併,這樣就減少了shuffle的io傳送,所以效率高一點。
案例:- object GroupyKeyAndReduceByKeyDemo {
- def main(args: Array[String]): Unit = {
- Logger.getLogger("org").setLevel(Level.WARN)
- val config = new SparkConf().setAppName("GroupyKeyAndReduceByKeyDemo").setMaster("local")
- val sc = new SparkContext(config)
- val arr = Array("val config", "val arr")
- val socketDS = sc.parallelize(arr).flatMap(_.split(" ")).map((_, 1))
- //groupByKey 和reduceByKey 的區別:
- //他們都是要經過shuffle的,groupByKey在方法shuffle之間不會合並原樣進行shuffle,
- //reduceByKey進行shuffle之前會先做合併,這樣就減少了shuffle的io傳送,所以效率高一點
- socketDS.groupByKey().map(tuple => (tuple._1, tuple._2.sum)).foreach(x => {
- println(x._1 + " " + x._2)
- })
- println("----------------------")
- socketDS.reduceByKey(_ + _).foreach(x => {
- println(x._1 + " " + x._2)
- })
- sc.stop()
- }
- }
二 、首先groupByKey有三種
檢視原始碼groupByKey()實現了 groupByKey(defaultPartitioner(self))
- /**
- * Group the values for each key in the RDD into a single sequence. Hash-partitions the
- * resulting RDD with the existing partitioner/parallelism level. The ordering of elements
- * within each group is not guaranteed, and may even differ each time the resulting RDD is
- * evaluated.
- *
- * @note This operation may be very expensive. If you are grouping in order to perform an
- * aggregation (such as a sum or average) over each key, using `PairRDDFunctions.aggregateByKey`
- * or `PairRDDFunctions.reduceByKey` will provide much better performance.
- */
- def groupByKey(): RDD[(K, Iterable[V])] = self.withScope {
- groupByKey(defaultPartitioner(self))
- }
檢視原始碼 groupByKey(numPartitions: Int) 實現了 groupByKey(new HashPartitioner(numPartitions))
- /**
- * Group the values for each key in the RDD into a single sequence. Hash-partitions the
- * resulting RDD with into `numPartitions` partitions. The ordering of elements within
- * each group is not guaranteed, and may even differ each time the resulting RDD is evaluated.
- *
- * @note This operation may be very expensive. If you are grouping in order to perform an
- * aggregation (such as a sum or average) over each key, using `PairRDDFunctions.aggregateByKey`
- * or `PairRDDFunctions.reduceByKey` will provide much better performance.
- *
- * @note As currently implemented, groupByKey must be able to hold all the key-value pairs for any
- * key in memory. If a key has too many values, it can result in an `OutOfMemoryError`.
- */
- def groupByKey(numPartitions: Int): RDD[(K, Iterable[V])] = self.withScope {
- groupByKey(new HashPartitioner(numPartitions))
- }
其實上面兩個都是實現了groupByKey(partitioner: Partitioner)
- /**
- * Group the values for each key in the RDD into a single sequence. Allows controlling the
- * partitioning of the resulting key-value pair RDD by passing a Partitioner.
- * The ordering of elements within each group is not guaranteed, and may even differ
- * each time the resulting RDD is evaluated.
- *
- * @note This operation may be very expensive. If you are grouping in order to perform an
- * aggregation (such as a sum or average) over each key, using `PairRDDFunctions.aggregateByKey`
- * or `PairRDDFunctions.reduceByKey` will provide much better performance.
- *
- * @note As currently implemented, groupByKey must be able to hold all the key-value pairs for any
- * key in memory. If a key has too many values, it can result in an `OutOfMemoryError`.
- */
- def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])] = self.withScope {
- // groupByKey shouldn't use map side combine because map side combine does not
- // reduce the amount of data shuffled and requires all map side data be inserted
- // into a hash table, leading to more objects in the old gen.
- val createCombiner = (v: V) => CompactBuffer(v)
- val mergeValue = (buf: CompactBuffer[V], v: V) => buf += v
- val mergeCombiners = (c1: CompactBuffer[V], c2: CompactBuffer[V]) => c1 ++= c2
- val bufs = combineByKeyWithClassTag[CompactBuffer[V]](
- createCombiner, mergeValue, mergeCombiners, partitioner, mapSideCombine = false)
- bufs.asInstanceOf[RDD[(K, Iterable[V])]]
- }
三、再檢視reduceByKey也有三種方式
- /**
- * Merge the values for each key using an associative and commutative reduce function. This will
- * also perform the merging locally on each mapper before sending results to a reducer, similarly
- * to a "combiner" in MapReduce. Output will be hash-partitioned with the existing partitioner/
- * parallelism level.
- */
- def reduceByKey(func: (V, V) => V): RDD[(K, V)] = self.withScope {
- reduceByKey(defaultPartitioner(self), func)
- }
- /**
- * Merge the values for each key using an associative and commutative reduce function. This will
- * also perform the merging locally on each mapper before sending results to a reducer, similarly
- * to a "combiner" in MapReduce. Output will be hash-partitioned with numPartitions partitions.
- */
- def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)] = self.withScope {
- reduceByKey(new HashPartitioner(numPartitions), func)
- }
- /**
- * Merge the values for each key using an associative and commutative reduce function. This will
- * also perform the merging locally on each mapper before sending results to a reducer, similarly
- * to a "combiner" in MapReduce.
-
相關推薦
【Spark系列2】reduceByKey和groupByKey區別與用法
在spark中,我們知道一切的操作都是基於RDD的。在使用中,RDD有一種非常特殊也是非常實用的format——pair RDD,即RDD的每一行是(key, value)的格式。這種格式很像Python的字典型別,便於針對key進行一些處理。 針對pair RDD這樣的
reduceByKey和groupByKey區別與用法
轉自:https://blog.csdn.net/zongzhiyuan/article/details/49965021在spark中,我們知道一切的操作都是基於RDD的。在使用中,RDD有一種非常特殊也是非常實用的format——pair RDD,即RDD的每一行是(ke
[Spark原始碼學習] reduceByKey和groupByKey實現與combineByKey的關係
reduceByKey原始碼: def reduceByKey(self, func, numPartitions=None, partitionFunc=portable_hash): """ Merge the val
Mybatis中的mapper.xml裡面${} 和 #{}區別與用法
Mybatis 的Mapper.xml語句中parameterType向SQL語句傳參有兩種方式:#{}和${} #{}方式能夠很大程度防止sql注入。 $方式無法防止Sql注入。 $方式一般用於傳入資料庫物件,例如傳入表名. 一般能用#的就別用$. #{}表示一個佔
NSMutableDictionary 和 NSDictionary的區別與用法大全
NSDictionary 初始化新字典,新字典包含otherDic NSDictionary *dic = [NSDictionary dictionaryWithDictionary:otherDic]; 以檔案內容初始化字典 NSDictionary *dic
List集合ListIterator和Iterator迭代器區別與用法
ListIterator是Iterator的子介面,是List集合特有的迭代輸出介面,它可以實現在迭代過程中對元素的增刪改查。 在使用Iterator迭代過程中,不要使用集合操作元素,容易出現異常
Mybatis中的 ${} 和 #{}區別與用法
Mybatis 的Mapper.xml語句中parameterType向SQL語句傳參有兩種方式:#{}和${} 我們經常使用的是#{},一般解說是因為這種方式可以防止SQL注入,簡單的說#{}這種方
Synchronize 和 Lock 的區別與用法
一、synchronized和lock的用法區別 (1)synchronized(隱式鎖):在需要同步的物件中加入此控制,synchronized可以加在方法上,也可以加在特定程式碼塊中,括號中表示
深入研究 Java Synchronize 和 Lock 的區別與用法
在分散式開發中,鎖是執行緒控制的重要途徑。Java為此也提供了2種鎖機制,synchronized和lock。做為Java愛好者,自然少不了對比一下這2種機制,也能從中學到些分散式開發需要注意的地方。 我們先從最簡單的入手,逐步分析這2種的區別。 一、synchronized和lock的用法區別 synchr
ReduceByKey 和 groupByKey 的區別
浪費了“黃金五年”的Java程式設計師,還有救嗎? >>>
export ,export default 和 import 區別 以及用法
彈出 但是 clas 能夠 引用 port 模塊 返回值 兩個 首先要知道export,import ,export default是什麽 ES6模塊主要有兩個功能:export和import export用於對外輸出本模塊(一個文件可以理解為一個模塊)變量的接口 imp
PHP數據庫連接mysql與mysqli的區別與用法
close ace ase 二次 銷毀 數據庫 table name .cn 一、mysql與mysqli的概念相關: 1、mysql與mysqli都是php方面的函數集,與mysql數據庫關聯不大。 2、在php5版本之前,一般是用php的mysql函數去驅動mysq
ARRAYLIST VECTOR LINKEDLIST 區別與用法
用法 size 插入數據 區別 插入元素 lin 需要 cto linked ArrayList 和Vector是采用數組方式存儲數據,此數組元素數大於實際存儲的數據以便增加和插入元素,都允許直接序號索引元素,但是插入數據要設計到數組元素移動等內存操作,所以索引數據快插入數
轉 [ORACLE]詳解not in與not exists的區別與用法(not in的性能並不差!)
values 我們 and ons 一點 pla 出現 開始 min 在網上搜了下關於oracle中not exists和not in性能的比較,發現沒有描述的太全面的,可能是問題太簡單了,達人們都不屑於解釋吧。於是自己花了點時間,試圖把這個問題簡單描述清楚,其實歸根結底一
PHP中VC6、VC9、TS、NTS版本的區別與用法詳解
進行 系統資源 stc 詳解 ron 線程安全 info 啟動 win 1. VC6與VC9的區別: VC6版本是使用Visual Studio 6編譯器編譯的,如果你的PHP是用Apache來架設的,那你就選擇VC6版本。 VC9版本是使用Visual Studio 20
json_encode與json_decode的區別與用法
php json json_encode json_decode //json_encode顧名思義json編碼,就是將數組或對象,編碼成json字符串的函數$arr['a'] = 1;$arr['b'] = 2;var_dump(json_encode($ar
scala的==、equals、eq、ne區別與用法
根據官方API的定義: final def ==(arg0: Any): Boolean The expression x == that is equivalent to if (x eq null) that eq null else x.equals(that) final de
js中!和!!的區別及用法簡介
js中!的用法是比較靈活的,它除了做邏輯運算常常會用!做型別判斷,可以用!與上物件來求得一個布林值, 1、!可將變數轉換成boolean型別,null、undefined和空字串取反都為false,其餘都為true。 複製程式碼 1 !null=true 2 3
PHP資料庫連線mysql與mysqli的區別與用法
一、mysql與mysqli的概念相關: 1、mysql與mysqli都是php方面的函式集,與mysql資料庫關聯不大。 2、在php5版本之前,一般是用php的mysql函式去驅動mysql資料庫的,比如mysql_query()的函式,屬於面向過程3、在p
c++ list, vector, map, set 區別與用法
List封裝了連結串列,Vector封裝了陣列, list和vector得最主要的區別在於vector使用連續記憶體儲存的,他支援[]運算子,而list是以連結串列形式實現的,不支援[]。 Vector對於隨機訪問的速度很快,但是對於插入尤其是在頭部插入元素速度很慢,在