1. 程式人生 > >spark2.2.0:記錄一次資料傾斜的解決(擴容join)!

spark2.2.0:記錄一次資料傾斜的解決(擴容join)!

前言:

資料傾斜,一個在大資料處理中很常見的名詞,經由前人總結,現已有不少資料傾斜的解決方案(而且會發現大資料的不同框架的資料傾斜解決思想是一致的,只是實現方法不同),本文重點記錄這次遇到spark處理資料中的傾斜問題。

老話:

菜雞一隻,本人會對文中的結論負責,如果有說錯的,還請各位批評指出!

起因:

事情是這樣的:有一批資料在hive的表中(我們稱它為表A,表A中有不同網站的域名),要對這批資料進行處理。有一張非常小的表(表B,這張表中有我們想關注的域名,而且域名並不唯一,意思就是:同一個域名可能會有多次操作),我相信說到這,大家已經有點蒙了,畫個圖舉個例子!


經過:

以下是我的心路歷程:

提一句為了增加shuffle的並行,我設定了1000的並行度.config("spark.sql.shuffle.partitions","1000")

1、直接join!不做其他的任何處理

結果:效果非常美!我沒截圖,大概情況是1000個task中有900多個賊快,就剩下20--30個左右,十分的慢,然後

ExecutorLostFailure (executor 65 exited caused by one of the running tasks)
 Reason: Container marked as failed:
 container_e61_1520995248034_27835_01_000066 on host: XXX(這裡是主機名,我使用XXX代替). Exit status: 137. 
 Diagnostics: Container killed on request. Exit code is 137
Container exited with a non-zero exit code 137
Killed by external signal

還有這種:

java.lang.OutOfMemoryError: Java heap space

and:

ExecutorLostFailure (executor 147 exited caused by one of the running tasks) 
Reason: Container marked as failed: container_e61_1520995248034_26149_01_000181 on host: XXX. Exit status: 1. 
Diagnostics: Exception from container-launch.
Container id: container_e61_1520995248034_26149_01_000181
Exit code: 1
Stack trace: ExitCodeException exitCode=1: 
	at org.apache.hadoop.util.Shell.runCommand(Shell.java:601)
	at org.apache.hadoop.util.Shell.run(Shell.java:504)
	at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:786)
	at org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor.launchContainer(DefaultContainerExecutor.java:213)
	at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:302)
	at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:82)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:748)


Container exited with a non-zero exit code 1

還有:

Container exited with a non-zero exit code 1
Current usage: 8.1 GB of 10 GB physical memory used; 
9.9 GB of 10 GB virtual memory used. Killing container。
---嗯,大概類似這種,具體數字是多少我記不清了,反正就是超出yarn對於container的記憶體控制,然後被yarnkill了

就上面這個錯,我要吐槽下,網上很多人說:

<property>  
    <name>yarn.nodemanager.vmem-check-enabled</name>  
    <value>false</value>  

</property>  

關閉yarn的記憶體檢查,這。。。顯然是逃避問題,這樣就有可能導致異常的container一直吃記憶體,吃到yarn上沒有資源,如果是自己搭個玩玩的叢集,修改這個引數,還行,但是如果是伺服器,有點異想天開了。

總之就是一大堆亂七八糟的記憶體報錯,程式根本執行不完,然後掛了!

解決記憶體問題:

-1.以上的問題,幾乎都是記憶體不夠,因此我首先考慮的是記憶體!

增加申請到的executor的記憶體

--executor-memory 10G 

--conf spark.yarn.executor.memoryOverHead=2048 

然後我發現,完全不可能解決這個問題,該報錯還是報錯

-2.調整executor的記憶體分配

因為我沒有任何的cache,所以完全沒必要保留記憶體空間給cache資料,應該讓大部分記憶體都來支援shuffle操作

.config("spark.memory.storageFraction","0.2")

.config("spark.memory.fraction","0.8")


以上兩個引數的含義我就不在這裡贅言了,大家自行百度腦補


-3.總結:

在我各種設定記憶體大小和佔比之後,程式碼確實是從不能執行到能夠執行,但是事實上我發現有的分割槽才幾M的資料,有的分割槽居然有50G+(該階段總的流入資料是160G左右,1000個task)的資料流入並處理,該task記憶體中存不下,還溢寫到磁碟9G+,然而大部分task在2分鐘內都能執行完,唯獨有幾個task需要10分鐘,甚至更多,因此當資料傾斜嚴重的時候,一味的調整記憶體,最多你只能讓你的程式碼執行起來,但是效能還不如直接使用hive跑來得快。


去掉shuffle過程:

這個也很好理解,因為資料傾斜發生在shuffle階段(對應到該問題就是join階段),如果不呼叫join這個api那就直接避免了資料傾斜的問題。

-1.之前遇到過的場景是表A是域名,表B中是想要看到的域名(而且域名都是唯一的),因此我直接讀取表B,將表B做成廣播變數,然後在map表A的時候,保留我想看到的域名

//載入表B成為廣播變數
List<String> domains =  spark.table("hive庫名.hive表B").map(new MapFunction<Row, String>() {
           public String call(Row value) throws Exception {
            //value.getString(1)是唯一的字串型別的id,value.getString(0)這個是域名也是唯一的
            return value.getString(1)+"_"+value.getString(0);
        }
    },Encoders.STRING()).collectAsList();
HashMap<String,List<String>> map = new HashMap<String,List<String>>();
	//將剛剛的list做成map,key是id,value是一個list,list裡面放了id相同的域名
    for (String domain:domains ) {
        String[] str = domain.split("_");
        if(map.get(str[0])==null){
            List list = new ArrayList();
            list.add(str[1]);
            map.put(str[0],list);
        }else{
            map.get(str[0]).add(str[1]);
        }
    }
	
//這一步是將HashMap這個每個executor都要使用到的物件,做成廣播變數,減少記憶體的使用
ClassTag<HashMap<String,List<String>>> tag = (ClassTag) scala.reflect.ClassTag$.MODULE$.apply(HashMap.class);
final Broadcast<HashMap<String,List<String>>>  bsmap =  spark.sparkContext().broadcast(map,tag);
//遍歷表A,保留有用的資料
Dataset<Row> 表A =  spark.table("hive庫名.hive表A")
               .map(new MapFunction<Row, 表Adomain>() {
                    public 表Adomain call(Row value) throws Exception {
                        表Adomain A =  new 表Adomain();
						A.setXXX(value.getString(0));
                        A.setXXX(value.getString(1));
						A.setXXX(value.getString(2));
						...
                        return A;
                    }
                },Encoders.bean(表Adomain.class)).filter(new FilterFunction<表Adomain>() {
                    public boolean call(表Adomain value) throws Exception {
						//在這裡判斷該條資料的域名是否存在於廣播變量表B的map中
						//如果存在就保留該資料,否則就去掉	
                        boolean flag = false;
                        String 域名 = value.getXXX();
                        String id = value.getXXX();
                        List<String> list = bsmap.value().get(id);
                        if(list == null){
                            return flag;
                        }else{
                            for(String str:list){
                                if(域名.equals(str)){
                                    flag=true;
                                }
                        }
                        }
                        return flag;
                    }

如上程式碼,其實也實現瞭如下這樣的sql,並且沒有觸發shuffle階段,親測效果very gooooood!

select 欄位X,欄位XX... 
from 表A inner join 表B 
on 表A.id=表B.id and 表A.域名=表B.域名;

缺點:這種方法在我現在的場景中用不了,如圖1所示,我的表B域名並不唯一,有可能導致表A的一條資料變成多條,所以並不是簡單的過濾(我覺得就是一定要join,把資料笛卡爾積然後再通過on欄位去掉不符合的資料,這樣才能增加表A的資料)!所以這種方法雖然好用,但是在當前的場景下用不了

資料雜湊:

-1.其實就是相同域名的資料分到了同一個task中,然後有部分的域名資料量大的變態


-2.處理方式(總體來說就是讓相同域名的資料到不同的task上處理):

第一種:如果資料傾斜的域名較少,可以接著把這些域名filter出來,然後將正常的資料join,將異常的資料的域名加上一個隨機值,例如:將表A的域名A---->域名A_1,域名A_2,將表B的資料擴容(就是一條資料擴容成10條甚至100條,確保表A的域名能和B表join)這樣這兩個域名就會在不同的task中聚合,然後聚合完在去掉拼接上的隨機值即可

第二種:如果不確定資料傾斜的域名是哪些,可以先抽樣count,然後order by count再top5或者top10等,這樣就知道異常的是哪些了(但是這樣的程式碼寫起來還是比較麻煩的。。。)

第三種:比較暴力的方式,不管哪些正常哪些異常,直接表A的域名直接拼接上隨機值,然後表B資料擴容(注意:是擴容,就是比如原來有10條資料,擴容10倍,就要變成100條資料),然後join,隨機值可以先從10開始嘗試慢慢增大(因為我表B的資料非常小,如果我擴容100倍,30K資料最多也就幾M,而且我傾斜比較嚴重,所以我採用100倍)

上面的處理方式,思想上是一樣的,只是處理的步驟有細微的區別使用哪種方法具體還要測試效能,但是第三種是比較簡單通用的方法(程式碼量比較少,效果也差不多,通用,我就採用這種方式)

畫圖演示下:


這樣就可以確保表A的每一條被打上隨機值的資料都能在表B中找到對應的域名join上,就不會資料丟失了。

程式碼如下:

 //把這張表的資料擴容10倍,等待接下來的join
Dataset<表Bdomain> tb2 = spark.sql("我是一條查詢表B具體資料的sql")
//使用map打隨機值,使用flatmap才能擴容,所以這裡用flatmap
.flatMap(new FlatMapFunction<Row, 表Bdomain>() {
    public Iterator<表Bdomain> call(Row row) throws Exception {
        List<表Bdomain> list = new ArrayList<表Bdomain>();
		//獲得域名等引數
        String 域名 = row.getString(0);
		//...等引數
        for (int i =1;i<=100;i++){
            表Bdomain dominfo =  new 表Bdomain(dm1+"_"+i,引數1,引數2);
            list.add(dominfo);
        }
        return list.iterator();
    }
},Encoders.bean(表Bdomain.class));

Dataset<表Adomain> tb1 = spark.sql("我是一條查詢表A的sql")
.map(new MapFunction<Row, 表Adomain>() {
     public 表Adomain call(Row value) throws Exception {
         Random r = new Random();
		//隨機出1--100的數字
         int num = r.nextInt(100)+1;
         String 域名 = value.getString(0)+"_"+num;
		//...設定相關引數
         表Adomain esdmb = new 表Adomain(域名,引數1,引數2,引數3);
         return esdmb;
     }
 },Encoders.bean(表Adomain.class));
 Dataset<join結果domain> tmp = tb1.join(tb2,tb1.col("域名").equalTo(tb2.col("域名")))
		//這裡多做一次select,規範字段順序,方便後面查詢
         .select("這裡是欄位1","欄位2","欄位3",.....)
         .map(new MapFunction<Row, join結果domain>() {
             public join結果domain call(Row value) throws Exception {
                 String 引數1, = value.getString(0);
                 String 引數2 = value.getString(1);
				//去掉域名後面的隨機數
                 String 域名 = value.getString(2).split("_")[0];
				//.....
                 join結果domain sdmba = new join結果domain(引數1,引數2,引數3,...);
                 return sdmba;
             }
         },Encoders.bean(join結果domain.class));
//可以註冊成一張臨時檢視(老版本叫臨時表,其實一個意思)
tmp.createOrReplaceTempView("tmp");


結論:

最後測試結果,除錯記憶體後join過程在15分鐘左右,採用雜湊過後情況得到非常大的改善,見下圖:


只需要2.9min,因此遇到資料傾斜首先要知道是哪個階段出了問題,不能一味的盯著某一個引數調優(比如單獨調整記憶體),應該嘗試從更多的角度全域性的去思考問題。

順帶一提:

在解決此次問題中看了不少文章,在此記錄下:

1、(岑玉海)https://www.cnblogs.com/cenyuhai/

看了他的(這篇也是其他人轉載的)部落格後,深表敬佩:https://blog.csdn.net/heyc861221/article/details/80122943

2、(ICE-Martina寫的spark資料傾斜,非常的精闢和完整)https://blog.csdn.net/lw_ghy/article/details/51419877

結束語:本人菜雞一隻,有什麼錯誤的地方,希望大家可以指出批評,或者大家對於我的問題,有更好的處理方式,也可以給我留言賜教!