1. 程式人生 > >spark中各種transformation運算元操作(scala版)

spark中各種transformation運算元操作(scala版)

package cn.spark.study.core

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext

object TransformationOperation {
def main(args: Array[String]): Unit = {
//mapTest
//filterTest
//flatMap
//groupByKeyTest()
//reduceByKeyTest
sortByKeyTest()
}

/**
* map運算元
* 將集合中的元素都乘以2
*/
def mapTest(){
val conf = new SparkConf()
.setAppName(“map”)
.setMaster(“local”)
val sc = new SparkContext(conf)
val numbers = Array(1,2,3,4,5)
val numberRDD = sc.parallelize(numbers, 1)
val multipleNumberRDD = numberRDD.map { number => number*2 }
multipleNumberRDD.foreach { num => println(num) }

}

/**
* filter運算元
* 過濾集合中的偶數
*/
def filterTest{
val conf = new SparkConf()
.setAppName(“filter”)
.setMaster(“local”)
val sc = new SparkContext(conf)
val numbers = Array(1,2,3,4,5,6,7,8,9,10)
val numberRDD = sc.parallelize(numbers, 1)
val evenNumberRDD = numberRDD.filter { number => number % 2 == 0 }
evenNumberRDD.foreach { num => println(num) }
}

/**
* faltMap運算元
* 將一行行的文字拆分為單詞
*/
def flatMap{
val conf = new SparkConf()
.setAppName(“filter”)
.setMaster(“local”)
val sc = new SparkContext(conf)
val lines = Array(“hello you”,”hello me”,”hello world”)
val linesRDD = sc.parallelize(lines, 1)
val wordsRDD = linesRDD.flatMap { line => line.split(” “) }
wordsRDD.foreach { word => println(word) }
}

/**
* groupByKey運算元
* 案例:按照班級將成績進行分組
*/
def groupByKeyTest(){
val conf = new SparkConf()
.setAppName(“groupByKey”)
.setMaster(“local”)
val sc = new SparkContext(conf)
val scores = Array((“class1”,80),(“class2”,90),(“class1”,65),(“class2”,85))
val scoresRDD = sc.parallelize(scores, 1)
val groupedscores = scoresRDD.groupByKey
groupedscores.foreach(score => { println(“class:”+score._1); score._2.foreach { s => println(s) };println(“===============”)})
}

/**
* reduceByKey運算元
* 案例:求每個班級所有人的總分
*/
def reduceByKeyTest{
val conf = new SparkConf()
.setAppName(“reduceByKey”)
.setMaster(“local”)
val sc = new SparkContext(conf)
val scores = Array((“class1”,80),(“class2”,90),(“class1”,65),(“class2”,85))
val scoresRDD = sc.parallelize(scores, 1)
val totalScores = scoresRDD.reduceByKey(+)

  totalScores.foreach(classScore => println(classScore._1+":"+classScore._2))

}

/**
* sortByKey運算元
* 案例:對學生成績進行排序
*/
def sortByKeyTest(){
val conf = new SparkConf()
.setAppName(“reduceByKey”)
.setMaster(“local”)
val sc = new SparkContext(conf)
val scores = Array((60,”leo”),(100,”ksc”),(99,”my”),(10,”jack”))
val scoresRDD = sc.parallelize(scores, 1)
val sortedScores = scoresRDD.sortByKey(false, 1)
sortedScores.foreach(score => println(score._1+” : “+score._2))
}
}