1. 程式人生 > >Spark實戰----(1)使用Scala開發本地測試的Spark WordCount程式

Spark實戰----(1)使用Scala開發本地測試的Spark WordCount程式

第一步:JDk的安裝

第二步:Scala的安裝   不會的可以看這裡   Scala環境安裝

鑑於以上兩步較為簡單,不再詳細贅述

第三步:去Spark官方網站下載Spark包 我下載的檔名是spark-1.6.2-bin-hadoop2.6



         點選DownLoad就可以下載了,下載完並解壓

第四步:IDE選擇

我用的是 intellij IDEA ,不過我學習的時候用的是Scala for Eclipse,用法嘛大同小異的,個人推薦IDEA

第五步:建立工程

在eclipse中點選File->New->Scala Project ,填上Project  name 然後點選finish

第六步:

更改Scala Library container的版本



第七步:匯入Spark 的jar包

在專案上右鍵,找到Build Path -> Configure Build Path


彈出這個視窗後,點選 Add External JARs ,找到Spark 包的位置,點選lib檔案,找到spark-assembly-1.6.2-hadoop2.6.0.jar


點選ok可以看到專案裡面多了一個Referenced Libraries,點開發現下面就是我們剛剛新增的包

第八步:建立包和scala檔案

在src下右鍵 點選Package,新建一個包,我命名的是cn.limbo.spark,至此專案的結構如下所示


其中WordCount是我們需要編寫的檔案,(上面那個別管 = =)

第九步:編寫WordCount.scala,程式碼如下

package cn.limbo.spark

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD

/**
 * 使用Scala開發本地測試的Spark WordCount程式
 */
object WordCount {
  def main(args: Array[String]): Unit = {
    /**
     * 第一步:建立Spark的配置物件SparkConf,設定Spark程式的執行時的配置資訊
     * 例如說通過setMaster來設定程式要連線的Spark叢集的Master的URL
     * 如果設定為local,則代表Spark程式在本地執行,特別適合於配置條件的較差的人
     * 
     */
    
    val conf = new SparkConf()
    conf.setAppName("MyFirstSparkApplication")  //設定應用程式的名稱,在程式執行的監控介面可以看到名稱
    conf.setMaster("local")   //此時程式在本地執行,無需安裝Spark的任何叢集
    
    /**
     * 第二步:建立SparkContext物件
     * SparkContext是Spark程式所有功能的唯一入口,無論是採用Scala,Java,Python等都必須有一個SparkContext
     * SparkContext核心作用:初始化Spark應用程式執行所需要的核心元件,包括DAGScheduler,TaskScheduler,Scheduler
     * 同時還會負責Spark程式往Master註冊程式等
     * SparkContext是整個Spark應用程式中最為至關重要的一個物件。
     */
    
    val sc = new SparkContext(conf)     //建立SparkContext物件,通過傳入SparkConf例項來定製Spark執行的具體引數和配置資訊
    
    /**
     * 第三步:根據具體的資料來源(HDFS,HBase,Local FS(本地檔案系統) ,DB,S3(雲上)等)通過SparkContext來建立RDD
     * RDD的建立基本有三種方式,根據外部的資料來源(例如HDFS),根據Scala集合,由其他的RDD操作產生
     * 資料會被RDD劃分成為一系列的Partitions,分配到每個Partition的資料屬於一個Task的處理範疇
     */
    
     //檔案的路徑,最小並行度(根據機器數量來決定)
    //val lines:RDD[String]= sc.textFile("F://spark//spark-1.6.2-bin-hadoop2.6//README.md", 1)    //讀取本地檔案,並設定Partition = 1
    val lines= sc.textFile("F://spark//spark-1.6.2-bin-hadoop2.6//README.md", 1)    //讀取本地檔案,並設定Partition = 1   //型別推導得出lines為RDD
    /**
     * 第四步:對初始的RDD進行Transformation級別的處理,例如map,filter等高階函式等的程式設計,來進行具體的資料計算
     *    4.1:將每一行的字串拆分成單個的單詞
     *    4.2:在單詞拆分的基礎上對每個單詞的例項計數為1,也就是word =>(word,1)
     *    4.3:在每個單詞例項計數為1基礎之上統計每個單詞在檔案出現的總次數
     */
    
    //對每一行的字串進行單詞的拆分並把所有行的拆分結果通過flat合併成為一個大的單詞集合
    val words = lines.flatMap { line => line.split(" ") }    //words同樣是RDD型別  
    val pairs = words.map { word => (word,1) }
    val wordCounts = pairs.reduceByKey(_+_)       //對相同的key,進行value的累加(包括Local和Reducer級別同時Reduce)
    
    
    wordCounts.foreach(wordNumberPair => println(wordNumberPair._1 + " : " + wordNumberPair._2))
    
    sc.stop()    //注意一定要將SparkContext的物件停止,因為SparkContext執行時會建立很多的物件
    
    
    /*這個程式執行之後一定會有一個錯誤,因為 沒有hadoop環境,這個不是程式錯誤,也不影響任何功能*/
    
  }
}
之後就可以看到控制檯的列印結果了~

至此,Spark本地的部署就結束了
下一章介紹叢集部署