1. 程式人生 > >Spark RDD API 參考示例(五)

Spark RDD API 參考示例(五)

57、sample

原型
def sample(withReplacement: Boolean, fraction: Double, seed: Int): RDD[T]

含義
sample 隨機挑選RDD中的一部分產生新的RDD,withReplacement表示是否允許重複挑選,fraction表示挑選比例,seed表示隨機初始化種子

示例

val a = sc.parallelize(1 to 20, 3)
//不允許重複挑選,也就是不能有相同的資料
a.sample(false,0.3,0).collect
res1: Array[Int] = Array(3, 10, 13, 15
, 16, 19, 20) //可以重複挑選 a.sample(true,0.3,1).collect res2: Array[Int] = Array(4, 6, 7, 7, 12, 14)

58、sampleByKey [Pair]

原型
def sampleByKey(withReplacement: Boolean, fractions: Map[K, Double], seed: Long = Utils.random.nextLong): RDD[(K, V)]

含義
sampleByKey 先將每一個key相同的聚合在一起,然後在相同的key之間按照指定的權重篩選,使用者需要自定義一個Map來確定每個key篩選的比例。

示例

val randRDD = sc.parallelize(List( (7,"cat"), (6, "mouse"),(7, "cup"), (6, "book"), (7, "tv"), (6, "screen"), (7, "heater")))
//定製自己想要篩選的key,然後放到sampleByKey中去
val sampleMap = List((7, 0.4), (6, 0.6)).toMap
randRDD.sampleByKey(false, sampleMap,42).collect

res1: Array[(Int, String)] = Array((6,book), (7
,tv), (7,heater))

59、saveAsObjectFile

原型
def saveAsObjectFile(path: String)

含義
saveAsObjectFile 把RDD資料儲存成二進位制格式

示例

val x = sc.parallelize(1 to 10, 3)
//儲存成二進位制檔案
x.saveAsObjectFile("objFile")
//讀取上面儲存的而二進位制檔案,[Int]用於提示物件型別
val y = sc.objectFile[Int]("objFile")
y.collect
res1: Array[Int] =  Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

60、saveAsSequenceFile [SeqFile]

原型
def saveAsSequenceFile(path: String, codec: Option[Class[_ <: CompressionCodec]] = None)

含義
saveAsSequenceFile 把RDD資料儲存成Hadoop 序列檔案

示例

//將這個資料儲存到hdfs中去
val v = sc.parallelize(Array(("owl",3), ("gnu",4), ("dog",1), ("cat",2), ("ant",5)), 2)
//這個功能非常實用,可以直接操作hdfs檔案
v.saveAsSequenceFile("hdfs://192.168.10.71:9000/wc2")

61、saveAsTextFile

原型
def saveAsTextFile(path: String)
def saveAsTextFile(path: String, codec: Class[_ <: CompressionCodec])

含義
saveAsTextFile 把RDD資料儲存成文字檔案

示例

val a = sc.parallelize(1 to 10000, 3)
a.saveAsTextFile("mydata_a")
//儲存時,還可以定製壓縮格式,這樣可以節省空間
import org.apache.hadoop.io.compress.GzipCodec
a.saveAsTextFile("mydata_b", classOf[GzipCodec])

62、stats [Double]

原型
def stats(): StatCounter

含義
stats 統計學方法,同時計算RDD中的平均值,方差,標準差,最大值和最小值

示例

val z = sc.parallelize(1 to 10,3)
z.stats
//同時計算統計值,
res1: (count: 10, mean: 5.500000, stdev: 2.872281, max: 10.000000, min: 1.000000)

63、sortBy

原型
def sortBy[K](f: (T) ⇒ K, ascending: Boolean = true, numPartitions: Int = this.partitions.size)(implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T]

含義
sortBy 用於排序,第一個引數用於指定將待排序的RDD元素轉換成要排序的key,根據這個指定的key來排序,第二個引數如果為true表示升序,否則為降序

示例

//對於普通的單個元素,按照一般的排序
val y = sc.parallelize(Array(5, 7, 1, 3, 2, 1))
//true指定排序方式為升序
y.sortBy(c => c, true).collect
res1: Array[Int] = Array(1, 1, 2, 3, 5, 7)

//false表示按照降序排序
y.sortBy(c => c, false).collect
res2: Array[Int] = Array(7, 5, 3, 2, 1, 1)

//對於元組,則需要指定將元組對映到要排序的key上
val z = sc.parallelize(Array(("H", 10), ("A", 26), ("Z", 1), ("L", 5)))

//此處表示按照每個元組的第一個值排序
z.sortBy(c => c._1, true).collect
res3: Array[(String, Int)] = Array((A,26), (H,10), (L,5), (Z,1))

//此處表示按照每個元組的第二個值排序
z.sortBy(c => c._2, true).collect
res4: Array[(String, Int)] = Array((Z,1), (L,5), (H,10), (A,26))

64、sortByKey [Ordered]

原型
def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.size): RDD[P]

含義
sortByKeysortBy 的一個簡單應用,他使用的時候不需要使用者指定排序key,而是直接排序。只能對key-value型別使用。

示例

val a = sc.parallelize(List("dog", "cat", "owl", "gnu", "ant"), 2)
val b = sc.parallelize(1 to a.count.toInt, 2)
val c = a.zip(b)
c.sortByKey(true).collect
res1: Array[(String, Int)] = Array((ant,5), (cat,2), (dog,1), (gnu,4), (owl,3))

65、stdev [Double], sampleStdev [Double]

原型
def stdev(): Double
def sampleStdev(): Double

含義
stdev 呼叫stats 函式,取得RDD中的標準差,divides by N
sampleStdev 呼叫stats 函式,取得RDD中的標準差,divides by N-1,對於資料量很大的情況下適用

示例

//這裡可以體現stdev和sampleStdev的區別
val d = sc.parallelize(List(0.0, 0.0, 1.0), 3)
d.stdev
res14: Double = 0.4714045207910317
d.sampleStdev
res15: Double = 0.5773502691896257

66、subtract

原型
def subtract(other: RDD[T]): RDD[T]
def subtract(other: RDD[T], numPartitions: Int): RDD[T]
def subtract(other: RDD[T], p: Partitioner): RDD[T]

含義
subtract 求兩個集合的差,A-B

示例

//和數學上的集合相減類似
val a = sc.parallelize(1 to 9, 3)
val b = sc.parallelize(1 to 3, 3)
val c = a.subtract(b)
c.collect
res1: Array[Int] = Array(6, 9, 4, 7, 5, 8)

67、subtractByKey [Pair]

原型
def subtractByKey[W: ClassTag](other: RDD[(K, W)]): RDD[(K, V)]
def subtractByKey[W: ClassTag](other: RDD[(K, W)], numPartitions: Int): RDD[(K, V)]
def subtractByKey[W: ClassTag](other: RDD[(K, W)], p: Partitioner): RDD[(K, V)]

含義
subtractByKey 兩個RDD先按照key分組,二者相減A-B,key相同,都會消除。如果A中沒有,B中有,那麼就會丟棄

示例

//丟棄A中不存在的,key相同的都會消除
val a = sc.parallelize(List("dog", "tiger", "lion", "cat", "spider", "wolf"), 2)
val b = a.keyBy(_.length)
val c = sc.parallelize(List("ant", "falcon", "squid","abcdefg"), 2)
val d = c.keyBy(_.length)
b.subtractByKey(d).collect

res15: Array[(Int, String)] = Array((4,lion),(4,wolf))

68、sum [Double], sumApprox [Double]

原型
def sum(): Double
def sumApprox(timeout: Long, confidence: Double = 0.95): PartialResult[BoundedDouble]

含義
sum 計算RDD中元素的總和,要求輸入一個數字型別的RDD

示例

val x = sc.parallelize(1 to 10)
x.sum
res1: Double = 55.0

68、sum [Double], sumApprox [Double]

原型
def sum(): Double
def sumApprox(timeout: Long, confidence: Double = 0.95): PartialResult[BoundedDouble]

含義
sum 計算RDD中元素的總和,要求輸入一個數字型別的RDD

示例

val x = sc.parallelize(1 to 10)
x.sum
res1: Double = 55.0