1. 程式人生 > >Spark RDD API 參考示例(一)

Spark RDD API 參考示例(一)

1、aggregate

原型
def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U

含義
aggregate是一個聚合函式,一個RDD分割槽後,產生多個Partition,在aggregate中需要指定兩個處理函式,第一個函式用於對每個分割槽內部處理,第二個函式用於分割槽之間的處理。aggregate 的初始值作用於前後兩個部分, 分割槽是嚴格按照順序的,順序不同aggregate 結果就不同。

示例

//分割槽順序嚴重aggregate的結果,分割槽特點,先平均分配,多的依次放到序號大的分割槽
val z = sc.parallelize(List(1,2,3,4,5,6), 2) //使用parallelize產生2個分割槽,可以使用mapPartitionWithIndex檢視分割槽 //分割槽結果是(1,2,3)和(4,5,6) z.aggregate(0)(math.max,_+_) res0: Int = 9 特別注意:初始值 0 是加在分割槽的左邊 //第一個引數0 表示將0放入到每個分割槽中,然後每個分割槽進行計算。每個分割槽之間彙總時,再次將0放入其中計算 //math.max表示對每個分割槽內部取最大值,第一個分割槽有(1,2,3),再加上0,所以是(0,1,2,3)取最大值 //_+_表示每個分割槽之間進行相加
val x = sc.parallelize(List(1,2,3,4,5,6,7,8,9), 3) x.aggregate(5)(math.max,_+_) res1: Int = 25 //分析三個分割槽為(1,2,3),(4,5,6),(7,8,9)其中aggregate的初始值為5 //每次分割槽計算資料為(5,1,2,3),(5,4,5,6),(5,7,8,9),分別取最大值,得到(5,6,9) //分割槽之間使用 _+_ 計算時,再次將5,放入其中,所以變成(5,5,6,9) //最後將其相加變成5+5+6+9=25 //字串型別處理 val z = sc.parallelize(List("a"
,"b","c","d","e","f"),2) z.aggregate("x")(_ + _, _+_) res2: String = xxdefxabc //第一個分割槽集合中有("x","a","b","c") 分割槽內部的聚合結果為"xabc" //由於是並行的聚合,所以"xabc"和"xdef"誰在前面和後面並不知道 //分割槽之間進行聚合時,集合變成了("x","xabc","xdef") //兩個高階的示例 val z = sc.parallelize(List("12","23","345","4567"),2) z.aggregate("")((x,y) => math.max(x.length, y.length).toString, _+_) res3: String = 42 //分為兩個區("12","23")和("345","4567") //由於需要使用兩個函式,所以這裡定義了一個閉包函式,集合中的每兩個元素,取出最大的長度。 //每個分割槽進行比較時集合為("","12","34")、("","345","4567") //每個分割槽聚合之後的結果為("2","4") //分割槽之間進行聚合時,集合為("","2","4"),再次聚合結構可能是"24"也可能是"42" val z = sc.parallelize(List("12","23","345","4567"),2) z.aggregate("asd")((x,y) => math.min(x.length, y.length).toString, _+_) res4: String = asd11 //分割槽後的結果為("12","23") ("345","456") //分割槽內部應用初始值之後的結果為("asd","12","34") ("asd","345","456") //每個分割槽中有三個元素,每個分割槽內部需要多次聚合,分為("asd","12")和("34") //("asd","12")聚合結果是"2", 再與("34")聚合結果是"1" //所以("asd","12","34") 聚合結果是"1" //同理("asd","345","456") 聚合結果是"1" //兩個分割槽之間再次聚合,將初始值應用到其中,得到"asd11" val z = sc.parallelize(List("12","23","345",""),2) z.aggregate("")((x,y) => math.min(x.length, y.length).toString,_+_) res5: String = 10 //分割槽後的結果為("12","23") ("345","") //分割槽內部應用初始值之後的結果為("","12","23") ("","345","") //有三個元素,需要多次聚合("","12") ("23") //("","12")聚合結果是"0" 再與("23")聚合,得到的結果是"1" //("","345")聚合結果是"0" 再與("")聚合,得到的結果是"0" //最後兩次結果相加,得到的是"10" 特別注意:為了說明分割槽順序的重要性,請看下面的例子 val z = sc.parallelize(List("12","23","","345"),2) z.aggregate("")((x,y) => math.min(x.length, y.length).toString,_+_) res6: String = 11 //分割槽後的結果為("12","23") ("","345") //分割槽內部應用初始值之後的結果為("","12","23") ("","","345") //有三個元素,需要多次聚合("","12") ("23") //("","12")聚合結果是"0" 再與("23")聚合,得到的結果是"1" //("","")聚合結果是"0" 再與("345")聚合,得到的結果是"1" //最後兩次結果相加,得到的是"11"

2、aggregateByKey[Pair]

原型
def aggregateByKey[U](zeroValue: U)(seqOp: (U, V) ⇒ U, combOp: (U, U) ⇒ U)(implicit arg0: ClassTag[U]): RDD[(K, U)]

含義
aggregateByKey 也是一個聚合函式,一個RDD分割槽後,產生多個Partition,在aggregateByKey中需要指定兩個處理函式,第一個函式用於對每個分割槽內部處理,第二個函式用於分割槽之間的處理。aggregateByKey 的初始值只作用於每個分割槽內部,不影響分割槽之間聚合,分割槽是嚴格按照順序的,順序不同aggregateByKey 結果就不同。

示例

val z = sc.parallelize(List( ("cat",2), ("cat", 5), ("pig", 4),("cat", 12), ("dog", 12)), 2)
z.aggregateByKey(0)(math.max(_, _), _ + _).collect
res1: Array((dog,12), (pig,4), (cat,17))  
//分割槽之後的結果是(("cat",2),("cat",5))   (("pig",4),("cat",12),("dog",12))
//應用aggregate的初始值後,第二個分割槽為: 
//([("pig",0),("pig","4")],[("cat",0),("cat",12)],[("dog",0),("dog",12)])
//使用math.max函式後,結果為(("pig","4"),("cat",12),("dog",12))
//如果使用math.min函式後,結果就為(("pig",0),("cat",0),("dog",0))
//使用函式_+_聚合時,不會應用初始值

z.aggregateByKey(100)(math.max(_, _), _ + _).collect
res2: Array((dog,100), (pig,100), (cat,200))

3、cartesian

原型
def cartesian[U: ClassTag](other: RDD[U]): RDD[(T, U)]

含義
cartesian 笛卡爾積,兩個RDD中的元素兩兩組合

示例

val x = sc.parallelize(List(1,2,3,4,5))
val y = sc.parallelize(List(6,7,8,9,10))
x.cartesian(y).collect
//結果是兩者由小到大的順序排列
res0: Array[(Int, Int)] = Array((1,6), (1,7), (1,8), (1,9), (1,10), (2,6), (2,7), (2,8), (2,9), (2,10), (3,6), (3,7), (3,8), (3,9), (3,10), (4,6), (5,6), (4,7), (5,7), (4,8), (5,8), (4,9), (4,10), (5,9), (5,10))

4、checkpoint

原型
def checkpoint()

含義
checkpoint 檢查點機制,假設你在迭代1000次的計算中在第999次失敗了,然後你沒有checkpoint,你只能重新開始恢復了,如果恰好你在第998次迭代的時候你做了一個checkpoint,那麼你只需要恢復第998次產生的rdd,然後再執行2次迭代完成總共1000的迭代,這樣效率就很高,比較適用於迭代計算非常複雜的情況,也就是恢復計算代價非常高的情況,適當進行checkpoint會有很大的好處。

示例

sc.setCheckpointDir("hdfs://192.168.10.71:9000/wc")
//檢查點目錄必須存在
val a = sc.parallelize(1 to 5)
a.checkpoint
//將其結果檢查點更新
a.collect
res1: Long = 5

5、coalesce, repartition

原型
def coalesce ( numPartitions : Int , shuffle : Boolean = false ): RDD [T]
def repartition ( numPartitions : Int ): RDD [T]

含義
repartition 表示對已經分割槽的資料重新分割槽,是coalesce 的一個簡單應用

示例

val y = sc.parallelize(1 to 10, 5)
y.coalesce(2, false).collect
res1: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
//coalesce分割槽中false表示直接分割槽,不對其重新打亂順序

val z = y.coalesce(2, true)
val x = y.repartition(2)
z.collect
res2: Array[Int] = Array(3, 5, 9, 1, 7, 2, 8, 4, 6, 10)
//coalesce分割槽中false表示重新打亂順序,再分割槽,和repartition相等,預設是不打亂順序的

6、cogroup [Pair], groupWith [Pair]

原型
def cogroup[W](other: RDD[(K, W)]): RDD[(K, (Iterable[V], Iterable[W]))]
def groupWith[W](other: RDD[(K, W)]): RDD[(K, (Iterable[V], Iterable[W]))]
含義
cogroup 超級分組,可以將3個key-value型別的RDD進行分組

示例

//3個key-value型別的RDD分組聚合
val x = sc.parallelize(List((1, "apple"), (2, "banana"), (3, "orange"), (4, "kiwi")), 2)
val y = sc.parallelize(List((5, "computer"), (1, "laptop"), (1, "desktop"), (4, "iPad")), 2)
val z = sc.parallelize(List((3,"iphone"),(1,"xiaomi"),(4,"huawei")),2)
x.cogroup(y,z).collect

res1: Array((4,(CompactBuffer(kiwi),CompactBuffer(iPad),CompactBuffer(huawei))), (2,(CompactBuffer(banana),CompactBuffer(),CompactBuffer())), (1,(CompactBuffer(apple),CompactBuffer(laptop, desktop),CompactBuffer(xiaomi))), (3,(CompactBuffer(orange),CompactBuffer(),CompactBuffer(iphone))), (5,(CompactBuffer(),CompactBuffer(computer),CompactBuffer())))

//分析:如果x中不存在這個元素,那麼就會將其值置為空

7、collect, toArray

原型
def collect[U: ClassTag](f: PartialFunction[T, U]): RDD[U]
def toArray(): Array[T]
含義
collect 將RDD資料轉換成Scala中的陣列並返回

示例


val c = sc.parallelize(List("Gnu", "Cat", "Rat", "Dog", "Gnu", "Rat"), 2)
c.collect
res1: Array[String] = Array(Gnu, Cat, Rat, Dog, Gnu, Rat)

8、collectAsMap

原型
def collectAsMap(): Map[K, V]

含義
collectAsMap 和collect非常類似,但是,是將資料轉換成key-value型別的Map

示例


val a = sc.parallelize(List(1, 2, 1, 3), 1)
val b = a.zip(a)
//以List中的數字作為key,出現次數作為value
b.collectAsMap
res1: scala.collection.Map[Int,Int] = Map(2 -> 2, 1 -> 1, 3 -> 3)

9、combineByKey[Pair]

原型
def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C): RDD[(K, C)]

含義
combineByKey 高效的實現按照key來聚合,通過在每一個分割槽內部先聚合,再在分割槽之間聚合。每個分割槽內部是通過迭代方式來聚合,效率非常高。

示例

val a = sc.parallelize(List("dog","cat","gnu","salmon","rabbit","turkey","wolf","bear","bee"), 3)
val b = sc.parallelize(List(1,1,2,2,2,1,2,2,2), 3)
val c = b.zip(a)
//將資料分成三個區,使用數字當作key,字串作為value

val d = c.combineByKey(List(_), (x:List[String], y:String) => y :: x, (x:List[String], y:List[String]) => x ::: y)
//combineByKey的三個引數的意思是:
//第一個分割槽為((1,"dog"), (1,"cat"), (2,"gnu"))
//List(_) 每個分割槽中的key,每次取該分割槽中的一個key
//第一次取1,初始時x中的List為空,找到第一個key為1的dog,將dog放入到x中的List,然後再找到cat
//第二次取2,初始時x中的List為空,找到第一個key為2的gnu
//第一個分割槽得到了((1,("cat","dog")),(2,("gnu")))
//同理,第二個分割槽得到了((2,("salmon","rabbit")),(1,("turkey")))
//同理,第三個分割槽得到了(2,("wolf","bear","bee"))
//最後,每個分割槽之間進行合併,將相同的key的資料合併

d.collect
res1: Array[(Int, List[String])] = Array((1,List(cat, dog, turkey)), (2,List(gnu, rabbit, salmon, bee, bear, wolf)))

10、context, sparkContext

原型
def compute(split: Partition, context: TaskContext): Iterator[T]

含義
context 返回建立RDD時的SparkContext

示例

val c = sc.parallelize(List("Gnu", "Cat", "Rat", "Dog"), 2)
c.context
res1: org.apache.spark.SparkContext = [email protected]74c08828

11、count

原型
def count(): Long

含義
count 返回RDD中儲存的元素的個數

示例

val c = sc.parallelize(List("Gnu", "Cat", "Rat", "Dog"), 2)
c.count
res2: Long = 4

12、countApproxDistinct

原型
def countApproxDistinct(relativeSD: Double = 0.05): Long

含義
countApproxDistinct 近似統計RDD中不重複的值的個數,可以通過relativeSD 來確定統計精度,這種方式適合大規模,分散式的資料快速統計

示例

val z = sc.parallelize(List(1,3,4,1,2,3))
z.countApproxDistinct(0.01)
res1: Long = 4
//countApproxDistinct統計的是去掉重複的數值

val a = sc.parallelize(1 to 10000, 20)
val b = a++a++a++a++a
b.countApproxDistinct(0.1)
res2: Long = 8224

b.countApproxDistinct(0.05)
res3: Long = 9750

b.countApproxDistinct(0.01)
res4: Long = 9947

b.countApproxDistinct(0.001)
res5: Long = 10000

13、countApproxDistinctByKey [Pair]

原型
def countApproxDistinctByKey(relativeSD: Double = 0.05): RDD[(K, Long)]

含義
countApproxDistinctByKeycountApproxDistinct 類似,但是他是統計元組中的相同的key的個數,相比於其他統計方式,這種統計速度更快

示例

val a = sc.parallelize(List("Gnu", "Cat", "Rat", "Dog"), 2)
val b = sc.parallelize(a.takeSample(true, 100, 0), 5)
//產生100個隨機的,從a中選取的字元,並分為5個區

val c = sc.parallelize(1 to b.count().toInt, 5)
//產生1-100的陣列,並分為5個區

val d = b.zip(c)
//將其轉換為map型別
//按照key來進行統計
d.countApproxDistinctByKey(0.1).collect
res1: Array((Rat,27), (Cat,25), (Dog,29), (Gnu,24))

d.countApproxDistinctByKey(0.01).collect
res2:  Array((Rat,27), (Cat,24), (Dog,27), (Gnu,22))

14、countByKey [Pair]

原型
def countByKey(): Map[K, Long]

含義
countByKeycount 類似,但是他是用於統計元組中每個相同的key出現的次數,最後返回的是一個元組

示例

val c = sc.parallelize(List((3, "Gnu"), (3, "Yak"), (5, "Mouse"), (3, "Dog")), 2)
c.countByKey
res1: scala.collection.Map[Int,Long] = Map(3 -> 3, 5 -> 1)

15、countByValue [Pair]

原型
def countByValue(): Map[T, Long]

含義
countByValue 計算RDD中不同值出現的次數,以value作為統計的標準。分割槽統計時,這個操作會將資訊合併到單個reduce中去。

示例


val b = sc.parallelize(List(1,2,3,4,5,6,7,8,2,4,2,1,1,1,1,1))
b.countByValue
res1: Map(5 -> 1, 8 -> 1, 3 -> 1, 6 -> 1, 1 -> 6, 2 -> 3, 4 -> 2, 7 -> 1)

相關推薦

Spark RDD API 參考示例

1、aggregate 原型 def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U 含義 aggregate是一個聚

Spark RDD API 參考示例

28、getCheckpointFile 原型 def getCheckpointFile: Option[String] 含義 getCheckpointFile 返回RDD的che

Spark RDD API 參考示例

57、sample 原型 def sample(withReplacement: Boolean, fraction: Double, seed: Int): RDD[T] 含義 sa

Kafka新版消費者API示例

Kafka的高階消費者(high-level consumer)和低階消費者(low-level consumer,底層用SimpleConsumer實現)是舊版本的consumer中的。 新版本的consumer中沒有這兩個概念。新版本把高階消費者和低階消費者整合到一起了,對應KafkaCon

vue-router單頁應用簡單示例

問題 clas 做了 設置 new scope 文件的 log target 請先完成了項目初始化,具體請看我另一篇博文。vue項目初始化 看一下完成的效果圖,很典型的單頁應用。 .vue後綴名的單文件組件 這裏先說一下我對組件的理解。組件,顧名思義就是一組元素組成的

PHP實現RESTful風格的API實例

request restful ref turn function 數據操作 dex 進行 所有 (關於Yii2的RESTful教程請看: Yii2框架RESTful API教程) 最近看了一些關於RESTful的資料,自己動手也寫了一個RESTful實例,以下是源碼

Spring Boot參考教程 SpringBoot概述及Hello World

自動 構建項目 測試 run world tar 建模 持久化 開啟 前言 筆者閑來無事,寫此文檔,不足之處,海涵! 本文檔將詳細介紹Sping Boot特性,使用方法,及與第三方框架的集成應用。使開發人員可以快速的了解SpringBoot,熟練的使用S

Apache Spark大數據分析入門

做的 項目 persist fig shell命令 tutorial math 提高 welcom 摘要:Apache Spark的出現讓普通人也具備了大數據及實時數據分析能力。鑒於此,本文通過動手實戰操作演示帶領大家快速地入門學習Spark。本文是Apache Spark

LVS的簡單示例

LVS的簡單示例(一)LVS的簡單示例 1.使用NAT模型的TCP協議類型的lvs服務負載均衡一個php應用,如Discuz!論壇或者phpMyAdmin; 必要條件:三臺主機(一個調度器兩個服務器) 聲明:172.16.1.11作為調度器,192.168.100.2和192.168.100.3

Linux shell 示例

echo -e linu 定義變量 cheng lee export let linux amd 一、環境系統:Centos6.6 x64shell:bash、sh [centos@Shell ~]$ hostname Shell [centos@Shell ~]$ ls

Spark官方文檔翻譯~Overview

安裝 pre mac os home 翻譯 size ber uri ems Spark官方文檔翻譯,有問題請及時指正,謝謝。 Overview頁 http://spark.apache.org/docs/latest/index.html Spark概述 Apac

慕課 從零到spark進階之路

1.RDD RDD是spark特有的資料模型,談到RDD就會提到什麼彈性分散式資料集,什麼有向無環圖,本文暫時不去展開這些高深概念。 (0)隨便找個點理解以下 最重要的記住,RRD是不可變的,也就是說,已有的RDD不能被修改或者更新,但可以從已有的RDD轉化成一個新的RDD. 上面的

Zookeeper C API應用示例3——配置管理非同步API

場景描述同:https://blog.csdn.net/qq_41688455/article/details/83780854 服務端程式碼如下: #include <stdio.h> #include <unistd.h> #include <std

Zookeeper C API應用示例1——配置管理同步API

場景描述 服務端監控/configure目錄; 客戶端對/configure目錄讀/寫資料,建立/刪除子節點 服務端: 監控/configure目錄,有資料更新時,輸出/configure中的資料;子節點建立/刪除時,服務程式列出當前的子目錄列表。 程式碼如下: #include &

Java呼叫C++ API完整示例dll

最近有一個和香港的對接專案。在通訊問題上出現了卡殼。港方提供的是一個java庫,需要和我們這邊進行交易策略對接。交易策略是以協議的方式,通過網路通訊傳送到我們的系統。由於我們缺少穩定可靠的java通訊元件,但我們具有一個非常可靠的C++通訊元件。因此就萌發了將現有的C++通訊元件封裝為可供

Django 入門示例

1、建立專案 django-admin startproject dj-demo 2、初始化 python manage.py migrate 3、部署執行 python manage.py runserver 0.0.0.0:8000 4、部署執行(指定埠,開

selenium webdriver API詳解

  本系列主要講解webdriver常用的API使用方法(注意:使用前請確認環境是否安裝成功,瀏覽器驅動是否與谷歌瀏覽器版本對應)   一:開啟某個網址  from selenium import webdriver # 匯入webdriverdriver = webdriver.Chro

Netty入門示例

以下演示的是一個時間伺服器。 依次啟動Server,Client;Client從伺服器上讀取得時間後列印的控制檯上。 ChannelInitializer,ChannelInboundHandlerAdapter Server 提供時間服務。 監聽8080埠,子類TimeSe

Spark從入門到精通

什麼是Spark 大資料計算框架 離線批處理 大資料體系架構圖(Spark) Spark包含了大資料領域常見的各種計算框架:比如Spark Core用於離線計算,Spark SQL用於互動式查詢,Spark Streaming用於實時流式計算,Spark MLib用於機器學習,Spark

Spark Streaming狀態管理函式——updateStateByKey和mapWithState

updateStateByKey和mapWithState   什麼是狀態管理函式   updateStateByKey   mapWithState   updateStateByKey和mapWithState的區別   適用場景 什麼是狀態管理函