1. 程式人生 > >org.apache.spark.SparkException: Task not serializable問題分析

org.apache.spark.SparkException: Task not serializable問題分析

問題描述及原因分析

在編寫Spark程式中,由於在map等運算元內部使用了外部定義的變數和函式,從而引發Task未序列化問題。然而,Spark運算元在計算過程中使用外部變數在許多情形下確實在所難免,比如在filter運算元根據外部指定的條件進行過濾,map根據相應的配置進行變換等。為了解決上述Task未序列化問題,這裡對其進行了研究和總結。

出現“org.apache.spark.SparkException: Task not serializable”這個錯誤,一般是因為在map、filter等的引數使用了外部的變數,但是這個變數不能序列化( 不是說不可以引用外部變數,只是要做好序列化工作 ,具體後面詳述)。其中最普遍的情形是:當引用了某個類(經常是當前類)的成員函式或變數時,會導致這個類的所有成員(整個類)都需要支援序列化。雖然許多情形下,當前類使用了“extends Serializable”宣告支援序列化,但是由於某些欄位不支援序列化,仍然會導致整個類序列化時出現問題,最終導致出現Task未序列化問題。

引用成員變數的例項分析

如上所述, 由於Spark程式中的map、filter等運算元內部引用了類成員函式或變數導致需要該類所有成員都需要支援序列化,又由於該類某些成員變數不支援序列化,最終引發Task無法序列化問題 。為了驗證上述原因,我們編寫了一個例項程式,如下所示。該類的功能是從域名列表中(rdd)過濾得到特定頂級域名(rootDomain,如.com,.cn,.org)的域名列表,而該特定頂級域名需要函式呼叫時指定。

class MyTest1(conf:String) extends Serializable{
  val list = List("a.com", "www.b.com"
, "a.cn", "a.com.cn", "a.org"); private val sparkConf = new SparkConf().setAppName("AppName"); private val sc = new SparkContext(sparkConf); val rdd = sc.parallelize(list); private val rootDomain = conf def getResult(): Array[(String)] = { val result = rdd.filter(item => item.contains(rootDomain)) result.take(result.count().toInt) } }

依據上述分析的原因,由於依賴了當前類的成員變數,所以導致當前類全部需要序列化,由於當前類某些欄位未做好序列化,導致出錯。實際情況與分析的原因一致,執行過程中出現錯誤,如下所示。分析下面的錯誤報告得到錯誤是由於sc(SparkContext)引起的。

Exception in thread "main" org.apache.spark.SparkException: Task not serializable
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
    at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
    at org.apache.spark.SparkContext.clean(**SparkContext**.scala:1435) 
……
Caused by: java.io.NotSerializableException: org.apache.spark.SparkContext
    - field (class "com.ntci.test.MyTest1", name: "sc", type: "class org.apache.spark.SparkContext")
    - object (class "com.ntci.test.MyTest1", com.ntci.test.MyTest1@63700353)
    - field (class "com.ntci.test.MyTest1$$anonfun$1", name: "$outer", type: "class com

為了驗證上述結論,將不需要序列化的的成員變數使用關鍵字“@transent”標註,表示不序列化當前類中的這兩個成員變數,再次執行函式,同樣報錯。

Exception in thread "main" org.apache.spark.SparkException: Task not serializable
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
……
 Caused by: java.io.NotSerializableException: org.apache.spark.SparkConf
    - field (class "com.ntci.test.MyTest1", name: "sparkConf", type: "class org.apache.spark.**SparkConf**")
    - object (class "com.ntci.test.MyTest1", com.ntci.test.MyTest1@6107799e)

雖然錯誤原因相同,但是這次導致錯誤的欄位是sparkConf(SparkConf)。使用同樣的“@transent”標註方式,將sc(SparkContext)和sparkConf(SparkConf)都標註為不需序列化,再次執行時,程式正常執行。

class MyTest1(conf:String) extends Serializable{
  val list = List("a.com", "www.b.com", "a.cn", "a.com.cn", "a.org");
  @transient
  private val sparkConf = new SparkConf().setAppName("AppName");
  @transient
  private val sc = new SparkContext(sparkConf);
  val rdd = sc.parallelize(list);

  private val rootDomain = conf

  def getResult(): Array[(String)] = {

    val result = rdd.filter(item => item.contains(rootDomain))
    result.take(result.count().toInt)
  }
}

所以,通過上面的例子我們可以得到結論:由於Spark程式中的map、filter等運算元內部引用了類成員函式或變數導致該類所有成員都需要支援序列化,又由於該類某些成員變數不支援序列化,最終引發Task無法序列化問題。相反地,對類中那些不支援序列化問題的成員變數標註後,使得整個類能夠正常序列化,最終消除Task未序列化問題。

引用成員函式的例項分析

成員變數與成員函式的對序列化的影響相同,即引用了某類的成員函式,會導致該類所有成員都支援序列化。為了驗證這個假設,我們在map中使用了當前類的一個成員函式,作用是如果當前域名沒有以“www.”開頭,那麼就在域名頭新增“www.”字首(注:由於rootDomain是在getResult函式內部定義的,就不存在引用類成員變數的問題,也就不存在和排除了上一個例子所討論和引發的問題,因此這個例子主要討論成員函式引用的影響;此外,不直接引用類成員變數也是解決這類問題的一個手段,如本例中為了消除成員變數的影響而在函式內部定義變數的這種做法,這類問題具體的規避做法此處略提,在下一節作詳細闡述)。下面的程式碼同樣會報錯,同上面的例子一樣,由於當前類中的sc(SparkContext)和sparkConf(SparkConf)兩個成員變數沒有做好序列化處理,導致當前類的序列化出現問題。

class MyTest1(conf:String)  extends Serializable{
  val list = List("a.com", "www.b.com", "a.cn", "a.com.cn", "a.org");
  private val sparkConf = new SparkConf().setAppName("AppName");
  private val sc = new SparkContext(sparkConf);
  val rdd = sc.parallelize(list);

  def getResult(): Array[(String)] = {
    val rootDomain = conf
    val result = rdd.filter(item => item.contains(rootDomain))
    .map(item => addWWW(item))
    result.take(result.count().toInt)
  }
  def addWWW(str:String):String = {
    if(str.startsWith("www."))
      str
    else
      "www."+str
  }
}

如同前面的做法,將sc(SparkContext)和sparkConf(SparkConf)兩個成員變數使用“@transent”標註後,使當前類不序列化這兩個變數,則程式可以正常執行。此外,與成員變數稍有不同的是,由於該成員函式不依賴特定的成員變數,因此可以定義在scala的object中(類似於Java中的static函式),這樣也取消了對特定類的依賴。如下面例子所示,將addWWW放到一個object物件(UtilTool)中去,filter操作中直接呼叫,這樣處理以後,程式能夠正常執行。

def getResult(): Array[(String)] = {
    val rootDomain = conf
    val result = rdd.filter(item => item.contains(rootDomain))
    .map(item => UtilTool.addWWW(item))
    result.take(result.count().toInt)
  }
object UtilTool {
  def addWWW(str:String):String = {
    if(str.startsWith("www."))
      str
    else
      "www."+str
  }
}

對全類序列化要求的驗證

如上所述,引用了某類的成員函式,會導致該類及所有成員都需要支援序列化。因此,對於使用了某類成員變數或函式的情形,首先該類需要序列化(extends Serializable),同時需要對某些不需要序列化的成員變數標記以避免為序列化造成影響。對於上面兩個例子,由於引用了該類的成員變數或函式,導致該類以及所有成員支援序列化,為了消除某些成員變數對序列化的影響,使用“@transent”進行標註。

為了進一步驗證關於整個類需要序列化的假設,這裡在上面例子使用“@transent”標註後並且能正常執行的程式碼基礎上,將類序列化的相關程式碼刪除(去掉extends Serializable),這樣程式執行會報該類為序列化的錯誤,如下所示。所以通過這個例項說明了上面的假設。

Caused by: java.io.NotSerializableException: com.ntci.test.MyTest1
    - field (class "com.ntci.test.MyTest1$$anonfun$1", name: "$outer", type: "class c

所以通過以上例子可以說明:map等運算元內部可以引用外部變數和某類的成員變數,但是要做好該類的序列化處理。首先是該類需要繼承Serializable類,此外,對於類中某些序列化會出錯的成員變數做好處理,這也是Task未序列化問題的主要原因。對於出現這類問題,首先檢視未能序列化的成員變數是哪個,對於可以不需要序列化的成員變數可使用“@transent”標註。

此外,也不是map操作所在的類必須序列化不可(繼承Serializable類),對於不需要引用某類成員變數或函式的情形,就不會要求相應的類必須實現序列化,如下面的例子所示,filter操作內部沒有引用任何類的成員變數或函式,因此當前類不用序列化,程式可正常執行。

class MyTest1(conf:String) {
  val list = List("a.com", "www.b.com", "a.cn", "a.com.cn", "a.org");
  private val sparkConf = new SparkConf().setAppName("AppName");
  private val sc = new SparkContext(sparkConf);
  val rdd = sc.parallelize(list);

  def getResult(): Array[(String)] = {
    val rootDomain = conf
    val result = rdd.filter(item => item.contains(rootDomain))
    result.take(result.count().toInt)
  }
}

解決辦法與程式設計建議

承上所述,這個問題主要是引用了某類的成員變數或函式,並且相應的類沒有做好序列化處理導致的。因此解決這個問題無非以下兩種方法:
1. 不在(或不直接在)map等閉包內部直接引用某類(通常是當前類)的成員函式或成員變數
2. 如果引用了某類的成員函式或變數,則需對相應的類做好序列化處理

(一)不在(或不直接在)map等閉包內部直接引用某類成員函式或成員變數

(1)對於依賴某類成員變數的情形
- 如果程式依賴的值相對固定,可取固定的值,或定義在map、filter等操作內部,或定義在scala
object物件中(類似於Java中的static變數)
- 如果依賴值需要程式呼叫時動態指定(以函式引數形式),則在map、filter等操作時,可不直接引用該成員變數,而是在類似上面例子的getResult函式中根據成員變數的值重新定義一個區域性變數,這樣map等運算元就無需引用類的成員變數。

(2)對於依賴某類成員函式的情形
- 如果函式功能獨立,可定義在scala object物件中(類似於Java中的static方法),這樣就無需一來特定的類。

(二)如果引用了某類的成員函式或變數,則需對相應的類做好序列化處理

對於這種情況,則需對該類做好序列化處理,首先該類繼承序列化類,然後對於不能序列化的成員變數使用“@transent”標註,告訴編譯器不需要序列化。
此外如果可以,可將依賴的變數獨立放到一個小的class中,讓這個class支援序列化,這樣做可以減少網路傳輸量,提高效率。

相關推薦

org.apache.spark.SparkException: Task not serializable問題分析

問題描述及原因分析 在編寫Spark程式中,由於在map等運算元內部使用了外部定義的變數和函式,從而引發Task未序列化問題。然而,Spark運算元在計算過程中使用外部變數在許多情形下確實在所難免,比如在filter運算元根據外部指定的條件進行過濾,ma

org.apache.spark.SparkException: Exception thrown in awaitResult (Spark報錯)

WARN StandaloneAppClient$ClientEndpoint: Failed to connect to master node1:7077 org.apache.spark.SparkException: Exception thrown i

IDEA中如果報org.apache.spark.sparkException: A master URL must be set in your configuration

local 本地單執行緒local[K] 本地多執行緒(指定K個核心)local[*] 本地多執行緒(指定所有可用核心)spark://HOST:PORT 連線到指定的 Spark stand

Spark: org.apache.spark.SparkException: Kryo serialization failed: Buffer overflow

wechat:812716131 ------------------------------------------------------ 技術交流群請聯絡上面wechat ----------------------------------------------

spark2.1註冊內部函數spark.udf.register("xx", xxx _),運行時拋出異常:Task not serializable

ext path run scope rim function dex exe xtend 函數代碼: class MySparkJob{ def entry(spark:SparkSession):Unit={ def getInne

SparkSpark執行報錯Task not serializable

文章目錄 異常資訊 出現場景 解決方案 分析 異常資訊 org.apache.spark.SparkException: Task not serializable Caused by: java.io.NotSerial

java spark報錯:Task not serializable

在spark記憶體計算 JavaPairRDD<String, Integer> results = listRDD.reduceByKey(new Function2<Integer, Integer, Integer>() { @Ove

spark-core_04: org.apache.spark.deploy.SparkSubmit原始碼分析

SparkSubmitArgumentsParser的父類就SparkSubmitOptionParser,在launcher.Main方法執行時用到OptionParser 它的父類也是SparkSubmitOptionParser。並且這個父類有一個方法parser。作用

解決value toDF is not a member of org.apache.spark.rdd.RDD[People]

編譯如下程式碼時 val rdd : RDD[People]= sparkSession.sparkContext.textFile(hdfsFile,2).map(line => line.split(",")).map(arr => Peo

scala學習-Description Resource Path Location Type value toDF is not a member of org.apache.spark.rdd.R

編譯如下程式碼時,出現value toDF is not a member of org.apache.Spark.rdd.RDD[People] 錯誤 val rdd : RDD[People]= sparkSession.sparkContext.tex

Provider org.apache.xerces.jaxp.DocumentBuilderFactoryImpl not found 問題排查

缺少 pcl fall api 自帶 new load delet caused 自定義的classLoader啟動spring容器,遇到 Provider org.apache.xerces.jaxp.DocumentBuilderFactoryImpl not subt

Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGSchedul

depend trace pan ssi ram rac .org driver 過大 在寫Spark程序是遇到問題 Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.orgapachea

if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class o...

names apach true 行數 cin name else color isnull 運行spark程序一直報錯: 1 if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) nu

關於在使用sparksql寫程序是報錯以及解決方案:org.apache.spark.sql.AnalysisException: Duplicate column(s): "name" found, cannot save to file.

文件加載 mod 但是 路徑 win 錯誤 寫入 技術分享 over 說明:   spark --version : 2.2.0   我有兩個json文件,分別是emp和dept: emp內容如下: {"name": "zhangsan", "age": 26, "dep

Beginning Data Exploration and Analysis with Apache Spark 使用Apache Spark開始資料探索和分析 中文字幕

使用Apache Spark開始資料探索和分析 中文字幕 Beginning Data Exploration and Analysis with Apache Spark 無論您是想要探索資料還是開發複雜的機器學習模型,資料準備都是任何資料專業人士的主要任務 Spark是一種引擎,它

org.apache.spark.examples.SparkPi

Warning: Local jar /usr/local/spark/ does not exist, skipping. java.lang.ClassNotFoundException: org.apache.spark.examples.SparkPi at java.net.U

Caused by: java.io.NotSerializableException: org.apache.spark.unsafe.types.UTF8String$IntWrapper

菜雞一隻!如果有什麼說錯的還請大家指出批評,堅決改正!! 遇到了一個挺詭異的報錯把,反正比較無語,發現國內網站完全搜不到這個報錯的解決方法,因此在這裡記錄下!! 1、環境: 這是一個spark的Task not serializable問題,因此只需要關注spark的版本就好了,我的版本是

idae執行spark程式碼報錯ERROR MetricsSystem: Sink class org.apache.spark.metrics.sink.MetricsServlet cannot b

1.問題描述 在idea中編寫,streaming處理伺服器socket傳遞的資料,結果報錯: "C:\Program Files\Java\jdk1.8.0_91\bin\java" -Didea.launcher.port=7534 "-Didea.launcher.bin.path=C

使Apache Spark和Mysql作資料分析

使用用spart-shell讀取MySQL表中的資料 步驟1: 執行spark-shell命令,進入spark-shell命令列,執行命令如下: [email protected]:~/run/spark/bin$ ./spark-shell --maste

pyspark連hbase報org.apache.spark.examples.pythonconverters.ImmutableBytesWritableToStringConverter

ERROR python.Converter: Failed to load converter: org.apache.spark.examples.pythonconverters.ImmutableBytesWritableToStringConverter Trace