1. 程式人生 > >40-天亮大資料系列教程之Spark常用運算元分析與應用

40-天亮大資料系列教程之Spark常用運算元分析與應用

目錄
1、運算元概述
2、Spark運算元介紹與應用
3、經典運算元練習
詳情
1、運算元概述

  • 什麼是運算元

    • 英文翻譯為:Operator(簡稱op)

    • 狹義:指從一個函式空間到另一個函式空間(或它自身)的對映。

    • 廣義:指從一個空間到另一個空間的對映

    • 通俗理解:指事物(資料或函式)從一個狀態到另外一個狀態的過程抽象。
      實質就是對映,就是關係,就是變換。 * 運算元的重要作用

    • 運算元越少,靈活性越低,則實現相同功能的程式設計複雜度越高,運算元越多則反之。

      • 老手機與智慧手機

        • 老電腦與新電腦之HDMI (跟VGA是對等的)
      • 運算元越少,表現力越差,面對複雜場景則易用性較差。運算元越多的則反之。

        • 黑白彩電與彩色電視
        • 彩色電視和智慧電視
    • MapReduce 與 Spark運算元比較

      • MapReduce只有2個運算元,Map和Reduce,絕大多數應用場景下,均需要複雜編碼才能達到使用者需求。
      • Spark有80多個運算元,進行充分的組合應用後,能滿足絕大多數的應用場景。

2、Spark運算元介紹與應用

  • 2.1 運算元分類
    • 1)轉換運算元(Transformation):此種運算元不觸發提交作業,只有作業被提交後才會真正啟動轉換計算。

    • Value型轉換運算元 : 其處理的資料項是Value型

    • 輸入分割槽與輸出分割槽一對一型

      • map運算元
      • flatMap運算元
      • mapPartitions運算元
      • glom運算元
  • 輸入分割槽與輸出分割槽多對一型
    • union運算元

    • cartesian運算元

    • 輸入分割槽與輸出分割槽多對多型
      * grouBy運算元
      * 輸出分割槽為輸入分割槽子集型

        	* filter運算元
        	* distinct運算元
        	* subtract運算元
        	* sample運算元
        	* takeSample運算元
        * Cache型
      
        	* cache運算元
        	* persist運算元
      
      • Key-Value型轉換運算元:其處理的資料是Key-Value型

        • 輸入分割槽與輸出分割槽一對一

          • mapValues運算元
        • 對單個RDD聚集

          • combineByKey運算元
          • reduceByKey運算元
          • partitionBy運算元
        • 對兩個RDD聚集

          • cogroup運算元
        • 連線

          • join運算元
          • leftOutJoin運算元
          • rightOutJoin運算元
    • 2) 行動運算元(Action):此種運算元會觸發SparkContext提交作業。

      • 無輸出(是指不輸出hdfs、本地檔案當中)

        • foreach運算元
      • HDFS

        • saveAsTextFile運算元
        • saveAsObjectFile運算元
      • Scala集合和資料型別

        • collect運算元
        • collectAsMap運算元
        • reduceByKeyLocally運算元
        • lookup運算元
        • count運算元
        • top運算元
        • reduce運算元
        • fold運算元
        • aggregate運算元
    • 2.2 常用運算元分析與應用

      • 2.2.1 Value型轉換運算元
          1. map

            • 類比於mapreduce中的map操作,給定一個輸入通過map函式映到成一個新的元素輸出
    • case_1
      val first = sc.parallelize(List(“Hello”,“World”,“天亮教育”,“大資料”),2)val second= first.map(_.length)second.collect

    • case_2
      val first = sc.parallelize(1 to 5,2)first.map(1 to _).collect

      1. flatMap
      • 給定一個輸入,將返回的所有結果打平成一個一維集合結構

        • case_1

val first = sc.parallelize(1 to 5,2)first.flatMap(1 to _).collect
* case_2

val first = sc.parallelize(List(“one”,“two”,“three”),2)first.flatMap(x => List(x,x,x)).collect
* case_3

val first = sc.parallelize(List(“one”,“two”,“three”),2)first.flatMap(x => List(x+"_1",x+"_2",x+"_3")).collect
* 3) mapPartitions

	* 以分割槽為單位進行計算處理,而map是以每個元素為單位進行計算處理。
	* 當在map過程中需要頻繁建立額外物件時,如檔案輸出流操作、jdbc操作、Socket操作等時,當用mapPartitions運算元

		* case_1

val rdd=sc.parallelize(Seq(1,2,3,4,5),3)var rdd2=rdd.mapPartitions(partition=>{ partition.map(num => num * num) })rdd2.max
* case_2

val rdd=sc.parallelize(Seq(1,2,3,4,5),3)var rdd2=rdd.mapPartitions(partition=>{ partition.flatMap(1 to _) })rdd2.count
* 4) glom

	* 以分割槽為單位,將每個分割槽的值形成一個數組

val a = sc.parallelize(Seq(“one”,“two”,“three”,“four”,“five”,“six”,“seven”),3)a.glom.collect
* 5) union運算元

	* 將兩個RDD合併成一個RDD,並不去重
	* 會發生多分割槽合併成一個分割槽的情況

val a = sc.parallelize(1 to 4, 2)val b = sc.parallelize(3 to 6, 2)a.union(b).collect(a ++ b).collect(a union b).collect
* 6) groupBy運算元

	* 輸入分割槽與輸出分割槽多對多型

val a = sc.parallelize(Seq(1,3,4,5,9,100,200), 3)a.groupBy(x => { if (x > 10) “>10” else “<=10” }).collect
* 7) filter運算元

	* 輸出分割槽為輸入分割槽子集型

val a = sc.parallelize(1 to 21, 3)val b = a.filter(_ % 4 == 0)b.collect
* 8 ) distinct運算元

	* 輸出分割槽為輸入分割槽子集型

		* case_1

val a = sc.parallelize(1 to 4, 2)val b = sc.parallelize(3 to 6, 2)a.union(b).distinct().collect
* case_2

val c = sc.parallelize(List(“張三”, “李四”, “李四”, “王五”), 2)c.distinct.collect
* 9) cache運算元

	* cache 將 RDD 元素從磁碟快取到記憶體。 相當於 persist(MEMORY_ONLY) 函式的功能。
	* 主要應用在當RDD資料反覆被使用的場景下

		* case_1

val a = sc.parallelize(1 to 4, 2)val b = sc.parallelize(3 to 6, 2)a.union(b).counta.union(b).distinct().collect
* case_2

val a = sc.parallelize(1 to 4, 2)val b = sc.parallelize(3 to 6, 2)val c=a.union(b).cachec.countc.distinct().collect
* 2.2.2 Key-Value型轉換運算元:其處理的資料是Key-Value型

	* mapValues運算元

		* 輸入分割槽與輸出分割槽一對一
		* 針對(Key, Value)型資料中的 Value 進行 Map 操作,而不對 Key 進行處理。

val first = sc.parallelize(List((“張一”,1),(“張二”,2),(“張三”,3),(“張四”,4)),2)val second= first.mapValues(x=>x+1)second.collect
* combineByKey運算元

	* 定義

def combineByKey[C](createCombiner: (V) => C,mergeValue: (C, V) => C,mergeCombiners: (C, C) => C): RDD[(String, C)]
* createCombiner: 對每個分割槽內的同組內元素如何聚合,形成一個累加器
* mergeValue: 將前邊的累加器與新遇到的值進行合併的方法
* mergeCombiners: 每個分割槽都是獨立處理,故同一個鍵可以有多個累加器。如果有兩個或者更

  多的分割槽都有對應同一個鍵的累加器,用方法將各個分割槽的結果進行合併。
* case_1

val first = sc.parallelize(List((“張一”,1),(“李一”,1),(“張一”,2),(“張一”,3),(“李一”,3),(“李三”,3),(“張四”,4)),2)val second= first.combineByKey(List(_), (x:List[Int], y:Int) => y :: x, (x:List[Int], y:List[Int]) => x ::: y)second.collect
* reduceByKey運算元

	* 按key聚合後對組進行歸約處理,如求和、連線等操作

val first = sc.parallelize(List(“小米”, “華為”, “小米”, “小米”, “華為”, “蘋果”), 2)val second = first.map(x => (x, 1))second.reduceByKey(_ + _).collect
* join運算元

	* 對K-V結構的RDD進行按K的join操作,最後將V部分做flat打平操作。

val first = sc.parallelize(List((“張一”,11),(“李二”,12)),2)val second = sc.parallelize(List((“張一”,21),(“李二”,22),(“王五”,23)),2)first.join(second).collect
* 2.2.3 行動運算元(Action):此種運算元會觸發SparkContext提交作業。觸發了RDD DAG 的執行。
* 無輸出型:不落地到檔案或是hdfs的作用

	* foreach運算元

val first = sc.parallelize(List(“小米”, “華為”, “小米”, “小米”, “華為”, “蘋果”), 2)first.foreach(println _)
* HDFS輸出型

	* saveAsTextFile運算元

val first = sc.parallelize(List(“小米”, “華為”, “小米”, “小米”, “華為”, “蘋果”), 2)//指定本地儲存的目錄first.saveAsTextFile(“file:///home/spark/text”) //指定hdfs儲存的目錄,預設亦儲存在hdfs中first.saveAsTextFile(“spark_shell_output_1”)
* Scala集合和資料型別

	* collect運算元

		* 相當於toArray操作,將分散式RDD返回成為一個scala array陣列結果,實際是Driver所在的機器節點,再針對該結果操作    

val first = sc.parallelize(List(“小米”, “華為”, “小米”, “小米”, “華為”, “蘋果”), 2)first.collect
* collectAsMap運算元

	* 相當於toMap操作,將分散式RDD的kv對形式返回成為一個的scala map集合,實際是Driver所在的機器節點,再針對該結果操作  

val first = sc.parallelize(List((“張一”,1),(“李一”,1),(“張一”,2),(“張一”,3),(“李一”,3),(“李三”,3),(“張四”,4)),2)first.collectAsMap
* lookup運算元

	* 對(Key,Value)型的RDD操作,返回指定Key對應的元素形成的Seq。

val first = sc.parallelize(List(“小米”, “華為”, “華米”, “大米”, “蘋果”,“米老鼠”), 2)val second=first.map(x=>({if(x.contains(“米”)) “有米” else “無米”},x))second.lookup(“有米”)
* reduce運算元

	* 先對兩個元素進行reduce函式操作,然後將結果和迭代器取出的下一個元素進行reduce函式操作,直到迭代器遍歷完所有元素,得到最後結果。

//求value型列表的和val a = sc.parallelize(1 to 10, 2)a.reduce(_ + _)//求key-value型列表的value的和val a = sc.parallelize(List((“one”,1),(“two”,2),(“three”,3),(“four”,4)), 2)a.reduce((x,y)=>(“sum”,x._2 + y._2))._2
* fold運算元

	* fold運算元簽名:  def fold(zeroValue: T)(op: (T, T) => T): T
	* 其實就是先對rdd分割槽的每一個分割槽進行op函式,在呼叫op函式過程中將zeroValue參與計算,最後在對所有分割槽的結果呼叫op函式,同理此處zeroValue再次參與計算。

//和是41,公式=(1+2+3+4+5+6+10)+10sc.parallelize(List(1, 2, 3, 4, 5, 6), 1).fold(10)(+)//和是51,公式=(1+2+3+10)+(4+5+6+10)+10=51sc.parallelize(List(1, 2, 3, 4, 5, 6), 2).fold(10)(+)//和是61,公式=(1+2+10)+(3+4+10)+(5+6+10)+10=61sc.parallelize(List(1, 2, 3, 4, 5, 6), 3).fold(10)(+)3、經典運算元練習
* cartesian運算元
* subtract運算元
* sample運算元
* takeSample運算元
* persist運算元
* cogroup運算元
* leftOutJoin運算元
* rightOutJoin運算元
* saveAsObjectFile運算元
* count運算元
* top運算元
* aggregate運算元

天亮教育是一家從事大資料雲端計算、人工智慧、教育培訓、產品開發、諮詢服務、人才優選為一體的綜合型網際網路科技公司。
公司由一批BAT等一線網際網路IT精英人士建立,
以"快樂工作,認真生活,打造高階職業技能教育的一面旗幟"為願景,胸懷"讓天下沒有難找的工作"使命,
堅持"客戶第一、誠信、激情、擁抱變化"的價值觀,
全心全意為學員賦能提效,踐行技術改變命運的初心。

歡迎關注天亮教育公眾號,大資料技術資料與課程、招生就業動態、教育資訊動態、創業歷程分享一站式分享,官方微信公眾號二維碼:
這裡寫圖片描述