1. 程式人生 > >Spark RDD-1-常用運算元

Spark RDD-1-常用運算元

目錄

 

1、RDD簡介

2、RDD建立

3、常用RDD運算元

(1)Action RDD

(2)單個RDD的 Transformation (惰性)

(3)多個RDD的Transformation


1、RDD簡介

Spark對資料的一種核心抽象,Resilient Distributed Dataset,彈性分散式資料集,不可變,是val型別

RDD資料儲存在記憶體中,採購伺服器時,需選擇記憶體較大的機器,計算能力強

def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U

        儲存型:硬碟大

        計算型:記憶體大

彈性:服務閒置較多時,減少服務;服務訪問壓力過大時,增加服務

分散式:叢集上多個節點,不一定某部分資料分給誰;

資料集:資料的容器,比如之前的map、list、set。

2、RDD建立

  1. 資料來源在程式內部,練習用,實際開發一般不會是這種方式,預設分割槽數2
    val data = "Though my daily life is extremely monotonous"
    sc.parallelize(data.split(" "))
  2. 外部資料來源
    sc.textFile("spark/src/test.txt")
    1. testFile:
      /**
         * Read a text file from HDFS, a local file system (available on all nodes), or any * * 
         * Hadoop-supported file system URI, and return it as an RDD of Strings.
         * 
         * @path: 
         *   比如local file:"src/test.txt"
         *      Hadoop-supported:local,"file://test.txt"
         *                        hdfs, "hdfs://namenode:test.txt"
         */
        def textFile(path: String,
                     minPartitions: Int = defaultMinPartitions): RDD[String] = withScope {
          assertNotStopped()
          hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],
            minPartitions).map(pair => pair._2.toString).setName(path)
        }
      1. path:本地檔案路徑,或者所有hadoop支援的檔案系統uri(寫法見程式碼中path舉例)
      2. minPatitions:指定RDD的分割槽數(RDD一個分割槽,就是一個Task),預設是
        defaultMinPartitions: Int = math.min(defaultParallelism, 2) // defaultParallelism是配置檔案中指定的或程式中設定的引數
      3. 支援萬用字元*:比如textFile("/my/*.txt")、textFile("my/*")
      4. 支援讀取壓縮檔案:textFile("/my/*.gz")
      5. 支援指定多個檔案:textFile("/my/text1.txt","/my/text2.txt")

 

3、常用RDD運算元

(1)Action RDD

  1. foreach:遍歷每個元素,無返回值,一般用在將結果儲存到資料庫中使用
  2. saveAsTextFile儲存到hdfs,RDD每個partition存到hdfs的一個block塊
  3. saveAsObjectFile:儲存到hdfs,將每個partition的資料序列化後,以sequenceFile(序列化)格式存到hdfs
  4. collect:將RDD轉換為本地資料集合
    1. collectAsMap:PairRDD型別,可以轉換為本地的map型別
  5. count:RDD集合中資料量
  6. countByValue:各個元素在RDD中出現的次數,比如結果為{(2,1), (3,2)} ,2出現1次,3出現2次
  7. countByKey():Returns a hashmap of (K, Int) pairs with the count of each key
  8. lookup(k: Key):(PairRDD型別)選出與k匹配元素,將結果以sequence形式返回本地集
  9. top(num:Int) / take(num:Int):取出最大(最小)的num個元素
    1. 可以結合count做分頁
    2. 可以用來批量傳輸資料
  10. first:取第一個元素
  11. reduce(f: (T, T) => T): T     
    1. 二元運算函式聚合rdd的元素,最後合成一個值
    2. 聚合,each partition先做聚合,再mergeResult將分割槽聚合結果再聚合
  12. fold(zeroValue: T)(op: (T, T) => T):設定了初始值的聚合,也是先對each partition內用op聚合,再將每個partition的計算結果用op再聚合
    // 21,分割槽內聚合得11,再對所有分割槽結果再聚合得21
    println(sc.parallelize(List(1), 1).fold(10)((x:Int,y:Int)=>x+y))) 
    // 33,兩個分割槽結果分別是11,12,之後再聚合10+11,再+12,33
    println(sc.parallelize(List(1,2), 2).fold(10)((x:Int,y:Int)=>x+y)) 
    1. 類似reduce,區別是多了初始值,各個分割槽算一次zeroValue,最後聚合分割槽結果時再算一次
  13. aggregate:先歸併後聚合
    1. 類似fold:區別是聚合前後資料型別可以不一致,而reduce/fold聚合前後型別一致
    2. 計算:注意初始值會多次加入到結果中,分割槽數(M)+最後combine result次數=初始值參與M+1次
      	println(sc.parallelize(List(1, 2,3,4,5), 2).aggregate("s")(
      		(str: String, num: Int) => {
      			println(str + "\t\t\t" + num)
      			s"${str}_${num}"
      		},
      		(str1: String, str2: String) => {
      			println(str1 + "\t" + str2)
      			s"${str1}/${str2}"
      		}
      	))
      
      結果
      s			1
      s_1			2
      s			3
      s_3			4
      s_3_4			5
      s	s_3_4_5
      s/s_3_4_5	s_1_2
      s/s_3_4_5/s_1_2       // s 初始值加入了3次
      1. seqOp:an operator used to accumulate results within a partition,初始值是zeroValue
      2. combOp:an associative operator used to combine results from different partitions,初始值是zeroValue

(2)單個RDD的 Transformation (惰性)

  1. filter:過濾結果集
  2. distinct:去重,返回新的RDD
  3. map:遍歷元素執行操作函式,返回值型別由操作函式決定,一個數據建立一個連線
  4. mapPartition
    1. 和map類似,都是遍歷,區別是遍歷的是分割槽,函式是作用於每個分割槽而不是每個元素
    2. 適用於需要頻繁建立外部連結的情況
      1. 比如spark streaming消費kafka訊息之後需要操作資料庫時,需要建立資料庫連結,此時用mapPartition而非map,一個分割槽建立一個連結,以減少連線數
  5. mapPartitionWithIndex:和mapPartition一樣,只不過多提供了一個引數partition index,內部原始碼都是通過new MapPartitionsRDD
    val source1 = sc.parallelize(List("apple", "apple", "pig", "apple", "pig"), 2)
    source1.mapPartitionsWithIndex{
    		(index, iter) => {
    			var res = List[(String,Int)]()
    			while(iter.hasNext)
    				res = res.::(iter.next(), index)
    			res.iterator
    		}
    	}.foreach(println)
    結果:
    (apple,0)
    (apple,0)
    (pig,1)
    (apple,1)
    (pig,1)
    
  6. flatMap:壓平,將map後每個元素的子元素取出來(比如map處理後每個元素是個list,list裡每個元素提出來)
    val sourceFlatMap = sc.parallelize(List(("apple,banana"), ("pig,cat,dog")))
    println(sourceFlatMap.flatMap(_.split(",")).collect.mkString(" "))
    
    結果:
    apple banana pig cat dog
    1. 拉平元素
    2. 類似map,區別是可以把二維RDD轉成一維,如果map後不是集合,最終結果和map沒區別
  7. flatMapValues[U](f: V => TraversableOnce[U])
    val source = sc.parallelize(List(("fruit", "apple,banana"), ("animal", "pig,cat,dog")))
    println(source.flatMapValues(_.split(",")).collect.mkString(" * "))
    
    結果
    (fruit,apple) * (fruit,banana) * (animal,pig) * (animal,cat) * (animal,dog)
    1. 拉平value
  8. keyBy[K](f: T => K):將rdd轉成 key-value元組資料結構的pair rdd,根據value(T)得出k

    val source1 = sc.parallelize(List("apple", "banana", "pig", "cat", "dog"))
    println(source1.keyBy(_.length).collect.mkString(" "))
    
    結果:
    (5,apple) (6,banana) (3,pig) (3,cat) (3,dog)
  9. groupBy[K](f: T => K, p: Partitioner):
     def groupBy[K](f: T => K, p: Partitioner)
                   (implicit kt: ClassTag[K], ord: Ordering[K] = null): RDD[(K, Iterable[T])] = withScope {
        val cleanF = sc.clean(f)
        this.map(t => (cleanF(t), t)).groupByKey(p)
      }
    1. 函式f:用來指定key,將rdd中每個元素,轉成K
    2. 根據K,做groupByKey
  10. combineByKey(createCombiner: V => C,mergeValue: (C, V) => C,mergeCombiners: (C, C) => C): RDD[(K, C)]
    def combineByKey[C](
          createCombiner: V => C,
          mergeValue: (C, V) => C,
          mergeCombiners: (C, C) => C): RDD[(K, C)] = self.withScope {
        combineByKeyWithClassTag(createCombiner, mergeValue, mergeCombiners)(null)
      }
    1. 下面的groupByKey、reduceByKey、aggregateByKey內部,都用了和combineByKey同樣的combineByKeyWithClassTag
      1. createCombiner:分割槽內,遍歷分割槽內元素,遇到一個新的key就用這個函式操作一次轉成C型別,不是新的key就不操作了(不是新的key操作的是下面的mergeValue)
      2. mergeValue:分割槽內的,對上面結果C,又遇到同樣的key,做mergeValue操作
      3. mergeCombiners:分割槽間函式,合併分割槽結果
    2. 比如求平均值
      val initialScores = Array(("Fred", 88.0), ("Fred", 95.0), ("Fred", 91.0), ("Wilma", 93.0), ("Wilma", 95.0), ("Wilma", 98.0))
      val d1 = sc.parallelize(initialScores)
      type MVType = (Int, Double) //定義一個元組型別(科目計數器,分數)
      d1.combineByKey(
        score => (1, score),
        (c1: MVType, newScore) => (c1._1 + 1, c1._2 + newScore),
        (c1: MVType, c2: MVType) => (c1._1 + c2._1, c1._2 + c2._2)
      ).map { case (name, (num, socre)) => (name, socre / num) }.collect
      

      res1: Array[(String, Double)] = Array((Wilma,95.33333333333333), (Fred,91.33333333333333))

  11. groupByKey():RDD[(K, Iterable[V])]:按key分組
    def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])] = self.withScope {
    // 1、V型別轉成Buffer,不做計算
        val createCombiner = (v: V) => CompactBuffer(v)
    // 2、merge V into Buffer,第一步計算,但其實沒算,只是把值放到buffer中
        val mergeValue = (buf: CompactBuffer[V], v: V) => buf += v
    // 3、combine two CompactBuffer[V] into one CompactBuffer[V]
        val mergeCombiners = (c1: CompactBuffer[V], c2: CompactBuffer[V]) => c1 ++= c2
        val bufs = combineByKeyWithClassTag[CompactBuffer[V]](
          createCombiner, mergeValue, mergeCombiners, partitioner, mapSideCombine = false)
        bufs.asInstanceOf[RDD[(K, Iterable[V])]]
     }
    1. pair RDD的運算元
    2. 較耗時,如果是希望做聚合,using `PairRDDFunctions.aggregateByKey`or `PairRDDFunctions.reduceByKey效能更好
      1. 原因是:groupByKey不能在map端做combine,需要把所有資料都insert into compactBuffer中,然後combine,這樣會造成 more objects in the old gen(jvm老生帶會有大量沒用的物件)
    3. 每次的結果,相同key對應的Iterable,元素順序可能不同
  12. reduceByKey
    def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = self.withScope {
    // 比較groupByKey,這裡func作用了兩個地方,一個是merge,一個是combine
    // merge:相當於是在本地map端先彙總
    // combine:之後到reduce端再次彙總
        combineByKeyWithClassTag[V]((v: V) => v, func, func, partitioner)
      }
    1. 類似與groupByKey,區別是先在map端做一次聚合,再到reduce再聚合
    2. map端做一次彙總,減少資料IO傳輸,效能比groupBykey好
  13. aggregateByKey:
    def aggregateByKey[U: ClassTag](zeroValue: U, partitioner: Partitioner)(seqOp: (U, V) => U,
          combOp: (U, U) => U): RDD[(K, U)]
    1. 類似與reduceByKey,也是分別在map和reduce端做聚合,區別是多了初始值的設定
    2. zeroValue:初始值,對每種key作用一次
    3. seqOp:在分割槽內,Aggregate the values of each key(聚合每個key的values),用zeroValue,結果型別是U
    4. comOp:a operation for merging two U's between partitions,在分割槽間執行
  14. sortByKey:
    1. 預設對Key做升序排序
    2. 如果是輸出到檔案,會寫入多個 part-X 檔案中

(3)多個RDD的Transformation

  1. union:兩個RDD並行處理
    1. 沒有shuffle,只是將兩個rdd合併,可能涉及資料的移動
  2. intersection:交集且去重 (比如用在物品歸類)

  3. cartesian:笛卡兒積

  4. subtract(other: RDD[T]):作差(不去重)

    1. Return an RDD with the elements from `this` that are not in `other`.
  5. zip:兩個rdd一對一關聯處理元素

    1. 要求兩個rdd的元素數量要相同,有相同的值

    2. 比如表垂直拆分之後合併就用zip

  6. join:pair RDD的操作,通過key進行關聯查詢,內部通過cogroup實現

    def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))] = self.withScope {
        this.cogroup(other, partitioner).flatMapValues( pair =>
    // pair的兩個iterator都有值,才能到yield
          for (v <- pair._1.iterator; w <- pair._2.iterator) yield (v, w)
        )
      }
    1. 返回資料形式為: (k, (v1, v2)) tuple

  7. leftOuterJoin:pair RDD的操作,左關聯查詢,內部通過cogroup實現

  8. rightOuterJoin:pair RDD的操作,右,內部通過cogroup實現

  9. fullOuterJoin:pair RDD的操作,全連線,內部通過cogroup實現,左邊沒有,以右邊為主,右邊沒有,以左邊為主