1. 程式人生 > >Spark取出(Key,Value)型資料中Value值為前n條資料

Spark取出(Key,Value)型資料中Value值為前n條資料

     最近在使用Spark進行一些日誌分析,需要對日誌中的一些(key,value)型資料進行排序,並取出value最多的10條資料。經過查詢資料,發現Spark中的top()函式可以取出排名前n的元素,以及sortBy()函式可以對(key,value)資料根據value進行排序,原以為一切都很好解決,但是實際情況並沒有得到想要的結果資料,研究了部分原始碼,才最終達到了想要的資料,特在此備註和分享。

   前期遇到的坑

   剛開始,通過查詢資料,知道Spark可以使用sortByKey()和sortBy() 兩個函式對(key,value)型資料排序。於是,直接使用sortByKey()進行排序,排完之後才發現,排序的時候是根據Key排序,而我需要先對Key進行彙總,再根據Value進行排序。顯然,sortByKey不能滿足需求!

    於是,開始嘗試使用sortBy()函式,使用方法為 rdd.sortBy(_._2,false),即可對value進行降序排序。在測試的時候,我使用了rdd.sortBy(_._2,false).collect()進行排序和彙總,但是collect()函式會將所有的資料彙總到Driver,當資料量太大時對導致Driver中的記憶體不足。於是,想著只取將10條資料返回給Driver。經過查詢,知道top()函式可以取出前10條資料。

   top()函式中的坑及其解決方法

    知道可以用top()函式取出前10條資料,以為這麼簡單就能得到想要的資料,好激動!誰知,我還是高興得太早了-_-
    我取出Value值排名前10的資料的實現程式碼為: rdd.sortBy(_._2,false).top(10)   程式碼簡潔清晰,感覺一切都是這麼美好。然而,當我放到叢集上執行時,得到的結果卻大大出乎我的意料。Value值為1的資料都被取出來了,Value值較大的資料反而沒有取到,神馬情況??     想了半天,也不知道是哪裡出了問題。沒辦法,只能研究原始碼了。於是,花了點時間研究將sortBy()函式的原始碼看了一遍,發現sortBy()函式的實現其實最終是呼叫了sortByKey()函式進行排序。但是,這跟最後取得Value值為前10的目標沒有衝突呀!於是,繼續研究top()函式的原始碼。由於之前一直想著top()函式就是取出排序之後的前n條資料,已經有這個慣性思維一直在腦海中,所以在前兩次看top()原始碼的時候,也沒發現什麼異常。下面為top()函式的原始碼實現:
   

   

   後來,靜下心來再看了幾遍,終於發現不是太對了。我的資料型別是(key,value)格式的,rdd.sortBy(_._2,false)中實現了根據value值排序的目的,但是 .top(10) 卻取出了key為前10的資料。top()函式原始碼中,對RDD中的資料進行了reduce操作,並將結果進行排序。所以,rdd.sortBy(_._2,false).top(10) 這段程式碼先是對(key,value)資料根據value進行排序,而top()函式中,資料又再次對key進行了排序,導致之前根絕value排序的結果亂序了,所以最後取到的是key排在前10的資料。這就是導致問題的原因,終於被我發現了!

    問題雖然被發現了,但是怎麼解決呢?說實話,我對Scala也不是太瞭解,只能去QQ群裡請教了一些大神。有一位叫做老徐的大神幫我給出瞭解決方法: rdd.sortBy(_._2,false).top(10)(Ordering.by(e => e._2))。再次執行,果然能得到正確結果。後來再仔細想想,覺得sortBy()函式有點多餘,於是變成rdd.top(10)(Ordering.by(e => e._2))。至此,已經能對(key,value)型別的資料進行彙總,然後根據value值進行排序,最後取出value排名前10的資料了。

   take()函式實現目標

   在請教大神的時候,偶然接觸到了take()函式,經過測試: rdd.sortBy(_._2,false).take(10) 這段程式碼能得到value排名前10的資料。

   檢視take()函式的原始碼,如下:

/**
   * Take the first num elements of the RDD. It works by first scanning one partition, and use the
   * results from that partition to estimate the number of additional partitions needed to satisfy
   * the limit.
   *
   * @note this method should only be used if the resulting array is expected to be small, as
   * all the data is loaded into the driver's memory.
   *
   * @note due to complications in the internal implementation, this method will raise
   * an exception if called on an RDD of `Nothing` or `Null`.
   */
  def take(num: Int): Array[T] = withScope {
    if (num == 0) {
      new Array[T](0)
    } else {
      val buf = new ArrayBuffer[T]
      val totalParts = this.partitions.length
      var partsScanned = 0
      while (buf.size < num && partsScanned < totalParts) {
        // The number of partitions to try in this iteration. It is ok for this number to be
        // greater than totalParts because we actually cap it at totalParts in runJob.
        var numPartsToTry = 1L
        if (partsScanned > 0) {
          // If we didn't find any rows after the previous iteration, quadruple and retry.
          // Otherwise, interpolate the number of partitions we need to try, but overestimate
          // it by 50%. We also cap the estimation in the end.
          if (buf.size == 0) {
            numPartsToTry = partsScanned * 4
          } else {
            // the left side of max is >=1 whenever partsScanned >= 2
            numPartsToTry = Math.max((1.5 * num * partsScanned / buf.size).toInt - partsScanned, 1)
            numPartsToTry = Math.min(numPartsToTry, partsScanned * 4)
          }
        }

        val left = num - buf.size
        val p = partsScanned.until(math.min(partsScanned + numPartsToTry, totalParts).toInt)
        val res = sc.runJob(this, (it: Iterator[T]) => it.take(left).toArray, p)

        res.foreach(buf ++= _.take(num - buf.size))
        partsScanned += p.size
      }

      buf.toArray
    }
  }
   該函式的註解指出,take()函式通過掃描一個數據分割槽,並取出該分割槽中的前n個數據,避免了其它分割槽資料的檢索。最主要的是,該函式沒有對父RDD中的資料進行重新分割槽,所以,資料的分割槽和排序順序並沒有改變,因此能取出value排名前10的資料。

   總結

   經過上面的這些折騰,發現top()函式中所遇到的坑的實質是由於(key,value)資料在sortBy(_._2)函式中根據value進行排序的時候,會進行Shuffle操作,根據value值將原來的資料進行重新分割槽。而sortBy()對資料排序之後,在top()函式中進行排序時,會根據key進行Shuffle操作,並得到根據key排序和分割槽之後的新RDD,所以導致最後的結果跟預期的不一致。

    只要肯花時間,自己的潛力還是可以挖掘出來的!