1. 程式人生 > >Spark troubleshooting 1運算元返回null錯誤 2錯誤持久化以及checkpoint

Spark troubleshooting 1運算元返回null錯誤 2錯誤持久化以及checkpoint

一、運算元返回為null

問題
在有些運算元函式裡,我們都需要有返回值。但是,有些可能不需要返回值,但是這時候不能直接返回null,返回null將會導致錯誤

Scala.Math(NULL)  //異常

解決方法

  1. 如果不想有返回值,可以在返回的時候,返回一些特殊的值,比如“-999”
  2. 獲取到rdd之後,對rdd進行filter操作,如果資料是-999的,可以返回false,進行過濾掉
  3. filter之後,使用coalesce運算元壓縮rdd的partition數量,讓各個partition資料比較緊湊。提升效能。
		return actionRDD.mapToPair(new PairFunction<Row, String, Row>() {

			private static final long serialVersionUID = 1L;
			
			@Override
		public Tuple2<String, Row> call(Row row) throws Exception {
			return new Tuple2<String, Row>("-999", RowFactory.createRow("-999"));  
			}
			
	});

二、持久化使用方式

有時候希望重複使用一個rdd,不用反覆計算rdd,可以直接使用通過各個節點上的executor 的BlockManager管理記憶體、磁碟資料
但是使用rdd持久化應該是 像下面這樣

sessionid2actionRDD = sessionid2actionRDD.persist(StorageLevel.MEMORY_ONLY());

如果直接像下面

sessionid2actionRDD.persist(StorageLevel.MEMORY_ONLY());

會報錯,file not found錯誤

三、checkpoint

持久化,一般情況可以正常工作,但是可能出現意外,快取在記憶體中的資料莫名其妙丟失,或者儲存在磁碟檔案中的資料,莫名其妙被刪除。
但是有些rdd計算可能非常耗時,rdd之前有大量的父rdd,如果重新計算一個partition,可能就需要重新計算之前所有的父rdd對應的partition,這種情況就可以對rdd進行checkpoint,以防萬一。進行checkpoint,就是說,會將rdd的資料,持久化一份到容錯檔案系統上(比如hdfs)。在對rdd進行計算的時候,如果發現快取資料不見了,就會去checkpoint目錄查詢資料,如果有的話,就直接使用避免重新計算。
這麼理解,checkpoint其實算是 cache的一個備用。如果cache失效了,checkpoint就可以派上用場。
好處是

:提高了spark作業的可靠性,發生問題,不用重新計算大量的rdd;
壞處是:進行checkpoint操作的時候,將rdd資料寫入hdfs中的時候,很耗費效能。

checkpoint原理

  1. 在程式碼中,用sparkContext,設定一個checkpoint目錄,比如hdfs目錄
		JavaSparkContext sc = new JavaSparkContext(conf);
//		sc.checkpointFile("hdfs://");
  1. 在程式碼中,對需要進行checkpoint的rdd,執行rdd.checkpoint()
sessionid2actionRDD = sessionid2actionRDD.persist(StorageLevel.MEMORY_ONLY());
//		sessionid2actionRDD.checkpoint();
  1. RDDCheckpointData(spark 內部的api) 接管你的RDD,會標記為marked for checkpoint,準備進行checkpoint
  2. job執行完之後,會呼叫一個finalRDD.doCheckpoint()方法,順著rdd lineage,回溯掃描,發現有標記為checkpoint的rdd,就會進行二次標記,inProgressCheckpoint,正在接受checkpoint操作
  3. job執行完後,就會啟動一個內部新的rdd,去將標記為inProgressCheckpoint的rdd的資料,都寫入hdfs檔案中。(備註,如果rdd之前cache過,會直接從快取中獲取資料,寫入hdfs中;如果沒有cache過,那麼就會重新計算一遍這個rdd,再checkpoint)
  4. 將checkpoint過的rdd之前的依賴rdd,改成一個CheckpointRDD*,強制改變你的rdd的lineage。後面如果rdd的cache資料獲取失敗,直接會通過它的上游CheckpointRDD,去容錯的檔案系統,比如hdfs,中,獲取checkpoint的資料。

在這裡插入圖片描述