1. 程式人生 > >子雨大資料之Spark入門教程---Spark2.1.0入門:第一個Spark應用程式:WordCount 2.2

子雨大資料之Spark入門教程---Spark2.1.0入門:第一個Spark應用程式:WordCount 2.2

前面已經學習了Spark安裝,完成了實驗環境的搭建,並且學習了Spark執行架構和RDD設計原理,同時,我們還學習了Scala程式設計的基本語法,有了這些基礎知識作為鋪墊,現在我們可以沒有障礙地開始編寫一個簡單的Spark應用程式了——詞頻統計。

任務要求

任務:編寫一個Spark應用程式,對某個檔案中的單詞進行詞頻統計。
準備工作:請進入Linux系統,開啟“終端”,進入Shell命令提示符狀態,然後,執行如下命令新建目錄:

  1. cd /usr/local/spark
  2. mkdir mycode
  3. cd mycode
  4. mkdir wordcount
  5. cd wordcount

Shell 命令

然後,在“/usr/local/spark/mycode/wordcount”目錄下新建一個包含了一些語句的文字檔案word.txt,命令如下:

  1. vim word.txt

Shell 命令

你可以在文字檔案中隨意輸入一些單詞,用空格隔開,我們會編寫Spark程式對該檔案進行單詞詞頻統計。然後,按鍵盤Esc鍵退出vim編輯狀態,輸入“:wq”儲存檔案並退出vim編輯器。

在spark-shell中執行詞頻統計

啟動spark-shell

首先,請登入Linux系統(要注意記住登入採用的使用者名稱,本教程統一採用hadoop使用者名稱進行登入),開啟“終端”(可以在Linux系統中使用Ctrl+Alt+T組合鍵開啟終端),進入shell命令提示符狀態,然後執行以下命令進入spark-shell:

  1. cd /usr/local/spark
  2. ./bin/spark-shell
  3. ....//這裡省略啟動過程顯示的一大堆資訊
  4. scala>

Shell 命令

啟動進入spark-shell需要一點時間,在進入spark-shell後,我們可能還需要到Linux檔案系統中對相關目錄下的檔案進行編輯和操作(比如要檢視spark程式執行過程生成的檔案),這個無法在park-shell中完成,因此,這裡再開啟第二個終端,用來在Linux系統的Shell命令提示符下操作。

載入本地檔案

在開始具體詞頻統計程式碼之前,需要解決一個問題,就是如何載入檔案?

要注意,檔案可能位於本地檔案系統中,也有可能存放在分散式檔案系統HDFS中

,所以,下面我們分別介紹如何載入本地檔案,以及如何載入HDFS中的檔案。
首先,請在第二個終端視窗下操作,用下面命令到達“/usr/local/spark/mycode/wordcount”目錄,檢視一下上面已經建好的word.txt的內容:

  1. cd /usr/local/spark/mycode/wordcount
  2. cat word.txt

Shell 命令

cat命令會把word.txt檔案的內容全部顯示到螢幕上。

現有讓我們切換回到第一個終端,也就是spark-shell,然後輸入下面命令:

  1. scala> val textFile = sc.textFile("file:///usr/local/spark/mycode/wordcount/word.txt")

scala

上面程式碼中,val後面的是變數textFile,而sc.textFile()中的這個textFile是sc的一個方法名稱,這個方法用來載入檔案資料。這兩個textFile不是一個東西,不要混淆。實際上,val後面的是變數textFile,你完全可以換個變數名稱,比如,val lines = sc.textFile("file:///usr/local/spark/mycode/wordcount/word.txt")。這裡使用相同名稱,就是有意強調二者的區別。
注意,要載入本地檔案,必須採用“file:///”開頭的這種格式。執行上上面這條命令以後,並不會馬上顯示結果,因為,Spark採用惰性機制,只有遇到“行動”型別的操作,才會從頭到尾執行所有操作。所以,下面我們執行一條“行動”型別的語句,就可以看到結果:

  1. scala> textFile.first()

scala

first()是一個“行動”(Action)型別的操作,會啟動真正的計算過程,從檔案中載入資料到變數textFile中,並取出第一行文字。螢幕上會顯示很多反饋資訊,這裡不再給出,你可以從這些結果資訊中,找到word.txt檔案中的第一行的內容。

正因為Spark採用了惰性機制,在執行轉換操作的時候,即使我們輸入了錯誤的語句,spark-shell也不會馬上報錯,而是等到執行“行動”型別的語句時啟動真正的計算,那個時候“轉換”操作語句中的錯誤就會顯示出來,比如:

  1. val textFile = sc.textFile("file:///usr/local/spark/mycode/wordcount/word123.txt")

scala

上面我們使用了一個根本就不存在的word123.txt,執行上面語句時,spark-shell根本不會報錯,因為,沒有遇到“行動”型別的first()操作之前,這個載入操作時不會真正執行的。然後,我們執行一個“行動”型別的操作first(),如下:

  1. scala> textFile.first()

scala

執行上面語句後,你會發現,會返回錯誤資訊,其中有四個醒目的中文文字“拒絕連線”,因為,這個word123.txt檔案根本就不存在。好了,現在我們可以練習一下如何把textFile變數中的內容再次寫回到另外一個文字檔案wordback.txt中:

  1. val textFile = sc.textFile("file:///usr/local/spark/mycode/wordcount/word.txt")
  2. textFile.saveAsTextFile("file:///usr/local/spark/mycode/wordcount/writeback")

scala

上面的saveAsTextFile()括號裡面的引數是儲存檔案的路徑,不是檔名。saveAsTextFile()是一個“行動”(Action)型別的操作,所以,馬上會執行真正的計算過程,從word.txt中載入資料到變數textFile中,然後,又把textFile中的資料寫回到本地檔案目錄“/usr/local/spark/mycode/wordcount/writeback/”下面,現在讓我們切換到Linux Shell命令提示符視窗中,執行下面命令:

  1. cd /usr/local/spark/mycode/wordcount/writeback/
  2. ls

Shell 命令

執行結果類似下面:

part-00000 _SUCCESS

也就是說,該目錄下包含兩個檔案,我們可以使用cat命令檢視一下part-00000檔案(注意:part-後面是五個零):

  1. cat part-00000

Shell 命令

顯示結果,是和上面word.txt中的內容一樣的。

載入HDFS中的檔案

為了能夠讀取HDFS中的檔案,請首先啟動Hadoop中的HDFS元件。注意,之前我們在“Spark安裝”這章內容已經介紹瞭如何安裝Hadoop和Spark,所以,這裡我們可以使用以下命令直接啟動Hadoop中的HDFS元件(由於用不到MapReduce元件,所以,不需要啟動MapReduce或者YARN)。請到第二個終端視窗,使用Linux Shell命令提示符狀態,然後輸入下面命令:

  1. cd /usr/local/hadoop
  2. ./sbin/start-dfs.sh

Shell 命令

啟動結束後,HDFS開始進入可用狀態。如果你在HDFS檔案系統中,還沒有為當前Linux登入使用者建立目錄(本教程統一使用使用者名稱hadoop登入Linux系統),請使用下面命令建立:

  1. ./bin/hdfs dfs -mkdir -p /user/hadoop

Shell 命令

也就是說,HDFS檔案系統為Linux登入使用者開闢的預設目錄是“/user/使用者名稱”(注意:是user,不是usr),本教程統一使用使用者名稱hadoop登入Linux系統,所以,上面建立了“/user/hadoop”目錄,再次強調,這個目錄是在HDFS檔案系統中,不在本地檔案系統中。建立好以後,下面我們使用命令檢視一下HDFS檔案系統中的目錄和檔案:

./bin/hdfs dfs -ls .

上面命令中,最後一個點號“.”,表示要檢視Linux當前登入使用者hadoop在HDFS檔案系統中與hadoop對應的目錄下的檔案,也就是檢視HDFS檔案系統中“/user/hadoop/”目錄下的檔案,所以,下面兩條命令是等價的:

./bin/hdfs dfs -ls .
./bin/hdfs dfs -ls /user/hadoop

如果要檢視HDFS檔案系統根目錄下的內容,需要使用下面命令:

./bin/hdfs dfs -ls /

下面,我們把本地檔案系統中的“/usr/local/spark/mycode/wordcount/word.txt”上傳到分散式檔案系統HDFS中(放到hadoop使用者目錄下):

  1. ./bin/hdfs dfs -put /usr/local/spark/mycode/wordcount/word.txt .

Shell 命令

然後,用命令檢視一下HDFS的hadoop使用者目錄下是否多了word.txt檔案,可以使用下面命令列出hadoop目錄下的內容:

./bin/hdfs dfs -ls .

可以看到,確實多了一個word.txt檔案,我們使用cat命令檢視一個HDFS中的word.txt檔案的內容,命令如下:

./bin/hdfs dfs -cat ./word.txt

上面命令執行後,就會看到HDFS中word.txt的內容了。

現在,讓我們切換回到spark-shell視窗,編寫語句從HDFS中載入word.txt檔案,並顯示第一行文字內容:

  1. scala> val textFile = sc.textFile("hdfs://localhost:9000/user/hadoop/word.txt")
  2. scala> textFile.first()

scala

執行上面語句後,就可以看到HDFS檔案系統中(不是本地檔案系統)的word.txt的第一行內容了。

需要注意的是,sc.textFile("hdfs://localhost:9000/user/hadoop/word.txt")中,“hdfs://localhost:9000/”是前面介紹Hadoop安裝內容時確定下來的埠地址9000實際上,也可以省略不寫,如下三條語句都是等價的

val textFile = sc.textFile("hdfs://localhost:9000/user/hadoop/word.txt")
val textFile = sc.textFile("/user/hadoop/word.txt")
val textFile = sc.textFile("word.txt")

下面,我們再把textFile的內容寫回到HDFS檔案系統中(寫到hadoop使用者目錄下):

  1. scala> val textFile = sc.textFile("word.txt")
  2. scala> textFile.saveAsTextFile("writeback")

scala

執行上面命令後,文字內容會被寫入到HDFS檔案系統的“/user/hadoop/writeback”目錄下,我們可以切換到Linux Shell命令提示符視窗檢視一下:

  1. ./bin/hdfs dfs -ls .
  2. ``
  3. 執行上述命令後,在執行結果中,可以看到有個writeback目錄,下面我們檢視該目錄下有什麼檔案:
  4. ```bash
  5. ./bin/hdfs dfs -ls ./writeback

Shell 命令

執行結果中,可以看到存在兩個檔案:part-00000和_SUCCESS。我們使用下面命令輸出part-00000檔案的內容(注意:part-00000裡面有五個零):

  1. ./bin/hdfs dfs -cat ./writeback/part-00000

Shell 命令

執行結果中,就可以看到和word.txt檔案中一樣的文字內容。

詞頻統計

有了前面的鋪墊性介紹,下面我們就可以開始第一個Spark應用程式:WordCount
請切換到spark-shell視窗:

  1. scala> val textFile = sc.textFile("file:///usr/local/spark/mycode/wordcount/word.txt")
  2. scala> val wordCount = textFile.flatMap(line => line.split(" ")).map(word =>(word,1)).reduceByKey((a, b)=> a + b)
  3. scala> wordCount.collect()

scala

上面只給出了程式碼,省略了執行過程中返回的結果資訊,因為返回資訊很多。
下面簡單解釋一下上面的語句。
textFile包含了多行文字內容,textFile.flatMap(line => line.split(" "))會遍歷textFile中的每行文字內容,當遍歷到其中一行文字內容時,會把文字內容賦值給變數line,並執行Lamda表示式line => line.split(" ")。line => line.split(" ")是一個Lamda表示式,左邊表示輸入引數,右邊表示函式裡面執行的處理邏輯,這裡執行line.split(" "),也就是針對line中的一行文字內容,採用空格作為分隔符進行單詞切分,從一行文字切分得到很多個單詞構成的單詞集合。這樣,對於textFile中的每行文字,都會使用Lamda表示式得到一個單詞集合,最終,多行文字,就得到多個單詞集合。textFile.flatMap()操作就把這多個單詞集合“拍扁”得到一個大的單詞集合。然後,針對這個大的單詞集合,執行map()操作,也就是map(word => (word, 1)),這個map操作會遍歷這個集合中的每個單詞,當遍歷到其中一個單詞時,就把當前這個單詞賦值給變數word,並執行Lamda表示式word => (word, 1),這個Lamda表示式的含義是,word作為函式的輸入引數,然後,執行函式處理邏輯,這裡會執行(word, 1),也就是針對輸入的word,構建得到一個對映(Map,是一種資料結構),這個對映的key是word,value是1(表示該單詞出現1次)。
程式執行到這裡,已經得到一個對映(Map),這個對映中包含了很多個(key,value)。最後,針對這個對映,執行reduceByKey((a, b) => a + b)操作,這個操作會把對映中的所有(key,value)按照key進行分組,然後使用給定的函式(這裡就是Lamda表示式:(a, b) => a + b),對具有相同的key的多個value進行聚合操作,返回聚合後的(key,value),比如("hadoop",1)和("hadoop",1),具有相同的key,進行聚合以後就得到("hadoop",2),這樣就計算得到了這個單詞的詞頻。

編寫獨立應用程式執行詞頻統計

下面我們編寫一個Scala應用程式來實現詞頻統計。
請登入Linux系統(本教程統一採用使用者名稱hadoop進行登入),進入Shell命令提示符狀態,然後,執行下面命令:

  1. cd /usr/local/spark/mycode/wordcount/
  2. mkdir -p src/main/scala //這裡加入-p選項,可以一起建立src目錄及其子目錄

Shell 命令

請在“/usr/local/spark/mycode/wordcount/src/main/scala”目錄下新建一個test.scala檔案,裡面包含如下程式碼:

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf

object WordCount {
    def main(args: Array[String]) {
        val inputFile =  "file:///usr/local/spark/mycode/wordcount/word.txt"
        val conf = new SparkConf().setAppName("WordCount")
        val sc = new SparkContext(conf)
                val textFile = sc.textFile(inputFile)
                val wordCount = textFile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey((a, b) => a + b)
                wordCount.foreach(println)       
    }
}

如果test.scala沒有呼叫SparkAPI,那麼,只要使用scalac命令編譯後執行即可。但是,這個test.scala程式依賴 Spark API,因此我們需要通過 sbt 進行編譯打包(前面的“Spark的安裝和使用”這個章節已經介紹過如何使用sbt進行編譯打包)。下面我們再演示一次。

請執行如下命令:

  1. cd /usr/local/spark/mycode/wordcount/
  2. vim simple.sbt

Shell 命令

通過上面程式碼,新建一個simple.sbt檔案,請在該檔案中輸入下面程式碼:

name := "Simple Project"

version := "1.0"

scalaVersion := "2.11.8"

libraryDependencies += "org.apache.spark" %% "spark-core" % "2.1.0"

注意, "org.apache.spark"後面是兩個百分號,千萬不要少些一個百分號%,如果少了,編譯時候會報錯。
下面我們使用 sbt 打包 Scala 程式。為保證 sbt 能正常執行,先執行如下命令檢查整個應用程式的檔案結構:

  1. cd /usr/local/spark/mycode/wordcount/
  2. find .

Shell 命令

應該是類似下面的檔案結構:

.
./src
./src/main
./src/main/scala
./src/main/scala/test.scala
./simple.sbt
./word.txt

接著,我們就可以通過如下程式碼將整個應用程式打包成 JAR(首次運行同樣需要下載依賴包 ):

  1. cd /usr/local/spark/mycode/wordcount/ //請一定把這目錄設定為當前目錄
  2. /usr/local/sbt/sbt package

Shell 命令

上面執行過程需要消耗幾分鐘時間,螢幕上會返回一下資訊:

[email protected]:/usr/local/spark/mycode/wordcount$ /usr/local/sbt/sbt package
OpenJDK 64-Bit Server VM warning: ignoring option MaxPermSize=256M; support was removed in 8.0
[info] Set current project to Simple Project (in build file:/usr/local/spark/mycode/wordcount/)
[info] Updating {file:/usr/local/spark/mycode/wordcount/}wordcount...
[info] Resolving jline#jline;2.12.1 ...
[info] Done updating.
[info] Packaging /usr/local/spark/mycode/wordcount/target/scala-2.11/simple-project_2.11-1.0.jar ...
[info] Done packaging. 
[success] Total time: 34 s, completed 2017-2-20 10:13:13
#螢幕上返回上述資訊表明打包成功

生成的 jar 包的位置為 /usr/local/spark/mycode/wordcount/target/scala-2.11/simple-project_2.11-1.0.jar。
最後,通過 spark-submit 執行程式。我們就可以將生成的 jar 包通過 spark-submit 提交到 Spark 中運行了,命令如下:

  1. /usr/local/spark/bin/spark-submit --class "WordCount" /usr/local/spark/mycode/wordcount/target/scala-2.11/simple-project_2.11-1.0.jar

Shell 命令

下面是筆者的word.txt進行詞頻統計後的結果(你的結果應該和這個類似):

(Spark,1)
(is,1)
(than,1)
(fast,1)
(love,2)
(i,1)
(I,1)
(hadoop,2)
(Spark,1)

相關推薦

資料Spark入門教程---Spark2.1.0入門一個Spark應用程式WordCount 2.2

前面已經學習了Spark安裝,完成了實驗環境的搭建,並且學習了Spark執行架構和RDD設計原理,同時,我們還學習了Scala程式設計的基本語法,有了這些基礎知識作為鋪墊,現在我們可以沒有障礙地開始編寫一個簡單的Spark應用程式了——詞頻統計。 任務要求 任務:

資料Spark入門教程

【版權宣告】部落格內容由廈門大學資料庫實驗室擁有版權,未經允許,請勿轉載!版權所有,侵權必究! Spark最初誕生於美國加州大學伯克利分校(UC Berkeley)的AMP實驗室,是一個可應用於大規模資料處理的快速、通用引擎。2013年,Spark加入Apache孵化器專案後,開始獲得迅猛的發展,如今已

資料(6)hbase2.1.1版本全分散式安裝及使用

一、Hadoop安裝 具體請參見 https://blog.csdn.net/u011095110/article/details/83791734 二、Zookeeper分散式叢集安裝 1.Zookeeper下載 #進入hadoop主目錄 cd /hadoop #下載z

Spark——Hadoop2.7.3+Spark2.1.0 完全分散式環境 搭建全過程

一、修改hosts檔案在主節點,就是第一臺主機的命令列下;vim /etc/hosts我的是三臺雲主機:在原檔案的基礎上加上;ip1 master worker0 namenode ip2 worker1 datanode1 ip3 worker2 datanode2其中的i

SpringBoot 入門教程例項詳解(一) 開發一個SpringBoot應用程式例項

構建你的第一個Spring Boot應用程式 更多精彩請閱讀 東陸之滇的csdn部落格:http://blog.csdn.net/zixiao217 此教程提供一個入門應用程式例子,來展示Spring Boot是如何幫助快速、敏捷開發新一代應用的。你還可以通

WPF入門教程系列(一) 建立你的一個WPF專案

WPF基礎知識 快速學習絕不是從零學起的,良好的基礎是快速入手的關鍵,下面先為大家摞列以下自己總結的學習WPF的幾點基礎知識: 1) C#基礎語法知識(或者其他.NET支援的語言):這個是當然的了,雖然WPF是XAML配置的,但是總還是要寫程式碼的,相信各

Net Core 學習入門(三)---------一個web應用程式

 使用vs2017,新增一個新專案-asp.net core web應用程式。          結構如圖,        wwwroot放了網站的靜態資源如css、js、image檔案;        appsetting.json是應用程式的配置檔案。        bu

Objective-C語法一個iPhone應用程式的那些事兒(十)

#import "HelloWorldViewController.h" @implementation HelloWorldViewController - (void)didReceiveMemoryWarning { // Releases the view if it doesn't ha

VS2010 教程建立一個 WPF 應用程式 (第一節)

來自:https://msdn.microsoft.com/zh-cn/library/ff629048.aspx [原文發表時間] Friday, May 22, 2009 8:00 AM 這篇文章裡,我將使用VS2010 Beta 1建立一個WPF 應用程式。

零基礎入門資料spark中rdd部分運算元詳解

先前文章介紹過一些spark相關知識,本文繼續補充一些細節。 我們知道,spark中一個重要的資料結構是rdd,這是一種並行集合的資料格式,大多數操作都是圍繞著rdd來的,rdd裡面擁有眾多的方法可以呼叫從而實現各種各樣的功能,那麼通常情況下我們讀入的資料來源並非rdd格式的,如何轉

零基礎入門資料spark中的幾種key-value操作

今天記錄一下spark裡面的一些key-value對的相關運算元。 key-value對可以簡單理解為是一種認為構造的資料結構方式,比如一個字串"hello",單看"hello"的話,它是一個字串型別,現在假設我想把它在一個文字中出現的次數n作為一個值和"hello"一起操作,那麼可

[資料Spark]——快速入門

      本篇文件是介紹如何快速使用spark,首先將會介紹下spark在shell中的互動api,然後展示下如何使用java,scala,python等語言編寫應用。   為了良好的閱讀下面的文件,最好是結合實際的練習。首先需要下載spark,然後安裝hd

資料Spark(五)--- Spark的SQL模組,Spark的JDBC實現,SparkSQL整合MySQL,SparkSQL整合Hive和Beeline

一、Spqrk的SQL模組 ---------------------------------------------------------- 1.該模組能在Spack上執行Sql語句 2.可以處理廣泛的資料來源 3.DataFrame --- RDD --- tabl

資料Spark(四)--- Dependency依賴,啟動模式,shuffle,RDD持久化,變數傳遞,共享變數,分散式計算PI的值

一、Dependency:依賴:RDD分割槽之間的依存關係 --------------------------------------------------------- 1.NarrowDependency: 子RDD的每個分割槽依賴於父RDD的少量分割槽。 |

資料Spark(三)--- Spark核心API,Spark術語,Spark三級排程流程原始碼分析

一、Spark核心API ----------------------------------------------- [SparkContext] 連線到spark叢集,入口點. [HadoopRDD] extends RDD 讀取hadoop

資料Spark(二)--- RDD,RDD變換,RDD的Action,解決spark資料傾斜問題,spark整合hadoop的HA

一、Spark叢集執行 ------------------------------------------------------- 1.local //本地模式 2.standalone //獨立模式 3.yarn //yarn模式

資料Spark(一)--- Spark簡介,模組,安裝,使用,一句話實現WorldCount,API,scala程式設計,提交作業到spark叢集,指令碼分析

一、Spark簡介 ---------------------------------------------------------- 1.快如閃電的叢集計算 2.大規模快速通用的計算引擎 3.速度: 比hadoop 100x,磁碟計算快10x 4.使用: java

資料Spark(七)--- Spark機器學習,樸素貝葉斯,酒水評估和分類案例學習,垃圾郵件過濾學習案例,電商商品推薦,電影推薦學習案例

一、Saprk機器學習介紹 ------------------------------------------------------------------ 1.監督學習 a.有訓練資料集,符合規範的資料 b.根據資料集,產生一個推斷函式

資料Spark(六)--- Spark Streaming介紹,DStream,Receiver,Streamin整合Kafka,Windows,容錯的實現

一、Spark Streaming介紹 ----------------------------------------------------------- 1.介紹 是spark core的擴充套件,針對實時資料的實時流處理技術 具有可擴充套件、高吞吐量、

資料Spark(八)--- Spark閉包處理,部署模式和叢集模式,SparkOnYarn模式,高可用,Spark整合Hive訪問hbase類載入等異常解決,使用spark下的thriftserv

一、Spark閉包處理 ------------------------------------------------------------ RDD,resilient distributed dataset,彈性(容錯)分散式資料集。 分割槽列表,function,dep Op