Spark資料過濾、自定義分割槽、Shuffer調優 經典案例(詳解)
阿新 • • 發佈:2018-12-11
案例: 根據學科取得最受歡迎的老師的前兩名 這個是資料
http://bigdata.edu360.cn/zhangsan http://bigdata.edu360.cn/zhangsan http://bigdata.edu360.cn/lisi http://bigdata.edu360.cn/lisi http://bigdata.edu360.cn/lisi http://bigdata.edu360.cn/lisi http://bigdata.edu360.cn/lisi http://bigdata.edu360.cn/wangwu http://bigdata.edu360.cn/wangwu http://javaee.edu360.cn/zhaoliu http://javaee.edu360.cn/zhaoliu http://javaee.edu360.cn/laoyang http://javaee.edu360.cn/laoyang http://javaee.edu360.cn/laoyang http://bigdata.edu360.cn/lisi http://bigdata.edu360.cn/lisi http://bigdata.edu360.cn/lisi http://bigdata.edu360.cn/lisi http://bigdata.edu360.cn/lisi http://bigdata.edu360.cn/wangwu http://bigdata.edu360.cn/wangwu http://javaee.edu360.cn/zhaoliu http://javaee.edu360.cn/zhaoliu http://javaee.edu360.cn/laoyang http://javaee.edu360.cn/laoyang http://javaee.edu360.cn/laoyang http://bigdata.edu360.cn/lisi http://bigdata.edu360.cn/lisi http://bigdata.edu360.cn/lisi http://bigdata.edu360.cn/lisi http://bigdata.edu360.cn/lisi http://bigdata.edu360.cn/wangwu http://bigdata.edu360.cn/wangwu http://javaee.edu360.cn/zhaoliu http://javaee.edu360.cn/zhaoliu http://javaee.edu360.cn/laoyang http://javaee.edu360.cn/laoyang http://javaee.edu360.cn/laoyang http://python.edu360.cn/laoli http://python.edu360.cn/laoliu http://python.edu360.cn/laoli http://python.edu360.cn/laoli
`基本寫法` ------->在List中進行排序會產生記憶體溢位
package day03 /** * * 根據學科取得最受歡迎的老師前2名 */ import java.net.URL import org.apache.log4j.{Level, Logger} import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object FavTeacherWithObject { def main(args: Array[String]): Unit = { Logger.getLogger("org.apache.spark").setLevel(Level.OFF) val conf = new SparkConf() conf.setAppName("FavTeacher").setMaster("local[2]") //local[*]表示用多個執行緒跑,2表示用兩個執行緒 val sc = new SparkContext(conf) //讀取資料 val lines: RDD[String] = sc.textFile("D:\\data\\teacher.log") //整理資料,每個老師記一次數 val subjectAddTeacher: RDD[((String, String), Int)] = lines.map(line => { val teacher = line.substring(line.lastIndexOf("/") + 1) val url = new URL(line).getHost val subject = url.substring(0, url.indexOf(".")) ((subject, teacher), 1) }) //聚合 val reduced: RDD[((String, String), Int)] = subjectAddTeacher.reduceByKey(_+_) println(reduced.collect().toBuffer) //根據學科進行 分組 val grouped: RDD[(String, Iterable[((String, String), Int)])] = reduced.groupBy(_._1._1) println(grouped.collect().toBuffer) //排序,這裡的排序取前兩名, 取到的資料是scala集合list中進行排序的 //先分組,在組內進行排序,這CompactBuffer是迭代器,繼承了序列,然後將迭代器轉換成list進行排序 //在某種極端的情況,_表示迭代分割槽的資料,這裡是將迭代器的資料一次性的拉去過來後進行toList,如果資料量非常的大,這裡肯定會出現OOM(記憶體溢位) val sorted: RDD[(String, List[((String, String), Int)])] = grouped.mapValues(_.toList.sortBy( - _._2).take(2)) //println(sorted.collect().toBuffer) val result = sorted.collect() result.foreach(println) //釋放資源 sc.stop() } }
將資料過濾,同一個key在一個RDD,在RDD中進行排序就不會記憶體溢位(如果排不下的話就會到磁碟,所以不會溢位)
package day03 /** * 根據學科取得最受歡迎的老師前2名(過濾後排序) * ((bigdata, wangwu),10) * ((javaee,laoyang),8) * * 資料: * http://bigdata.edu360.cn/wangwu * http://bigdata.edu360.cn/wangwu * http://javaee.edu360.cn/zhaoliu * http://javaee.edu360.cn/zhaoliu * ...... */ import java.net.URL import org.apache.log4j.{Level, Logger} import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object FavTeacherWithObject2 { def main(args: Array[String]): Unit = { Logger.getLogger("org.apache.spark").setLevel(Level.OFF) val conf = new SparkConf() conf.setAppName("FavTeacherWithObject2").setMaster("local") val subjects = Array("bigdata", "javaee", "php") val sc = new SparkContext(conf) val lines = sc.textFile("D:\\data\\in\\teacher\\teacher.log") //整理資料 val sbjectTeacherAndOne: RDD[((String, String), Int)] = lines.map(line => { val index = line.lastIndexOf("/") val teacher = line.substring(index + 1) val httpHost = line.substring(0, index) val subject = new URL(httpHost).getHost.split("[.]")(0) ((subject, teacher), 1) }) //和一組合在一起(不好,呼叫了兩次map方法) //val map: RDD[((String, String), Int)] = sbjectAndteacher.map((_, 1)) //聚合,將學科和老師聯合當做key val reduced: RDD[((String, String), Int)] = sbjectTeacherAndOne.reduceByKey(_+_) //cache到記憶體 //val cached = reduced.cache() //scala的集合排序是在記憶體中進行的,但是記憶體有可能不夠用 //可以呼叫RDD的sortby方法,記憶體+磁碟進行排序 for (sb <- subjects) { //該RDD中對應的資料僅有一個學科的資料(因為過濾過了) val filtered: RDD[((String, String), Int)] = reduced.filter(_._1._1 == sb) //現在呼叫的是RDD的sortBy方法,(take是一個action,會觸發任務提交) val favTeacher = filtered.sortBy(_._2, false).take(2) //列印 println(favTeacher.toBuffer) } sc.stop() } }
key資料量太大的時候就會使得key在一個分割槽中,從而造成排序混亂,所以自定義分割槽
package day03
/**
* 根據學科取得最受歡迎的老師前2名(自定義分割槽)
* ((bigdata, wangwu),10)
* ((javaee,laoyang),8)
*
* 資料:
* http://bigdata.edu360.cn/wangwu
* http://bigdata.edu360.cn/wangwu
* http://javaee.edu360.cn/zhaoliu
* http://javaee.edu360.cn/zhaoliu
* ......
* Created by zhangjingcun on 2018/9/19 8:36.
* */
import java.net.URL
import org.apache.log4j.{Level, Logger}
import org.apache.spark.{HashPartitioner, Partitioner, SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
import scala.collection.mutable
object FavTeacherWithObject03 {
def main(args: Array[String]): Unit = {
Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
val topN = args(0).toInt
val conf = new SparkConf()
conf.setAppName("FavTeacher").setMaster("local[2]") //local[*]表示用多個執行緒跑,2表示用兩個執行緒
val sc = new SparkContext(conf)
//讀取資料
val lines: RDD[String] = sc.textFile("D:\\data\\in\\teacher\\teacher.log")
//整理資料,每個老師記一次數
val subjectAddTeacher: RDD[((String, String), Int)] = lines.map(line => {
val teacher = line.substring(line.lastIndexOf("/") + 1)
val url = new URL(line).getHost
val subject = url.substring(0, url.indexOf("."))
((subject, teacher), 1)
})
//聚合,將學科和老師聯合當做key
val reduced: RDD[((String, String), Int)] = subjectAddTeacher.reduceByKey(_+_)
//計算有多少學科
val subjects: Array[String] = reduced.map(_._1._1).distinct().collect()
//自定義一個分割槽器,並且按照指定的分割槽器進行分割槽
val sbPatitioner = new SubjectParitioner(subjects);
//partitionBy按照指定的分割槽規則進行分割槽
//呼叫partitionBy時RDD的Key是(String, String)
val partitioned: RDD[((String, String), Int)] = reduced.partitionBy(sbPatitioner)
//如果一次拿出一個分割槽(可以操作一個分割槽中的資料了)
val sorted: RDD[((String, String), Int)] = partitioned.mapPartitions(it => {
//將迭代器轉換成list,然後排序,在轉換成迭代器返回
it.toList.sortBy(_._2).reverse.take(topN).iterator
})
//
val r: Array[((String, String), Int)] = sorted.collect()
println(r.toBuffer)
sc.stop()
}
}
//自定義分割槽器
class SubjectParitioner(sbs: Array[String]) extends Partitioner {
//相當於主構造器(new的時候回執行一次)
//用於存放規則的一個map
val rules = new mutable.HashMap[String, Int]()
var i = 0
for(sb <- sbs) {
//rules(sb) = i
rules.put(sb, i)
i += 1
}
//返回分割槽的數量(下一個RDD有多少分割槽)
override def numPartitions: Int = sbs.length
//根據傳入的key計算分割槽標號
//key是一個元組(String, String)
override def getPartition(key: Any): Int = {
//獲取學科名稱
val subject = key.asInstanceOf[(String, String)]._1
//根據規則計算分割槽編號,相當於執行apply方法
rules(subject)
}
}
在上面的程式碼中有兩個shuffer過程reduceByKey和partitionBy,但是可以合成一個shuffer
package day03
/**
* 根據學科取得最受歡迎的老師前2名(自定義分割槽)
* ((bigdata, wangwu),10)
* ((javaee,laoyang),8)
*
* 資料:
* http://bigdata.edu360.cn/wangwu
* http://bigdata.edu360.cn/wangwu
* http://javaee.edu360.cn/zhaoliu
* http://javaee.edu360.cn/zhaoliu
* ......
* Created by zhangjingcun on 2018/9/19 8:36.
* */
import java.net.URL
import org.apache.log4j.{Level, Logger}
import org.apache.spark.{HashPartitioner, Partitioner, SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
import scala.collection.mutable
object FavTeacherWithObject04 {
def main(args: Array[String]): Unit = {
Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
val topN = args(0).toInt
val conf = new SparkConf()
conf.setAppName("FavTeacher").setMaster("local[2]") //local[*]表示用多個執行緒跑,2表示用兩個執行緒
val sc = new SparkContext(conf)
//讀取資料
val lines: RDD[String] = sc.textFile("D:\\data\\in\\teacher\\teacher.log")
//整理資料,每個老師記一次數
val subjectAddTeacher: RDD[((String, String), Int)] = lines.map(line => {
val teacher = line.substring(line.lastIndexOf("/") + 1)
val url = new URL(line).getHost
val subject = url.substring(0, url.indexOf("."))
((subject, teacher), 1)
})
//計算有多少學科
val subjects: Array[String] = subjectAddTeacher.map(_._1._1).distinct().collect()
//自定義一個分割槽器,並且按照指定的分割槽器進行分割槽
val sbPatitioner = new SubjectParitioner2(subjects);
//聚合,將學科和老師聯合當做key,**這時候兩個合併成一個shuffer**
val reduced: RDD[((String, String), Int)] = subjectAddTeacher.reduceByKey(sbPatitioner,_+_)
//partitionBy按照指定的分割槽規則進行分割槽
//呼叫partitionBy時RDD的Key是(String, String)
val partitioned: RDD[((String, String), Int)] = reduced.partitionBy(sbPatitioner)
//如果一次拿出一個分割槽(可以操作一個分割槽中的資料了)
val sorted: RDD[((String, String), Int)] = partitioned.mapPartitions(it => {
//將迭代器轉換成list,然後排序,在轉換成迭代器返回
it.toList.sortBy(_._2).reverse.take(topN).iterator
})
//
val r: Array[((String, String), Int)] = sorted.collect()
println(r.toBuffer)
sc.stop()
}
}
//自定義分割槽器
class SubjectParitioner2(sbs: Array[String]) extends Partitioner {
//相當於主構造器(new的時候回執行一次)
//用於存放規則的一個map
val rules = new mutable.HashMap[String, Int]()
var i = 0
for(sb <- sbs) {
//rules(sb) = i
rules.put(sb, i)
i += 1
}
//返回分割槽的數量(下一個RDD有多少分割槽)
override def numPartitions: Int = sbs.length
//根據傳入的key計算分割槽標號
//key是一個元組(String, String)
override def getPartition(key: Any): Int = {
//獲取學科名稱
val subject = key.asInstanceOf[(String, String)]._1
//根據規則計算分割槽編號,相當於執行apply方法
rules(subject)
}
}