1. 程式人生 > >Spark -- RDD簡單操作【統計文字中單行最大單詞數】

Spark -- RDD簡單操作【統計文字中單行最大單詞數】

一 、什麼是RDD ?

         RDD在Spark【Scala語言】中,是一種資料結構【基於記憶體,可持久化】,就好比Java的ArrayList一樣,可以進行各種的Action操作,比如Java中的List集合,可以進行get【獲取元素】、add【增加元素】、remove【移除元素】等操作;

         當然,Scala語言底層實現是基於JVM的,即Scala相容Java語言【但高效於Java】,因此,Java的List集合可以直接拿來在Scala中使用;

         對於Spark的RDD,一樣有其對應的Action【行動】操作,主要操作有以下幾個

collect:返回RDD所有元素【注:如果spark為叢集模式,則從各個work節點上抓取RDD資料】

count  :  統計RDD資料集中的元素個數

reduce:並行整合RDD資料集中的元素,如(a,b)=> a+b【累加求和】, (a,b) = > if(a>b) a else b【漸進比大小,求最大元素】

foreach(func) :遍歷RDD資料集中的元素,並進行func【函式】操作。如:println【列印函式】

....etc

       RDD不單單是一個數據集【資料結構】,它的全稱可是:彈性分散式資料集【Resilient Distributed Datasets

       如此高逼格的名稱可不是隨便叫的,為什麼呢?

(1)為什麼稱是彈性的 【個人理解】

        彈簧我們知道,可長可短,即可伸縮;

        在整個Spark計算的過程中,都是圍繞著RDD資料集來的,即,計算一開始會產生N個RDD資料集,隨著計算的推進,N個RDD會被轉來轉去,但是最終會得到一個RDD資料集,也就是我們想要的計算結果。

        而且整個過程是流水線【並行】的,每個流水線上【work節點】執行過程得到的RDD不用等其他流水線上的RDD,區別於MapReduce【reduce任務需要等所有的map任務完成後,才能進行】,而執行過程中得到的RDD都是基於記憶體的,因此Spark的執行效率要遠高於Hadoop的MapReduce,因為MapReduce的每一個map和reduce任務都要讀寫磁碟【IO開銷很大】

(2)分散式

       這個不做過多解釋,只要涉及大資料,必提分散式

       字眼如:多臺機器、並行、分割槽、任務排程...etc【叢集】

(3)資料集

        不要見到RDD,就如同看到了三個陌生的字母一樣,它是一種資料結構,準確說是一種基於記憶體的資料結構,在準確點說,就是scala語言對資料集在記憶體中的一種封裝【包裝】,至於為什麼這樣做,我說不下去了....,

 二、怎麼得到一個RDD呢?

       注意這裡我用的是得到,而不是建立【建立讓人有種即將寫程式碼的緊迫感,有沒有】

       在我沒有開始寫demo來突出本篇博文的主題時,我們還是來想想,既然上面說了什麼是RDD,那麼,這個抽象的東西究竟怎麼獲得呢?

       上面我們說過,RDD是有行動【Action】操作的,也就是RDD常用到的幾個函式【count、reduce...etc】,所謂的行動操作是基於RDD資料集的,注意,得先有RDD,才能進行下一步函式的呼叫

        在Spark中,我們把得到【建立】RDD的過程叫做RDD的轉換【Transformation】過程,它是一種延遲操作,為什麼這樣說呢?

      [  我去,越扯越多,本來三五行demo就能搞定的博文,我居然....... ]

       這就要提到Spark的惰性機制了,RDD在轉換的過程中,看似得到了一個RDD,其實這個RDD是個虛的,並沒有立馬在記憶體中建立,只有我們在執行行動操作的時候,這個RDD才從頭開始執行並在記憶體中建立,話又說回來,RDD的轉換過程有哪些常用的函式呢【也就是RDD的建立函式】

       [  太抽象? 別急,欠的demo示例,一會一起補上 ]

textFile:從本地或者HDFS檔案系統中的指定檔案中獲取RDD資料集

map:  對資料集中的元素按照某種規則進行轉換,得到新的RDD

filter:對資料集中的元素進行某種規則的過濾轉換,得到新的RDD

.... etc

三、RDD簡單操作

         該說的也說的差不多了,下面就把上面欠的demo補上

功能:統計word.txt檔案中,單行單詞數最大的,並輸出結果

步驟:

input:  本地word.txt

轉換操作:RDD1  ---> RDD2 --> RDD3

執行操作:RDD3.reduce

output:輸出結果

(1)建立input  【隨便找個目錄,word.txt 如下】

a b d d e f
a a d c c e  h j k
o i k l m n b v
q w e r t y u i are v x a
q w e

(2)將檔案轉換成RDD 【使用轉換函式 -- textFile】  --- RDD1

【注:本地模式非叢集模式,且demo演示主要在spark-shell中進行演示】

  var lines = sc.textFile("word01.txt")

注意,上面是一個完整的計算過程,如果單有RDD的轉換過程是無法真正在記憶體中建立一個RDD的,如下面這種:

  看似正常,感覺我們好像建立了一個RDD,但是,當我們真正使用它的時候,卻發現報異常了

這就是Spark的惰性【延遲】機制,如果RDD在一開始轉換的時候就在記憶體中建立資料集的話,那麼一開始就會報檔案不存在的異常,而不是在我們呼叫RDD的執行函式時才發現

正常點,我們列印下正確的RDD的資料集的內容:

lines.foreach(println)

(3)將上一步得到的lines進一步轉換,生成新的RDD  --- RDD2

利用map函式配合lambda表示式,重新轉換lines中的元素,將每一行文字按照空格拆分成一個stirng單詞陣列

var lineArray = lines.map(line => line.split(" "))   

(4)將上一步得到的lineArray再次轉換,得到新的RDD --- RDD3

利用map函式配合lambda表示式,再次將上一步得到的lineArray中的元素進行轉換,得到新的RDD資料集lineWordSize

(5) 對上一步得到的RDD進行Action操作,得到最終結果 --  RDD3 --> output

利用reduce函式,對RDD3資料集進行整合,找出最大的單詞數 

(5) 核對計算結果

注:第二行文字,多了一個空格,因此統計出來的是10個單詞【不要有疑問哈】

四、RDD簡單操作   -- 一氣呵成

 【三】步驟有點太繁瑣了,scala本身就是一個高效、簡潔的語言(同Python),因此,我們用最簡潔的方式在跑一遍demo,拿到我們最終要的計算結果:12

  【看好了,不要眨眼...... 激動人心的時刻到了,哈哈】

var result = sc.textFile("word01.txt").map(line => line.split(" ").size).reduce((a,b) => if(a>b) a else b)

一行程式碼搞定!!!!    ----   流水線操作