1. 程式人生 > >Spark-Core應用詳解之基礎篇

Spark-Core應用詳解之基礎篇

一、RDD

1.什麼是RDD

RDD,是spark為了簡化使用者的使用,對所有的底層資料進行的抽象,以面向物件的方式提供了RDD的很多方法,通過這些方法來對RDD進行內部的計算額輸出。
RDD:彈性分散式資料集。

2.RDD的特性

1.不可變,對於所有的RDD操作都將產生一個新的RDD。
2.可分割槽,RDD是通過將資料進行分割槽儲存的。
3.彈性:
<1>儲存的彈性:記憶體與磁碟的自動切換。
<2>容錯的彈性:資料丟失可以自動恢復。
<3>計算的彈性:計算出錯進行重試機制。
<4>分片的彈性:根據需要重新進行分片。

3.Spark到底做了些什麼

在這裡插入圖片描述

4.RDD是懶執行的,分為轉換和行動操作,行動操作負責觸發RDD執行

二、RDD的方法

1.RDD的建立

<1>從集合中建立RDD

方法:使用makeRDD或者parallelize

def makeRDD[T: ClassTag](
      seq: Seq[T],
      numSlices: Int = defaultParallelism): RDD[T] = withScope {
    parallelize(seq, numSlices)
  }
  } 

在這裡插入圖片描述


def makeRDD[
T: ClassTag](seq: Seq[(T, Seq[String])]): RDD[T] = withScope { assertNotStopped() val indexToPrefs = seq.zipWithIndex.map(t => (t._2, t._1._2)).toMap new ParallelCollectionRDD[T](this, seq.map(_._1), math.max(seq.size, 1), indexToPrefs) }

在這裡插入圖片描述


def parallelize[T: ClassTag](
      seq:
Seq[T], numSlices: Int = defaultParallelism): RDD[T] = withScope { assertNotStopped() new ParallelCollectionRDD[T](this, seq, numSlices, Map[Int, Seq[String]]()) } /** *可以為單個數據物件存放的節點 */

在這裡插入圖片描述


<2>從外部儲存建立RDD
<3>從其他RDD轉換

2.RDD的型別

<1>數值型RDD

RDD[Int],RDD[(Int),(Int)]

<2>鍵值對RDD

RDD[(Int),(Int)],RDD[(Int),(Int,Int)]
**所有鍵值對RDD都可以使用資料型RDD操作

3.RDD常用運算元

Transiformation

<1>map
/**
   * Return a new RDD by applying a function to all elements of this RDD.
   * 一對一的進行RDD的轉換操作,並且產生一個新的RDD儲存所有的elements
   */
  def map[U: ClassTag](f: T => U): RDD[U] 
scala> sc.makeRDD(1 to 10)
res1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at makeRDD at <console>:25

scala> res1.map(x=>x*2).collect
res2: Array[Int] = Array(2, 4, 6, 8, 10, 12, 14, 16, 18, 20)

scala> sc.makeRDD("zhangzhangshisb").collect
res3: Array[Char] = Array(z, h, a, n, g, z, h, a, n, g, s, h, i, s, b)
<2>filter
/**
   * Return a new RDD containing only the elements that satisfy a predicate.
   * 過濾的RDD轉換操作
   */
  def filter(f: T => Boolean): RDD[T] 
scala> sc.makeRDD(1 to 10)
res1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at makeRDD at <console>:25

scala> res1.map(x=>x*2)
res7: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[4] at map at <console>:27

scala> res7.filter(_>10)
res8: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[5] at filter at <console>:29

scala> res8.collect
res9: Array[Int] = Array(12, 14, 16, 18, 20)
<3>flatMap
/**
   * 通過一個演算法將RDD多維化,但是輸出卻是平面的型別
   */
  def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] 
scala> sc.makeRDD(1 to 10)
res1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at makeRDD at <console>:25

scala> res1.flatMap(1 to _)
res10: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[6] at flatMap at <console>:27

scala> res10.collect
res11: Array[Int] = Array(1, 1, 2, 1, 2, 3, 1, 2, 3, 4, 1, 2, 3, 4, 5, 1, 2, 3, 4, 5, 6, 1, 2, 3, 4, 5, 6, 7, 1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5, 6, 7, 8, 9, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
<4>mapPartitions
  /**
   * 將RDD進行分塊操作,使該RDD區域的所有元素執行此命令
   */
  def mapPartitions[U: ClassTag](
      f: Iterator[T] => Iterator[U],
      preservesPartitioning: Boolean = false): RDD[U] 
scala> sc.makeRDD(1 to 10)
res1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at makeRDD at <console>:25

scala> res1.mapPartitions(x=>x.filter(_ % 2==0).map("Partition"+_))
res12: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[7] at mapPartitions at <console>:27

scala> res12.collect
res13: Array[String] = Array(Partition2, Partition4, Partition6, Partition8, Partition10)
<5>mapPartitionsWithIndex
    /**
	  *在mapPartitions基礎上增加了一個index的索引引數
      *在建立RDD的時候也可以手動設定Partitions的數量
      *看如下操作
   */
  def mapPartitionsWithIndex[U: ClassTag](
      f: (Int, Iterator[T]) => Iterator[U],
      preservesPartitioning: Boolean = false): RDD[U] 
scala> var rdd = sc.makeRDD(1 to 50,5)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[8] at makeRDD at <console>:24

scala> rdd.collect
res15: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50)

scala> rdd.partitions.size
res16: Int = 5

scala> rdd.mapPartitionsWithIndex((i,x)=>Iterator(i + ":["+x.mkString(",")+"]"))
res17: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[9] at mapPartitionsWithIndex at <console>:27

scala> res17.collect
res18: Array[String] = Array(0:[1,2,3,4,5,6,7,8,9,10], 1:[11,12,13,14,15,16,17,18,19,20], 2:[21,22,23,24,25,26,27,28,29,30], 3:[31,32,33,34,35,36,37,38,39,40], 4:[41,42,43,44,45,46,47,48,49,50])
<6>sample
  /**
   * 簡單來說就是隨機抽樣操作
   * withReplacement:
   * true就是放回抽樣,
   * false就是不放回抽樣
   * fracition:
   * 挑選出來元素的比率
   * seed:
   * 不用多說了吧,種子演算法
   */
  def sample(
      withReplacement: Boolean,
      fraction: Double,
      seed: Long = Utils.random.nextLong): RDD[T] 
scala> var rdd = sc.makeRDD(1 to 50,5)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[8] at makeRDD at <console>:24

scala> rdd.sample(true,0.5,5).collect
res19: Array[Int] = Array(8, 8, 8, 9, 14, 14, 16, 16, 20, 21, 22, 22, 25, 27, 28, 28, 32, 33, 36, 36, 45, 45, 48, 48, 49, 49)
<7>union
  /**
   * 聯合一個RDD,返回一個組合的RDD,但是兩個RDD的型別得一樣
   */
  def union(other: RDD[T]): RDD[T]
scala> sc.makeRDD(5 to 9).union(sc.makeRDD(10 to 15)).collect
res21: Array[Int] = Array(5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15)
<8>intersection
  /**
   * Return the intersection of this RDD and another one. The output will not contain any duplicate
   * elements, even if the input RDDs did.
   *
   * @note This method performs a shuffle internally.
   * 返回兩個RDD的交集
   */
  def intersection(other: RDD[T]): RDD[T] 
scala> sc.makeRDD(5 to 9).intersection(sc.makeRDD(0 to 15)).collect
res22: Array[Int] = Array(6, 8, 7, 9, 5)
<9>distinct
  /**
   * Return a new RDD containing the distinct elements in this RDD.
   * 去重但混洗,有點像shuffle的那種?
   */
  def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] 
scala> sc.makeRDD(5 to 9).union(sc.makeRDD(0 to 15)).distinct.collect
res23: Array[Int] = Array(4, 0, 8, 12, 13, 1, 9, 5, 14, 6, 10, 2, 15, 11, 7, 3)
<10>partitionBy
  /**
   * Return a copy of the RDD partitioned using the specified partitioner.
   */
  def partitionBy(partitioner: Partitioner): RDD[(K, V)] 
scala> var rdd=sc.makeRDD(1 to 80,8)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at makeRDD at <console>:24

scala> rdd.mapPartitionsWithIndex((i,x)=>Iterator(i + ":["+x.mkString(",")+"]"))
res0: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[1] at mapPartitionsWithIndex at <console>:27

scala> res0.collect
res1: Array[String] = Array(0:[1,2,3,4,5,6,7,8,9,10], 1:[11,12,13,14,15,16,17,18,19,20], 2:[21,22,23,24,25,26,27,28,29,30], 3:[31,32,33,34,35,36,37,38,39,40], 4:[41,42,43,44,45,46,47,48,49,50], 5:[51,52,53,54,55,56,57,58,59,60], 6:[61,62,63,64,65,66,67,68,69,70], 7:[71,72,73,74,75,76,77,78,79,80])

scala> import org.apache.spark.HashPartitioner
import org.apache.spark.HashPartitioner

scala> rdd.map(x=>(x,x)).partitionBy(new HashPartitioner(5))
res2: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[3] at partitionBy at <console>:28

scala> res2.partitions.size
res5: Int = 5

scala> res2.mapPartitionsWithIndex((i,x)=>Iterator(i + ":["+x.mkString(",")+"]"))
res6: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[4] at mapPartitionsWithIndex at <console>:30

scala> res6.collect
res7: Array[String] = Array(0:[(5,5),(10,10),(15,15),(20,20),(25,25),(30,30),(35,35),(40,40),(45,45),(50,50),(55,55),(60,60),(65,65),(70,70),(75,75),(80,80)], 1:[(1,1),(6,6),(11,11),(16,16),(21,21),(26,26),(31,31),(36,36),(41,41),(46,46),(51,51),(56,56),(61,61),(66,66),(71,71),(76,76)], 2:[(2,2),(7,7),(12,12),(17,17),(22,22),(27,27),(32,32),(37,37),(42,42),(47,47),(52,52),(57,57),(62,62),(67,67),(72,72),(77,77)], 3:[(3,3),(8,8),(13,13),(18,18),(23,23),(28,28),(33,33),(38,38),(43,43),(48,48),(53,53),(58,58),(63,63),(68,68),(73,73),(78,78)], 4:[(4,4),(9,9),(14,14),(19,19),(24,24),(29,29),(34,34),(39,39),(44,44),(49,49),(54,54),(59,59),(64,64),(69,69),(74,74),(79,79)])
<11>reduceByKey
  /**
   * 根據Key進行聚合操作
   */
  def reduceByKey(func: (V, V) => V): RDD[(K, V)] 
scala> val rdd1 = sc.makeRDD(Array((1,1),(1,2),(1,5),(2,3),(2,4),(2,5),(3,6)))
rdd1: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[5] at makeRDD at <console>:25

scala> rdd1.reduceByKey(_+_).collect
res8: Array[(Int, Int)] = Array((2,12), (1,8), (3,6))
<12>groupByKey
  /**
   * 延時處理,但是實際開發,reduceBykey用的更多,將key相同的value聚集到一起
   */
  def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])] 
scala> val rdd1 = sc.makeRDD(Array((1,1),(1,2),(1,5),(2,3),(2,4),(2,5),(3,6)))
rdd1: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[5] at makeRDD at <console>:25

scala> rdd1.groupByKey().collect
res10: Array[(Int, Iterable[Int])] = Array((2,CompactBuffer(3, 4, 5)), (1,CompactBuffer(1, 2, 5)), (3,CompactBuffer(6)))
<13>combineByKey
  def combineByKey[C](
      createCombiner: V => C,
      mergeValue: (C, V) => C,
      mergeCombiners: (C, C) => C,
      numPartitions: Int): RDD[(K, C)] 

在這裡插入圖片描述

scala> var rdd2 = sc.makeRDD(Array(("a",90),("a",80),("a",87),("b",89),("b",74),("c",77),("c",99)))
rdd2: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[0] at makeRDD at <console>:24

scala> rdd2.combineByKey(v=>(v,1),(c:(Int,Int),v)=>(c._1+v,c._2+1),(c1:(Int,Int),c2:(Int,Int))=>(c1._1+c2._1,c1._2+c2._2))
res2: org.apache.spark.rdd.RDD[(String, (Int, Int))] = ShuffledRDD[1] at combineByKey at <console>:27

scala> rdd2.collect
res3: Array[(String, Int)] = Array((a,90), (a,80), (a,87), (b,89), (b,74), (c,77), (c,99))

scala> res2.map{case(k,v:(Int,Int))=>(k,v._1/v._2)}
res6: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[2] at map at <console>:29

scala> res6.collect
res7: Array[(String, Int)] = Array((b,81), (a,85), (c,88))
<13>aggregateByKey
  /**
   *在SeqOP中先講同一個partition內的key值相同情況下各自取出max(value)
   *然後再對rdd內所有的partition進行同樣的操作
   *最後在CombOP中進行聚合操作
   */
  def aggregateByKey[U: ClassTag](zeroValue: U, partitioner: Partitioner)(seqOp: (U, V) => U,
      combOp: (U, U) => U): RDD[(K, U)] 
scala> var rdd=sc.parallelize(List((1,3),(1,2),(2,4),(2,6),(2,8),(2,7),(3,9)),5)
rdd: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[0] at parallelize at <console>:24

scala> rdd.collect
res0: Array[(Int, Int)] = Array((1,3), (1,2), (2,4), (2,6), (2,8), (2,7), (3,9))

scala> var agg = rdd.aggregateByKey(0)(math.max(_,_),_+_)
agg: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[1] at aggregateByKey at <console>:26

scala> agg.collect
res1: Array[(Int, Int)] = Array((1,5), (2,21), (3,9))
<14>foldByKey
//是aggregateByKey的簡化版
  def foldByKey(
      zeroValue: V,
      partitioner: Partitioner)(func: (V, V) => V): RDD[(K, V)] 
scala> var rdd=sc.parallelize(List((1,3),(1,2),(2,4),(2,6),(2,8),(2,7),(3,9)),5)
rdd: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[0] at parallelize at <console>:24

scala> var fold = rdd.foldByKey(0)(_+_)
fold: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[2] at foldByKey at <console>:26

scala> fold.collect
res2: Array[(Int, Int)] = Array((1,5), (2,25), (3,9))
<15>sortByKey
//根據Key進行排序,但是如果不支援key的排序操作就會繼承withOrdering介面實現compare方法,實現key的大小判定
  def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.length)
    : RDD[(K, V)] 
//可以在sortByKey內部提供一個boolean值決定升序還是降序,預設是升序
scala> var rdd=sc.parallelize(List((1,3),(1,2),(2,4),(2,6),(2,8),(2,7),(3,9)),5)
rdd: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[0] at parallelize at <console>:24

scala> rdd.sortByKey().collect
res3: Array[(Int, Int)] = Array((1,3), (1,2), (2,4), (2,6), (2,8), (2,7), (3,9))
<16>sortBy
//比sortByKey更靈活
  def sortBy[K](
      f: (T) => K,
      ascending: Boolean = true,
      numPartitions: Int = this.partitions.length)
      (implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T] 
scala> var rdd1 = sc.makeRDD(1 until(9))
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[15] at makeRDD at <console>:24

scala> rdd1.sortBy(x=>x).collect
res9: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8)

scala> rdd1.sortBy(1 / _).collect
res10: Array[Int] = Array(2, 3, 4, 5, 6, 7, 8, 1)
<17>join
//連線兩個RDD然後
//JOIN 只留下雙方都有的KEY
//left JOIN 留下左邊RDD的資料
//right JOIN 留下右邊RDD的資料
def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))] 
scala> var rdd=sc.makeRDD(Array((1,2),(2,3)))
rdd: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[28] at makeRDD at <console>:24

scala> var rdd1=sc.makeRDD(Array((2,4),(3,5)))
rdd1: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[29] at makeRDD at <console>:24

scala> rdd.join(rdd1).collect
res15: Array[(Int, (Int, Int))] = Array((2,(3,4))) 
<18>cogroup
//分別將相同key的資料聚合到一起
  def cogroup[W](other: RDD[(K, W)], partitioner: Partitioner)
      : RDD[(K, (Iterable[V], Iterable[W]))] 
scala> var rdd=sc.makeRDD(Array((1,2),(2,3)))
rdd: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[28] at makeRDD at <console>:24

scala> var rdd1=sc.makeRDD(Array((2,4),(3,5)))
rdd1: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[29] at makeRDD at <console>:24

scala> rdd.cogroup(rdd1).collect
res16: Array[(Int, (Iterable[Int], Iterable[Int]))] = Array((2,(CompactBuffer(3),CompactBuffer(4))), (1,(CompactBuffer(2),CompactBuffer())), (3,(CompactBuffer(),CompactBuffer(5))))
<19>Coalesce
//當RDD數遠遠大於節點數時,就會把小的資料集放到一個節點上,減小計算壓力
  def coalesce(numPartitions: Int, shuffle: Boolean = false,
               partitionCoalescer: Option[PartitionCoalescer] = Option.empty)
              (implicit ord: Ordering[T] = null)
      : RDD[T] 
scala> rdd.partitions.size
res18: Int = 2

scala> rdd.coalesce(1)
res19: org.apache.spark.rdd.RDD[(Int, Int)] = CoalescedRDD[35] at coalesce at <console>:27

scala> res19.partitions.size
res20: Int = 1
<20>repartition
//給資料混洗進行重新分割槽
def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] 
scala> rdd.repartition(1)
res21: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[39] at repartition at <console>:27

scala> res21.partitions.size
res22: Int = 1

Action

Transiformation最後返回的一定是RDD
而Action返回的一定不是RDD

<1>reduce
//通過基於reduce內部的f函式對資料集進行聚集操作
def reduce(f: (T, T) => T): T
scala> var rdd=sc.makeRDD(Array[Int](1,2,3,4,5,6,7,8,9))
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[41] at makeRDD at <console>:24

scala> rdd.reduce(_*_)
res26: Int = 362880
<2>collect
//以陣列的形式返回資料集的元素
def collect(): Array[T]
scala>  rdd.collect
res27: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9)
<3>count
//返回資料集的元素個數
def count(): Long
scala> rdd.count
res28: Long = 9
<4>first
//返回第一個元素
def first(): T
scala> rdd.first
res29: Int = 1
<5>take
//返回前num個元素組成的陣列
def take(num: Int): Array[T]
scala> rdd.take(5)
res30: Array[Int] = Array(1, 2, 3, 4, 5)
<6>takeSample
//返回一個從資料集挑選的有num個元素組成的隨機陣列
//boolean決定是否可重複取樣
def takeSample(
      withReplacement: Boolean,
      num: Int,
      seed: Long = Utils.random.nextLong): Array[T] 
scala> rdd.takeSample(true,5)
res31: Array[Int] = Array(1, 1, 5, 9, 4)
<7>takeOrdered
//返回排序後的前num個數據
def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T]
scala> rdd.takeOrdered(4)
res34: Array[Int] = Array(1, 2, 3, 4) 
<8>aggregate
//和transiformation的aggregateByKey一樣,只不過返回的是泛型,不需要和RDD型別一樣
def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U
scala> rdd.aggregate(1)(
     | (_*_),
     | (_+_))
res35: Int = 15145
<9>fold
//摺疊操作,簡化aggregate
def fold(zeroValue: T)(op: (T, T) => T): T
scala> rdd.fold(1)(_*_)
res42: Int = 362880
<10>saveAsTextFile
//將資料集以檔案形式儲存起來
//path為hdfs的路徑
def saveAsTextFile(path: String): Unit
<11>saveAsSequenceFile
//儲存為Seq型別
def saveAsSequenceFile(
      path: String,
      codec: Option[Class[_ <: CompressionCodec]] = None): Unit 
<12>saveAsObjectFile
//儲存為Object型別
def saveAsObjectFile(path: String): Unit
<13>countByKey
//返回每個key的資料量
def countByKey(): Map[K, Long]
scala> var rdd=sc.makeRDD((List((1,2),(1,100),(1,14),(2,18),(2,14),(3,77))))
rdd: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[44] at makeRDD at <console>:24

scala> rdd.countByKey
res47: scala.collection.Map[Int,Long] = Map(2 -> 2, 1 -> 3, 3 -> 1)
<14>foreach
//和scala的foreach一樣,對每個元素進行處理
def foreach[U](f: A => U): Unit
scala> rdd.foreach(print(_))
(1,2)(1,100)(1,14)(2,18)(2,14)(3,77)

4.RDD的統計方法

在這裡插入圖片描述