1. 程式人生 > >Spark資料過濾、自定義分割槽、Shuffer調優 經典案例(詳解)

Spark資料過濾、自定義分割槽、Shuffer調優 經典案例(詳解)

案例: 根據學科取得最受歡迎的老師的前兩名 這個是資料

http://bigdata.edu360.cn/zhangsan
http://bigdata.edu360.cn/zhangsan
http://bigdata.edu360.cn/lisi
http://bigdata.edu360.cn/lisi
http://bigdata.edu360.cn/lisi
http://bigdata.edu360.cn/lisi
http://bigdata.edu360.cn/lisi
http://bigdata.edu360.cn/wangwu
http://bigdata.edu360.cn/wangwu
http://javaee.edu360.cn/zhaoliu
http://javaee.edu360.cn/zhaoliu
http://javaee.edu360.cn/laoyang
http://javaee.edu360.cn/laoyang
http://javaee.edu360.cn/laoyang
http://bigdata.edu360.cn/lisi
http://bigdata.edu360.cn/lisi
http://bigdata.edu360.cn/lisi
http://bigdata.edu360.cn/lisi
http://bigdata.edu360.cn/lisi
http://bigdata.edu360.cn/wangwu
http://bigdata.edu360.cn/wangwu
http://javaee.edu360.cn/zhaoliu
http://javaee.edu360.cn/zhaoliu
http://javaee.edu360.cn/laoyang
http://javaee.edu360.cn/laoyang
http://javaee.edu360.cn/laoyang
http://bigdata.edu360.cn/lisi
http://bigdata.edu360.cn/lisi
http://bigdata.edu360.cn/lisi
http://bigdata.edu360.cn/lisi
http://bigdata.edu360.cn/lisi
http://bigdata.edu360.cn/wangwu
http://bigdata.edu360.cn/wangwu
http://javaee.edu360.cn/zhaoliu
http://javaee.edu360.cn/zhaoliu
http://javaee.edu360.cn/laoyang
http://javaee.edu360.cn/laoyang
http://javaee.edu360.cn/laoyang
http://python.edu360.cn/laoli
http://python.edu360.cn/laoliu
http://python.edu360.cn/laoli
http://python.edu360.cn/laoli
                                 `基本寫法`   ------->在List中進行排序會產生記憶體溢位
package day03



/**
  *
  * 根據學科取得最受歡迎的老師前2名
  */
import java.net.URL
import org.apache.log4j.{Level, Logger}
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object FavTeacherWithObject {
  def main(args: Array[String]): Unit = {
    Logger.getLogger("org.apache.spark").setLevel(Level.OFF)

    val conf = new SparkConf()
    conf.setAppName("FavTeacher").setMaster("local[2]") //local[*]表示用多個執行緒跑,2表示用兩個執行緒
    val sc = new SparkContext(conf)

    //讀取資料
    val lines: RDD[String] = sc.textFile("D:\\data\\teacher.log")

    //整理資料,每個老師記一次數
    val subjectAddTeacher: RDD[((String, String), Int)] = lines.map(line => {
      val teacher = line.substring(line.lastIndexOf("/") + 1)
      val url = new URL(line).getHost
      val subject = url.substring(0, url.indexOf("."))
      ((subject, teacher), 1)
    })

    //聚合
    val reduced: RDD[((String, String), Int)] = subjectAddTeacher.reduceByKey(_+_)
    println(reduced.collect().toBuffer)

    //根據學科進行 分組
    val grouped: RDD[(String, Iterable[((String, String), Int)])] = reduced.groupBy(_._1._1)

    println(grouped.collect().toBuffer)

    //排序,這裡的排序取前兩名, 取到的資料是scala集合list中進行排序的
    //先分組,在組內進行排序,這CompactBuffer是迭代器,繼承了序列,然後將迭代器轉換成list進行排序
    //在某種極端的情況,_表示迭代分割槽的資料,這裡是將迭代器的資料一次性的拉去過來後進行toList,如果資料量非常的大,這裡肯定會出現OOM(記憶體溢位)
    val sorted: RDD[(String, List[((String, String), Int)])] = grouped.mapValues(_.toList.sortBy( - _._2).take(2))

    //println(sorted.collect().toBuffer)

    val result = sorted.collect()
    result.foreach(println)

    //釋放資源
    sc.stop()
  }
}

                將資料過濾,同一個key在一個RDD,在RDD中進行排序就不會記憶體溢位(如果排不下的話就會到磁碟,所以不會溢位)
package day03


/**
  * 根據學科取得最受歡迎的老師前2名(過濾後排序)
  *   ((bigdata, wangwu),10)
  *   ((javaee,laoyang),8)
  *
  *   資料:
  *     http://bigdata.edu360.cn/wangwu
  *     http://bigdata.edu360.cn/wangwu
  *     http://javaee.edu360.cn/zhaoliu
  *     http://javaee.edu360.cn/zhaoliu
  *     ......
  */
import java.net.URL

import org.apache.log4j.{Level, Logger}
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object FavTeacherWithObject2 {
  def main(args: Array[String]): Unit = {
    Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
    val conf = new SparkConf()
    conf.setAppName("FavTeacherWithObject2").setMaster("local")
    val subjects = Array("bigdata", "javaee", "php")
    val sc = new SparkContext(conf)
    val lines = sc.textFile("D:\\data\\in\\teacher\\teacher.log")
    //整理資料
    val sbjectTeacherAndOne: RDD[((String, String), Int)] = lines.map(line => {
      val index = line.lastIndexOf("/")
      val teacher = line.substring(index + 1)
      val httpHost = line.substring(0, index)
      val subject = new URL(httpHost).getHost.split("[.]")(0)
      ((subject, teacher), 1)
    })

    //和一組合在一起(不好,呼叫了兩次map方法)
    //val map: RDD[((String, String), Int)] = sbjectAndteacher.map((_, 1))

    //聚合,將學科和老師聯合當做key
    val reduced: RDD[((String, String), Int)] = sbjectTeacherAndOne.reduceByKey(_+_)

    //cache到記憶體
    //val cached = reduced.cache()

    //scala的集合排序是在記憶體中進行的,但是記憶體有可能不夠用
    //可以呼叫RDD的sortby方法,記憶體+磁碟進行排序

    for (sb <- subjects) {
      //該RDD中對應的資料僅有一個學科的資料(因為過濾過了)
      val filtered: RDD[((String, String), Int)] = reduced.filter(_._1._1 == sb)

      //現在呼叫的是RDD的sortBy方法,(take是一個action,會觸發任務提交)
      val favTeacher = filtered.sortBy(_._2, false).take(2)

      //列印
      println(favTeacher.toBuffer)
    }

    sc.stop()


  }
}

                       key資料量太大的時候就會使得key在一個分割槽中,從而造成排序混亂,所以自定義分割槽
package day03

/**
  * 根據學科取得最受歡迎的老師前2名(自定義分割槽)
  *   ((bigdata, wangwu),10)
  *   ((javaee,laoyang),8)
  *
  *   資料:
  *     http://bigdata.edu360.cn/wangwu
  *     http://bigdata.edu360.cn/wangwu
  *     http://javaee.edu360.cn/zhaoliu
  *     http://javaee.edu360.cn/zhaoliu
  *     ......
  * Created by zhangjingcun on 2018/9/19 8:36.
  * */
import java.net.URL

import org.apache.log4j.{Level, Logger}
import org.apache.spark.{HashPartitioner, Partitioner, SparkConf, SparkContext}
import org.apache.spark.rdd.RDD

import scala.collection.mutable
object FavTeacherWithObject03 {
  def main(args: Array[String]): Unit = {
    Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
    val topN = args(0).toInt
    val conf = new SparkConf()
    conf.setAppName("FavTeacher").setMaster("local[2]") //local[*]表示用多個執行緒跑,2表示用兩個執行緒
    val sc = new SparkContext(conf)

    //讀取資料
    val lines: RDD[String] = sc.textFile("D:\\data\\in\\teacher\\teacher.log")

    //整理資料,每個老師記一次數
    val subjectAddTeacher: RDD[((String, String), Int)] = lines.map(line => {
      val teacher = line.substring(line.lastIndexOf("/") + 1)
      val url = new URL(line).getHost
      val subject = url.substring(0, url.indexOf("."))
      ((subject, teacher), 1)
    })

    //聚合,將學科和老師聯合當做key
    val reduced: RDD[((String, String), Int)] = subjectAddTeacher.reduceByKey(_+_)

    //計算有多少學科
    val subjects: Array[String] = reduced.map(_._1._1).distinct().collect()

    //自定義一個分割槽器,並且按照指定的分割槽器進行分割槽
    val sbPatitioner = new SubjectParitioner(subjects);

    //partitionBy按照指定的分割槽規則進行分割槽
    //呼叫partitionBy時RDD的Key是(String, String)
    val partitioned: RDD[((String, String), Int)] = reduced.partitionBy(sbPatitioner)

    //如果一次拿出一個分割槽(可以操作一個分割槽中的資料了)
    val sorted: RDD[((String, String), Int)] = partitioned.mapPartitions(it => {
      //將迭代器轉換成list,然後排序,在轉換成迭代器返回
      it.toList.sortBy(_._2).reverse.take(topN).iterator
    })

    //
    val r: Array[((String, String), Int)] = sorted.collect()

    println(r.toBuffer)


    sc.stop()


  }
}

//自定義分割槽器
class SubjectParitioner(sbs: Array[String]) extends Partitioner {

  //相當於主構造器(new的時候回執行一次)
  //用於存放規則的一個map
  val rules = new mutable.HashMap[String, Int]()
  var i = 0
  for(sb <- sbs) {
    //rules(sb) = i
    rules.put(sb, i)
    i += 1
  }

  //返回分割槽的數量(下一個RDD有多少分割槽)
  override def numPartitions: Int = sbs.length

  //根據傳入的key計算分割槽標號
  //key是一個元組(String, String)
  override def getPartition(key: Any): Int = {
    //獲取學科名稱
    val subject = key.asInstanceOf[(String, String)]._1
    //根據規則計算分割槽編號,相當於執行apply方法
    rules(subject)
  }
}

                在上面的程式碼中有兩個shuffer過程reduceByKey和partitionBy,但是可以合成一個shuffer
package day03

/**
  * 根據學科取得最受歡迎的老師前2名(自定義分割槽)
  *   ((bigdata, wangwu),10)
  *   ((javaee,laoyang),8)
  *
  *   資料:
  *     http://bigdata.edu360.cn/wangwu
  *     http://bigdata.edu360.cn/wangwu
  *     http://javaee.edu360.cn/zhaoliu
  *     http://javaee.edu360.cn/zhaoliu
  *     ......
  * Created by zhangjingcun on 2018/9/19 8:36.
  * */
import java.net.URL

import org.apache.log4j.{Level, Logger}
import org.apache.spark.{HashPartitioner, Partitioner, SparkConf, SparkContext}
import org.apache.spark.rdd.RDD

import scala.collection.mutable
object FavTeacherWithObject04 {
  def main(args: Array[String]): Unit = {
    Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
    val topN = args(0).toInt
    val conf = new SparkConf()
    conf.setAppName("FavTeacher").setMaster("local[2]") //local[*]表示用多個執行緒跑,2表示用兩個執行緒
    val sc = new SparkContext(conf)

    //讀取資料
    val lines: RDD[String] = sc.textFile("D:\\data\\in\\teacher\\teacher.log")

    //整理資料,每個老師記一次數
    val subjectAddTeacher: RDD[((String, String), Int)] = lines.map(line => {
      val teacher = line.substring(line.lastIndexOf("/") + 1)
      val url = new URL(line).getHost
      val subject = url.substring(0, url.indexOf("."))
      ((subject, teacher), 1)
    })


    //計算有多少學科
    val subjects: Array[String] = subjectAddTeacher.map(_._1._1).distinct().collect()

    //自定義一個分割槽器,並且按照指定的分割槽器進行分割槽
    val sbPatitioner = new SubjectParitioner2(subjects);

    //聚合,將學科和老師聯合當做key,**這時候兩個合併成一個shuffer**
    val reduced: RDD[((String, String), Int)] = subjectAddTeacher.reduceByKey(sbPatitioner,_+_)
    //partitionBy按照指定的分割槽規則進行分割槽
    //呼叫partitionBy時RDD的Key是(String, String)
    val partitioned: RDD[((String, String), Int)] = reduced.partitionBy(sbPatitioner)

    //如果一次拿出一個分割槽(可以操作一個分割槽中的資料了)
    val sorted: RDD[((String, String), Int)] = partitioned.mapPartitions(it => {
      //將迭代器轉換成list,然後排序,在轉換成迭代器返回
      it.toList.sortBy(_._2).reverse.take(topN).iterator
    })

    //
    val r: Array[((String, String), Int)] = sorted.collect()

    println(r.toBuffer)


    sc.stop()


  }
}

//自定義分割槽器
class SubjectParitioner2(sbs: Array[String]) extends Partitioner {

  //相當於主構造器(new的時候回執行一次)
  //用於存放規則的一個map
  val rules = new mutable.HashMap[String, Int]()
  var i = 0
  for(sb <- sbs) {
    //rules(sb) = i
    rules.put(sb, i)
    i += 1
  }

  //返回分割槽的數量(下一個RDD有多少分割槽)
  override def numPartitions: Int = sbs.length

  //根據傳入的key計算分割槽標號
  //key是一個元組(String, String)
  override def getPartition(key: Any): Int = {
    //獲取學科名稱
    val subject = key.asInstanceOf[(String, String)]._1
    //根據規則計算分割槽編號,相當於執行apply方法
    rules(subject)
  }
}