1. 程式人生 > >Spark中repartition和partitionBy的區別

Spark中repartition和partitionBy的區別

是我 item its alt ive 同時 tint nts exe

repartition 和 partitionBy 都是對數據進行重新分區,默認都是使用 HashPartitioner,區別在於partitionBy 只能用於 PairRDD,但是當它們同時都用於 PairRDD時,結果卻不一樣:

技術分享圖片

不難發現,其實 partitionBy 的結果才是我們所預期的,我們打開 repartition 的源碼進行查看:

/**
   * Return a new RDD that has exactly numPartitions partitions.
   *
   * Can increase or decrease the level of parallelism in this RDD. Internally, this uses
   * a shuffle to redistribute data.
   *
   * If you are decreasing the number of partitions in this RDD, consider using `coalesce`,
   * which can avoid performing a shuffle.
   *
   * TODO Fix the Shuffle+Repartition data loss issue described in SPARK-23207.
   
*/ def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope { coalesce(numPartitions, shuffle = true) } /** * Return a new RDD that is reduced into `numPartitions` partitions. * * This results in a narrow dependency, e.g. if you go from 1000 partitions * to 100 partitions, there will not be a shuffle, instead each of the 100 * new partitions will claim 10 of the current partitions. If a larger number * of partitions is requested, it will stay at the current number of partitions. * * However, if you‘re doing a drastic coalesce, e.g. to numPartitions = 1, * this may result in your computation taking place on fewer nodes than * you like (e.g. one node in the case of numPartitions = 1). To avoid this, * you can pass shuffle = true. This will add a shuffle step, but means the * current upstream partitions will be executed in parallel (per whatever * the current partitioning is). * * @note With shuffle = true, you can actually coalesce to a larger number * of partitions. This is useful if you have a small number of partitions, * say 100, potentially with a few partitions being abnormally large. Calling * coalesce(1000, shuffle = true) will result in 1000 partitions with the * data distributed using a hash partitioner. The optional partition coalescer * passed in must be serializable.
*/ def coalesce(numPartitions: Int, shuffle: Boolean = false, partitionCoalescer: Option[PartitionCoalescer] = Option.empty) (implicit ord: Ordering[T] = null) : RDD[T] = withScope { require(numPartitions > 0, s"Number of partitions ($numPartitions) must be positive.
") if (shuffle) { /** Distributes elements evenly across output partitions, starting from a random partition. */ val distributePartition = (index: Int, items: Iterator[T]) => { var position = new Random(hashing.byteswap32(index)).nextInt(numPartitions) items.map { t => // Note that the hash code of the key will just be the key itself. The HashPartitioner // will mod it with the number of total partitions. position = position + 1 (position, t) } } : Iterator[(Int, T)] // include a shuffle step so that our upstream tasks are still distributed new CoalescedRDD( new ShuffledRDD[Int, T, T](mapPartitionsWithIndex(distributePartition), new HashPartitioner(numPartitions)), numPartitions, partitionCoalescer).values } else { new CoalescedRDD(this, numPartitions, partitionCoalescer) } }

即使是RairRDD也不會使用自己的key,repartition 其實使用了一個隨機生成的數來當做 Key,而不是使用原來的 Key!!

Spark中repartition和partitionBy的區別