1. 程式人生 > >Spark:基本工作原理與RDD

Spark:基本工作原理與RDD

Spark的基本工作原理

我們從巨集觀講解Spark的基本工作原理,幫助你全面瞭解佈局
在這裡插入圖片描述

1、客戶端:
客戶端也就是專業們常說的Client端,這裡的是表示我們在本地編寫Spark程式,然後必須找一個能夠連線Spark叢集,並提交程式進行執行的機器

2、讀取資料:
在準備執行Spark程式的同時,是不是也要有資料來源進行處理的呢,這裡我們介紹幾種常見的讀取資料來源,是Hadoop叢集中的HDFS、Hive也有可能是搭建在叢集上的HBase;還有MySQL等DB資料庫;或者是在程式中我們設定的集合資料。

3、Spark分散式叢集
Spark叢集是一種分散式計算、是一種迭代式計算、是一種基於記憶體計算。

分散式計算,這是Spark最基本的特徵,計算時候資料會分佈存放到各個叢集節點,來並行分散式計算。如圖的第一個操作map,是對於節點1、2、3上面的資料進行map運算元操作,處理後的資料可能會轉移到其他節點的記憶體中,這裡假設到了4、5、6節點,處理後的資料有可能多或是變少,這個需要看我們具體的處理方式。第二個操作reduce,是將map處理後的資料再次進行處理。

這也就得到Spark是一種迭代式計算模型,一次計算邏輯中可以分為N個階段,上一個階段結果資料成為了下一個階段的輸入資料,這樣就不只是想mapreduce計算一樣了,只有兩個階段map和reduce,就結束一個job的執行,必須得落地到HDFS。而Spark在各個階段計算轉換中一直保持基於記憶體迭代式計算,所以Spark相對於MapReduce來說計算模型可以提供更加強大的計算邏輯功能。

4、結果資料輸出:
基於Hadoop的HDFS、Hive或是HBase;MySQL等DB資料;或是直接輸出返回給客戶端。

RDD特徵概要總結

RDD(Resilient Distributed Dataset)叫做彈性分散式資料集,是Spark中最基本的資料抽象,它代表一個不可變、可分割槽、裡面的元素可平行計算的集合。

  1. RDD在抽象上來說是一種元素集合,包含了資料。它是被分割槽的,分為多個分割槽,每個分割槽分佈在叢集中的不同節點上,從而讓RDD中的資料可以被並行操作。(分散式資料集)
  2. RDD通常通過Hadoop上的檔案,即HDFS檔案或者Hive表,來進行建立;有時也可以通過應用程式中的集合來建立。
  3. RDD最重要的特性就是,提供了容錯性,可以自動從節點失敗中恢復過來。即如果某個節點上的RDD partition,因為節點故障,導致資料丟了,那麼RDD會自動通過自己的資料來源重新計算該partition。這一切對使用者是透明的。
  4. RDD的資料預設情況下存放在記憶體中的,但是在記憶體資源不足時,Spark會自動將RDD資料寫入磁碟。(彈性)

在這裡插入圖片描述

Spark的核心程式設計

  1. 定義初始的RDD,就是說,你要定義第一個RDD是從哪裡,讀取資料,hdfs、linux本地檔案、程式中的集合。
  2. 定義對RDD的計算操作,這個在spark裡稱之為運算元,map、reduce、flatMap、groupByKey,比mapreduce提供的map和reduce強大的太多太多了。
  3. 其實就是迴圈往復的過程,第一個計算完了以後,資料可能就會到了新的一批節點上,也就是變成一個新的RDD。然後再次反覆,針對新的RDD定義計算操作
  4. 最後,就是獲得最終的資料,將資料儲存起來。

使用Java開發wordcount程式

  1. 建立SparkConf物件,設定Spark應用的配置資訊
    使用setMaster()可以設定Spark應用程式要連線的Spark叢集的master節點的url
SparkConf conf = new SparkConf()
				.setAppName("WordCountLocal")
				.setMaster("local"); //local代表,在本地執行
  1. 建立JavaSparkContext物件
    在Spark中,SparkContext是Spark所有功能的一個入口,你無論是用java、scala,甚至是python編寫都必須要有一個SparkContext,它的主要作用,包括初始化Spark應用程式所需的一些核心元件,包括排程器(DAGSchedule、TaskScheduler),還會去到Spark Master節點上進行註冊,等等
JavaSparkContext sc = new JavaSparkContext(conf);
  1. 要針對輸入源(hdfs檔案、本地檔案,等等),建立一個初始的RDD
    輸入源中的資料會打散,分配到RDD的每個partition中,從而形成一個初始的分散式的資料集
JavaRDD<String> lines = sc.textFile("hdfs://spark1:9000/spark.txt");
  1. 對初始RDD進行transformation操作,也就是一些計算操作
    通常操作會通過建立function,並配合RDD的map、flatMap等運算元來執行
JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {

			private static final long serialVersionUID = 5859923875545688649L;

			@Override
			public Iterable<String> call(String line) throws Exception {
				return Arrays.asList(line.split(" "));
			}
		});
  1. 接著,需要將每一個單詞,對映為(單詞, 1)的這種格式
    mapToPair,其實就是將每個元素,對映為一個(v1,v2)這樣的Tuple2型別的元素mapToPair,其實就是將每個元素,對映為一個(v1,v2)這樣的Tuple2型別的元素
JavaPairRDD<String, Integer> pairs = words.mapToPair(new PairFunction<String, String, Integer>() {

			private static final long serialVersionUID = 5925817248239252608L;

			@Override
			public Tuple2<String, Integer> call(String t) throws Exception {
				return new Tuple2<String, Integer>(t, 1);
			}
		});
  1. 需要以單詞作為key,統計每個單詞出現的次數
    使用reduceByKey這個運算元,對每個key對應的value,都進行reduce操作,reduce操作,相當於是把第一個值和第二個值進行計算,然後再將結果與第三個值進行計算。
JavaPairRDD<String, Integer> wordCounts = pairs.reduceByKey(new Function2<Integer, Integer, Integer>() {

			private static final long serialVersionUID = 8794397384148863606L;

			@Override
			public Integer call(Integer v1, Integer v2) throws Exception {
				return v1 + v2;
			}
		});
  1. 到這裡為止,我們通過幾個Spark運算元操作,已經統計出了單詞的次數
    之前我們使用的flatMap、mapToPair、reduceByKey這種操作,都叫做transformation操作,一個Spark應用中,光是有transformation操作,是不行的,是不會執行的,必須要有一種叫做action。
wordCounts.foreach(new VoidFunction<Tuple2<String, Integer>>() {

			private static final long serialVersionUID = 2448389534049516517L;

			@Override
			public void call(Tuple2<String, Integer> wordCount) throws Exception {
				System.out.println(wordCount._1 + " appeared " + wordCount._2 + " times.");
			}
		});

8.釋放資源

sc.close();

9.將檔案上傳到hdfs上去

hadoop fs -put spark.txt /spark.txt

10.使用spark-submit提交到spark叢集進行執行

  • 將SparkConf的setMaster()方法給刪掉,預設它自己會去連線
  • pom.xml裡配置的maven外掛,對工程進行打包
    在這裡插入圖片描述
  • 編寫spark-submit指令碼
  /usr/local/spark/bin/spark-submit \
  --class cn.spark.sparktest.core.WordCountCluster \
  --num-executors 3 \
  --driver-memory 100m \
  --executor-memory 100m \
  --executor-cores 3 \
  /usr/local/spark-study-java-0.0.1-SNAPSHOT-jar-with-dependencies \
  • 執行spark-submit指令碼,提交spark應用到叢集執行

使用Scala開發wordcount程式

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext

object WordCount {

  def main(args: Array[String]): Unit = {
   
    val conf = new SparkConf().setAppName("WordCount")

    val sc = new SparkContext(conf)

    val lines = sc.textFile("hdfs://spark1:9000/spark.txt", 1)

    val words = lines.flatMap(line => line.split(" "))

    val pairs = words.map(word => (word, 1))

    val wordCounts = pairs.reduceByKey(_ + _)

    wordCounts.foreach(wordCount => println(wordCount._1 + wordCount._2))

  }

}

wordcount程式原理深度剖析

在這裡插入圖片描述

Spark架構原理

  • Client:客戶端程序,負責提交作業到Master。
  • Master:主控節點,負責接收Client提交的作業,管理Worker,並命令Worker啟動Driver和Executor
  • Worker:程序,slave節點上的守護程序,負責管理本節點的資源,定期向Master彙報心跳,接收Master的命令,啟動Driver和Executor。用自己記憶體,儲存RDD的某個或某些partition
  • Driver:程序,一個Spark作業執行時包括一個Driver程序,也是作業的主程序,負責作業的解析、生成Stage並排程Task到Executor上。包括DAGScheduler,TaskScheduler。
  • Executor:程序,即真正執行作業的地方,一個叢集一般包含多個Executor,每個Executor接收Driver的命令Launch Task,一個Executor可以執行一到多個Task。
  • Task:執行緒,對RDD的partition資料執行指定的運算元操作,形成新的RDD的partition。

在這裡插入圖片描述

  1. 啟動Spark叢集,其實就是通過執行spark-all.sh指令碼來啟動master節點和worker節點,啟動了一個個對應的master程序和worker程序
  2. worker啟動之後,向master程序傳送註冊資訊(該過程基於AKKA Actor事件驅動模型);
  3. worker向master註冊成功之後,會不斷向master傳送心跳包,監聽master節點是否存活(該過程基於AKKA Actor事件驅動模型)
  4. driver向Spark叢集提交作業,通過spark-submit.sh指令碼,向master節點申請資源(該過程基於AKKA Actor事件驅動模型)
  5. master收到Driver提交的作業請求之後,向worker節點指派任務,其實就是讓其啟動對應的executor程序
  6. worker節點收到master節點發來的啟動executor程序任務,就啟動對應的executor程序,同時向master彙報啟動成功,處於可以接收任務的狀態
  7. 當Executor程序啟動成功後,就像Driver程序反向註冊,以此來告訴driver,誰可以接收任務,執行spark作業(該過程基於AKKA Actor事件驅動模型)
  8. driver接收到註冊之後,就知道了向誰傳送spark作業,這樣在spark叢集中就有一組獨立的Executor程序為該driver服務
  9. SparkContext重要元件執行——DAGScheduler和TaskScheduler,DAGScheduler根據寬依賴將作業劃分為若干stage,併為每一個階段組裝一批task組成taskset(task裡面就包含了序列化之後的我們編寫的spark transformation);然後將taskset交給TaskScheduler,由其將任務分發給對應的Executor
  10. Executor程序接收到driver傳送過來的taskset,進行反序列化,然後將這些task封裝進一個叫taskrunner的執行緒中,放到本地執行緒池中,排程我們的作業的執行
  11. Executor接收到task之後,會啟動多個執行緒來執行task
  12. task就會對RDD的partiton資料執行指定的運算元操作,形成新的RDD的partition