1. 程式人生 > >Spark常用運算元詳解

Spark常用運算元詳解



Spark的運算元的分類

   從大方向來說,Spark 運算元大致可以分為以下兩類:

     1)Transformation 變換/轉換運算元:這種變換並不觸發提交作業,完成作業中間過程處理

     Transformation 操作是延遲計算的,也就是說從一個RDD 轉換生成另一個 RDD 的轉換操作不是馬上執行,需要等到有 Action 操作的時候才會真正觸發運算。

     2)Action 行動運算元:這類運算元會觸發 SparkContext 提交 Job 作業

      Action 運算元會觸發 Spark 提交作業(Job),並將資料輸出 Spark系統。

從小方向來說,Spark 運算元大致可以分為以下三類:

  1)Value資料型別的Transformation運算元,這種變換並不觸發提交作業,針對處理的資料項是Value型的資料。
  2)Key-Value資料型別的Transfromation運算元,這種變換並不觸發提交作業,針對處理的資料項是Key-Value型的資料對。

  3)Action運算元,這類運算元會觸發SparkContext提交Job作業。

1)Value資料型別的Transformation運算元  

  一、輸入分割槽與輸出分割槽一對一型

    1、map運算元

    2、flatMap運算元

    3、mapPartitions運算元

    4、glom運算元

  二、輸入分割槽與輸出分割槽多對一型 

    5、union運算元

    6、cartesian運算元

  三、輸入分割槽與輸出分割槽多對多型

    7、grouBy運算元

  四、輸出分割槽為輸入分割槽子集型

    8、filter運算元

    9、distinct運算元

    10、subtract運算元

    11、sample運算元

        12、takeSample運算元

   五、Cache型

    13、cache運算元  

    14、persist運算元

2)Key-Value資料型別的Transfromation運算元

  一、輸入分割槽與輸出分割槽一對一

    15、mapValues運算元

  二、對單個RDD或兩個RDD聚集

   單個RDD聚集

    16、combineByKey運算元

    17、reduceByKey運算元

    18、partitionBy運算元

   兩個RDD聚集

    19、Cogroup運算元

三、連線

    20、join運算元

    21、leftOutJoin和 rightOutJoin運算元

 3)Action運算元

  一、無輸出

    22、foreach運算元

二、HDFS

    23、saveAsTextFile運算元

    24、saveAsObjectFile運算元

  三、Scala集合和資料型別

    25、collect運算元

    26、collectAsMap運算元

      27、reduceByKeyLocally運算元

      28、lookup運算元

    29、count運算元

    30、top運算元

    31、reduce運算元

    32、fold運算元

    33、aggregate運算元

    1. Transformations 運算元
 (1)map

  將原來 RDD 的每個資料項通過map 中的使用者自定義函式 f對映轉變為一個新的元素。原始碼中 map 運算元相當於初始化一個 RDD, 新 RDD 叫做 MappedRDD(this, sc.clean(f))。

     圖 1中每個方框表示一個 RDD 分割槽,左側的分割槽經過使用者自定義函式 f:T->U對映為右側的新 RDD 分割槽。但是,實際只有等到 Action運算元觸發後,這個 f 函式才會和其他函式在一個stage 中對資料進行運算。在圖 1 中的第一個分割槽,資料記錄 V1 輸入 f,通過 f 轉換輸出為轉換後的分割槽中的資料記錄 V’1。
                            

      圖1    map 運算元對 RDD 轉換                   

    (2)flatMap
     將原來 RDD 中的每個元素通過函式 f 轉換為新的元素,並將生成的 RDD 的每個集合中的元素合併為一個集合,內部建立 FlatMappedRDD(this,sc.clean(f))。
  圖 2 表 示 RDD 的 一 個 分 區 ,進 行 flatMap函 數 操 作, flatMap 中 傳 入 的 函 數 為 f:T->U,T和 U 可以是任意的資料型別。將分割槽中的資料通過使用者自定義函式 f 轉換為新的資料。外部大方框可以認為是一個 RDD 分割槽,小方框代表一個集合。 V1、 V2、 V3 在一個集合作為 RDD 的一個數據項,可能儲存為陣列或其他容器,轉換為V’1、 V’2、 V’3 後,將原來的陣列或容器結合拆散,拆散的資料形成為 RDD 中的資料項。

        圖2     flapMap 運算元對 RDD 轉換


    (3)mapPartitions
      mapPartitions 函 數 獲 取 到 每 個 分 區 的 迭 代器,在 函 數 中 通 過 這 個 分 區 整 體 的 迭 代 器 對整 個 分 區 的 元 素 進 行 操 作。 內 部 實 現 是 生 成
MapPartitionsRDD。圖 3 中的方框代表一個 RDD 分割槽。圖 3 中,使用者通過函式 f (iter)=>iter.f ilter(_>=3) 對分割槽中所有資料進行過濾,大於和等於 3 的資料保留。一個方塊代表一個 RDD 分割槽,含有 1、 2、 3 的分割槽過濾只剩下元素 3。

    圖3  mapPartitions 運算元對 RDD 轉換

  (4)glom

  glom函式將每個分割槽形成一個數組,內部實現是返回的GlommedRDD。 圖4中的每個方框代表一個RDD分割槽。圖4中的方框代表一個分割槽。 該圖表示含有V1、 V2、 V3的分割槽通過函式glom形成一陣列Array[(V1),(V2),(V3)]。

      圖 4   glom運算元對RDD轉換

     (5)union
      使用 union 函式時需要保證兩個 RDD 元素的資料型別相同,返回的 RDD 資料型別和被合併的 RDD 元素資料型別相同,並不進行去重操作,儲存所有元素。如果想去重
可以使用 distinct()。同時 Spark 還提供更為簡潔的使用 union 的 API,通過 ++ 符號相當於 union 函式操作
     圖 5 中左側大方框代表兩個 RDD,大方框內的小方框代表 RDD 的分割槽。右側大方框代表合併後的 RDD,大方框內的小方框代表分割槽。

  含有V1、V2、U1、U2、U3、U4的RDD和含有V1、V8、U5、U6、U7、U8的RDD合併所有元素形成一個RDD。V1、V1、V2、V8形成一個分割槽,U1、U2、U3、U4、U5、U6、U7、U8形成一個分割槽。

  圖 5  union 運算元對 RDD 轉換 



  (6)cartesian
       對 兩 個 RDD 內 的 所 有 元 素進 行 笛 卡 爾 積 操 作。 操 作 後, 內 部 實 現 返 回CartesianRDD。圖6中左側大方框代表兩個 RDD,大方框內的小方框代表 RDD 的分割槽。右側大方框代表合併後的 RDD,大方框內的小方框代表分割槽。圖6中的大方框代表RDD,大方框中的小方框代表RDD分割槽。
      例 如: V1 和 另 一 個 RDD 中 的 W1、 W2、 Q5 進 行 笛 卡 爾 積 運 算 形 成 (V1,W1)、(V1,W2)、 (V1,Q5)。
     

       圖 6  cartesian 運算元對 RDD 轉換

  (7)groupBy
  groupBy :將元素通過函式生成相應的 Key,資料就轉化為 Key-Value 格式,之後將 Key 相同的元素分為一組。
  函式實現如下:
  1)將使用者函式預處理:
  val cleanF = sc.clean(f)
  2)對資料 map 進行函式操作,最後再進行 groupByKey 分組操作。

     this.map(t => (cleanF(t), t)).groupByKey(p)
  其中, p 確定了分割槽個數和分割槽函式,也就決定了並行化的程度。

  圖7 中方框代表一個 RDD 分割槽,相同key 的元素合併到一個組。例如 V1 和 V2 合併為 V, Value 為 V1,V2。形成 V,Seq(V1,V2)。

  圖 7 groupBy 運算元對 RDD 轉換

  (8)filter
    filter 函式功能是對元素進行過濾,對每個 元 素 應 用 f 函 數, 返 回 值 為 true 的 元 素 在RDD 中保留,返回值為 false 的元素將被過濾掉。 內 部 實 現 相 當 於 生 成 FilteredRDD(this,sc.clean(f))。
    下面程式碼為函式的本質實現:
    deffilter(f:T=>Boolean):RDD[T]=newFilteredRDD(this,sc.clean(f))
  圖 8 中每個方框代表一個 RDD 分割槽, T 可以是任意的型別。通過使用者自定義的過濾函式 f,對每個資料項操作,將滿足條件、返回結果為 true 的資料項保留。例如,過濾掉 V2 和 V3 保留了 V1,為區分命名為 V’1。

  圖 8  filter 運算元對 RDD 轉換
     

  (9)distinct

  distinct將RDD中的元素進行去重操作。圖9中的每個方框代表一個RDD分割槽,通過distinct函式,將資料去重。 例如,重複資料V1、 V1去重後只保留一份V1。

    圖9  distinct運算元對RDD轉換

  (10)subtract

  subtract相當於進行集合的差操作,RDD 1去除RDD 1和RDD 2交集中的所有元素。圖10中左側的大方框代表兩個RDD,大方框內的小方框代表RDD的分割槽。 右側大方框
代表合併後的RDD,大方框內的小方框代表分割槽。 V1在兩個RDD中均有,根據差集運算規則,新RDD不保留,V2在第一個RDD有,第二個RDD沒有,則在新RDD元素中包含V2。
  

          圖10   subtract運算元對RDD轉換

  (11)sample
       sample 將 RDD 這個集合內的元素進行取樣,獲取所有元素的子集。使用者可以設定是否有放回的抽樣、百分比、隨機種子,進而決定取樣方式。內部實現是生成 SampledRDD(withReplacement, fraction, seed)。
  函式引數設定:
‰   withReplacement=true,表示有放回的抽樣。
‰   withReplacement=false,表示無放回的抽樣。
  圖 11中 的 每 個 方 框 是 一 個 RDD 分 區。 通 過 sample 函 數, 採 樣 50% 的 數 據。V1、 V2、 U1、 U2、U3、U4 取樣出資料 V1 和 U1、 U2 形成新的 RDD。

     

       圖11  sample 運算元對 RDD 轉換

  (12)takeSample

  takeSample()函式和上面的sample函式是一個原理,但是不使用相對比例取樣而是按設定的取樣個數進行取樣,同時返回結果不再是RDD,而是相當於對取樣後的資料進行
Collect(),返回結果的集合為單機的陣列。
  圖12中左側的方框代表分散式的各個節點上的分割槽,右側方框代表單機上返回的結果陣列。 通過takeSample對資料取樣,設定為取樣一份資料,返回結果為V1。

    圖12    takeSample運算元對RDD轉換

  (13)cache
     cache將 RDD 元素從磁碟快取到記憶體。 相當於 persist(MEMORY_ONLY) 函式的功能。
     圖13 中每個方框代表一個 RDD 分割槽,左側相當於資料分割槽都儲存在磁碟,通過 cache 運算元將資料快取在記憶體。
      

      圖 13 Cache 運算元對 RDD 轉換

  (14)persist
      persist 函式對RDD 進行快取操作。資料快取在哪裡依據 StorageLevel 這個列舉型別進行確定。 有以下幾種型別的組合(見10), DISK 代表磁碟,MEMORY 代表記憶體, SER 代表資料是否進行序列化儲存。

  下面為函式定義, StorageLevel 是列舉型別,代表儲存模式,使用者可以通過圖 14-1 按需進行選擇。
  persist(newLevel:StorageLevel)
  圖 14-1 中列出persist 函式可以進行快取的模式。例如,MEMORY_AND_DISK_SER 代表資料可以儲存在記憶體和磁碟,並且以序列化的方式儲存,其他同理。

            圖 14-1  persist 運算元對 RDD 轉換

  圖 14-2 中方框代表 RDD 分割槽。 disk 代表儲存在磁碟, mem 代表儲存在記憶體。資料最初全部儲存在磁碟,通過 persist(MEMORY_AND_DISK) 將資料快取到記憶體,但是有的分割槽無法容納在記憶體,將含有 V1、 V2、 V3 的RDD儲存到磁碟,將含有U1,U2的RDD仍舊儲存在記憶體。

      圖 14-2   Persist 運算元對 RDD 轉換

  (15)mapValues
      mapValues :針對(Key, Value)型資料中的 Value 進行 Map 操作,而不對 Key 進行處理。

    圖 15 中的方框代表 RDD 分割槽。 a=>a+2 代表對 (V1,1) 這樣的 Key Value 資料對,資料只對 Value 中的 1 進行加 2 操作,返回結果為 3。

     

      圖 15   mapValues 運算元 RDD 對轉換

  (16)combineByKey
  下面程式碼為 combineByKey 函式的定義:
  combineByKey[C](createCombiner:(V) C,
  mergeValue:(C, V) C,
  mergeCombiners:(C, C) C,
  partitioner:Partitioner,
  mapSideCombine:Boolean=true,
  serializer:Serializer=null):RDD[(K,C)]

說明:
‰   createCombiner: V => C, C 不存在的情況下,比如通過 V 建立 seq C。
‰   mergeValue: (C, V) => C,當 C 已經存在的情況下,需要 merge,比如把 item V
加到 seq C 中,或者疊加。
   mergeCombiners: (C, C) => C,合併兩個 C。
‰   partitioner: Partitioner, Shuff le 時需要的 Partitioner。
‰   mapSideCombine : Boolean = true,為了減小傳輸量,很多 combine 可以在 map
端先做,比如疊加,可以先在一個 partition 中把所有相同的 key 的 value 疊加,
再 shuff le。
‰   serializerClass: String = null,傳輸需要序列化,使用者可以自定義序列化類:

  例如,相當於將元素為 (Int, Int) 的 RDD 轉變為了 (Int, Seq[Int]) 型別元素的 RDD。圖 16中的方框代表 RDD 分割槽。如圖,通過 combineByKey, 將 (V1,2), (V1,1)資料合併為( V1,Seq(2,1))。
  

      圖 16  comBineByKey 運算元對 RDD 轉換

  (17)reduceByKey
     reduceByKey 是比 combineByKey 更簡單的一種情況,只是兩個值合併成一個值,( Int, Int V)to (Int, Int C),比如疊加。所以 createCombiner reduceBykey 很簡單,就是直接返回 v,而 mergeValue和 mergeCombiners 邏輯是相同的,沒有區別。
    函式實現:
    def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)]
= {
combineByKey[V]((v: V) => v, func, func, partitioner)
}
  圖17中的方框代表 RDD 分割槽。通過使用者自定義函式 (A,B) => (A + B) 函式,將相同 key 的資料 (V1,2) 和 (V1,1) 的 value 相加運算,結果為( V1,3)。
     

相關推薦

Spark常用運算元彙總 : 實戰案例、Java版本、Scala版本

官網API地址: JavaRDD:http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.api.java.JavaRDD  JavaPairRDD:http://spark.apache.or

Spark常用運算元

Spark的運算元的分類   從大方向來說,Spark 運算元大致可以分為以下兩類:     1)Transformation 變換/轉換運算元:這種變換並不觸發提交作業,完成作業中間過程處理。     Transformation 操作是延遲計算的,也就是說從一個RDD

《深入理解Spark》之Spark常用運算元(java版+spark1.6.1)

最近公司要用Java開發Spark專案,以前用的是Scala語言,今天就把Spark常用的運算元使用java語言實現了一遍 XML Code  1 2 3 4 5 6 7 8 9 10 11 12

Spark 系列(四)—— RDD常用運算元

一、Transformation spark 常用的 Transformation 運算元如下表: Transformation 運算元 Meaning(含義) map(func) 對原 RDD 中每個元素運用 func 函式,並生成新的 RDD filter(func) 對原 RDD 中每

spark運算元

combineByKey(createCombiner, mergeValue, mergeCombiners, partitioner) 定義: def combineByKey[C]( createCombiner: V => C, mergeVal

零基礎入門大資料之spark中rdd部分運算元

先前文章介紹過一些spark相關知識,本文繼續補充一些細節。 我們知道,spark中一個重要的資料結構是rdd,這是一種並行集合的資料格式,大多數操作都是圍繞著rdd來的,rdd裡面擁有眾多的方法可以呼叫從而實現各種各樣的功能,那麼通常情況下我們讀入的資料來源並非rdd格式的,如何轉

spark運算元------Action運算元介紹

本文首發自個人部落格:https://blog.smile13.com/articles/2018/11/30/1543589289882.html 一、無輸出的運算元 1.foreach運算元 功能:對 RDD 中的每個元素都應用 f 函式操作,無返回值。 原始碼:

spark運算元------Transformation運算元介紹

本文首發自個人部落格:https://blog.smile13.com/articles/2018/12/02/1543738193387.html 一、Value資料型別的Transformation運算元  1.輸入分割槽與輸出分割槽一對一型別的運算元 1.1.map運算元

spark運算元------spark運算元分類

本文首發自個人部落格:https://blog.smile13.com/articles/2018/12/02/1543738098914.html 1.spark運算元分類 1.1Transformation運算元 Transformation運算元不觸發提交作業,完成作業中間

Spark RDD使用5--Action運算元

本質上在Actions運算元中通過SparkContext執行提交作業的runJob操作,觸發了RDD DAG的執行。  根據Action運算元的輸出空間將Action運算元進行分類:無輸出、 HDFS、 Scala集合和資料型別。 無輸出 foreach 對RDD中的每個元素

《深入理解Spark》之運算元

 XML Code  1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22

Docker常用命令

nbsp 詳解 .cn 本地 test 並且 www 更多 top docker ps 查看當前正在運行的容器 docker ps -a 查看所有容器的狀態 docker start/stop id/name 啟動/停止某個容器 docker attach id 進

09-nginx常用配置

accept 用戶與用戶組 oot 進程資源 常用 brush pan nts 還要 Nginx配置分為各個配置塊,主配置塊負責全局配置,子配置塊可以繼承全局配置,也可以相應的配置不同設置。 main block:主配置(全局配置) event{

yum常用命令

yum工具、解決依賴安裝 yum命令 yum使用技巧 YUM常用命令詳解 yum是一個用於管理rpm包的後臺程序,用python寫成,可以非常方便的解決rpm的依賴關系。在建立好yum服務器後,yum客戶端可以通過 http、ftp方式獲得軟件包,並使用方便的命令直接管理、更新所有的rpm包

Tomcat學習總結(6)——Tomca常用配置

mar evel 代碼段 between 取消 新建 unp -h tom 註:Tomcat 8需要JRE7以上的JRE 1. Tomcat環境變量設置 1.1 Java環境變量設置 右鍵計算機—屬性—高級系統設置—環境變量,在”系統環境變量”,設置如下三個變量(如果變量已

python os.path模塊常用方法

linu margin 作文 擴展 .py sans csv pytho 宋體 python os.path模塊常用方法詳解 轉發自:http://www.cnblogs.com/wuxie1989/p/5623435.html os.path模塊主要用於文件的

常用編碼

sci bsp 日語 字母 圖像 路徑 以及 兼容 保留 GBK,ISO-8859-1,GB2312的本質區別編碼有幾種 ,計算機最初是在美國等國家發明的 所以表示字符只有簡單的幾個字母只要對字母進行編碼就好 我們標準碼 iso-8859-1 這就是一個標準但是後來計算機普

Input輸入對象常用方法

知新樹 寧金峰 Input對象可以獲取用戶所有行為的輸入,如鼠標、鍵盤、加速度、陀螺儀、按鈕等,所以掌握Input對象就可以在外部輸入信息和系統之間進行交互。 Input對象的主要變量:mousePonsition 當前鼠標的像素坐標anyKeyDown 用戶點擊任何鍵或鼠標按鈕,第一幀返回tru

Redis中Key相關的常用指令

redis key vaule Redis是一個開源的使用ANSI C 語言編寫、支持網絡、同memcache相比在Redis下可以實現基於內存亦可持久化的日誌型、Key-Value 類型的NoSQL數據庫,且在Redis中Key的類型也更為豐富。所以較為廣泛的在生產環境中使用,在這裏就說一

git 常用命令

git最重要2命令:git clone git://github.com/schacon/grit.git 從服務器上將代碼給拉下來git pull 本地與服務器端同步一、 Git 命令初識在正式介紹Git命令之前,先介紹一下Git 的基本命令和操作,對Git命令有一個總體的認識示例:從Git 版本庫的初始化