1. 程式人生 > >spark中的scalaAPI之RDDAPI常用操作

spark中的scalaAPI之RDDAPI常用操作

appname 轉換 成了 size pre esc atm rgs new

package com.XXX
import org.apache.spark.storage.StorageLevel
import org.apache.spark.{SparkConf, SparkContext}
//spark中的RDD測試
object RddTest {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local[*]").setAppName("rdd api test")
    val sc = SparkContext.getOrCreate(conf)
// mapTest(sc) // distinctTest(sc) // filterTest(sc) // keyByTest(sc) // sortByTest(sc) // topNTest(sc) // repartitionTest(sc) // groupByTest(sc) aggSumTest(sc) sc.stop() } def mapTest(sc:SparkContext) = { val file = sc.textFile("file:///G:\\bd14\\user-logs-large.txt",3) val mapResult
= file.map(x =>{//map的特點是一個輸入對應一條輸出,沒有返回值,對應的返回值會是() NIL val info = x.split("\\t") (info(0),info(1))//轉換成了元組 }) //take是一個action,作用是取出前n條數據發送到driver,一般用於開發測試 mapResult.take(10).foreach(println) //map和mapPartition的區別:map是一條記錄一條記錄的轉換,mapPartition是 //一個partition(分區)轉換一次 val mapPartitionResult = file.mapPartitions(x => {//
一個分區對應一個分區 var info = new Array[String](3) for(line <- x) yield{//yield:作用:有返回值,所有的記錄返回之後是一個集合 info = line.split("\\t") (info(0),info(1)) } }) mapPartitionResult.take(10).foreach(println) // 把一行轉為多行記錄,使用flatMap展平,把一條new_tweet記錄轉成兩條login記錄 val flatMapTest = file.flatMap(x=>{ val info = x.split("\\t") info(1) match { case "new_tweet"=> for (i <- 1 to 2) yield s"${info(0)} login ${info(2)}" case _ => Array(x) } }) flatMapTest.take(10).foreach(println) println(file.count()) println(flatMapTest.count()) } //distinct:排重,把重復的數據去掉,不是數據的轉換,屬於數據的聚合 def distinctTest(sc:SparkContext) = { val file = sc.textFile("file:///G:\\bd14\\user-logs-large.txt",3) val userRdd = file.map(x=>x.split("\\t")(0)).distinct() userRdd.foreach(println) } //filter:過濾 def filterTest(sc:SparkContext) = { val file = sc.textFile("file:///G:\\bd14\\user-logs-large.txt",3) val loginFilter = file.filter(x=>x.split("\\t")(1)=="login") loginFilter.take(10).foreach(println) println(loginFilter.count()) } //keyBy,輸入作為value,key由算計計算而來 def keyByTest(sc:SparkContext) = { val file = sc.textFile("file:///G:\\bd14\\user-logs-large.txt",3) val userActionType = file.keyBy(x=>{ val info = x.split("\\t") s"${info(0)}--${info(1)}" }) userActionType.take(10).foreach(println) } //sortBy排序 def sortByTest(sc:SparkContext) = { val file = sc.textFile("file:///C:\\Users\\zuizui\\Desktop\\README.txt") //數據量小的話,想進行群排序,吧numPartitions設置成1 //默認為聖墟,姜旭吧第二個參數設置為false // val sortBy = file.sortBy(x=>x.split("\\s+")(1).toInt,numPartitions = 1)//後面有不同數量的空格時,使用\\s+來split val sortBy = file.sortBy(x=>x.split("\\s+")(1).toInt,false,numPartitions = 1)//後面有不同數量的空格時,使用\\s+來split sortBy.foreach(println) } def topNTest(sc:SparkContext) = { val list = List(1,23,34,54,56,100)//把集合轉化為RDD使用parallelize,或者mkRDD val rdd = sc.parallelize(list,2) //添加飲食準換,使takeOrdered,和top的排序順序變反 implicit val tonordered = new Ordering[Int]{ override def compare(x: Int, y: Int): Int = y.compareTo(x) } val takeOrdered = rdd.takeOrdered(3)//從小到大取出前三條 takeOrdered.foreach(println) val topN = rdd.top(3)//從大到小取出前三條 topN.foreach(println) } //重新分區 def repartitionTest(sc:SparkContext) = { val file = sc.textFile("file:///G:\\bd14\\user-logs-large.txt") val result = file.repartition(5)//repartition是寬依賴,所謂寬依賴就是 //原來RDD的每一個分區中的數據都會分別吧部分數據寫入到新的RDD的每個分區中 //窄依賴:就是原來RDD的分區中的一個分區數據完全寫入到新的RDD中的一個分區中 //窄依賴減少網絡間的傳輸 file.foreachPartition(x=>{ var sum = 0 x.foreach(x=>sum+=1) println(s"該分區的數據有${sum}") }) result.foreachPartition(x=>{ var sum = 0 x.foreach(x=>sum+=1) println(s"該分區的數據有${sum}") }) val coalesce = result.coalesce(3)//使用窄依賴,原來有五個分區,現在變成三個的話, //其中的一個不變,另外四個分區中的兩兩分別通過窄依賴添加到另外兩個新的分區中 coalesce.foreachPartition(x=>{ var sum = 0 x.foreach(x=>sum+=1) println(s"coalesce該分區的數據有${sum}") }) } def groupByTest(sc:SparkContext)= { val file = sc.textFile("file:///G:\\bd14\\user-logs-large.txt") val groupedBy = file.groupBy(x=>x.split("\\t")(0)) //group by 容易發生數傾斜 groupedBy.foreachPartition(x=>{ println(s"groupByRDD分區,該分區共有:${x.size}條記錄") }) groupedBy.foreach(x=>{ println(s"groupByRDD的一條記錄,key為${x._1},value上集合記錄條數是:${x._2.size}") }) groupedBy.foreach(x => { var sum = 0 x._2.foreach(line => { line.split("\\t")(1) match { case "login" => sum += 1 case _ => } }) println(s"用戶:${x._1}的登錄次數是:$sum") }) } def aggSumTest(sc:SparkContext) = { val list = List(1,2,4,5) val rdd = sc.parallelize(list,3) //reduce 計算sum val reduceResult = rdd.reduce((v1,v2)=>v1+v2) //fold計算sum val flodResult = rdd.fold(0)((v1,v2)=>v1+v2) //aggregate把元素連接成一個字符串 val aggResult = rdd.aggregate("")((c,v)=>{ c match { case "" => v.toString case _ => s"$c,$v" } },(c1,c2)=>{ c1 match { case ""=> c2 case _=>s"$c1,$c2" } }) println(s"reduceResult:$reduceResult") println(s"flodResult:$flodResult") println(s"aggResult:$aggResult") } def persistTest(sc:SparkContext) = { val file = sc.textFile("file:///G:\\bd14\\user-logs-large.txt") // file.cache() file.persist(StorageLevel.MEMORY_ONLY)//相當於cache(),智加載在內存中 //計算用戶數量 //計算ip數量 //計算每個用戶在每一個ip上的數量 } }

spark中的scalaAPI之RDDAPI常用操作