1. 程式人生 > >大資料篇:Spark

大資料篇:Spark

大資料篇:Spark

  • Spark是什麼

Spark是一個快速(基於記憶體),通用,可擴充套件的計算引擎,採用Scala語言編寫。2009年誕生於UC Berkeley(加州大學伯克利分校,CAL的AMP實驗室),2010年開源,2013年6月進入Apach孵化器,2014年成為Apach頂級專案,目前有1000+個活躍者。就是說用Spark就對了。

Spark支援Scala,Java,R,Python語言,並提供了幾十種(目前80+種)高效能的演算法,這些如果讓我們自己來做,幾乎不可能。

Spark得到眾多公司支援,如:阿里、騰訊、京東、攜程、百度、優酷、土豆、IBM、Cloudera、Hortonworks等。

  • 如果沒有Spark

解決MapReduce慢的問題而誕生,官網解釋比同樣的MapReduce任務快100倍!

spark.apache.org

1 內建模組

機器學習(MLlib),圖計算(GraphicX),實時處理(SparkStreaming),SQL解析(SparkSql)

1.1 叢集資源管理

Spark設計為可以高效的在一個計算節點到數千個計算節點之間伸縮計算,為了實現這樣的要求,同時獲得最大靈活性,Spark支援在各種叢集資源管理器上執行,目前支援的3種如下:(上圖中下三個)

  1. Hadoop YARN(國內幾乎都用)
  2. Apach Mesos(國外使用較多)
  3. Standalone(Spark自帶的資源排程器,需要在叢集中的每臺節點上配置Spark)

1.2 Spark Core

實現了Spark的基本功能,包含任務排程、記憶體管理、錯誤恢復、與儲存系統互動等模組。其中還包含了對彈性分散式資料集(RDD:Resilient Distributed DataSet)的API定義

1.3 Spark SQL

是Spark用來操作結構化資料的程式包,通過Spark SQL 我們可以使用SQL或者HQL來查詢資料。且支援多種資料來源:Hive、Parquet、Json等

1.4 Spark Streaming

是Spark提供的對實時資料進行流式計算的元件

1.5 Spark MLlib

提供常見的機器學習功能和程式庫,包括分類、迴歸、聚類、協同過濾等。還提供了模型評估、資料匯入等額外的支援功能。

2 執行模式

2.1 核心概念介紹

  • Master

    • Spark特有的資源排程系統Leader,掌控整個叢集資源資訊,類似於Yarn框架中的ResourceManager
    • 監聽Worker,看Worker是否正常工作
    • Master對Worker、Application等的管理(接收Worker的註冊並管理所有的Worker,接收Client提交的Application,排程等待Application並向Worker提交)
  • Worker

    • Spark特有的資源排程Slave,有多個,每個Slave掌管著所有節點的資源資訊,類似Yarn框架中的NodeManager
    • 通過RegisterWorker註冊到Master
    • 定時傳送心跳給Master
    • 根據Master傳送的Application配置程序環境,並啟動ExecutorBackend(執行Task所需的程序)
  • Driver

    • Spark的驅動器,是執行開發程式中的main方法的執行緒
    • 負責開發人員編寫SparkContext、RDD,以及進行RDD操作的程式碼執行,如果使用Spark Shell,那麼啟動時後臺自啟動了一個Spark驅動器,預載入一個叫做sc的SparkContext物件,如果驅動器終止,那麼Spark應用也就結束了。
    • 4大主要職責:
      • 將使用者程式轉化為作業(Job)
      • 在Executor之間排程任務(Task)
      • 跟蹤Executor的執行情況
      • 通過UI展示查詢執行情況
  • Excutor

    • Spark Executor是一個工作節點,負責在Spark作業中執行任務,任務間相互獨立。Spark應用啟動時,Executor節點被同時啟動,並且始終伴隨著整個Spark應用的生命週期而存在,如果有Executor節點發生了故障或崩潰,Spark應用也可以繼續執行,會將出錯節點上的任務排程到其他Executor節點上繼續執行
    • 兩個核心功能:
      • 負責執行組成Spark應用的任務,並將結果返回給驅動器(Driver)
      • 它通過自身塊管理器(BlockManager)為使用者程式中要求快取的RDD提供記憶體式儲存。RDD是直接存在Executor程序內的,因此任務可以在執行時充分利用快取資料加速運算。
  • RDDs

    • Resilient Distributed DataSet:彈性分散式資料集
    • 一旦擁有SparkContext物件,就可以用它來建立RDD
  • 通用流程圖

2.2 WordCount案例

  • Spark Shell方式
#建立word.txt檔案
vim word.txt
#--->
hadoop hello spark
spark word
hello hadoop spark
#---<
#上傳HDFS叢集
hadoop dfs -put word.txt /
#連結客戶端
spark-shell

sc.textFile("/word.txt").flatMap(line => line.split(' ')).map((_,1)).reduceByKey(_ + _).collect

每個Spark應用程式都包含一個驅動程式,驅動程式負責把並行操作釋出到叢集上,驅動程式包含Spark應用中的主函式,定義了分散式資料集以應用在叢集中,在前面的wordcount案例中,spark-shell就是我們的驅動程式,所以我們鍵入我們任何想要的操作,然後由它負責釋出,驅動程式通過SparkContext物件來訪問Spark,SparkContext物件相當於一個到Spark叢集的連結

2.3 Job劃分和排程

  • Application應用
    • 一個SparkContext就是一個Application
  • Job作業:
    • 一個行動運算元(Action)就是一個Job
  • Stage階段:
    • 一次寬依賴(一次shuffle)就是一個Stage,劃分是從後往前劃分
  • Task任務:
    • 一個核心就是一個Task,體現任務的並行度,常常根據核心數的1.5倍進行設定

  • 使用WordCount案例分析

一個行動運算元collect(),一個job

一次寬依賴shuffle運算元reduceByKey(),切分成2個Stage階段

Stage階段,預設檔案被切分成2份,所以有2個task

Stage階段0

Stage階段1

2.4 Shuffle洗牌

2.4.1 ShuffleMapStage And ResultStage

  • 在劃分stage時,最後一個stage稱為FinalStage,本質上是一個ResultStage物件,前面所有的stage被稱為ShuffleMapStage

  • ShuffleMapStage 的結束伴隨著shuffle檔案寫磁碟

  • ResultStage對應程式碼中的action運算元,即將一個函式應用在RDD的各個Partition(分割槽)的資料集上,意味著一個Job執行結束

2.4.2 HashShuffle

  • 未優化HashShuffle流程圖:目前已經沒有了

如上圖,最終結果會有12個小檔案

  • 優化後HashShuffle流程圖

如上圖,最終結果會有6個小檔案,比未優化前少了一半

2.4.3 SortShuffle

該模式下,資料會先寫入一個數據結果,reduceByKey寫入Map,一邊通過Map區域性聚合,一邊寫入記憶體,

Join運算元寫入ArrayList直接寫入記憶體中,然後需要判斷是否達到閥值,如果達到就會將記憶體資料寫入磁碟,釋放記憶體資源

2.4.4 Bypass SortShuffle

  • Bypass SortShuffle執行機制觸發條件
    • shuffle map task 數量小於 spark.shuffle.sort.bypassMargeThreshold引數的值,預設為200
    • 不是聚合類的shuffle運算元

2.5 Submit語法

spark-submit \
--class <main-calss> \
--master <master-url> \
--deploy-mode <deploy-mode> \
--conf <key>=<value> \
...  #其他 options
<application-jar> \
[application-arguments]
  • --class:應用啟動類全類名(如:org.apache.spark.examples.SparkPi)
  • --master:指定master地址,預設本機Local(本地一般使用Local[*],叢集一般使用yarn)
  • --deploy-mode:是否釋出到驅動worker節點(引數:cluster),或者作為一個本地客戶端(引數:client),預設本地client
  • --conf:任意Spark配置屬性,格式key=value,如包含空格,可以加引號"key=value"
  • application-jar:打包好的應用程式jar,包含依賴,這個URL在叢集中全域性課件,如HDFS上的jar->hdfs://path;如linux上的jar->file://path 且所有節點路徑都需要包含這個jar
  • application-arguments:給main()方法傳引數
  • --executor-memory 1G:指定每個executor可用記憶體為1G
  • --total-executor-cores 6:指定所有executor使用的cpu核數為6個
  • --executor-cores 2:表示每個executor使用的cpu的核數2個

2.6 Local模式

Local模式就是在一臺計算機上執行Spark,通常用於開發中。(單機)

  • Submit提交方式
spark-submit \
--class org.apache.spark.examples.SparkPi \
--master local[*] \
/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/lib/spark/examples/jars/spark-examples_2.11-2.4.0-cdh6.2.0.jar 100

2.7 Standalone模式

構建一個由 Master + Slave 構成的Spark叢集,Spark執行在叢集中,只依賴Spark,不依賴別的元件(如:Yarn)。(獨立的Spark叢集)

#連結客戶端
spark-shell --master spark://cdh01.cm:7337

參考wordCount案例

  • Standalone-Client流程圖

  • Standalone-Cluster流程圖

2.8 Yarn模式

Spark客戶端可以直接連線Yarn,不需要構建Spark叢集。

有yarn-client和yarn-cluster兩種模式,主要區別在:Driver程式的執行節點不同。

yarn-client:Driver程式執行在客戶端,適用於互動、除錯,希望立即看見APP輸出

yarn-cluster:Driver程式執行在由ResourceManager啟動的ApplicationMaster上,適用於生產環境

  • Yarn-Client流程圖

  • Yarn-Cluster流程圖

  • 客戶端模式:Driver是在Client端,日誌結果可以直接在後臺看見
spark-submit \
--class org.apache.spark.examples.SparkPi \
--master yarn \
--deploy-mode client \
/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/lib/spark/examples/jars/spark-examples_2.11-2.4.0-cdh6.2.0.jar 100
  • 叢集模式:Driver是在NodeManager端,日誌結果需要通過監控日誌檢視
spark-submit \
--class org.apache.spark.examples.SparkPi \
--master yarn \
--deploy-mode cluster \
/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/lib/spark/examples/jars/spark-examples_2.11-2.4.0-cdh6.2.0.jar 100    

3 使用IDEA開發Spark

  • pom.xml
    <dependencies>
        <!-- scala -->
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>2.11.12</version>
        </dependency>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-compiler</artifactId>
            <version>2.11.12</version>
        </dependency>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-reflect</artifactId>
            <version>2.11.12</version>
        </dependency>
        <!-- Spark Core -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>2.4.0</version>
        </dependency>
        <!-- Spark SQL -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>2.4.0</version>
        </dependency>
        <!-- Spark On Hive -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-hive_2.11</artifactId>
            <version>2.4.0</version>
        </dependency>
        <!-- Hbase On Spark-->
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-spark</artifactId>
            <version>2.1.0-cdh6.2.0</version>
        </dependency>
        <!-- Spark Streaming -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.11</artifactId>
            <version>2.4.0</version>
        </dependency>
        <!-- Spark Streaming Kafka-->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
            <version>2.4.0</version>
        </dependency>
        <!-- Kafka -->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.12</artifactId>
            <version>2.1.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-tools</artifactId>
            <version>2.1.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>2.1.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-examples</artifactId>
            <version>2.1.0</version>
        </dependency>

        <!--mysql依賴的jar包-->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.47</version>
        </dependency>

    </dependencies>

    <repositories>
        <repository>
            <id>cloudera</id>
            <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
        </repository>
    </repositories>
    <build>
        <plugins>
            <!-- 在maven專案中既有java又有scala程式碼時配置 maven-scala-plugin 外掛打包時可以將兩類程式碼一起打包 -->
            <plugin>
                <groupId>org.scala-tools</groupId>
                <artifactId>maven-scala-plugin</artifactId>
                <version>2.15.2</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <!-- MAVEN 編譯使用的JDK版本 -->
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.3</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                    <encoding>UTF-8</encoding>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>3.0.0</version>
                <executions>
                    <execution>
                        <phase>package</phase><!--繫結到package生命週期階段-->
                        <goals>
                            <goal>single</goal><!--只執行一次-->
                        </goals>
                    </execution>
                </executions>
                <configuration>
                    <!--<finalName></finalName>&lt;!&ndash;主類入口&ndash;&gt;-->
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
            </plugin>

            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-surefire-plugin</artifactId>
                <version>2.10</version>
                <configuration>
                    <skip>true</skip>
                </configuration>
            </plugin>
        </plugins>
    </build>
  • WorkCount案例

    1. 在resources資料夾下,新建word.csv檔案
    hello,spark
    hello,scala,hadoop
    hello,hdfs
    hello,spark,hadoop
    hello
    
    1. WorkCount.scala
    import org.apache.spark.{SparkConf, SparkContext}
    
    object WorkCount {
      def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setAppName("WorkCount").setMaster("local[*]")
        val sc = new SparkContext(conf)
        val tuples: Array[(String, Int)] = sc.textFile(ClassLoader.getSystemResource("word.csv").getPath)
          .flatMap(_.split(","))
          .map((_, 1))
          .reduceByKey(_ + _)
          .collect()
        tuples.foreach(println)
      }
    }
    

    結果:

    (scala,1)
    (hello,5)
    (spark,2)
    (hadoop,2)
    (hdfs,1)

4 Spark Core

4.1 什麼是RDD

Resilient Distributed DataSet:彈性分散式資料集,是Spark中最基本資料抽象,可以理解為資料集合。

在程式碼中是一個抽象類,它代表一個彈性的、不可變的、可分割槽,裡面的元素可平行計算的集合。

4.2 RDD的五個主要特性

  1. 分割槽性
    • 多個分割槽,分割槽可以看成是資料集的基本組成單位
    • 對於RDD來說,每個分割槽都會被一個計算任務處理,並決定了平行計算的粒度。
    • 使用者可以在建立RDD時,指定RDD的分割槽數,如果沒有指定,那麼採用預設值(程式所分配到的CPU Coure的數目)
    • 每個分配的儲存是由BlockManager實現的,每個分割槽都會被邏輯對映成BlockManager的一個Block,而這個Block會被一個Task負責計算。
  2. 計算每個分割槽的函式
    • Spark中RDD的計算是以分割槽為單位的,每個RDD都會實現compute函式以達到這個目的
  3. 依賴性
    • RDD的每次轉換都會生成一個新的RDD,所以RDD之間會形成類似於流水線一樣的前後依賴關係,在部分分割槽資料丟失時,Spark可以通過這個依賴關係重新計算丟失的分割槽資料,而不是對RDD的所有分割槽進行重新計算。
  4. 對儲存鍵值對的RDD,還有一個可選的分割槽器
    • 只有對key-value的RDD,才會有Partitioner,非key-value的RDD的Rartitioner的值是None
    • Partitioner不但決定了RDD的分割槽數量,也決定了parent RDD Shuffle輸出時的分割槽數量
    • 預設是HashPartitioner,還有RangePartition,自定義分割槽
  5. 儲存每個分割槽優先位置的列表(本地計算性)
    • 比如對於一個HDFS檔案來說,這個列表儲存的就是每個Partition所在檔案快的位置,按照“移動資料不如移動計算”的理念,Spark在進行任務排程的時候,會盡可能地將計算任務分配到其所要處理資料塊的儲存位置。

4.3 Transformation和Action運算元

在Spark中,Transformation運算元(也稱轉換運算元),在沒有Action運算元(也稱行動運算元)去觸發的時候,是不會執行的,可以理解為懶運算元,而Action運算元可以理解為觸發運算元,常用Action運算元如下:

  • redece:通過函式聚集RDD的所有元素,先聚合分割槽內的資料,在聚合分割槽間的資料(預聚合)
  • collect:以陣列的形式返回RDD中的所有元素,所有資料都會被拉到Driver端,記憶體開銷很大,所以慎用
  • count:返回RDD中元素個數
  • take:返回RDD中前N個元素組成的陣列
  • first:返回RDD中的第一個元素,類似於tack(1)
  • takeOrdered:返回排序後的前N個元素,預設升序,資料也會拉到Driver端
  • aggregate:分割槽內聚合後,在分割槽間聚合
  • fold:aggregate簡化操作,如果分割槽內和分割槽間演算法一樣,則可以使用
  • saveAsTextFile:將資料集的元素以textFile的形式儲存到HDFS檔案系統或者其他檔案系統,對每個元素,Spark都會呼叫toString方法轉換為文字
  • saveAsSequenceFile:將資料集的元素以Hadoop SquenceFile的形式儲存到指定目錄下,可以是HDFS或者其他檔案系統
  • saveAsObjectFile:將RDD中的元素序列化成物件,儲存到檔案中
  • countByKey:針對k-v型別RDD,返回一個Map(Key,count),可以用來檢視資料是否傾斜
  • foreach:針對RDD中的每一個元素都執行一次函式,每個函式實在Executor上執行的

常用Transformation運算元如下:

  • map:輸入變換函式應用於RDD中所有元素,轉換其型別
  • mapPartitions:輸入變換函式應用於每個分割槽中所有元素
  • mapPartitionsWithIndex:輸入變換函式應用於每個分割槽中所有元素,帶有分割槽號
  • filter:過濾運算元
  • flatMap:扁平化運算元
  • sample:抽樣運算元
  • union:並集運算元
  • intersection:交集運算元
  • distinct:去重運算元
  • groupByKey:根據Key分組運算元
  • reduceByKey:根據Key聚合運算元
  • aggregateByKey:根據Key聚合運算元
  • sortByKey:根據Key排序運算元
  • join:連結運算元
  • coalesce:壓縮分割槽運算元
  • repartition:重分割槽運算元

4.4 RDD的建立

4.4.1 從集合中建立

object Demo {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("WorkCount").setMaster("local[*]")
    val sc = new SparkContext(conf)

    /**
      * 通過parallelize方法傳入序列得到RDD
      * 傳入分割槽數為1,結果為1	2	3	4	5	6	7	8	9	10
      * 傳入分割槽數大於1,結果順序不定,因為資料被打散在2個分割槽裡
      * */
    val rdd: RDD[Int] = sc.parallelize(1.to(10), 1)
    rdd.foreach(x => print(x + "\t"))
  }
}

4.4.2 從外部儲存建立RDD

  • 讀取textFile

WordCount案例介紹了此種用法

  • 讀取Json檔案

在idea中,resources目錄下建立word.json檔案

{"name": "zhangsa"}
{"name": "lisi", "age": 30}
{"name": "wangwu"}
["aa","bb"]
object Demo0 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("json").setMaster("local[*]")
    val sc = new SparkContext(conf)
    val rdd1: RDD[String] = sc.textFile(this.getClass().getClassLoader.getResource("word.json").getPath)
    val rdd2: RDD[Option[Any]] = rdd1.map(JSON.parseFull(_))
    rdd2.foreach(println)
    /**
      * Some(Map(name -> zhangsa))
      * Some(Map(name -> wangwu))
      * Some(List(aa, bb))
      * Some(Map(name -> lisi, age -> 30.0))
      * */
  }
}
  • 讀取Object物件檔案
object Demo1 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("object").setMaster("local[*]")
    val sc = new SparkContext(conf)
    val rdd1: RDD[Int] = sc.parallelize(Array(1,2,3,4,5))
//    rdd1.saveAsObjectFile("hdfs://cdh01.cm/test")

    val rdd2: RDD[Nothing] = sc.objectFile("hdfs://cdh01.cm/test")
    rdd2.foreach(println)

    /**
      * 2
      * 5
      * 1
      * 4
      * 3
      * */
  }
}

4.4.3 從其他RDD轉換得到新的RDD

  • 根據RDD的資料型別的不同,整體分為2種RDD:Value型別,Key-Value型別(二維元組)

map()返回一個新的RDD,該RDD是由原RDD的每個元素經過函式轉換後的值組成,主要作用就是轉換結構。(不存在shuffle)

  • 案例一:
object Demo2 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("map").setMaster("local[*]")
    val sc = new SparkContext(conf)

    /**
      * map運算元,一共有多少元素就會執行多少次,和分割槽數無關
      **/
    val rdd: RDD[Int] = sc.parallelize(1.to(10), 1)
    val mapRdd: RDD[Int] = rdd.map(x => {
      println("執行") //一共被執行10次
      x * 2
    })
    val result: Array[Int] = mapRdd.collect()
    result.foreach(x => print(x + "\t")) //2	4	6	8	10	12	14	16	18	20
  }
}
  • 案例二:
object demo3 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("mapPartitions").setMaster("local[*]")
    val sc = new SparkContext(conf)

    /**
      * mapPartitions運算元,一個分割槽內處理,幾個分割槽就執行幾次,優於map函式
      **/
    val rdd: RDD[Int] = sc.parallelize(1.to(10), 2)
    val mapRdd: RDD[Int] = rdd.mapPartitions(it => {
      println("執行") //分割槽2次,共列印2次
      it.map(x => x * 2)
    })
    val result: Array[Int] = mapRdd.collect()
    result.foreach(x => print(x + "\t")) //2	4	6	8	10	12	14	16	18	20
  }
}
  • 案例三:
object Demo4 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("mapPartitionsWithIndex").setMaster("local[*]")
    val sc = new SparkContext(conf)

    /**
      * mapPartitionsWithIndex運算元,一個分割槽內處理,幾個分割槽就執行幾次,返回帶有分割槽號的結果集
      **/
    val rdd: RDD[Int] = sc.parallelize(1.to(10), 2)
    val value: RDD[(Int, Int)] = rdd.mapPartitionsWithIndex((index, it) => it.map((index, _)))
    val result: Array[(Int, Int)] = value.collect()
    result.foreach(x => print(x + "\t")) //(0,1)	(0,2)	(0,3)	(0,4)	(0,5)	(1,6)	(1,7)	(1,8)	(1,9)	(1,10)
  }
}

4.5 flatMap

扁平化(不存在shuffle)

object Demo5 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("flatMap").setMaster("local[*]")
    val sc = new SparkContext(conf)

    val rdd: RDD[(String, Int)] = sc.parallelize(Array(("A", 1), ("B", 2), ("C", 3)))
    val map_result: RDD[String] = rdd.map(ele => ele._1 + ele._2)
    val flatMap_result: RDD[Char] = rdd.flatMap(ele => ele._1 + ele._2)
    
    /**
      * C3
      * A1
      * B2
      **/
    map_result.foreach(println)

    /**
      * B
      * A
      * C
      * 1
      * 2
      * 3
      **/
    flatMap_result.foreach(println)
  }
}

4.6 glom

將每一個分割槽的元素合併成一個數組,形成新的RDD型別:RDD[Array[T]] (不存在shuffle)

object Demo6 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("glom").setMaster("local[*]")
    val sc = new SparkContext(conf)

    val rdd: RDD[Int] = sc.parallelize(1.to(10), 3)
    val result: RDD[Array[Int]] = rdd.glom()

    /**
      * 1,2,3
      * 7,8,9,10
      * 4,5,6
      * */
    result.foreach(x=>{
      println(x.toList.mkString(","))
    })

  }
}

4.7 groupBy

根據條件函式分組(存在shuffle)

object Demo7 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("groupBy").setMaster("local[*]")
    val sc = new SparkContext(conf)

    val rdd: RDD[Int] = sc.parallelize(1.to(10))
    val result1: RDD[(Int, Iterable[Int])] = rdd.groupBy(x => x % 2)
    val result2: RDD[(Boolean, Iterable[Int])] = rdd.groupBy(x => x % 2 == 0)

    /**
      * (0,CompactBuffer(2, 4, 6, 8, 10))
      * (1,CompactBuffer(1, 3, 5, 7, 9))
      **/
    result1.foreach(println)

    /**
      * (true,CompactBuffer(2, 4, 6, 8, 10))
      * (false,CompactBuffer(1, 3, 5, 7, 9))
      **/
    result2.foreach(println)
  }
}

4.8 filter

過濾(不存在shuffle)

object Demo8 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("filter").setMaster("local[*]")
    val sc = new SparkContext(conf)

    val rdd: RDD[Int] = sc.parallelize(1.to(10))
    val result: RDD[Int] = rdd.filter(x => x % 2 == 0)
    result.foreach(x => print(x + "\t"))  //6	10	8	4	2	
  }
}

4.9 sample

sample(withReplacement,fraction,seed)抽樣,常用在解決定位大key問題

  • 以指定的隨機種子隨機抽樣出比例為fraction的資料(抽取到的數量是size*fraction),注意:得到的結果並不能保證準確的比例,也就是說fraction只決定了這個數被選中的比率,並不是從資料中抽出多少百分比的資料,決定的不是個數,而是比率。
  • withReplacement表示抽出的資料是否放回,true為有放回抽樣,flase為無放回抽樣,放回表示資料有可能會被重複抽取到,false則不可能重複抽取到,如果為false則fraction必須在[0,1]內,是true則大於0即可。
  • seed用於指定隨機數生成器種子,一般預設的,或者傳入當前的時間戳,(如果傳入定值,每次取出結果一樣)
object Demo9 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("sample").setMaster("local[*]")
    val sc = new SparkContext(conf)

    val rdd: RDD[Int] = sc.parallelize(1.to(10))

    /**
      * 不放回抽樣
      * 從結果中可以看出,抽出結果沒有重複
      * */
    val result1: RDD[Int] = rdd.sample(false,0.5)
    result1.foreach(println)
    /**
      * 放回抽樣
      * 從結果中可以看出,抽出結果有重複
      * */
    val result2: RDD[Int] = rdd.sample(true,2)
    result2.foreach(println)
  }
}

4.10 distinct

distinct([numTasks])去重,引數表示任務數量,預設值和分割槽數保持一致(不存在shuffle)

object Demo10 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("distinct").setMaster("local[*]")
    val sc = new SparkContext(conf)

    val rdd: RDD[Int] = sc.parallelize(Array(1,2,3,4,2,3,4,3,4,5))
    val result: RDD[Int] = rdd.distinct(2)
    result.foreach(println)
  }
}

4.11 coalesce

coalesce(numPatitions)縮減,縮減分割槽到指定數量,用於大資料集過濾後,提高小資料集的執行效率,只能減不能加。(不存在shuffle)

object Demo11 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("coalesce").setMaster("local[*]")
    val sc = new SparkContext(conf)

    val rdd: RDD[Int] = sc.parallelize(1.to(10),5)
    println(rdd.partitions.length)  //5
    val result: RDD[Int] = rdd.coalesce(2)
    println(result.partitions.length)  //2
  }
}

4.12 repartition

repartition(numPatitions)更改分割槽,更改分割槽到指定數量,可加可減,但是減少還是使用coalesce,將這個理解為增加。(存在shuffle)

object Demo12 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("repartition").setMaster("local[*]")
    val sc = new SparkContext(conf)

    val rdd: RDD[Int] = sc.parallelize(1.to(10),2)
    println(rdd.partitions.length)  //2
    val result: RDD[Int] = rdd.repartition(5)
    println(result.partitions.length)  //5
  }
}

4.13 sortBy

排序(存在shuffle)

object Demo13 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("sortBy").setMaster("local[*]")
    val sc = new SparkContext(conf)

    val rdd: RDD[Int] = sc.parallelize(Array(4, 2, 3, 1, 5), 1)
    val result1: RDD[Int] = rdd.sortBy(x => x, false)
    result1.foreach(x => print(x + "\t"))  //5	4	3	2	1
    val result2: RDD[Int] = rdd.sortBy(x => x, true)
    result2.foreach(x => print(x + "\t"))  //1	2	3	4	5
  }
}

4.14 RDD與RDD互交

  • 並集:union
  • 差集:subtract
  • 交集:intersection
  • 笛卡爾積:cartesian
  • 拉鍊:zip
object Demo14 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("RDD AND RDD").setMaster("local[*]")
    val sc = new SparkContext(conf)

    val rdd1: RDD[Int] = sc.parallelize(1.to(5))
    val rdd2: RDD[Int] = sc.parallelize(3.to(8))

    //並集
    rdd1.union(rdd2).collect().foreach(x => print(x + "\t"))  //1	2	3	4	5	3	4	5	6	7	8
    //差集
    rdd1.subtract(rdd2).collect().foreach(x => print(x + "\t")) //1	2
    //交集
    rdd1.intersection(rdd2).collect().foreach(x => print(x + "\t")) //3	4	5\
    //笛卡爾積
    /*(1,3)	(1,4)	(1,5)	(1,6)	(1,7)	(1,8)
      (2,3)	(2,4)	(2,5)	(2,6)	(2,7)	(2,8)
      (3,3)	(3,4)	(3,5) (3,6) (3,7)	(3,8)
      (4,3)	(4,4)	(4,5)	(4,6)	(4,7)	(4,8)
      (5,3)	(5,4)	(5,5)	(5,6)	(5,7)	(5,8)*/
    rdd1.cartesian(rdd2).collect().foreach(x => print(x + "\t"))
    //拉鍊:必須保證RDD分割槽元素數量相同
    val rdd3: RDD[Int] = sc.parallelize(1.to(5))
    val rdd4: RDD[Int] = sc.parallelize(2.to(6))
    rdd3.zip(rdd4).collect().foreach(x => print(x + "\t"))  //(1,2)	(2,3)	(3,4)	(4,5)	(5,6)
  }
}

4.15 k-v型別 partitionBy

大多數Spark運算元都可以用在任意型別的RDD上,但是有一些比較特殊的操作只能用在key-value型別的RDD上

使用HashPartitioner分割槽器

object Demo15 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("partitionBy").setMaster("local[*]")
    val sc = new SparkContext(conf)

    val rdd1: RDD[String] = sc.parallelize(Array("hello", "hadooop", "hello", "spark"), 1)
    val rdd2: RDD[(String, Int)] = rdd1.map((_, 1))
    println(rdd2.partitions.length) //1
    println(rdd2.partitioner) //None
    val rdd3: RDD[(String, Int)] = rdd2.partitionBy(new HashPartitioner(2))
    println(rdd3.partitions.length) //2
    println(rdd3.partitioner) //Some(org.apache.spark.HashPartitioner@2)
    val result: RDD[(Int, (String, Int))] = rdd3.mapPartitionsWithIndex((index, it) => {
      it.map(x => (index, (x._1, x._2)))
    })
    result.foreach(println)

    /**
      * (1,(spark,1))
      * (0,(hello,1))
      * (0,(hadooop,1))
      * (0,(hello,1))
      **/
  }
}

自定義分割槽器

object Demo16 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("partitionBy").setMaster("local[*]")
    val sc = new SparkContext(conf)

    val rdd1: RDD[String] = sc.parallelize(Array("hello", "hadooop", "hello", "spark"), 1)
    val rdd2: RDD[(String, Int)] = rdd1.map((_, 1))
    println(rdd2.partitions.length) //1
    println(rdd2.partitioner) //None

    val rdd3: RDD[(String, Int)] = rdd2.partitionBy(new MyPatitioner(2))
    println(rdd3.partitions.length) //2
    println(rdd3.partitioner) //Some(com.test.sparkcore.MyPatitioner@769a58e5)
    val result: RDD[(Int, (String, Int))] = rdd3.mapPartitionsWithIndex((index, it) => {
      it.map(x => (index, (x._1, x._2)))
    })
    result.foreach(println)

    /**
      * (0,(hadooop,1))
      * (1,(hello,1))
      * (0,(spark,1))
      * (1,(hello,1))
      **/
  }
}

class MyPatitioner(num: Int) extends Partitioner {
  override def numPartitions: Int = num

  override def getPartition(key: Any): Int = {
    System.identityHashCode(key) % num.abs
  }
}

4.16 k-v型別 reduceByKey

reduceByKey(V , V)=>V 根據key進行聚合,在shuffle之前會有combine(預聚合)操作

object Demo17 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("reduceByKey").setMaster("local[*]")
    val sc = new SparkContext(conf)

    val rdd1: RDD[String] = sc.parallelize(Array("hello", "hadooop", "hello", "spark"), 1)
    val rdd2: RDD[(String, Int)] = rdd1.map((_, 1))
    val result: RDD[(String, Int)] = rdd2.reduceByKey(_ + _)
    result.foreach(x => print(x + "\t"))  //(spark,1)	(hadooop,1)	(hello,2)
  }
}

4.17 k-v型別 groupByKey

根據key進行分組,直接shuffle

object Demo18 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("groupByKey").setMaster("local[*]")
    val sc = new SparkContext(conf)

    val rdd1: RDD[String] = sc.parallelize(Array("hello", "hadooop", "hello", "spark"), 1)
    val rdd2: RDD[(String, Int)] = rdd1.map((_, 1))
    val result: RDD[(String, Iterable[Int])] = rdd2.groupByKey()
    result.foreach(x => print(x + "\t"))  //(spark,CompactBuffer(1))	(hadooop,CompactBuffer(1))	(hello,CompactBuffer(1, 1))
    result.map(x=>(x._1,x._2.size)).foreach(x => print(x + "\t")) 	//(spark,1)	(hadooop,1)	(hello,2)      
  }
}

4.18 k-v型別 aggrateByKey

aggrateByKey(zero : U)(( U , V )=>U , (U , U)=>U)

基於Key分組然後去聚合的操作,耗費資源太多,這時可以使用reduceByKey或aggrateByKey運算元去提高效能

aggrateByKey分割槽內聚合,後在進行shuffle聚合。

object Demo19 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("aggregateByKey").setMaster("local[*]")
    val sc = new SparkContext(conf)

    val rdd1: RDD[String] = sc.parallelize(Array("hello", "hadooop", "hello", "spark"), 1)
    val rdd2: RDD[(String, Int)] = rdd1.map((_, 1))
    val result: RDD[(String, Int)] = rdd2.aggregateByKey(0)(_ + _, _ + _)
    result.foreach(x => print(x + "\t")) //(spark,1)	(hadooop,1)	(hello,2)
  }
}

4.19 k-v型別 foldByKey

foldByKey(zero : V)((V , V)=>V) 摺疊計算,沒有aggrateByKey靈活,如果分割槽內和分割槽外聚合計算不一樣,則不行

object Demo20 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("foldByKey").setMaster("local[*]")
    val sc = new SparkContext(conf)

    val rdd1: RDD[String] = sc.parallelize(Array("hello", "hadooop", "hello", "spark"), 1)
    val rdd2: RDD[(String, Int)] = rdd1.map((_, 1))
    val result: RDD[(String, Int)] = rdd2.foldByKey(0)(_+_)
    result.foreach(x => print(x + "\t")) //(spark,1)	(hadooop,1)	(hello,2)
  }
}

4.20 k-v型別 combineByKey

combineByKey(V=>U,(U , V)=>U , (U , U)=>U) 根據Key組合計算

object Demo21 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("combineByKey").setMaster("local[*]")
    val sc = new SparkContext(conf)

    val rdd1: RDD[String] = sc.parallelize(Array("hello", "hadooop", "hello", "spark"), 1)
    val rdd2: RDD[(String, Int)] = rdd1.map((_, 1))
    val result: RDD[(String, Int)] = rdd2.combineByKey(v => v, (c: Int, v: Int) => c + v, (c1: Int, c2: Int) => c1 + c2)
    result.foreach(x => print(x + "\t"))  //(spark,1)	(hadooop,1)	(hello,2)
  }
}

4.21 k-v型別 sortByKey

根據Key排序

object Demo22 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("sortByKey").setMaster("local[*]")
    val sc = new SparkContext(conf)

    val rdd1: RDD[String] = sc.parallelize(Array("ahello", "bhadooop", "chello", "dspark"), 1)
    val rdd2: RDD[(String, Int)] = rdd1.map((_, 1))
    rdd2.sortByKey(false).foreach(x => print(x + "\t")) //(dspark,1)	(chello,1)	(bhadooop,1)	(ahello,1)
  }
}

4.22 k-v型別 mapValues

只對value操作的map轉換操作

object Demo23 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("mapValues").setMaster("local[*]")
    val sc = new SparkContext(conf)

    val rdd1: RDD[String] = sc.parallelize(Array("hello", "hadooop", "hello", "spark"), 1)
    val rdd2: RDD[(String, Int)] = rdd1.map((_, 1))
    rdd2.mapValues(x => x + 1).foreach(x => print(x + "\t")) //(hello,2)	(hadooop,2)	(hello,2)	(spark,2)
  }
}

4.23 k-v型別 join

object Demo24 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("join").setMaster("local[*]")
    val sc = new SparkContext(conf)

    val rdd1: RDD[(String, Int)] = sc.parallelize(Array(("a",10),("b",10),("a",20),("d",10)))
    val rdd2: RDD[(String, Int)] = sc.parallelize(Array(("a",30),("b",20),("c",10)))
    //內連線 (a,(10,30))	(b,(10,20))	(a,(20,30))
    rdd1.join(rdd2).foreach(x => print(x + "\t"))

    //左連結(b,(10,Some(20)))	(d,(10,None))	(a,(10,Some(30)))	(a,(20,Some(30)))
    rdd1.leftOuterJoin(rdd2).foreach(x => print(x + "\t"))

    //右連結(c,(None,10))	(a,(Some(10),30))	(b,(Some(10),20))	(a,(Some(20),30))
    rdd1.rightOuterJoin(rdd2).foreach(x => print(x + "\t"))
    
    //全連結(b,(Some(10),Some(20)))	(c,(None,Some(10)))	(d,(Some(10),None))	(a,(Some(10),Some(30)))	(a,(Some(20),Some(30)))
    rdd1.fullOuterJoin(rdd2).foreach(x => print(x + "\t"))
  }
}

4.24 k-v型別 cogroup

根據Key聚合RDD

object Demo25 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("cogroup").setMaster("local[*]")
    val sc = new SparkContext(conf)

    val rdd1: RDD[(String, Int)] = sc.parallelize(Array(("a",10),("b",10),("a",20),("d",10)))
    val rdd2: RDD[(String, Int)] = sc.parallelize(Array(("a",30),("b",20),("c",10)))

    /**
      * (c,(CompactBuffer(),CompactBuffer(10)))
      * (b,(CompactBuffer(10),CompactBuffer(20)))
      * (a,(CompactBuffer(10, 20),CompactBuffer(30)))
      * (d,(CompactBuffer(10),CompactBuffer()))
      */
    rdd1.cogroup(rdd2).foreach(println)
  }
}

4.25 keyo序列化

在分散式應用中,經常會進行IO操作,傳遞物件,而網路傳輸過程中就必須要序列化。

Java序列化可以序列化任何類,比較靈活,但是相當慢,並且序列化後物件的提交也比較大。

Spark出於效能考慮,在2.0以後,開始支援kryo序列化機制,速度是Serializable的10倍以上,當RDD在Shuffle資料的時候,簡單資料型別,簡單資料型別陣列,字串型別已經使用kryo來序列化。

object Demo26 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
      .setAppName("keyo")
      .setMaster("local[*]")
      //替換預設序列化機制
      .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      //註冊需要使用的kryo序列化自定義類
      .registerKryoClasses(Array(classOf[MySearcher]))

    val sc = new SparkContext(conf)

    val rdd1: RDD[String] = sc.parallelize(Array("hadoop yarn", "hadoop hdfs", "c"))
    val rdd2: RDD[String] = MySearcher("hadoop").getMathcRddByQuery(rdd1)
    rdd2.foreach(println)
  }
}

case class MySearcher(val query: String) {
  def getMathcRddByQuery(rdd: RDD[String]): RDD[String] = {
    rdd.filter(x => x.contains(query))
  }
}

4.26 依賴

  • 窄依賴:(不會shuffle)

    • 如果RDD2由RDD1計算得到,則RDD2就是子RDD,RDD1就是父RDD
    • 如果依賴關係在設計的時候就可以確定,而不需要考慮父RDD分割槽中的記錄,並且父RDD中的每個分割槽最多隻有一個子分割槽,這就叫窄依賴
    • 父RDD的每個分割槽中的資料最多被一個子RDD的分割槽使用
  • 寬依賴:(會shuffle)

    • 寬依賴往往對應著shuffle操作,需要在執行過程中將同一個父RDD的分割槽傳入到不同的子RDD分割槽中。
    • 對於寬依賴,重算的父RDD分割槽對應多個子RDD分割槽,這樣實際上父RDD 中只有一部分的資料是被用於恢復這個丟失的子RDD分割槽的,另一部分對應子RDD的其它未丟失分割槽,這就造成了多餘的計算;
    • 寬依賴中子RDD分割槽通常來自多個父RDD分割槽,極端情況下,所有的父RDD分割槽都要進行重新計算。

4.27 持久化

object Demo27 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("cache").setMaster("local[*]")
    val sc = new SparkContext(conf)

    val rdd1: RDD[String] = sc.parallelize(Array("a", "b", "c"))
    val rdd2: RDD[String] = rdd1.flatMap(x => {
      println("執行flatMap操作")
      x.split("")
    })
    val rdd3: RDD[(String, Int)] = rdd2.map((_, 1))

    /** 持久化到記憶體 */
    //rdd3.cache() //持久化到記憶體
    /**
      * 持久化到磁碟
      * DISK_ONLY:持久化到磁碟
      * DISK_ONLY_2:持久化到磁碟並且存一個副本(2個檔案)
      * MEMORY_ONLY:持久化到記憶體
      * MEMORY_ONLY_2:持久化到記憶體並且存一個副本(2個檔案)
      * MEMORY_ONLY_SER:持久化到記憶體,並且序列化
      * MEMORY_ONLY_SER_2:持久化到記憶體,並且序列化,還要存一個副本(2個檔案)
      * MEMORY_AND_DISK:持久化到記憶體和磁碟
      * MEMORY_AND_DISK_2:持久化到記憶體和磁碟並且存一個副本(2個檔案)
      * MEMORY_AND_DISK_SER:持久化到記憶體和磁碟,並且序列化
      * MEMORY_AND_DISK_SER_2:持久化到記憶體和磁碟,並且序列化,還要存一個副本(2個檔案)
      * OFF_HEAP:持久化在堆外記憶體中,Spark自己管理的記憶體
      * */
    rdd3.persist(StorageLevel.DISK_ONLY) //持久化到磁碟

    rdd3.collect.foreach(x => print(x + "\t"))
    println("------------")
    //輸出語句不會執行
    rdd3.collect.foreach(x => print(x + "\t"))
  }
}

4.28 checkpoint

持久化只是將資料儲存在BlockManager中,而RDD的Lineage是不變的,但是checkpoint執行完後,RDD已經沒有之前所謂的依賴了,而只是一個強行為其設定的checkpointRDD,RDD的Lineage改變了。

持久化的資料丟失可能性更大,磁碟、記憶體都有可能會存在資料丟失情況。但是checkpoint的資料通常是儲存在如HDFS等容錯、高可用的檔案系統,資料丟失可能性較小。

預設情況下,如果某個RDD沒有持久化,但是設定了checkpoint Job想要將RDD的資料寫入檔案系統,需要全部重新計算一次,再將計算出來的RDD資料checkpoint到檔案系統,所以,建議對checkpoint的RDD使用十九畫,這樣RDD只需要計算一次就可以了。

object Demo28 {
  def main(args: Array[String]): Unit = {
    //設定當前使用者
    System.setProperty("HADOOP_USER_NAME", "Heaton")
    val conf = new SparkConf().setAppName("checkpoint").setMaster("local[*]")
    val sc = new SparkContext(conf)

    //設定checkpoint目錄
    sc.setCheckpointDir("hdfs://cdh01.cm:8020/test")
    val rdd1: RDD[String] = sc.parallelize(Array("abc"))
    val rdd2: RDD[(String, Int)] = rdd1.map((_, 1))

    /**
      * 標記RDD2的checkpoint
      * RDD2會被儲存到檔案中,並且會切斷到父RDD的引用,該持久化操作,必須在job執行之前呼叫
      * 如果不進行持久化操作,那麼在儲存到檔案的時候需要重新計算
      **/
    rdd2.cache()
    rdd2.collect.foreach(x => print(x + "\t"))
    rdd2.collect.foreach(x => print(x + "\t"))
  }
}

4.29 累加器

4.29.1 累加器問題丟擲

object Demo29 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("Accumulator").setMaster("local[*]")
    val sc = new SparkContext(conf)
    val rdd1: RDD[Int] = sc.parallelize(Array(1, 2, 3, 4, 5), 2)
    var a = 1
    rdd1.foreach(x => {
      a += 1
      println("rdd:  "+a)
    })
    println("-----")
    println("main:  "+a)

    /**
      * rdd:  2
      * rdd:  2
      * rdd:  3
      * rdd:  3
      * rdd:  4
      * -----
      * main:  1
      * */
  }
}

從上面可以看出,2個問題:

  1. 變數是在RDD分割槽中進行累加,並且2個RDD分割槽中的變數不同
  2. 最後並沒有main方法中的變數值改變

考慮到main方法中的a變數是在Driver端,而RDD分割槽又是在Excutor端進行計算,所以只是拿了一個Driver端的映象,而且不同步回Driver端

在實際開發中,我們需要進行這種累加,這時就用到了累加器

4.29.2 累加器案例

Spark提供了一些常用累加器,主要針對值型別

object Demo30 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("Accumulator").setMaster("local[*]")
    val sc = new SparkContext(conf)
    val rdd1: RDD[Int] = sc.parallelize(Array(1, 2, 3, 4, 5), 2)
    val acc: util.LongAccumulator = sc.longAccumulator("acc")
    rdd1.foreach(x => {
      acc.add(1)
      println("rdd:  "+acc.value)
    })
    println("-----")
    println("main:  "+acc.count)

    /**
      * rdd:  1
      * rdd:  1
      * rdd:  2
      * rdd:  2
      * rdd:  3
      * -----
      * main:  5
      * */
  }
}

如上程式碼,我們發現累加器是分割槽內先累加,再分割槽間累加

4.29.3 自定義累加器

  • 案例一:自定義Int累加器
object Demo31 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("Accumulator").setMaster("local[*]")
    val sc = new SparkContext(conf)
    val rdd1: RDD[Int] = sc.parallelize(Array(1, 2, 3, 4, 5), 2)
    val acc = new MyAccumulator
    //註冊累加器
    sc.register(acc)

    rdd1.foreach(x => {
      acc.add(1)
      println("rdd:  " + acc.value)
    })
    println("-----")
    println("main:  " + acc.value)

    /**
      * rdd:  1
      * rdd:  1
      * rdd:  2
      * rdd:  3
      * rdd:  2
      * -----
      * main:  5
      **/
  }
}

class MyAccumulator extends AccumulatorV2[Int, Int] {
  var sum: Int = 0

  //判斷累加的值是不是空
  override def isZero: Boolean = sum == 0

  //如何把累加器copy到Executor
  override def copy(): AccumulatorV2[Int, Int] = {
    val accumulator = new MyAccumulator
    accumulator.sum = sum
    accumulator
  }

  //重置值
  override def reset(): Unit = {
    sum = 0
  }

  //分割槽內的累加
  override def add(v: Int): Unit = {
    sum += v
  }

  //分割槽間的累加,累加器最終的值
  override def merge(other: AccumulatorV2[Int, Int]): Unit = {
    other match {
      case o: MyAccumulator => this.sum += o.sum
      case _ =>
    }
  }

  override def value: Int = this.sum
}
  • 案例二:自定義map平均值累加器
object Demo32 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("partitionBy").setMaster("local[*]")
    val sc = new SparkContext(conf)
    val rdd1: RDD[Int] = sc.parallelize(Array(1, 2, 3, 4, 5), 2)
    val acc = new MyAccumulator
    //註冊累加器
    sc.register(acc)

    rdd1.foreach(x => {
      acc.add(x)
    })
    println("main:  " + acc.value)

    /**main:  Map(sum -> 15.0, count -> 17.0, avg -> 0.8823529411764706) */
  }
}

class MyAccumulator extends AccumulatorV2[Int, Map[String, Double]] {
  var map: Map[String, Double] = Map[String, Double]()

  //判斷累加的值是不是空
  override def isZero: Boolean = map.isEmpty

  //如何把累加器copy到Executor
  override def copy(): AccumulatorV2[Int, Map[String, Double]] = {
    val accumulator = new MyAccumulator
    accumulator.map ++= map
    accumulator
  }

  //重置值
  override def reset(): Unit = {
    map = Map[String, Double]()
  }

  //分割槽內的累加
  override def add(v: Int): Unit = {
    map += "sum" -> (map.getOrElse("sum", 0d) + v)
    map += "count" -> (map.getOrElse("sum", 0d) + 1)
  }

  //分割槽間的累加,累加器最終的值
  override def merge(other: AccumulatorV2[Int, Map[String, Double]]): Unit = {
    other match {
      case o: MyAccumulator =>
        this.map += "sum" -> (map.getOrElse("sum", 0d) + o.map.getOrElse("sum", 0d))
        this.map += "count" -> (map.getOrElse("count", 0d) + o.map.getOrElse("count", 0d))
      case _ =>
    }
  }

  override def value: Map[String, Double] = {
    map += "avg" -> map.getOrElse("sum", 0d) / map.getOrElse("count", 1d)
    map
  }
}

4.30 廣播變數

廣播變數在每個節點上儲存一個只讀的變數的快取,而不用給每個task來傳送一個copy

object Demo33 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("partitionBy").setMaster("local[*]")
    val sc = new SparkContext(conf)

    val rdd: RDD[String] = sc.parallelize(Array("a", "b"))
    val broadArr: Broadcast[Array[Int]] = sc.broadcast(Array(1, 2))
    rdd.foreach(x => {
      val value: Array[Int] = broadArr.value
      println(value.toList)
    })
    /**
      * List(1, 2)
      * List(1, 2)
      * */
  }
}

5 Spark SQL

Spark SQL是Spark用於結構化資料處理的Spark模組。如:Mysql,Hbase,Hive

Spark SQL將SQL轉換成RDD,然後提交到叢集執行,執行效率非常快,而且使只會寫SQL的同學可以直接開發

Spark SQL提供了2個程式設計抽象,等同於Spark Core中的RDD,分別是:DataFrame,DataSet

5.1 DataFrame

與RDD類似,DataFrame是一個分散式的資料容器

DataFrame更像是傳統資料庫的二維表格,除了資料以外,還記錄了資料的結構資訊(Schema)

與Hive類似,DataFrame也支援巢狀資料型別(Struct、Array、Map)

  • 底層架構

  • Predicate Pushdown 機制

5.2 DataSet

DataSet是DataFrame的一個擴充套件,是SparkSQL1.6後新增的資料抽象,API友好

scala樣例類支援非常好,用樣例類在DataSet中定義資料結構資訊,樣例類中每個屬性的沒成直接對映到DataSet中的欄位名稱。

DataFrame是DataSet的特例,DataFrame=DataSet[Row],可以通過as方法將DataFrame轉換成DataSet,Row是一個型別,可以是Person、Animal,所有的表結構資訊都用Row來表示

DataFrame只知道欄位,不知道欄位型別,而DataSet不僅知道欄位,還知道型別。

DataSet具有強型別的資料集合,需要提供對應