1. 程式人生 > >Spark之RDD程式設計

Spark之RDD程式設計

RDD,全稱Resilient Distributed
Datasets(彈性分散式資料集),是Spark最為核心的概念,是Spark對資料的抽象。RDD是分散式的元素集合,每個RDD只支援讀操作,且每個RDD都被分為多個分割槽儲存到叢集的不同節點上。除此之外,RDD還允許使用者顯示的指定資料儲存到記憶體和磁碟中,掌握了RDD程式設計是SPARK開發的第一步。

1.RDD讀取資料

檔案讀取

scala> val lines = sc.textFile("README.md")
scala> lines.collect()

並行化讀取

scala> var
lines = sc.parallelize(List("i love you")) scala> lines.collect()

2.RDD操作之transformation

RDD的操作可分兩種:
1 轉化操作(transformation) : 由一個RDD生成一個新的RDD
2 行動操作(action) : 對RDD中的元素進行計算,並把結果返回
參考api: org.apache.spark.rdd.RDD
3.下劃線用法

//以下兩句等同
_.split(" ")
line => line.split(" ")
//等同
_ + _
(left, right
) => left + right

2.1.map()

返回:一個新的RDD

scala> val rdd = sc.parallelize(List(1,2,3,3))
scala> val rrr=rdd.map(x => x +1 )
scala> rrr.collect()
        Array[Int] = Array(2, 3, 4, 4)
scala> rrr.foreach(println)
2
4
4
3

2.2.flatmap()

返回:迭代器的所有元素組成一個新的RDD

val rdd1 = sc.parallelize(List
(1,2,3,3)) scala> rdd1.map(x=>x+1).collect Array[Int] = Array(2, 3, 4, 4) scala> rdd1.flatMap(x=>x.to(3)).collect Array[Int] = Array(1, 2, 3, 2, 3, 3, 3)

從上面可以看出flatMap先map然後再flat

2.3.filter()

對每個元素進行篩選,返回符合條件的元素組成的一個新RDD

scala> val rdd = sc.parallelize(List(1,2,3,3))
scala> rdd.filter(x => x != 1).collect()
     Array[Int] = Array(2, 3, 3)

兩變數過濾操作

scala> val rdd = sc.parallelize(List((1,2),(3,4),(3,6)))

對value做控制,key不加限制條件

scala> val result = rdd.filter{case(x,y)=>y%3==0}
scala> result.foreach(println)
(3,6)
scala> val result = rdd.filter{case(x,y)=>y<=4}
scala> result.foreach(println)
(1,2)
(3,4)

對key做控制,value不控制

scala> val result = rdd.filter{case(x,y)=>x<3}
scala> result.foreach(println)
(1,2)

2.4 distinct()

distinct() : 去掉重複元素

scala> rdd.distinct().collect()
      Array[Int] = Array(1, 2, 3)

2.5 sample(withReplacement,fration,[seed])

sample():對RDD取樣

  • 第一個引數如果為true,可能會有重複的元素,如果為false,不會有重複的元素;
  • 第二個引數取值為[0,1],最後的資料個數大約等於第二個引數乘總數;
  • 第三個引數為隨機因子。
 scala> rdd.sample(false,0.5).collect()
     Array[Int] = Array(3, 3)

scala> rdd.sample(false,0.5).collect()
     Array[Int] = Array(1, 2)

scala> rdd.sample(false,0.5,10).collect()
    Array[Int] = Array(2, 3)

2.6排序

rdd.sortByKey():返回一個根據鍵排序的RDD

資料排序,可以通過接受ascending的引數表示我們是否想要結果按升序排序(預設是true)

scala> val result = rdd.sortByKey().collect()
          Array[(Int, Int)] = Array((1,2), (3,4), (3,6))

2.7reduce

rdd.reduceByKey(func):合併具有相同key的value值

scala> val rdd = sc.parallelize(List((1,2),(3,4),(3,6)))
scala> val result = rdd.reduceByKey((x,y)=>x+y)
scala> result.foreach(println)
(1,2)
(3,10)

2.8group

rdd.groupByKey():對具有相同鍵的進行分組 [資料分組]

scala> val result = rdd.groupByKey()
scala> result.foreach(println)
       (3,CompactBuffer(4, 6))
       (1,CompactBuffer(2))

2.9 mapValues

rdd.mapValues(func):對pairRDD中的每個值應用func 鍵不改變

scala> val rdd = sc.parallelize(List((1,2),(3,4),(3,6)))
scala> val result = rdd.mapValues(x=>x+1)
scala> result.foreach(println)
(1,3)
(3,5)
(3,7)

3.針對兩個pair RDD 的轉化操作

3.1 rdd.subtractByKey( other )

刪除掉RDD中與other RDD中鍵相同的元素

scala> val rdd = sc.parallelize(List((1,2),(3,4),(3,6)))
scala> val other = sc.parallelize(List((3,9)))
scala> val result = rdd.subtractByKey(other)
scala> result.foreach(println)
(1,2)

3.2 join內連線

rdd,other是3.1的變數

scala> val result = rdd.join(other)
scala> result.foreach(println)
(3,(4,9))
(3,(6,9))

3.3 右外連線

rdd,other是3.1的變數

scala> val result = rdd.rightOuterJoin(other)
scala> result.foreach(println)
     (3,(Some(4),9))
     (3,(Some(6),9))

3.4 左外連線

scala> val result = rdd.leftOuterJoin(other)
scala> result.foreach(println)
   (3,(4,Some(9)))
   (3,(6,Some(9)))
   (1,(2,None))

3.5按相同鍵分組

scala> val result = rdd.cogroup(other)
scala> result.foreach(println)
(1,(CompactBuffer(2),CompactBuffer()))
(3,(CompactBuffer(4, 6),CompactBuffer(9)))

3.6 使用reduceByKey()和mapValues()計算每個鍵對應累加值

x._1表示取元組的第一個值

scala> val rdd = sc.parallelize(List(Tuple2("panda",0),Tuple2("pink",3),Tuple2("pirate",3),Tuple2("panda",1),Tuple2("pink",4)))
scala> val result = rdd.mapValues(x=>(x,1)).reduceByKey((x,y)=>(x._1+y._1,x._2+y._2))
scala> result.foreach(println)
(pirate,(3,1))
(panda,(1,2))
(pink,(7,2))

3.7計單詞(wordcount)

用map reduce實現:

scala> val rdd = sc.parallelize(List("i am thinkgamer, i love cyan"))
scala> val words = rdd.flatMap(line => line.split(" "))
scala> val result = words.map(x=>(x,1)).reduceByKey((x,y) => x+y)
scala> result.foreach(println)
(cyan,1)
(love,1)
(thinkgamer,,1)
(am,1)
(i,2)

用countByValue實現:

scala> val rdd = sc.parallelize(List("i am thinkgamer, i love cyan"))
scala> val result = rdd.flatMap(x=>x.split(" ")).countByValue()
scala> result.foreach(println)
(am,1)
(thinkgamer,,1)
(i,2)
(love,1)
(cyan,1)

4.RDD操作之action

下面例子以 val rdd = sc.parallelize(List(1,2,3,3))為輸入

4.1 collect()返回所有元素

scala> rdd.collect()
       Array[Int] = Array(1, 2, 3, 3)

4.2 count() :返回元素個數

scala> rdd.count()
         Long = 4

4.3 countByValue() : 各個元素出現的次數

scala> rdd.countByValue()
     scala.collection.Map[Int,Long] = Map(1 -> 1, 2 -> 1, 3 -> 2)

4.4 take(num) : 返回num個元素

scala> rdd.take(2)
       Array[Int] = Array(1, 2)

4.5 top(num) : 返回前num個元素

scala> rdd.top(2)
         Array[Int] = Array(3, 3)

4.6 foreach(func):對每個元素使用func

scala> rdd.foreach(x => println(x*2))

相關推薦

SparkRDD程式設計

RDD,全稱Resilient Distributed Datasets(彈性分散式資料集),是Spark最為核心的概念,是Spark對資料的抽象。RDD是分散式的元素集合,每個RDD只支援讀操作,且每個RDD都被分為多個分割槽儲存到叢集的不同節點上

跟我一起學Spark——RDD Join中寬依賴與窄依賴的判斷

1.規律    如果JoinAPI之前被呼叫的RDD API是寬依賴(存在shuffle), 而且兩個join的RDD的分割槽數量一致,join結果的rdd分割槽數量也一樣,這個時候join api是窄依賴   除此之外的,rdd 的join api是寬依賴 2.Join的理解  

SparkRDD運算元-轉換運算元

  RDD-Transformation 轉換(Transformation)運算元就是對RDD進行操作的介面函式,其作用是將一個或多個RDD變換成新的RDD。 使用Spark進行資料計算,在利用建立運算元生成RDD後,資料處理的演算法設計和程式編寫的最關鍵部分,就是利用

SparkRDD的屬性

1.一組分片(Partition),即資料集的基本組成單位。對於RDD來說,每個分片都會被一個計算任務處理,並決定平行計算的粒度。使用者可以在建立RDD時指定RDD的分片個數,如果沒有指定,那麼就會採用預設值。預設值就是程式所分配到的CPU Core的數目。

spark你媽喊你回家吃飯-05】RDD程式設計旅基礎篇-01

1.RDD工作流程 1.1 RDD理解 RDD是spark特有的資料模型,談到RDD就會提到什麼彈性分散式資料集,什麼有向無環圖,本文暫時不去展開這些高深概念,在閱讀本文時候,大家可以就把RDD當作一個數組,這樣的理解對我們學習RDD的API是非常有幫助的。本文所有示例程式

[2.3]Spark DataFrame操作(二)通過程式設計動態完成RDD與DataFrame的轉換

參考 場景 一、上一篇部落格將待分析資料影射成JavaBean的欄位,然後通過def createDataFrame(data:java.util.List[_],beanClass:Class[_]):DataFrame完成了RDD與DataFra

Spark核心程式設計RDD持久化詳解

RDD持久化原理 Spark非常重要的一個功能特性就是可以將RDD持久化在記憶體中。當對RDD執行持久化操作時,每個節點都會將自己操作的RDD的partition持久化到記憶體中,並且在之後對該RDD的反覆使用中,直接使用記憶體快取的partition。這樣的

[1.2]Spark core程式設計(一)RDD總論與建立RDD的三種方式

參考 場景 RDD的理解 一、RDD是基於工作集的應用抽象;是分散式、函數語言程式設計的抽象。 MapReduce:基於資料集的處理。兩者的共同特徵:位置感知(具體資料在哪裡)、容錯、負載均衡。 基於資料集的處理:從物理儲存裝置上載入資料,然

Spark快速大數據分析RDD基礎

數學 ref 內存 相關 應用 級別 要求 分數 png Spark 中的RDD 就是一個不可變的分布式對象集合。每個RDD 都被分為多個分區,這些分區運行在集群中的不同節點上。RDD 可以包含Python、Java、Scala中任意類型的對象,甚至可以包含用

spark筆記RDD的緩存

process color RoCE 就是 發現 mark 其他 動作 blog Spark速度非常快的原因之一,就是在不同操作中可以在內存中持久化或者緩存數據集。當持久化某個RDD後,每一個節點都將把計算分區結果保存在內存中,對此RDD或衍生出的RDD進行的其他動作中重用

spark coreRDD編程

緩存 code 會有 核心 hdf 機器 end action rdd   spark提供了對數據的核心抽象——彈性分布式數據集(Resilient Distributed Dataset,簡稱RDD)。RDD是一個分布式的數據集合,數據可以跨越集群中的

spark筆記RDD容錯機制checkpoint

原理 chain for 機制 方式 方法 相對 例如 contex 10.checkpoint是什麽(1)、Spark 在生產環境下經常會面臨transformation的RDD非常多(例如一個Job中包含1萬個RDD)或者具體transformation的RDD本身計算

11.spark sqlRDD轉換DataSet

Once lds nco ldd 方法 att context gin statement 簡介 ??Spark SQL提供了兩種方式用於將RDD轉換為Dataset。 使用反射機制推斷RDD的數據結構 ??當spark應用可以推斷RDD數據結構時,可使用這種方式。這種

Spark函數詳解系列RDD基本轉換

9.png cal shuff reac 數組a water all conn data 摘要: RDD:彈性分布式數據集,是一種特殊集合 ? 支持多種來源 ? 有容錯機制 ? 可以被緩存 ? 支持並行操作,一個RDD代表一個分區裏的數據集 RDD有兩種操作算子: Tra

學習大資料課程 spark 基於記憶體的分散式計算框架(二)RDD 程式設計基礎使用

學習大資料課程 spark 基於記憶體的分散式計算框架(二)RDD 程式設計基礎使用   1.常用的轉換 假設rdd的元素是: {1,2,2,3}   很多初學者,對大資料的概念都是模糊不清的,大資料是什麼,能做什麼,學的時候,該按照什麼線路去學習,學完

spark RDD官網RDD程式設計指南

http://spark.apache.org/docs/latest/rdd-programming-guide.html#using-the-shell Overview(概述) 在較高的層次上,每個Spark應用程式都包含一個驅動程式,該程式執行使用者的主要功能並在叢集上執行各

零基礎入門大資料sparkrdd部分運算元詳解

先前文章介紹過一些spark相關知識,本文繼續補充一些細節。 我們知道,spark中一個重要的資料結構是rdd,這是一種並行集合的資料格式,大多數操作都是圍繞著rdd來的,rdd裡面擁有眾多的方法可以呼叫從而實現各種各樣的功能,那麼通常情況下我們讀入的資料來源並非rdd格式的,如何轉

零基礎入門大資料探勘sparkrdd

本節簡單介紹一下spark下的基本資料結構RDD,方便理解後續的更多操作。 那麼第一個問題,什麼是rdd。我們知道,大資料一般儲存在分散式叢集裡面,那麼你在對其進行處理的時候總得把它讀出來吧,讀出來後總得把它存成某種格式的檔案吧,就好比程式語言裡面的,這個資料是陣列,那麼你可以以陣列

Spark函式詳解系列RDD基本轉換

摘要:  RDD:彈性分散式資料集,是一種特殊集合 ‚ 支援多種來源 ‚ 有容錯機制 ‚ 可以被快取 ‚ 支援並行操作,一個RDD代表一個分割槽裡的資料集  RDD有兩種操作運算元:      Transformation(轉換):Transformation屬於延遲計

# Apache Spark系列技術直播# 第五講【 Spark RDD程式設計入門 】

內容提要:本次講座主要涵蓋Spark RDD程式設計入門基礎,包括: Spark、RDD簡介 RDD API簡介 打包與spark-submit 效能分析與調優基礎 主講人:王道遠(健身) 阿里巴巴計算平臺EMR技術專家 直播時間:2018.12.13(本週四)19:00 - 20