1. 程式人生 > >Caused by: java.io.NotSerializableException: org.apache.spark.unsafe.types.UTF8String$IntWrapper

Caused by: java.io.NotSerializableException: org.apache.spark.unsafe.types.UTF8String$IntWrapper

菜雞一隻!如果有什麼說錯的還請大家指出批評,堅決改正!!

遇到了一個挺詭異的報錯把,反正比較無語,發現國內網站完全搜不到這個報錯的解決方法,因此在這裡記錄下!!

1、環境:

這是一個spark的Task not serializable問題,因此只需要關注spark的版本就好了,我的版本是spark2.2.0的版本!

2、具體報錯:

Exception in thread "main" org.apache.spark.SparkException: Task not serializable
	at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)
	at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
	at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
	at org.apache.spark.SparkContext.clean(SparkContext.scala:2287)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:841)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:840)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
	at org.apache.spark.rdd.RDD.mapPartitionsWithIndex(RDD.scala:840)
	at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:389)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:138)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:135)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:116)
	at org.apache.spark.sql.execution.exchange.ShuffleExchange.prepareShuffleDependency(ShuffleExchange.scala:88)
	at org.apache.spark.sql.execution.exchange.ShuffleExchange$$anonfun$doExecute$1.apply(ShuffleExchange.scala:124)
	at org.apache.spark.sql.execution.exchange.ShuffleExchange$$anonfun$doExecute$1.apply(ShuffleExchange.scala:115)
	at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52)
	at org.apache.spark.sql.execution.exchange.ShuffleExchange.doExecute(ShuffleExchange.scala:115)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:138)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:135)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:116)
	at org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:252)
	at org.apache.spark.sql.execution.aggregate.HashAggregateExec.inputRDDs(HashAggregateExec.scala:141)
	at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:386)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:138)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:135)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:116)
	at org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:228)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:311)
	at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
	at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:2853)
	at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2153)
	at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2153)
	at org.apache.spark.sql.Dataset$$anonfun$55.apply(Dataset.scala:2837)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:2836)
	at org.apache.spark.sql.Dataset.head(Dataset.scala:2153)
	at org.apache.spark.sql.Dataset.take(Dataset.scala:2366)
	at org.apache.spark.sql.Dataset.showString(Dataset.scala:245)
	at org.apache.spark.sql.Dataset.show(Dataset.scala:644)
	at org.apache.spark.sql.Dataset.show(Dataset.scala:603)
	at org.apache.spark.sql.Dataset.show(Dataset.scala:612)
	at com.test.XXX.getXXXX(App.java:473)
	at com.test.BaseTest.main(BaseTest.java:66)
Caused by: java.io.NotSerializableException: org.apache.spark.unsafe.types.UTF8String$IntWrapper
Serialization stack:

其實報錯挺長的,後面還有,不過看報錯還是要提取關鍵字的:其實就是這個類,或者這個物件無法序列化!!!org.apache.spark.unsafe.types.UTF8String$IntWrapper

3、產生的原因(可看圖):


這裡是使用spark DSL語法,寫的sql,將A表和B表做join的同時,還用到了case when語法,並在case when內嵌套了sql的視窗函式(sql有一點點複雜,但是還好),如果大家不太懂怎麼寫,可以參考我的圖,不過這裡馬賽克打得亂七八糟的,希望各位諒解,畢竟這種東西,你懂的。。。當我要show t1和t2欄位的時候就出報錯了(如果不列印那兩個欄位,不會報錯)!

相對應的sql:

SELECT  查詢欄位5,查詢欄位6,查詢欄位7,查詢欄位8,
FROM (
SELECT 
查詢欄位1,
查詢欄位2,
查詢欄位3,
查詢欄位4,
case when (MAX(XXX) OVER w ) =1 then 1 else 0 end AS t1, 
case when (MIN(XXX) OVER w ) =0 then 1 else 0 end AS t2 
FROM (
select Z,ZZ,ZZZ,ZZZZ
from 表A
where 過濾條件
) info 
INNER JOIN 
( 
select X,XX,XXX,XXXX
from 表B
WHERE 過濾條件
) hav
ON info.關聯欄位=hav.關聯欄位
WINDOW w AS (PARTITION BY 欄位A,欄位B, 欄位C ORDER BY 欄位D RANGE BETWEEN  視窗時間 PRECEDING AND 視窗時間 FOLLOWING)
) db1
WHERE 過濾欄位
; 
大家可以結合DSL程式碼和sql一起看,應該比較能看得懂我在寫什麼鬼東西(實在抱歉,欄位都做了馬賽克)


4、解決方法:

方法一:

經過測試發現,如果直接把sql寫好,然後使用spark.sql(“selectXXXX”),這種方式是不會報錯的,真的想罵人,這種方式也是可以,但是我可能個人有點強迫症(領導讓你來寫程式碼,結果你就寫了一段sql,用這種方式執行,大家肯定會覺得你是在敷衍了事),所以不想採用這種方法,我想把這個報錯調好

方法二:

上下求索,終於讓我找到解決方法!

我去google上搜索相關的東西,找到了(id:debugger87)一位大神向spark提交的pr(pull request)!

地址:https://github.com/debugger87/spark/commit/2bd33d819137220c55e8ddf7b6df7b98945046aa

如果大家看不懂這篇pull request在說什麼,那我來說說:


這兩個靜態類是在org.apache.spark.unsafe.types.UTF8String這個類裡面的!

修改方式:


直接修改肯定是不可能的,因為這是編譯好的類!

1、下載spark原始碼,找到這個類,先把這個地方寫上implements Serializable,然後編譯原始碼,執行的時候用自己的原始碼的環境來執行(比較麻煩,因為你如果不會編譯的話。。。但是本人也有寫一篇如何編譯的文章,大家可以參考下,親測成功)

編譯部落格:https://blog.csdn.net/lsr40/article/details/80116235

2、直接把需要修改的原始碼類,複製出來放到自己的專案中,這樣執行的時候,自己寫的類就會覆蓋原始碼中的類,就實現了修改,如圖:



如果這樣還看不明白,那留言給我號了,比如什麼複製程式碼出來,怎麼建立一模一樣的package這一類的問題等。。。


這裡留一篇文章,這是一般人會遇到的序列化問題的解決方法(其實這類報錯就是看到底哪個類序列化失敗,然後去實現序列化介面,或者不要在map等閉包運算元內,引用外部物件變數,你可以直接在map運算元內手動建立就好了,或者考慮廣播變數等方式),原諒我找不到原作者了,我看到這篇文章的作者也是轉載的,而且也沒有標明出處,如有冒犯原作者的地方,還請諒解,如果原作者看到,可以私聊我,我願意新增上您的id!

序列化問題:https://blog.csdn.net/javastart/article/details/51206715

剛剛提到了閉包,可能會有人問什麼是閉包!?(又是一篇被轉載沒有標明出處的文章,不好意思)

閉包:https://blog.csdn.net/wy_blog/article/details/57130702

好了,老話,菜雞一隻,如果有什麼說的不對的,還請大家批評指出!

==========================================================================

經過了實際的線上測試!發現該問題在本地跑,直接如上文修改就可以了,但是如果是在伺服器上跑!請注意看我下面的描述:

首先對於之前沒有說明白,表示歉意!

1、本人經過嘗試,在伺服器上跑,無論打成胖包還是打成瘦包,spark都不會讀取我們自己修改的UTF8String這個類,只會讀取本地的jar包(因為這個類是spark本身就有的jar包,不是外部新增的,這個jar包名字spark-unsafe_2.11-2.2.0.jar)。因此按照上文那樣的修改方式,本地生效在伺服器上不生效!

2、考慮了幾種解決方法:

-1.升級或者降級伺服器上的spark版本(但是這。。。不可能說想換版本就換版本的,除非你真有那個許可權)

-2.去叢集上所有機器的spark的lib下替換spark-unsafe_2.11-2.2.0.jar(換成你改完的,但是這也不現實,工作量未免太大)

-3.當你在使用視窗的時候,使用sparksession.sql("sql")這種方式來執行!(本人就使用這種,就是直接把sql寫上去,就可以避免這個錯誤)

3、後來,我還遇到了資料傾斜的問題,我會在下一篇文章當中提出解決的方案(其實這個方法很早就有了,大資料解決資料傾斜的思想是比較重要而且比較通用的)