1. 程式人生 > >SparkCore運算元(例項)之---- 交集、差集、並集(intersection, subtract, union, distinct, subtractByKey)

SparkCore運算元(例項)之---- 交集、差集、並集(intersection, subtract, union, distinct, subtractByKey)

1. 交集 intersecion

1.1 原始碼

/**
   * Return the intersection of this RDD and another one. The output will not contain any duplicate
   * elements, even if the input RDDs did.//交集結果將會去重
   * 
   * @note This method performs a shuffle internally.//屬於shuffle類運算元
   */
   //參與計算的兩個RDD的元素泛型必須一致,也是返回的RDD的元素泛型
  def intersection(other: RDD[T]): RDD[T] = withScope {
    this.map(v => (v, null)).cogroup(other.map(v => (v, null)))
        .filter { case (_, (leftGroup, rightGroup)) => leftGroup.nonEmpty && rightGroup.nonEmpty }
        .keys
  }

原始碼分析:

  1. thisRDD.intersection(otherRDD):計算 thisRDD 和 otherRDD 的交集,交集結果將不會包含重複的元素,即使有的元素在兩個 RDD 中都出現多次;
  2. intersection 屬於 shuffleDependency 類運算元;
  3. 其內部呼叫了cogroup運算元;
  4. Note:凡是涉及兩個RDD的計算,並且計算是以相同 key分組的資料為物件進行的,那麼一定會呼叫 cogroup(otherDataSet,[numTasks]) 運算元。

1.2 程式碼例項:

   val list1 = List(1,2,3,4,5,6,7,7,20)
   val list2 = List(4,5,6,7,8,9,10)
   val rdd1: RDD[Int] = sc.parallelize(list1 , 3) //3為分割槽數,預設分割槽數為2
   val rdd2: RDD[Int] = sc.parallelize(list2)
   //交集:rdd1交rdd2
   rdd1.intersection(rdd2).foreach(println)

執行結果如下:

6
4
7
5

2. 差集 subtract

2.1 原始碼

   /**//預設保持thisRDD的分割槽器 和 分割槽數量
   * Return an RDD with the elements from `this` that are not in `other`.
   * 
   * Uses `this` partitioner/partition size, because even if `other` is huge, the resulting
   * RDD will be <= us.
   */
  def subtract(other: RDD[T]): RDD[T] = withScope {
    subtract(other, partitioner.getOrElse(new HashPartitioner(partitions.length)))
  }

  /**//可以傳入引數,控制新生成RDD的分割槽數量(仍保持thisRDD分割槽規則)
   * Return an RDD with the elements from `this` that are not in `other`.
   */
  def subtract(other: RDD[T], numPartitions: Int): RDD[T] = withScope {
    subtract(other, new HashPartitioner(numPartitions))
  }

  /**//可以傳入引數,控制使用自定義的分割槽器
   * Return an RDD with the elements from `this` that are not in `other`.
   */
  def subtract(
      other: RDD[T],
      p: Partitioner)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
    if (partitioner == Some(p)) {
      // Our partitioner knows how to handle T (which, since we have a partitioner, is
      // really (K, V)) so make a new Partitioner that will de-tuple our fake tuples
      val p2 = new Partitioner() {
        override def numPartitions: Int = p.numPartitions
        override def getPartition(k: Any): Int = p.getPartition(k.asInstanceOf[(Any, _)]._1)
      }
      // Unfortunately, since we're making a new p2, we'll get ShuffleDependencies
      // anyway, and when calling .keys, will not have a partitioner set, even though
      // the SubtractedRDD will, thanks to p2's de-tupled partitioning, already be
      // partitioned by the right/real keys (e.g. p).
      this.map(x => (x, null)).subtractByKey(other.map((_, null)), p2).keys
    } else {
      this.map(x => (x, null)).subtractByKey(other.map((_, null)), p).keys
    }
  }

    

2.2 程式碼例項
2.2.1 參與運算的RDD的泛型必須完全一致(統一型別)

    //準備資料集
    val list1 = List(1,2,3,4,5,6,7,7,20)
    val list2 = List(4,5,6,7,8,9,10)
    val array = Array("hello huangbo","hello xuzheng","hello huangxiaoming")
    val kv = Array(("a",1), ("b",2),("c",3),("a",1),("b",1),("c",1))
    val rdd1: RDD[Int] = sc.parallelize(list1 , 3) 
    val rdd2: RDD[Int] = sc.parallelize(list2)
    val rdd3: RDD[String] = sc.makeRDD(array)
    //k-v型的PairRDD
    val rdd4:RDD[(String,Int)] = sc.makeRDD(kv)  //會自動將元組的第一個元素作為key
    
    /** 開始計算差集
      * subtract():差集,參與運算的RDD必須具有相同泛型(元素型別一致);
      *     1、當為單值元素時,直接求差集
      *     2、當為(K,V)時,仍然按照整個元素進行求差集(而不是按照key);
      */
    val subtractRes: RDD[Int] = rdd1.subtract(rdd2)
    subtractRes.foreach(x => print(x + "\t"));println() //差集: 3	1	2	20
    //rdd3.subtract(rdd4)  //錯誤,參與運算的RDD必須泛型相同

2.2.2 當RDD的元素為元組時,元組內部的構成元素也必須一致:

    //錯誤:泛型不統一,無法進行差集計算(上雖然都是元組,但是元組的泛型不一致)
    val list01 = Array(("a",1), ("b",2), ("c",3))
    val rdd01: RDD[(String, Int)] = sc.parallelize(list01)
    val list02 = Array(("a","lily"),("b","lucy"),("c","rose"),("c",3))
    val rdd02: RDD[(String, Any)] = sc.makeRDD(list02)
    //rdd01.subtract(rdd02).foreach(print) //錯誤,元組的泛型不一致

但是可以使用多型,向上進行型別抽象,將型別統一:

    //正確:泛型統一了,結果為:(a,1)(b,2)
    //手動指定泛型Any,以統一型別
    val list03: Array[(String, Any)] = Array(("a",1), ("b",2), ("c",3))
    val rdd03 = sc.parallelize(list03)
    val list04: Array[(String, Any)] = Array(("a","lily"),("b","lucy"),("c","rose"),("c",3))
    val rdd04 = sc.makeRDD(list04)
    rdd03.subtract(rdd04).foreach(print)

3. 按照key取差集 subtractByKey

thisPairRDD.subtractByKey(otherPairRDD):以key值作為元素的唯一性標誌,記性差集運算,與value的型別和值無關。

注意:參與運算的必須是PairRDD。

程式碼例項

    /**
      * subtractByKey(otherRDD):只針對於key做差集,返回主RDD中存在的KEY,而otherRDD中不存在的KEY的元素;
      *           ----針對於PairRDD
      */
    val rdd10 = sc.makeRDD(Array(("a",1), ("b",2), ("c",3), ("a",5), ("d",5)))
    val rdd11 = sc.makeRDD(Array(("a",1), ("b",2), ("c",3)))
    //結果為 (d,5): 因為只有key="d" 在rdd11中沒有出現
    rdd10.subtractByKey(rdd11).foreach(print)

4. 並集

4.1 拼接運算元 union

/** 交集、並集、差集
   * union(): 直接拼接,並不會去重(並不是數學意義上的並集)
   * count():統計 RDD的元素個數!
   */
/*
    rdd1 = {1,2,3,4,5,6,7,7,20}
    rdd2 = {4,5,6,7,8,9,10}
 */
    println(rdd1.union(rdd2).count())//16個元素

4.2 求交集(先union,再distinct)

    //並集:先union拼接,再distinct去重
    rdd1.union(rdd2).distinct().foreach(println)