1. 程式人生 > >零基礎入門大資料之spark中的幾種key-value操作

零基礎入門大資料之spark中的幾種key-value操作

今天記錄一下spark裡面的一些key-value對的相關運算元。

key-value對可以簡單理解為是一種認為構造的資料結構方式,比如一個字串"hello",單看"hello"的話,它是一個字串型別,現在假設我想把它在一個文字中出現的次數n作為一個值和"hello"一起操作,那麼可以構造一種鍵值對(key-value)的結構來表示,也就是(hello,n)這樣的結構,那麼可能會問為什麼要這麼構造,答案是操作方便,便於計算。這種結構上和map基本是一個意思。

那麼spark裡面對key-value對操作的相關運算元有哪些呢?列舉幾個重要的:

  • reduceByKey
  • groupByKey
  • combineByKey
  • sortByKey
reduceByKey

首當其衝也是用的比較多的就是reduceByKey。reduce我們知道就是map-reduce的reduce,聚合操作,那麼reduceByKey就是按照key進行聚合操作,這個時候就是key不變,對具有相同key的所有key-value對中的value進行操作。這麼一看,reduceByKey的操作過程就是:

(1)按key進行分組,把所有具有相同key的資料對劃分為不同的大組;
(2)對每個大組,對該組中的所有元素的value進行某個函式處理;
(3)reduceByKey的處理輸出也是一個key-value對,注意只是一個key,而不是一系列相同的key。至於value是什麼,value是一個函式輸出值:function(value1, value2, value3,…)

reduceByKey的經典例子就是統計文字中不同單詞的個數,這個例子我在前面用非spark的reduceByKey方法舉例過,那裡比較複雜,這裡用reduceByKey就簡單多了。簡單點,假設我的test.txt文字如下:

a b c d s a s d f v s a c b w a d f v s a v s c d

命令列下的程式碼如下:

scala>     val data = "a b c d s a s d f v s a c b w a d f v s a v s c d"
scala>     val s0 = data.split(" ")
scala>     val s1 = sc.parallelize(s0)
scala>     val s2 = s1.map(x => (x,1))
scala>     val s3 = s2.reduceByKey((x,y) => x+y)
scala>     s3.collect.foreach(println)

(a,5)
(b,2)
(s,5)
(c,3)
(d,4)
(f,2)
(v,3)
(w,1)

解釋一下,首先我們需要構造key-value對的資料,也就是s2,可以看到,就是簡單將每一個元素當key,value都是1,表示數量1。然後就是reduceByKey操作了,注意的是reduceByKey因為是聚合操作,操作的是兩個key-value對生成一個key-value對,然後再重複這個過程,直到對相同的key只生成一個value,所以可以看到,輸入是兩個,輸出是x+y,其實x+y就是操作函式,表示對value進行相加,這樣才能達到統計數量的目的,當然這個函式是靈活的,不止於此,比如我把相同key的所有value串起來,如下:

scala>     val data = "a b c d s a s d f v s a c b w a d f v s a v s c d"
scala>     val s0 = data.split(" ")
scala>     val s1 = sc.parallelize(s0)
scala>     val s2 = s1.map(x => (x,"1"))
scala>     val s3 = s2.reduceByKey((x,y) => x+":"+y)
scala>     s3.collect.foreach(println)

(a,1:1:1:1:1)
(b,1:1)
(s,1:1:1:1:1)
(c,1:1:1)
(d,1:1:1:1)
(f,1:1)
(v,1:1:1)
(w,1)

可以看到,reduceByKey裡面對key執行操作的函式是非常靈活的,總結就是對相同key進行任意的可聚合的操作。

groupByKey

groupByKey其實和reduceByKey非常像,功能都是一樣的,效能甚至沒有reduceByKey好。看官網解釋: groupByKey是對每個key進行合併操作,但只生成一個sequence,groupByKey本身不能自定義操作函式。所以該函式比較簡單了,就是一個簡單的按照key進行搬運的操作,本省都不能自定義處理函式,所以用起來也無需輸入什麼,直接.groupByKey()生成一個key相同的序列。

注意:當採用groupByKey時,由於它不接收函式,spark只能先將所有的鍵值對都移動,這樣的後果是叢集節點之間的開銷很大,導致傳輸延時。

combineByKey

根據字面意思可以知道,combineByKey就是按照key進行聯合起來。乍一看和reduceByKey有什麼區別?其實combineByKey是更底層的聯合方式,很多函式包括reduceByKey都是由combineByKey實現的。

找了一下官網對combineByKey的定義:

def combineByKey[C](
      createCombiner: V => C,
      mergeValue: (C, V) => C,
      mergeCombiners: (C, C) => C,
      partitioner: Partitioner,
      mapSideCombine: Boolean = true,
      serializer: Serializer = null)

關鍵點來了,combineByKey裡面包括三個變換函式,不同情況下會出發不同的函式,這點需要好好理解。解釋就是:

(1)createCombiner: V => C ,這個函式是初始化階段,也就是資料輸入的原始value格式,想初始化成某種形式都可以,比如給值加個東西,a => (a,1)。

(2)mergeValue: (C, V) => C,該函式把新的元素V合併到之前的元素C(createCombiner)上 (這個操作在每個分割槽內進行),也就是第一步的結果上。

(3)mergeCombiners: (C, C) => C,該函式把2個元素C合併 (這個操作在不同分割槽間進行)。

不理解不要緊,看個求平均數的例子:

scala> val initialScores = Array(("zhangsan", 88.0), ("zhangsan", 95.0), ("lisi", 91.0), ("lisi", 93.0), ("zhangsan", 95.0), ("lisi", 98.0))
scala> val d1 = sc.parallelize(initialScores)
scala> type MVType = (Int, Double) //定義一個元組型別(科目計數器,分數)
scala> val d2 = d1.combineByKey(
  score => (1, score),
  (c1: MVType, newScore) => (c1._1 + 1, c1._2 + newScore),
  (c1: MVType, c2: MVType) => (c1._1 + c2._1, c1._2 + c2._2)
).map { case (name, (num, socre)) => (name, socre / num) }.collect

scala> d2.foreach(println)

(lisi,94.0)
(zhangsan,92.66666666666667)

解釋一下,輸入的原始資料是(姓名,成績)的key-value對,那麼對這個資料進行combineByKey的一系列操作後,首先,這個函式會對key相同的進行分組。那麼函式裡面處理的就只是value了。如果是原始資料的value,可以看到,它會執行第一個操作,也就是score => (1, score),因為只有這個操作可以符合輸入輸出。那麼原始資料處理過的第一步先變成了 (1, score),我們知道,聚合還沒有完成,假設一個經過第一步的資料和原始資料相遇進行聚合了怎麼辦?這個時候可以看到,資料輸入符合第二步的格式,從而會執行第二個操作,也將數目加1,分數相加。那麼也可能碰到兩個都進行第一步操作的兩個數進行聚合,這個時候可以看到,符合步驟三的輸入格式,執行步驟三。有人說,存不存在兩個步驟二出來的資料進行聚合呢?當然存在,但是我們看一下,兩個步驟2出來的資料格式是什麼?很顯然,就是步驟1出來的資料格式吧,所以兩個步驟2的資料聚合也是進行步驟三的操作。最最後map一下求一下平均即可得到平均數。

總結一下可以看到,combineByKey更復雜,複雜的東西當然也就更靈活了。

sortByKey

最後再說一個重要的按key進行排序的函式。但凡對key-value對處理完又需要排序的,基本都會碰到這個函式。而且靈活運用這個函式可是實現很多功能。比如我以前就用到了將key-value對進行處理,然後調換key-value的順序變成(value,key)組成新的key-value,然後排序,再調換位置處理,再排序,實現複雜的功能。

這裡簡化一下,假設我們像對reduceByKey裡面那個例子進行輸出字母統計進行降序排列輸出,怎麼做呢?

scala>    al data = "a b c d s a s d f v s a c b w a d f v s a v s c d"
    val s0 = data.split(" ")
    val s1 = sc.parallelize(s0)
    val s2 = s1.map(x => (x,1))
    val s3 = s2.reduceByKey((x,y) => x+y)
    s3.collect.foreach(println)
    val s4 = s3.map{case(key,value) => (value,key)}
    val s5 = s4.sortByKey(false).map{case(key,value) => (value,key)}.collect
    
scala> s5.foreach(println)
(a,5)
(s,5)
(d,4)
(c,3)
(v,3)
(b,2)
(f,2)
(w,1)

sortByKey函式不需要引數或者接受一個bool引數,指定是降序還是升序,預設true升序。這裡我們給了false,可以看到輸出變成了降序。這就是將(key,value)變成(value,key)進行排序再變成(key,value)的典型應用。


關注公號【AInewworld】,第一時間獲取精彩內容
在這裡插入圖片描述