spark--當分組遇到排序的解決思路
場景
現在有如下資料格式
圖書分類,圖書名,數量
現在想統計全部分類中數量最多的書名以及數量
場景解析
如果不基於spark,我們來思考這個問題,資料量大記憶體是放不下,分類也不確定有多少類,圖書名可能有重複,還需要合併計算。這種情況只能是分治,首先分類,把檔案首先按照分類拆分成多個檔案,每個檔案中的資料都是圖書名數量,然後根據圖書名對數量進行合併,最後進行排序。
spark思維轉化
上面的思路單獨寫這個程式沒問題,但是如果基於spark就有點問題了,首先是分割槽的事情,想把資料準確落在不同的分割槽,且不重複,必須要先知道到底有多少分割槽。所以首先要統計分類種類,幫助以後分割槽。
分割槽器
//data是已經讀取進來的圖書分類的集合data.distinct().collect()
有了資料就要應用分割槽器
classMyPartionerextendsPartitioner{
private Map<String, Integer> part = new HashMap<>();
publicMyPartioner(List<String> data){
int count = 0;
for (String s : data) {
part.put(s, count++);
}
}
@Override publicintnumPartitions(){
return part.size();
}
@Override publicintgetPartition(Object o){
Keys info = (Keys) o;
return part.get(((Keys) o).type);
}
}
直接根據已經生成好的資料來進行分割槽。保證1個分類1個分割槽,這樣就可以以後的部分就只關注排序即可。
資料合併
分割槽,分割槽器都準備好了,按照以前的思路,是不是應該把資料分散在不同的分割槽了。想法挺好,但是在分散式儲存中,資料移動的成本很高,所以都是先對本地資料進行處理合並,減小資料量然後才進行資料的shuffle等分割槽操作,所以這裡我們要做的其實是合併同類資料。
textFile.mapToPair(lines ->newTuple2(name,count)) .reduceByKey((x, y)->x + y);
這裡是一個典型的單詞計數的案例。
接下來就是想著分割槽,然後排序,如果你查查api的話,你會發現並沒用按照value排序的運算元。如果要排序的話,一定是key。這裡發生了一個衝突點,就是你是按照type分割槽,次數的type就是key,接下來排序,其實就是按照type來排。 發現了我們要依賴key完成兩件事,一個是分割槽,一個是排序。分割槽靠type,排序靠count。這裡的解決方案就是用物件。計數之後,得到的結果會是一個<bookname,totalcount>的tuple。這個明顯無法繼續下去了,你連分割槽的條件都沒了。 bookname和type是一一對應的,所以這裡合併統計的是bookname+type的結構體。這樣就滿足了分割槽的條件了。 為了把排序的因子給加上,我們做個map操作,把type和count組織成一個物件。
classKeysimplementsSerializable{ String type; Integer count;publicKeys(String type, Integer count){this.type = type;this.count = count; }}
這樣就給了我們很大的空間,在分割槽器裡,取出key來進行操作,在排序的時候,寫一個比較器,按照count來進行排序。
.repartitionAndSortWithinPartitions(newMyPartioner(collect),newKeyCompare());
直接使用分割槽並且排序的運算元幫我解決這個問題。
小結
在大資料環境下,資料的shuffle操作的代價很大,所以優先考慮合併資料,然後再進行分割槽等等。spark的運算元大部分都是對key進行生效的,例如排序等等,對value的操作大部分是合併和迭代,並沒有單獨的排序出來。所以要合理利用java物件來組合key值,完成功能。
大家喜歡多多關注,你的關注是我最大的動力。
大家喜歡的可以關注我的微信公號號:首席資料師 你的關注是我最大的動力