1. 程式人生 > >Spark緩衝、容錯機制

Spark緩衝、容錯機制

一.緩衝

	檔案太大的時候,不會全部放到記憶體中,實際檔案大小30M,放到記憶體中達到90M:因為寫入的檔案當中存放的是二進位制,而讀取到記憶體中以後,使用Java物件序列化方式
	這種序列化會佔用更大的空間,所以比實際大小要大
	
	實際上不會將記憶體全部佔用,要給程式執行留下足夠的記憶體
	
	注意:
	cache可以提高程式執行速度,但是如果使用一次就沒必要cache,常用於反覆的使用
	cache既不是transformation也不是action,因為沒有生成新的RDD, 也沒有立即執行
	
	cache不建議直接將hdfs的資料直接cache
		   建議將hdfs的資料過濾後快取
		   
	使用完畢後清空快取:
	unpersist()

用法: RDD.cache 二.容錯機制 相關知識:checkpoint是建立檢查點,類似於快照,例如在spark計算裡面,計算流程DAG非常長,伺服器需要將整個DAG計算完成得到結果,但是如果在這很長的計算流程中突然中間算出的 資料丟失了,spark又會根據RDD的依賴關係從頭到尾計算一遍,這樣很費效能,當然我們可以將中間計算的結果通過cache或者persist方法記憶體或者磁碟中,但是這樣也不能保證資料完全不能丟失 儲存的這個記憶體出問題或者磁碟壞了,也會導致spark從頭再根據RDD計算一遍,所以就有了checkpoint,其中checkpoint的作用是將DAG中比較重要的中間資料做一個檢查點將結果 放在一個高可用的地方(通常這個地方是HDFS裡面)

	(*)checkpoint到底是什麼和需要用checkpoint解決什麼問題?
		1)spark在生產環境下經常面臨transformation的RDD非常多,(例如一個Job中包含一萬個RDD),或者是具體的transformation產生的RDD本身計算特別複雜和耗時(例如計算時長超過1個小時)
			可能業務比較複雜,此時我們必須要考慮對計算結果的持久化
		
		2)spark是擅長多步驟迭代計算,同時擅長基於Job的複用,這個時候如果曾經可以對計算結果的過程進行復用,就可以極大地提升效率,因為有時候有共同的步驟,可以避免重複計算
		3)如果採用cache將資料存放到記憶體的話,雖然最快但是也是最不可靠,即使放到磁碟也不可靠,都會壞掉
		4)checkpoint的產生就是為了相對而言更加可靠的持久化資料,在checkpoint可以指定資料存放到本地(HDFS)並且多個副本,這就天然的藉助HDFS高可靠的特徵
		5)checkpoint是針對整個RDD計算鏈條中特別需要資料持久化的環節(後面反覆使用的RDD)
	(*)缺點:
		通過檢查點checkpoint來實現,缺點:產生i/o
	(*)複習:HDFS的檢查點:由SeconderyNameNode進行日誌合併
			Oracle中,資料也是由檢查點的,如果產生檢查點,會以最高優先順序喚醒資料庫寫程序,將記憶體中的髒資料寫到資料檔案中(持久化)
	
	(*)檢查點可以將中間結果儲存起來
		兩種方式
		(*)本地目錄(測試環境)
		(*)HDFS的目錄(生產環境)
			注意:這種模式,需要將spark-shell執行在叢集上

2.使用checkpoint //先建立一個檔案 scala> sc.setCheckpointDir(“hdfs://bigdata02:9000/checkpoint0927”) //建立一個RDD scala> val rdd1 = sc.parallelize(1 to 1000) rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[8] at parallelize at :24 //啟動容錯機制 scala> rdd1.checkpoint //觸發容錯 scala> rdd1.collect

執行的時候相當於走了兩次流程,sum的時候前面計算一遍,然後checkpoint又會計算一遍,所以我們一般先進行cache然後做checkpoint就會只走一次流程了吧 checkpoint的時候就會從剛cache到記憶體中取資料寫入到hdfs中

			 其中作者也說明了,在checkpoint的時候強烈建議先進行cache,並且當你checkpoint執行成功後,那麼前面所有的RDD依賴都會被銷燬
			 /**
			   * Mark this RDD for checkpointing. It will be saved to a file inside the checkpoint
			   * directory set with `SparkContext#setCheckpointDir` and all references to its parent
			   * RDDs will be removed. This function must be called before any job has been
			   * executed on this RDD. It is strongly recommended that this RDD is persisted in
			   * memory, otherwise saving it on a file will require recomputation.
			   */