Caused by: java.io.NotSerializableException: org.apache.spark.unsafe.types.UTF8String$IntWrapper
菜雞一隻!如果有什麼說錯的還請大家指出批評,堅決改正!!
遇到了一個挺詭異的報錯把,反正比較無語,發現國內網站完全搜不到這個報錯的解決方法,因此在這裡記錄下!!
1、環境:
這是一個spark的Task not serializable問題,因此只需要關注spark的版本就好了,我的版本是spark2.2.0的版本!
2、具體報錯:
Exception in thread "main" org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298) at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108) at org.apache.spark.SparkContext.clean(SparkContext.scala:2287) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:841) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:840) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:362) at org.apache.spark.rdd.RDD.mapPartitionsWithIndex(RDD.scala:840) at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:389) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117) at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:138) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:135) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:116) at org.apache.spark.sql.execution.exchange.ShuffleExchange.prepareShuffleDependency(ShuffleExchange.scala:88) at org.apache.spark.sql.execution.exchange.ShuffleExchange$$anonfun$doExecute$1.apply(ShuffleExchange.scala:124) at org.apache.spark.sql.execution.exchange.ShuffleExchange$$anonfun$doExecute$1.apply(ShuffleExchange.scala:115) at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52) at org.apache.spark.sql.execution.exchange.ShuffleExchange.doExecute(ShuffleExchange.scala:115) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117) at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:138) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:135) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:116) at org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:252) at org.apache.spark.sql.execution.aggregate.HashAggregateExec.inputRDDs(HashAggregateExec.scala:141) at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:386) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117) at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:138) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:135) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:116) at org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:228) at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:311) at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38) at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:2853) at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2153) at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2153) at org.apache.spark.sql.Dataset$$anonfun$55.apply(Dataset.scala:2837) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65) at org.apache.spark.sql.Dataset.withAction(Dataset.scala:2836) at org.apache.spark.sql.Dataset.head(Dataset.scala:2153) at org.apache.spark.sql.Dataset.take(Dataset.scala:2366) at org.apache.spark.sql.Dataset.showString(Dataset.scala:245) at org.apache.spark.sql.Dataset.show(Dataset.scala:644) at org.apache.spark.sql.Dataset.show(Dataset.scala:603) at org.apache.spark.sql.Dataset.show(Dataset.scala:612) at com.test.XXX.getXXXX(App.java:473) at com.test.BaseTest.main(BaseTest.java:66) Caused by: java.io.NotSerializableException: org.apache.spark.unsafe.types.UTF8String$IntWrapper Serialization stack:
其實報錯挺長的,後面還有,不過看報錯還是要提取關鍵字的:其實就是這個類,或者這個物件無法序列化!!!org.apache.spark.unsafe.types.UTF8String$IntWrapper
3、產生的原因(可看圖):
這裡是使用spark DSL語法,寫的sql,將A表和B表做join的同時,還用到了case when語法,並在case when內嵌套了sql的視窗函式(sql有一點點複雜,但是還好),如果大家不太懂怎麼寫,可以參考我的圖,不過這裡馬賽克打得亂七八糟的,希望各位諒解,畢竟這種東西,你懂的。。。當我要show t1和t2欄位的時候就出報錯了(如果不列印那兩個欄位,不會報錯)!
相對應的sql:
大家可以結合DSL程式碼和sql一起看,應該比較能看得懂我在寫什麼鬼東西(實在抱歉,欄位都做了馬賽克)SELECT 查詢欄位5,查詢欄位6,查詢欄位7,查詢欄位8, FROM ( SELECT 查詢欄位1, 查詢欄位2, 查詢欄位3, 查詢欄位4, case when (MAX(XXX) OVER w ) =1 then 1 else 0 end AS t1, case when (MIN(XXX) OVER w ) =0 then 1 else 0 end AS t2 FROM ( select Z,ZZ,ZZZ,ZZZZ from 表A where 過濾條件 ) info INNER JOIN ( select X,XX,XXX,XXXX from 表B WHERE 過濾條件 ) hav ON info.關聯欄位=hav.關聯欄位 WINDOW w AS (PARTITION BY 欄位A,欄位B, 欄位C ORDER BY 欄位D RANGE BETWEEN 視窗時間 PRECEDING AND 視窗時間 FOLLOWING) ) db1 WHERE 過濾欄位 ;
4、解決方法:
方法一:
經過測試發現,如果直接把sql寫好,然後使用spark.sql(“selectXXXX”),這種方式是不會報錯的,真的想罵人,這種方式也是可以,但是我可能個人有點強迫症(領導讓你來寫程式碼,結果你就寫了一段sql,用這種方式執行,大家肯定會覺得你是在敷衍了事),所以不想採用這種方法,我想把這個報錯調好
方法二:
上下求索,終於讓我找到解決方法!
我去google上搜索相關的東西,找到了(id:debugger87)一位大神向spark提交的pr(pull request)!
地址:https://github.com/debugger87/spark/commit/2bd33d819137220c55e8ddf7b6df7b98945046aa
如果大家看不懂這篇pull request在說什麼,那我來說說:
這兩個靜態類是在org.apache.spark.unsafe.types.UTF8String這個類裡面的!
修改方式:
直接修改肯定是不可能的,因為這是編譯好的類!
1、下載spark原始碼,找到這個類,先把這個地方寫上implements Serializable,然後編譯原始碼,執行的時候用自己的原始碼的環境來執行(比較麻煩,因為你如果不會編譯的話。。。但是本人也有寫一篇如何編譯的文章,大家可以參考下,親測成功)
編譯部落格:https://blog.csdn.net/lsr40/article/details/80116235
2、直接把需要修改的原始碼類,複製出來放到自己的專案中,這樣執行的時候,自己寫的類就會覆蓋原始碼中的類,就實現了修改,如圖:
如果這樣還看不明白,那留言給我號了,比如什麼複製程式碼出來,怎麼建立一模一樣的package這一類的問題等。。。
這裡留一篇文章,這是一般人會遇到的序列化問題的解決方法(其實這類報錯就是看到底哪個類序列化失敗,然後去實現序列化介面,或者不要在map等閉包運算元內,引用外部物件變數,你可以直接在map運算元內手動建立就好了,或者考慮廣播變數等方式),原諒我找不到原作者了,我看到這篇文章的作者也是轉載的,而且也沒有標明出處,如有冒犯原作者的地方,還請諒解,如果原作者看到,可以私聊我,我願意新增上您的id!
序列化問題:https://blog.csdn.net/javastart/article/details/51206715
剛剛提到了閉包,可能會有人問什麼是閉包!?(又是一篇被轉載沒有標明出處的文章,不好意思)
閉包:https://blog.csdn.net/wy_blog/article/details/57130702
好了,老話,菜雞一隻,如果有什麼說的不對的,還請大家批評指出!
==========================================================================
經過了實際的線上測試!發現該問題在本地跑,直接如上文修改就可以了,但是如果是在伺服器上跑!請注意看我下面的描述:
首先對於之前沒有說明白,表示歉意!
1、本人經過嘗試,在伺服器上跑,無論打成胖包還是打成瘦包,spark都不會讀取我們自己修改的UTF8String這個類,只會讀取本地的jar包(因為這個類是spark本身就有的jar包,不是外部新增的,這個jar包名字spark-unsafe_2.11-2.2.0.jar)。因此按照上文那樣的修改方式,本地生效在伺服器上不生效!
2、考慮了幾種解決方法:
3、後來,我還遇到了資料傾斜的問題,我會在下一篇文章當中提出解決的方案(其實這個方法很早就有了,大資料解決資料傾斜的思想是比較重要而且比較通用的)-1.升級或者降級伺服器上的spark版本(但是這。。。不可能說想換版本就換版本的,除非你真有那個許可權)
-2.去叢集上所有機器的spark的lib下替換spark-unsafe_2.11-2.2.0.jar(換成你改完的,但是這也不現實,工作量未免太大)
-3.當你在使用視窗的時候,使用sparksession.sql("sql")這種方式來執行!(本人就使用這種,就是直接把sql寫上去,就可以避免這個錯誤)