1. 程式人生 > >pyspark中combineByKey的兩種理解方法

pyspark中combineByKey的兩種理解方法

Spark 1.6

以前一直模模糊糊的,現在搞一下比較清楚

combineByKey(createCombiner, mergeValue, mergeCombiners, numPartitions=None, partitionFunc=<function portable_hash at 0x7f1ac7340578>)
它是一個泛型函式,主要完成聚合操作,將輸入RDD[(K,V)]轉化為結果RDD[(K,C)]輸出

在資料分析中,處理Key,Value的Pair資料是極為常見的場景,例如我們可以針對這樣的資料進行分組、聚合或者將兩個包含Pair資料的RDD根據key進行join。從函式的抽象層面看,這些操作具有共同的特徵,都是將型別為RDD[(K,V)]的資料處理為RDD[(K,C)]。這裡的V和C可以是相同型別,也可以是不同型別。
作者:LuciferTM
連結:

http://www.jianshu.com/p/f3aea4480f2b
來源:簡書
著作權歸作者所有。商業轉載請聯絡作者獲得授權,非商業轉載請註明出處。

流程圖

這裡寫圖片描述

多謝@蔣瀟億–知乎的回答,我這裡將圖用自己的話再次整理下,方便理解

來一波分析

第一種比較正統的方法,按照原理圖來一步步推倒過程

  1. 如果pair RDD的key第一次出現,那麼就用把該key下的value進行createCombiner操作,這裡第一個pair RDD輸出結果應該是這種形式('coffee',(1,1))這裡需要強調的是,這是對value進行操作的,將其中的value進行轉化。這裡的例子是(1,1)第一個1是value,第二個1是出現了一次

  2. 對於key沒重複的pair RDD才上上述同樣操作,如果碰到同樣key的了,那就轉到第二步,key不變的情況下,將上一次的(1,1)當做引數傳遞進mergeValue,效果就是說,以前的acc[0]=1,即值是1,然後現在新的coffee傳進來的值是2,即value=2,這樣,就會對同key值進行累加acc[0]+value=同key下的累加值,而acc[1]=1,即統計了該key出現的次數,acc[1]+1=coffee這個key共出現的次數

  3. 經過這兩步之後,先不考慮另一個分割槽的情況,如果只有一個分割槽,那麼現在的結果應該是這樣。第一個分割槽的結果[(‘coffee’,(3,2)),(‘panda’,(3,1))],然後第二個分割槽的結果同理(‘coffee’,(9,1)),之後再對同key下的value傳入mergeCombiner進行操作即可,方式同第二步類似,搞清楚誰是傳進去的value

復現一下程式碼


def createCombiner(value):
    return (value,1)

def mergeValue(acc,value):
    return (acc[0]+value,acc[1]+1)

def mergeCombiners(acc1,acc2):
    return (acc1[0]+acc2[0],acc1[1]+acc2[1])

data = sc.parallelize([('coffee',1),('coffee',2),('panda',3),('coffee',9)],2)

# data.collect():[('coffee', 1), ('coffee', 2), ('panda', 3), ('coffee', 9)]
result = data.combineByKey(createCombiner,mergeValue,mergeCombiners)

print result.collect()

#------------------------------------------------------
#拓展,計算key所含value的均值,方法一,使用map
print result.map(lambda x:(x[0],float(x[1][0])/x[1][1])).collect()
# 方法二,s使用mapValues
print result.mapValues(lambda x:float(x[0])/x[1]).collect()


#[('coffee', (12, 3)), ('panda', (3, 1))]
#[('coffee', 4.0), ('panda', 3.0)]
#[('coffee', 4.0), ('panda', 3.0)]

第二種方法,使用字典來模擬這個過程

字典形式重構程式碼

# 相當於spark中的兩個分割槽
part1 = [('coffee',1),('coffee',2),('panda',3)]
part2 = [('coffee',9)]

dict_res = {}
for part in [part1,part2]:
    for tup in part: 
        if tup[0] not in dict_res:
            dict_res[tup[0]]= {}  # 在該key下,將value構建dict
            dict_res[tup[0]]['sum'] = 0 
            dict_res[tup[0]]['times'] = 0

        dict_res[tup[0]]['sum'] += tup[1]  # sum疊加
        dict_res[tup[0]]['times'] +=1  # 次數累加


print dict_res

# {'coffee': {'sum': 12, 'times': 3}, 'panda': {'sum': 3, 'times': 1}}
# 其中coffee代表鍵,之後的value我又傳了個dict,裡面key=sum的value代表和,key=times的value代表前面的key如coffee出現的次數

將上面的式子再進化一次,使更像spark的寫法


def createAndMergeValue(part):
    for tup in part: 
        if tup[0] not in dict_res:
            dict_res[tup[0]]= {}
            dict_res[tup[0]]['sum'] = 0
            dict_res[tup[0]]['times'] = 0

        dict_res[tup[0]]['sum'] += tup[1]
        dict_res[tup[0]]['times'] +=1

def  mergeCombiners(partitions):
    for part in partitions:
        createAndMergeValue(part)



dict_res = {}
partitions = [[('coffee',1),('coffee',2),('panda',3)],[('coffee',9)]]  # 為了表現其分割槽的特性,這裡用了list區分分割槽部分
mergeCombiners(partitions)

print dict_res

{'coffee': {'sum': 12, 'times': 3}, 'panda': {'sum': 3, 'times': 1}}

致謝