1. 程式人生 > >Spark重點難點知識總結(一)

Spark重點難點知識總結(一)

本部落格是個人在學習Spark過程中的一些總結,方便個人日後查閱,同時裡面出現的一些關鍵字也可以作為後來一些讀者學習的材料。若有問題,歡迎評論,一定知無不言。 1.Tuple:Tuple就是用來把幾個資料放在一起的比較方便的方式,注意是“幾個資料”,因此沒有Tuple1這一說。
val scores=Array(Tuple2(1,100),Tuple2(2,90),Tuple2(3,100),Tuple2(2,90),Tuple2(3,100))  
val content=sc.parallelize(scores)  
val data=content.countByKey()
執行結果
data: scala.collection.Map[Int,Long] = Map(1 -> 1, 3 -> 2, 2 -> 2)
需要注意的是Tuple是Scala的特有包,因此如果用Java需要導包
2.saveAsText:檔案輸出的方法,可以將檔案輸出到HDFS,也可以輸出到本地
sc.textFile(“text1.txt”).flatMap(_.split(“”)).map(word=>(word,1)).reduceByKey(_+_,1).saveAsTextFile(text2.txt) 
輸入的文字檔案text1
I ask her why not go
輸出的結果text2為
(not,1)(ask,1)(I,1)(why,1)(go?,1)(her,1)


3.job:為了響應Spark的action,包含很多task的平行計算,可以認為是Spark RDD 裡面的action,每個action的計算會生成一個job。使用者提交的Job會提交給DAGScheduler,job會被分解成Stage和Task。每個Job是一個計算序列的最終結果,而這個序列中能夠產生中間結果的計算就是一個stage。為了理解這一概念,我們舉例說明。 1)將一個包含人名和地址的檔案載入到RDD1中 
2)將一個包含人名和電話的檔案載入到RDD2中
3)通過name來Join RDD1和RDD2,生成RDD3 
4)在RDD3上做Map,給每個人生成一個HTML展示卡作為RDD4 
5)將RDD4儲存到檔案 
6)在RDD1上做Map,從每個地址中提取郵編,結果生成RDD5 
7)在RDD5上做聚合,計算出每個郵編地區中生活的人數,結果生成RDD6 
8)   Collect RDD6,並且將這些統計結果輸出到stdout

步驟(1、2、3、4、6、7)被Spark組織成stages,每個job則是一些stage序列的結果。對於一些簡單的場景,一個job可以只有一個stage。但是對於資料重分割槽的需求(比如第三步中的join),或者任何破壞資料局域性的事件,通常會導致更多的stage。
4.job和stage的區別:通常action對應job,transformation對應stage。怎麼樣才算是一個stage呢?劃分stage的依據是資料是否需要進行重組。action是一種操作級別,會生成      job,用通俗的話講就是把RDD變成了非RDD(資料聚合的過程),RDD是隻讀的,換句話說我們想要列印(println)必須要經過action級別的操作。transformation也是一種操作級別,會生成stage,用通俗的話講就是把一種形式的RDD變成另外一種形式的RDD,經過transformation級別的操作資料會進行重組。   常見的stage有:
map,filter,flatMap,mapPartitions,mapPartitionsWithIndex,sample,union,intersection
distinct,groupByKey,reduceByKey,aggregateByKey,sortByKey,join,cogroup,cartesian
pipe,coalesce,repartition,repartitionAndSortWithinPartitions
常見的action有:
reduce,collect,count,first,take,takeSample,takeOrdered,saveAsTextFile,saveAsSequenceFile,saveAsObjectFile,countByKey,foreach

5.task:被送到executor上的工作單元,task 是執行job 的邏輯單元 ,task和job的區別在於:job是關於整個輸入資料和麵向整個叢集(還沒有分機器)的概念,task一般是處理輸入資料的子集,並且和叢集中的具體一臺機器相聯絡。在task 會在每個executor 中的cpu core 中執行。每個Stage裡面Task的數量是由該Stage中最後一個RDD的Partition的數量所決定的。RDD在計算的時候,每個分割槽都會起一個task,所以rdd的分割槽數目決定了總的的task數目。申請的計算節點(Executor)數目和每個計算節點核數,決定了你同一時刻可以並行執行的task。比如的RDD有100個分割槽,那麼計算的時候就會生成100個task,你的資源配置為10個計算節點,每個兩2個核,同一時刻可以並行的task數目為20,計算這個RDD就需要5個輪次。如果計算資源不變,你有101個task的話,就需要6個輪次,在最後一輪中,只有一個task在執行,其餘核都在空轉。如果資源不變,你的RDD只有2個分割槽,那麼同一時刻只有2個task執行,其餘18個核空轉,造成資源浪費。這就是在spark調優中,增大RDD分割槽數目,增大任務並行度的做法。Spark上分為2類task:shuffleMapTask和resultTask。 6.Driver:在Driver中,RDD首先交給DAGSchedule進行Stage的劃分,然後底層的排程器TaskScheduler就與Executor進行互動,Driver和下圖中4個Worker節點的Executor發指令,讓它們在各自的執行緒池中執行Job,執行時Driver能獲得Executor發指令,讓它們在各自的執行緒池中執行Job,執行時Driver能獲得Executor的具體執行資源,這樣Driver與Executor之間進行通訊,通過網路的方式,Driver把劃分好的Task傳送給Executor,Task就是我們的Spark程式的業務邏輯程式碼。 7.下劃線_:Scala語言中下劃線最常用的作用是在集合中使用
val newArry= (1 to 10).map(_*2)
列印結果
2 4 6 8 10


8.reduce:它是這樣一個過程:每次迭代,將上一次的迭代結果與下一個元素一同執行一個二元的func函式。可以用這樣一個形象化的式子來說明:
reduce(func, [1,2,3] ) = func( func(1, 2), 3)
var list=List(1,2,3,4,5,6,7)
list.reduce(_-_)
輸出結果-26,1-2=-1,-1-3=-4,-4-4=-8,-8-5=-13,-13-6=-19,-19-7=-26 9.閉包:閉包是一個函式,返回值依賴於宣告在函式外部的一個或多個變數。閉包通常來講可以簡單的認為是可以訪問一個函式裡面區域性變數的另外一個函式。
    /*1.more是一個自由變數,其值及型別是在執行的時候得以確定的  
      2.x是型別確定的,其值是在函式呼叫的時候被賦值的    
      def add(more:Int) = (x:Int) => x+ more  
      val add1 = add(1) 
      println(add1(100)) 執行結果:101
這樣的函式稱之為閉包:從開放到封閉的過程。已知一個函式f(x)=x+i,讓你求f(3)= 3+i。分析:要得到最終的函式值,你必須知道i的值。 i稱作開放項(“開”著的,對應閉包的“閉”),若上文中定義了“ int i = 1”,則可以得到f(3)= 3+1 =4,即函式值若想被建立必須捕獲i的值,這一過程可以被理解為做對函式執行“關閉”操作,所以叫閉包。總之閉包就是(編譯器建立的)執行一段程式碼所需要的上下文。

10.split:將一個字串分割為子字串,然後將結果作為字串陣列返回。
var words="123456123"   words.split("12")   res0: Array[String] = Array("", 3456, 3)

var words="123456123"   words.split("123")  res0: Array[String] = Array("", 456)

var words="123456123"   words.split("")   res0: Array[String] = Array(1, 2, 3, 4, 5, 6, 1, 2, 3)

var words="123456123"   words.split("10")  res0: Array[String] = Array(123456123)


11.filter:使用filter方法,你可以篩選出集合中你需要的元素,形成一個新的集合。
val x = List.range(1, 10)   
val evens = x.filterNot(_ % 2 == 0)
列印結果:evens: List[Int] = List(1, 3, 5, 7, 9)

  12.collect:將RDD轉成Scala陣列,並返回。 13.Integer.parseInt:將整數的字串,轉化為整數
val b="123"
  
val a=Integer.parseInt(b)
  
println(a)//列印結果123

14.flatMap,Map和foreach:(1)Map:對rdd之中的元素進行逐一進行函式操作對映為另外一個rdd,map函式會對每一條輸入進行指定的操作,然後為每一條輸入返回一個物件
(2)flatMap函式則是兩個操作的集合——正是“先對映後扁平化”,分為兩階段:  操作1:同map函式一樣:對每一條輸入進行指定的操作,然後為每一條輸入返回一個物件。操作2:最後將所有物件合併為一個物件(3)foreach無返回值(準確說返回void) 15.RDD:Resilient Distributed Datasets,彈性分散式資料集。舉例說明:如果你有一箱香蕉,讓三個人拿回家吃完,這時候要把箱子開啟,倒出來香蕉,分別拿三個小箱子重新裝起來,然後,各自抱回家去啃。Spark和很多其他分散式計算系統都借用了這種思想來實現並行:把一個超大的資料集,切分成N個小堆,找M個執行器(M < N),各自拿一塊或多塊資料慢慢玩,玩出結果了再收集在一起,這就算執行完了。那麼Spark做了一項工作就是:凡是能夠被我算的,都是要符合我的要求的,所以spark無論處理什麼資料先整成一個擁有多個分塊的資料集再說,這個資料集就叫RDD。