1. 程式人生 > >Spark RDD運算元介紹

Spark RDD運算元介紹

Spark學習筆記總結

01. Spark基礎

1. 介紹

Spark可以用於批處理、互動式查詢(Spark SQL)、實時流處理(Spark Streaming)、機器學習(Spark MLlib)和圖計算(GraphX)。
Spark是MapReduce的替代方案,而且相容HDFS、Hive,可融入Hadoop的生態系統,以彌補MapReduce的不足。

2. Spark-Shell

  1. spark-shell是Spark自帶的互動式Shell程式,使用者可以在該命令列下用scala編寫spark程式。
  2. 直接啟動spark-shell,實質是spark的local模式,在master:8080中並未顯示客戶端連線。
  3. 叢集模式:
    /usr/local/spark/bin/spark-shell \
    --master spark://172.23.27.19:7077 \
    --executor-memory 2g \
    --total-executor-cores 2
  4. spark-shell中編寫wordcount
    sc.textFile("hdfs://172.23.27.19:9000/wrd/wc/srcdata/").flatMap(.split(" ")).map((,1)).reduceByKey(+).sortBy(_._2,false).collect

3. RDD介紹與屬性

1. 介紹

RDD(Resilient Distributed Dataset)叫做分散式資料集,是Spark中最基本的資料抽象,它代表一個不可變(建立了內容不可變)、可分割槽、裡面的元素可平行計算的集合。

2. 屬性:

nkfed8Z.png

  1. 由多個分割槽組成。對於RDD來說,每個分片都會被一個計算任務處理,並決定平行計算的粒度。
  2. 一個計算函式用於每個分割槽。Spark中RDD的計算是以分片為單位的,每個RDD都會實現compute函式以達到這個目的。
  3. RDD之間的依賴關係。RDD的每次轉換都會生成一個新的RDD,所以RDD之間就會形成類似於流水線一樣的前後依賴關係。資料丟失時,根據依賴重新計算丟失的分割槽而不是整個分割槽。
  4. 一個Partitioner,即RDD的分片函式。預設是HashPartition
  5. 分割槽資料的最佳位置去計算。就是將計算任務分配到其所要處理資料塊的儲存位置。資料本地化。
3. 建立方式:
  1. 可通過並行化scala集合建立RDD
    val rdd1 = sc.parallelize(Array(1,2,3,4,5,6,7,8))
  2. 通過HDFS支援的檔案系統建立,RDD裡沒有真的資料,只是記錄了元資料
    val rdd2 = sc.textFile("hdfs://172.23.27.19:9000/wrd/wc/srcdata/")

檢視該rdd的分割槽數量
rdd1.partitions.length

3. 基礎的transformation和action

RDD中兩種運算元:
transformation轉換,是延遲載入的

常用的transformation:
(1)map、flatMap、filter
(2)intersection求交集、union求並集:注意型別要一致
distinct:去重
(3)join:型別為(K,V)和(K,W)的RDD上呼叫,返回一個相同key對應的所有元素對在一起的(K,(V,W))的RDD
(4)groupByKey:在一個(K,V)的RDD上呼叫,返回一個(K, Iterator[V])的RDD
但是效率reduceByKey較高,因為有一個本地combiner的過程。
(5)cartesian笛卡爾積

常用的action
(1)collect()、count()
(2)reduce:通過func函式聚集RDD中的所有元素
(3)take(n):取前n個;top(2):排序取前兩個
(4)takeOrdered(n),排完序後取前n個

4. 較難的transformation和action

參考《http://homepage.cs.latrobe.edu.au/zhe/ZhenHeSparkRDDAPIExamples.html》

(1)mapPartitions(func)和
mapPartitions(func):
獨立地在RDD的每一個分片上執行,但是返回值;foreachPartition(func)也常用,不需要返回值

mapPartitionsWithIndex(func):
可以看到分割槽的編號,以及該分割槽資料。
類似於mapPartitions,但func帶有一個整數引數表示分片的索引值,func的函式型別必須是
(Int, Interator[T]) => Iterator[U]

val rdd1 = sc.parallelize(List(1,2,3,4,5,6,7,8,9), 2)
val func = (index: Int, iter: Iterator[(Int)]) => {iter.toList.map(x => "[partID:" +  index + ", val: " + x + "]").iterator}
rdd1.mapPartitionsWithIndex(func).collect

(2)aggregate
action操作,
第一個引數是初始值,
第二個引數:是2個函式[每個函式都是2個引數(第一個引數:先對個個分割槽進行的操作, 第二個:對個個分割槽合併後的結果再進行合併), 輸出一個引數]

例子:

rdd1.aggregate(0)(_+_, _+_)
//前一個是對每一個分割槽進行的操作,第二個是對各分割槽結果進行的結果

rdd1.aggregate(5)(math.max(_, _), _ + _)
//結果:5 + (5+9) = 19

val rdd3 = sc.parallelize(List("12","23","345","4567"),2)
rdd3.aggregate("")((x,y) => math.max(x.length, y.length).toString, (x,y) => x + y)
//結果:24或者42

val rdd4 = sc.parallelize(List("12","23","345",""),2)
rdd4.aggregate("")((x,y) => math.min(x.length, y.length).toString, (x,y) => x + y)
//結果01或者10

(3)aggregateByKey
將key值相同的,先區域性操作,再整體操作。。和reduceByKey內部實現差不多

val pairRDD = sc.parallelize(List( ("cat",2), ("cat", 5), ("mouse", 4),("cat", 12), ("dog", 12), ("mouse", 2)), 2)
pairRDD.aggregateByKey(0)(math.max(_, _), _ + _).collect
//結果:Array((dog,12), (cat,17), (mouse,6))

PS:
和reduceByKey(+)呼叫的都是同一個方法,只是aggregateByKey要底層一些,可以先區域性再整體操作。

(4)combineByKey
和reduceByKey是相同的效果,是reduceByKey的底層。
第一個引數x:原封不動取出來, 第二個引數:是函式, 區域性運算, 第三個:是函式, 對區域性運算後的結果再做運算
每個分割槽中每個key中value中的第一個值,

val rdd1 = sc.textFile("hdfs://master:9000/wordcount/input/").flatMap(_.split(" ")).map((_, 1))
val rdd2 = rdd1.combineByKey(x => x, (a: Int, b: Int) => a + b, (m: Int, n: Int) => m + n)
rdd2.collect

第一個引數的含義:
每個分割槽中相同的key中value中的第一個值
如:
(hello,1)(hello,1)(good,1)-->(hello(1,1),good(1))-->x就相當於hello的第一個1, good中的1

val rdd3 = rdd1.combineByKey(x => x + 10, (a: Int, b: Int) => a + b, (m: Int, n: Int) => m + n)
rdd3.collect
//每個會多加3個10

val rdd4 = sc.parallelize(List("dog","cat","gnu","salmon","rabbit","turkey","wolf","bear","bee"), 3)
val rdd5 = sc.parallelize(List(1,1,2,2,2,1,2,2,2), 3)
val rdd6 = rdd5.zip(rdd4)
val rdd7 = rdd6.combineByKey(List(_), (x: List[String], y: String) => x :+ y, (m: List[String], n: List[String]) => m ++ n)
//將key相同的資料,放入一個集合中

(5)collectAsMap
Action
Map(b -> 2, a -> 1)//將Array的元祖轉換成Map,以後可以通過key取值

val rdd = sc.parallelize(List(("a", 1), ("b", 2)))
rdd.collectAsMap
//可以下一步使用

(6)countByKey
根據key計算key的數量
Action

val rdd1 = sc.parallelize(List(("a", 1), ("b", 2), ("b", 2), ("c", 2), ("c", 1)))
rdd1.countByKey
rdd1.countByValue//將("a", 1)當做一個元素,統計其出現的次數

(7)flatMapValues
對每一個value進行操作後壓平

相關推薦

Spark RDD運算元介紹

Spark學習筆記總結 01. Spark基礎 1. 介紹 Spark可以用於批處理、互動式查詢(Spark SQL)、實時流處理(Spark Streaming)、機器學習(Spark MLlib)和圖計算(GraphX)。 Spark是MapReduce的替代方案,而且

spark RDD運算元 parallelize,makeRDD,textFile

- parallelize 將一個存在的集合,變成一個RDD。這種方式試用於學習spark和做一些spark的測試 第一個引數一是一個 Seq集合 第二個引數分割槽數 var array = List(1, 2, 3, 4, 5, 6, 7

spark RDD運算元大全

RDD作為spark核心的資料抽象,有關RDD的原始碼可看spark原始碼《一》RDD,有大量的api,也就是運算元 之前寫過兩篇運算元原始碼,一篇觸發runJob()的運算元,一篇是基於combineByKey()運算元的原始碼,有興趣的可以去看下。 目錄 map()&&

spark RDD運算元(二) filter,map ,flatMap

作者: 翟開順 首發:CSDN 先來一張spark快速大資料中的圖片進行快速入門,後面有更詳細的例子 filter 舉例,在F:\sparktest\sample.txt 檔案的內容如下 aa bb cc aa aa aa dd

spark RDD運算元(十一)之RDD Action 儲存操作saveAsTextFile,saveAsSequenceFile,saveAsObjectFile,saveAsHadoopFile 等

關鍵字:Spark運算元、Spark函式、Spark RDD行動Action、Spark RDD儲存操作、saveAsTextFile、saveAsSequenceFile、saveAsObjectFile,saveAsHadoopFile、saveAsHa

Spark-RDD運算元

一、Spark-RDD運算元簡介 RDD(Resilient Distributed DataSet)是分散式資料集。RDD是Spark最基本的資料的抽象。 scala中的集合。RDD相當於一個不可變、可分割槽、裡面的元素可以平行計算的集合。 RDD特點: 具

spark RDD運算元(一) parallelize,makeRDD,textFile

作者: 翟開順 首發:CSDN parallelize 呼叫SparkContext 的 parallelize(),將一個存在的集合,變成一個RDD,這種方式試用於學習spark和做一些spark的測試 scala版本 def paral

spark RDD運算元(四)之建立鍵值對RDD mapToPair flatMapToPair

mapToPair 舉例,在F:\sparktest\sample.txt 檔案的內容如下 aa bb cc aa aa aa dd dd ee ee ee ee ff aa bb zks ee kks ee zz zks 將每一行的第一個單詞

Spark RDD運算元【四】

自己總結了常用的部分運算元,方便自己理解和查閱 Spark RDD運算元列表 1. collectAsMap 2.count,countByKey,countByValue 3. filter,fi

spark RDD運算元(十)之PairRDD的Action操作countByKey, collectAsMap

countByKey def countByKey(): Map[K, Long] 以RDD{(1, 2),(2,4),(2,5), (3, 4),(3,5), (3, 6)}為例 rdd.cou

RDD運算元介紹

一、RDD運算元簡介spark在執行過程中通過運算元對RDD進行計算,運算元是RDD中定義的函式,可以對RDD中資料進行轉換和操作,如下圖輸入:spark程式中資料從外部資料空間輸入到spark中的資料塊,通過BlockManager進行管理執行:在spark資料形成RDD後

spark RDD,DataFrame,DataSet 介紹

列式存儲 ren gre rds 包含 執行 這一 ces 中一 彈性分布式數據集(Resilient Distributed Dataset,RDD) RDD是Spark一開始就提供的主要API,從根本上來說,一個RDD就是你的數據的一個不可變的分布式元素集

RDD常用運算元介紹只mappatitionwithIndex和mappatition

mappatition和mappatitionWithIndex mappatition 該函式和map函式類似,只不過對映函式的引數由RDD中的每一個元素變成了RDD中每一個分割槽的迭代器。如果在對映的過程中需要頻繁建立額外的物件,使用mapPartitions要比map高效的過。 比如,

spark RDD常用運算元(一)

- filter 演算法解釋 filter 函式功能是對元素進行過濾,對每個 元 素 應 用 f 函 數, 返 回 值 為 true 的 元 素 在RDD 中保留,返回值為 false 的元素將被過濾掉。 內 部 實 現 相 當 於 生 成 FilteredRDD

spark RDD常用運算元(三)

- first、take、collect、count、top、takeOrdered、foreach、fold、reduce、countByValue、lookup 演算法解釋 first:返回第一個元素 take:rdd.t

spark RDD常用運算元(二)

- reduceByKey 演算法解釋 reduceByKey 是比 combineByKey 更簡單的一種情況,只是兩個值合併成一個值,( Int, Int V)to (Int, Int C),比如疊加。所以 createCombiner reduceBykey 很簡

Spark基礎 -- Spark Shell -- RDD -- 運算元

Spark基礎 – Spark Shell – RDD – 運算元 文章目錄 Spark基礎 -- Spark Shell -- RDD -- 運算元 一、簡介 二、Spark 1.6.3部署

SparkRDD運算元-轉換運算元

  RDD-Transformation 轉換(Transformation)運算元就是對RDD進行操作的介面函式,其作用是將一個或多個RDD變換成新的RDD。 使用Spark進行資料計算,在利用建立運算元生成RDD後,資料處理的演算法設計和程式編寫的最關鍵部分,就是利用

Spark RDD-1-常用運算元

目錄   1、RDD簡介 2、RDD建立 3、常用RDD運算元 (1)Action RDD (2)單個RDD的 Transformation (惰性) (3)多個RDD的Transformation 1、RDD簡介 Spark對資料的一種核心抽象,R

Spark-RDD特點及RDD運算元

目錄 RDD 1.五個特性 RDD運算元 1.transformations類運算元 2.action類運算元 3.控制類運算元 RDD    &n