1. 程式人生 > >spark原始碼解讀1之Partitioner

spark原始碼解讀1之Partitioner

spark原始碼解讀系列環境:spark-1.5.2、hadoop-2.6.0、scala-2.10.4

1.理解

Partitioner類

Partitioner類是用於處理key-value型別的RDD,根據key進行元素劃分。

Partitioner是一個抽象類。只有兩個方法:numPartitions和getPartition(key: Any)。

/**
 * An object that defines how the elements in a key-value pair RDD are partitioned by key.
 * Maps each key to a partition ID, from 0 to `numPartitions - 1`.
 */
abstract class Partitioner extends Serializable {
  def numPartitions: Int
  def getPartition(key: Any): Int
}  

Partitioner同時有一個伴生物件,有defaultPartitioner方法,預設是:HashPartitioner

object Partitioner {
  def defaultPartitioner(rdd: RDD[_], others: RDD[_]*): Partitioner = {
    val bySize = (Seq(rdd) ++ others).sortBy(_.partitions.size).reverse
    for (r <- bySize if r.partitioner.isDefined && r.partitioner.get.numPartitions > 0) {
      return r.partitioner.get
    }
    if (rdd.context.conf.contains("spark.default.parallelism")) {
      new HashPartitioner(rdd.context.defaultParallelism)
    } else {
      new HashPartitioner(bySize.head.partitions.size)
    }
  }
}

Spark的Partitioner類的子類

在Spark的Partitioner類中,存在兩類Partitioner函式:HashPartitioner和RangePartitioner,它們都是繼承自Partitioner,主要提供了每個RDD有幾個分割槽(numPartitions)以及對於給定的值返回一個分割槽ID(0~numPartitions-1),也就是決定這個值是屬於那個分割槽的。

(1)HashPartitioner

 HashPartitioner分割槽的原理很簡單,對於給定的key,計算其hashCode,併除於分割槽的個數取餘,如果餘數小於0,則用餘數+分割槽的個數,最後返回的值就是這個key所屬的分割槽ID。

子類繼承父抽象類,需要實現抽象類中的抽象方法。Partitioner主要有兩個:numPartitions和getPartition。

HashPartitioner實現:
def numPartitions: Int = partitions

  def getPartition(key: Any): Int = key match {
    case null => 0
    case _ => Utils.nonNegativeMod(key.hashCode, numPartitions)
  }

numPartitions好理解,getPartition使用了Utils的方法:

  def nonNegativeMod(x: Int, mod: Int): Int = {
    val rawMod = x % mod
    rawMod + (if (rawMod < 0) mod else 0)
  }

主要是由於key.hashCode有可能為賦值,需要判斷和處理。

另外HashPatitioner重寫了兩個方法:

  override def equals(other: Any): Boolean = other match {
    case h: HashPartitioner =>
      h.numPartitions == numPartitions
    case _ =>
      false
  }

  override def hashCode: Int = numPartitions

這個好理解。

(2)RangePartitioner

從HashPartitioner分割槽的實現原理我們可以看出,其結果可能導致每個分割槽中資料量的不均勻,極端情況下會導致某些分割槽擁有RDD的全部資料,這顯然不是我們需要的。而RangePartitioner分割槽則儘量保證每個分割槽中資料量的均勻,而且分割槽與分割槽之間是有序的,也就是說一個分割槽中的元素肯定都是比另一個分割槽內的元素小或者大;但是分割槽內的元素是不能保證順序的。簡單的說就是將一定範圍內的數對映到某一個分割槽內。

RangePartitioner分割槽器的主要作用就是將一定範圍內的數對映到某一個分割槽內,所以它的實現中分界的演算法尤為重要

rangeBounds的邊界方法rangeBounds,主要是利用 RangePartitioner伴生物件的sketch方法進行抽樣,抽樣方法採取的是水塘抽樣(Reservoir Sampling),詳細可以參考【8】、【9】。
水塘抽樣(Reservoir Sampling)的主要思想是:

在序列流中取一個數,如何確保隨機性,即取出某個資料的概率為:1/(已讀取資料個數)

  假設已經讀取n個數,現在保留的數是Ax,取到Ax的概率為(1/n)。

  對於第n+1個數An+1,以1/(n+1)的概率取An+1,否則仍然取Ax。依次類推,可以保證取到資料的隨機性。

  數學歸納法證明如下:

    當n=1時,顯然,取A1。取A1的概率為1/1。

           假設當n=k時,取到的資料Ax。取Ax的概率為1/k。

           當n=k+1時,以1/(k+1)的概率取An+1,否則仍然取Ax。

    (1)如果取Ak+1,則概率為1/(k+1);

    (2)如果仍然取Ax,則概率為(1/k)*(k/(k+1))=1/(k+1)

採用水塘抽樣(Reservoir Sampling)主要是解決Spark 1.1中rangeBounds需要先知道rdd的size,然後再去抽樣的的問題,因為這種需要兩次遍歷RDD,而水塘抽樣(Reservoir Sampling)可以在不知道總size的情況下進行抽樣,特別適用於資料在記憶體存不下的情況。

RangePartitioner伴生物件的sketch方法:

  /**
   * Sketches the input RDD via reservoir sampling on each partition.
   *
   * @param rdd the input RDD to sketch
   * @param sampleSizePerPartition max sample size per partition
   * @return (total number of items, an array of (partitionId, number of items, sample))
   */
  def sketch[K : ClassTag](
      rdd: RDD[K],
      sampleSizePerPartition: Int): (Long, Array[(Int, Int, Array[K])]) = {
    val shift = rdd.id
    // val classTagK = classTag[K] // to avoid serializing the entire partitioner object
    val sketched = rdd.mapPartitionsWithIndex { (idx, iter) =>
      val seed = byteswap32(idx ^ (shift << 16))
      val (sample, n) = SamplingUtils.reservoirSampleAndCount(
        iter, sampleSizePerPartition, seed)
      Iterator((idx, n, sample))
    }.collect()
    val numItems = sketched.map(_._2.toLong).sum
    (numItems, sketched)
  }

 RangePartitioner.sketch的第一個引數是rdd.map(_._1),也就是把父RDD的key傳進來,因為分割槽只需要對Key進行操作即可。該函式返回值是val (numItems, sketched) ,其中numItems相當於記錄rdd元素的總數;而sketched的型別是Array[(Int, Int, Array[K])],記錄的是分割槽的編號、該分割槽中總元素的個數以及從父RDD中每個分割槽取樣的資料。

 sketch函式對父RDD中的每個分割槽進行取樣,並記錄下分割槽的ID和分割槽中資料總和。

 reservoirSampleAndCount函式就是典型的水塘抽樣實現,唯一不同的是該演算法還記錄下i的值,這個就是該分割槽中元素的總和。

如果獲取的資料分佈不均勻,則邊界方法rangeBounds會再次抽樣,但是隻對抽象數少於要求的partition進行sample,其他抽樣好的不會。

最後獲取到每個partition中每個樣本和對應的weight( 類似candidates += ((key, weight))),weight為partition中元素數量與抽樣數量的比值,對於重新抽樣的,則為1.

最後再進行邊界確定:determineBounds

 /**
   * Determines the bounds for range partitioning from candidates with weights indicating how many
   * items each represents. Usually this is 1 over the probability used to sample this candidate.
   *
   * @param candidates unordered candidates with weights
   * @param partitions number of partitions
   * @return selected bounds
   */
  def determineBounds[K : Ordering : ClassTag](
      candidates: ArrayBuffer[(K, Float)],
      partitions: Int): Array[K] = {
    val ordering = implicitly[Ordering[K]]
    val ordered = candidates.sortBy(_._1)
    val numCandidates = ordered.size
    val sumWeights = ordered.map(_._2.toDouble).sum
    val step = sumWeights / partitions
    var cumWeight = 0.0
    var target = step
    val bounds = ArrayBuffer.empty[K]
    var i = 0
    var j = 0
    var previousBound = Option.empty[K]
    while ((i < numCandidates) && (j < partitions - 1)) {
      val (key, weight) = ordered(i)
      cumWeight += weight
      if (cumWeight > target) {
        // Skip duplicate values.
        if (previousBound.isEmpty || ordering.gt(key, previousBound.get)) {
          bounds += key
          target += step
          j += 1
          previousBound = Some(key)
        }
      }
      i += 1
    }
    bounds.toArray
  }
}

determineBounds主要對 candidates先按key進行排序,然後獲取總抽樣元素除以partition大小即為每個partition理論的大小,即程式碼中的step。
然後再對排序好的ordered進行遍歷,當所代表的權重大於step的整數倍時,返回此時的key,作為劃分條件。然後依次類推,獲得每個partition的邊界key。

總結:RangePartitioner是採取抽樣的策略,每個partition理論的是抽取20個元素,實際採用水塘抽樣(Reservoir Sampling)時為了避免抽樣少於期望,會乘以3.然後再用determineBounds對抽樣資料進行排序,weight是每個key所代表的抽樣數量,再按weight確定每個partition接近理論的邊界,並進行返回,即為partitionid(getPartition返回值)。

getPartition查詢某個元素應該所屬的partitionid時,如果partition數量過大,會採取二分查詢。
原始碼中為了避免抽樣數量少於partition數量,也採取了相對應的措施。

(3)GridPartitioner

除了上述兩個子類,Partitioner的子類還有:mllib中的GridPartitioner(org.apache.spark.mllib.linalg.distributed.GridPartitioner),是一個網格Partitioner,採用了規則的網格劃分座標。
numPartitions等於行和列的partitioners之積:

override val numPartitions: Int = rowPartitions * colPartitions 

根據(i,j)獲取partition的id:

  private def getPartitionId(i: Int, j: Int): Int = {
    require(0 <= i && i < rows, s"Row index $i out of range [0, $rows).")
    require(0 <= j && j < cols, s"Column index $j out of range [0, $cols).")
    i / rowsPerPart + j / colsPerPart * rowPartitions
  }

PartitionIdPassthrough

還有:PartitionIdPassthrough(org.apache.spark.sql.execution.PartitionIdPassthrough),是一個虛擬的partitioner,這些partition的id已經提前計算好了,所以直接返回。

/**
 * A dummy partitioner for use with records whose partition ids have been pre-computed (i.e. for
 * use on RDDs of (Int, Row) pairs where the Int is a partition id in the expected range).
 */
private class PartitionIdPassthrough(override val numPartitions: Int) extends Partitioner {
  override def getPartition(key: Any): Int = key.asInstanceOf[Int]
}

ReferencePartitioner

除了spark原始碼中,spark上層的系統也有partitioner類的子類,比如Adam系統中的:ReferencePartitioner(org.bdgenomics.adam.rdd.ReferencePartitioner),ReferencePartitioner主要是根據參考位置或者參考域進行劃分,劃分一句是referenceName。

GenomicPositionPartitioner和GenomicRegionPartitioner

Adam中還有:
GenomicPositionPartitioner(org.bdgenomics.adam.rdd.GenomicPositionPartitioner)和GenomicRegionPartitioner(org.bdgenomics.adam.rdd.GenomicRegionPartitioner)
這兩個partitioner主要是將ReferencePosition物件劃分到分開的,物理位置相關的基因組位置。這樣主要是方便基因資料可以根據空間分佈進行劃分,然後進行計算。

2.程式碼:

自己寫的Partitioner:主要對url的資料進行劃分,按域名來,如果域名相同,則hashCode相同。
比如”https://github.com/xubo245/SparkLearning“和”https://github.com/xubo245“網址不同但域名相同,但想劃分到一個Partition中,於是可以改寫getPartition的數量

package org.apache.spark.sourceCode.partitionerLearning

import org.apache.spark.Partitioner

/**
  * Created by xubo on 2016/10/8.
  */
class URLHashPartitioner(numParts: Int) extends Partitioner {
  override def numPartitions: Int = numParts

  override def getPartition(key: Any): Int = {
    val domain = new java.net.URL(key.toString).getHost()
    val code = (domain.hashCode % numPartitions)
    if (code < 0) {
      code + numPartitions
    } else {
      code
    }
  }

  override def equals(other: Any): Boolean = other match {
    case url: URLHashPartitioner =>
      url.numPartitions == numPartitions
    case _ =>
      false
  }

  override def hashCode: Int = numPartitions
}

object URLHashPartitioner {
  def main(args: Array[String]) {

  }
}

測試程式碼:

package org.apache.spark.sourceCode.partitionerLearning

import org.apache.spark.util.SparkLearningFunSuite

/**
  * Created by xubo on 2016/10/8.
  */
class PartitionerTest extends SparkLearningFunSuite {
  test("URLHashPartitioner") {
    var url1 = new URLHashPartitioner(20)
    var partition1 = url1.getPartition("https://github.com/xubo245/SparkLearning")
    var partition2 = url1.getPartition("https://github.com/xubo245")
    println(partition1)
    println(partition2)
  }
}

SparkLearningFunSuite主要是方便測試,可以在參考【3】中看到原始碼

3.結果:

14
14

參考中【7】描述的比較詳細,背景也介紹的很好,建議多參考。

參考

【1】http://spark.apache.org/
【2】http://spark.apache.org/docs/1.5.2/programming-guide.html
【3】https://github.com/xubo245/SparkLearning
【4】book:《深入理解spark核心思想與原始碼分析》
【5】book:《spark核心原始碼分析和開發實戰》
【6】https://www.iteblog.com/archives/1368
【7】https://www.iteblog.com/archives/1522
【8】https://www.iteblog.com/archives/1525
【9】http://www.cnblogs.com/xudong-bupt/p/4053652.html