Spark核心程式設計建立RDD及transformation和action詳解和案例
建立RDD
進行Spark核心程式設計時,首先要做的第一件事,就是建立一個初始的RDD。該RDD中,通常就代表和包含了Spark應用程式的輸入源資料。然後在建立了初始的RDD之後,才可以通過Spark Core提供的transformation運算元,對該RDD進行轉換,來獲取其他的RDD。
Spark Core提供了三種建立RDD的方式,包括:使用程式中的集合建立RDD;使用本地檔案建立RDD;使用HDFS檔案建立RDD。
個人經驗認為:
1、使用程式中的集合建立RDD,主要用於進行測試,可以在實際部署到叢集執行之前,自己使用集合構造測試資料,來測試後面的spark應用的流程。
2、使用本地檔案建立RDD,主要用於臨時性地處理一些儲存了大量資料的檔案。
3、使用HDFS檔案建立RDD,應該是最常用的生產環境處理方式,主要可以針對HDFS上儲存的大資料,進行離線批處理操作。
並行化集合建立RDD
如果要通過並行化集合來建立RDD,需要針對程式中的集合,呼叫SparkContext的parallelize()方法。Spark會將集合中的資料拷貝到叢集上去,形成一個分散式的資料集合,也就是一個RDD。相當於是,集合中的部分資料會到一個節點上,而另一部分資料會到其他節點上。然後就可以用並行的方式來操作這個分散式資料集合,即RDD。
java
package cn.spark.study.core;
import java.util.Arrays;
import java.util.List;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function2;
/**
* 並行化集合建立RDD
* 案例:累加1到10
* @author Administrator
*
*/
public class ParallelizeCollection {
public static void main(String[] args) {
// 建立SparkConf
SparkConf conf = new SparkConf()
.setAppName("ParallelizeCollection")
.setMaster("local");
// 建立JavaSparkContext
JavaSparkContext sc = new JavaSparkContext(conf);
// 要通過並行化集合的方式建立RDD,那麼就呼叫SparkContext以及其子類,的parallelize()方法
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
JavaRDD<Integer> numberRDD = sc.parallelize(numbers);
// 執行reduce運算元操作
// 相當於,先進行1 + 2 = 3;然後再用3 + 3 = 6;然後再用6 + 4 = 10。。。以此類推
int sum = numberRDD.reduce(new Function2<Integer, Integer, Integer>() {
private static final long serialVersionUID = 1L;
@Override
public Integer call(Integer num1, Integer num2) throws Exception {
return num1 + num2;
}
});
// 輸出累加的和
System.out.println("1到10的累加和:" + sum);
// 關閉JavaSparkContext
sc.close();
}
}
scala
package cn.spark.study.core
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
/**
* @author Administrator
*/
object ParallelizeCollection {
def main(args: Array[String]) {
val conf = new SparkConf()
.setAppName("ParallelizeCollection")
.setMaster("local")
val sc = new SparkContext(conf)
val numbers = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
val numberRDD = sc.parallelize(numbers, 5)
val sum = numberRDD.reduce(_ + _)
println("1到10的累加和:" + sum)
}
}
呼叫parallelize()時,有一個重要的引數可以指定,就是要將集合切分成多少個partition。Spark會為每一個partition執行一個task來進行處理。Spark官方的建議是,為叢集中的每個CPU建立2~4個partition。Spark預設會根據叢集的情況來設定partition的數量。但是也可以在呼叫parallelize()方法時,傳入第二個引數,來設定RDD的partition數量。比如parallelize(arr, 10)
使用本地檔案和HDFS建立RDD
Spark是支援使用任何Hadoop支援的儲存系統上的檔案建立RDD的,比如說HDFS、Cassandra、HBase以及本地檔案。通過呼叫SparkContext的textFile()方法,可以針對本地檔案或HDFS檔案建立RDD。
有幾個事項是需要注意的:
1、如果是針對本地檔案的話,如果是在windows上本地測試,windows上有一份檔案即可;如果是在spark叢集上針對linux本地檔案,那麼需要將檔案拷貝到所有worker節點上。
2、Spark的textFile()方法支援針對目錄、壓縮檔案以及萬用字元進行RDD建立。
3、Spark預設會為hdfs檔案的每一個block建立一個partition,但是也可以通過textFile()的第二個引數手動設定分割槽數量,只能比block數量多,不能比block數量少。
java
package cn.spark.study.core;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
/**
* 使用本地檔案建立RDD
* 案例:統計文字檔案字數
* @author Administrator
*
*/
public class LocalFile {
public static void main(String[] args) {
// 建立SparkConf
SparkConf conf = new SparkConf()
.setAppName("LocalFile")
.setMaster("local");
// 建立JavaSparkContext
JavaSparkContext sc = new JavaSparkContext(conf);
// 使用SparkContext以及其子類的textFile()方法,針對本地檔案建立RDD
JavaRDD<String> lines = sc.textFile("C://Users//Administrator//Desktop//spark.txt");
// 統計文字檔案內的字數
JavaRDD<Integer> lineLength = lines.map(new Function<String, Integer>() {
private static final long serialVersionUID = 1L;
@Override
public Integer call(String v1) throws Exception {
return v1.length();
}
});
int count = lineLength.reduce(new Function2<Integer, Integer, Integer>() {
private static final long serialVersionUID = 1L;
@Override
public Integer call(Integer v1, Integer v2) throws Exception {
return v1 + v2;
}
});
System.out.println("檔案總字數是:" + count);
// 關閉JavaSparkContext
sc.close();
}
}
package cn.spark.study.core;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
/**
* 使用HDFS檔案建立RDD
* 案例:統計文字檔案字數
* @author Administrator
*
*/
public class HDFSFile {
public static void main(String[] args) {
// 建立SparkConf
// 修改:去除setMaster()設定,修改setAppName()
SparkConf conf = new SparkConf()
.setAppName("HDFSFile");
// 建立JavaSparkContext
JavaSparkContext sc = new JavaSparkContext(conf);
// 使用SparkContext以及其子類的textFile()方法,針對HDFS檔案建立RDD
// 只要把textFile()內的路徑修改為hdfs檔案路徑即可
JavaRDD<String> lines = sc.textFile("hdfs://spark1:9000/spark.txt");
// 統計文字檔案內的字數
JavaRDD<Integer> lineLength = lines.map(new Function<String, Integer>() {
private static final long serialVersionUID = 1L;
@Override
public Integer call(String v1) throws Exception {
return v1.length();
}
});
int count = lineLength.reduce(new Function2<Integer, Integer, Integer>() {
private static final long serialVersionUID = 1L;
@Override
public Integer call(Integer v1, Integer v2) throws Exception {
return v1 + v2;
}
});
System.out.println("檔案總字數是:" + count);
// 關閉JavaSparkContext
sc.close();
}
}
scala
package cn.spark.study.core
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
/**
* @author Administrator
*/
object LocalFile {
def main(args: Array[String]) {
val conf = new SparkConf()
.setAppName("LocalFile")
.setMaster("local");
val sc = new SparkContext(conf)
val lines = sc.textFile("C://Users//Administrator//Desktop//spark.txt", 1);
val count = lines.map { line => line.length() }.reduce(_ + _)
println("file's count is " + count)
}
}
package cn.spark.study.core
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
/**
* @author Administrator
*/
object HDFSFile {
def main(args: Array[String]) {
val conf = new SparkConf()
.setAppName("HDFSFile") ;
val sc = new SparkContext(conf)
val lines = sc.textFile("hdfs://spark1:9000/spark.txt", 1);
val count = lines.map { line => line.length() }.reduce(_ + _)
println("file's count is " + count)
}
}
使用本地檔案和HDFS建立RDD
Spark的textFile()除了可以針對上述幾種普通的檔案建立RDD之外,還有一些特列的方法來建立RDD:
1、SparkContext.wholeTextFiles()方法,可以針對一個目錄中的大量小檔案,返回
transformation和action介紹
Spark支援兩種RDD操作:transformation和action。transformation操作會針對已有的RDD建立一個新的RDD;而action則主要是對RDD進行最後的操作,比如遍歷、reduce、儲存到檔案等,並可以返回結果給Driver程式。
例如,map就是一種transformation操作,它用於將已有RDD的每個元素傳入一個自定義的函式,並獲取一個新的元素,然後將所有的新元素組成一個新的RDD。而reduce就是一種action操作,它用於對RDD中的所有元素進行聚合操作,並獲取一個最終的結果,然後返回給Driver程式。
transformation的特點就是lazy特性。lazy特性指的是,如果一個spark應用中只定義了transformation操作,那麼即使你執行該應用,這些操作也不會執行。也就是說,transformation是不會觸發spark程式的執行的,它們只是記錄了對RDD所做的操作,但是不會自發的執行。只有當transformation之後,接著執行了一個action操作,那麼所有的transformation才會執行。Spark通過這種lazy特性,來進行底層的spark應用執行的優化,避免產生過多中間結果。
action操作執行,會觸發一個spark job的執行,從而觸發這個action之前所有的transformation的執行。這是action的特性。
transformation和action原理剖析
案例:統計檔案字數
// 這裡通過textFile()方法,針對外部檔案建立了一個RDD,lines,但是實際上,程式執行到這裡為止,spark.txt檔案的資料是不會載入到記憶體中的。lines,只是代表了一個指向spark.txt檔案的引用。
val lines = sc.textFile("spark.txt")
// 這裡對lines RDD進行了map運算元,獲取了一個轉換後的lineLengths RDD。但是這裡連資料都沒有,當然也不會做任何操作。lineLengths RDD也只是一個概念上的東西而已。
val lineLengths = lines.map(line => line.length)
// 之列,執行了一個action操作,reduce。此時就會觸發之前所有transformation操作的執行,Spark會將操作拆分成多個task到多個機器上並行執行,每個task會在本地執行map操作,並且進行本地的reduce聚合。最後會進行一個全域性的reduce聚合,然後將結果返回給Driver程式。
val totalLength = lineLengths.reduce(_ + _)
案例:統計檔案每行出現的次數
Spark有些特殊的運算元,也就是特殊的transformation操作。比如groupByKey、sortByKey、reduceByKey等,其實只是針對特殊的RDD的。即包含key-value對的RDD。而這種RDD中的元素,實際上是scala中的一種型別,即Tuple2,也就是包含兩個值的Tuple。
在scala中,需要手動匯入Spark的相關隱式轉換,import org.apache.spark.SparkContext._。然後,對應包含Tuple2的RDD,會自動隱式轉換為PairRDDFunction,並提供reduceByKey等方法。
val lines = sc.textFile(“hello.txt”)
val linePairs = lines.map(line => (line, 1))
val lineCounts = linePairs.reduceByKey(_ + _)
lineCounts.foreach(lineCount => println(lineCount._1 + ” appears ” + llineCount._2 + ” times.”))
常用transformation介紹
常用action介紹
transformation操作開發實戰(重點)
java
package cn.spark.study.core;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.VoidFunction;
import scala.Tuple2;
/**
* transformation操作實戰
* @author Administrator
*
*/
@SuppressWarnings(value = {"unused", "unchecked"})
public class TransformationOperation {
public static void main(String[] args) {
// map();
// filter();
// flatMap();
// groupByKey();
// reduceByKey();
// sortByKey();
// join();
cogroup();
}
/**
* map運算元案例:將集合中每一個元素都乘以2
*/
private static void map() {
// 建立SparkConf
SparkConf conf = new SparkConf()
.setAppName("map")
.setMaster("local");
// 建立JavaSparkContext
JavaSparkContext sc = new JavaSparkContext(conf);
// 構造集合
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5);
// 並行化集合,建立初始RDD
JavaRDD<Integer> numberRDD = sc.parallelize(numbers);
// 使用map運算元,將集合中的每個元素都乘以2
// map運算元,是對任何型別的RDD,都可以呼叫的
// 在java中,map運算元接收的引數是Function物件
// 建立的Function物件,一定會讓你設定第二個泛型引數,這個泛型型別,就是返回的新元素的型別
// 同時call()方法的返回型別,也必須與第二個泛型型別同步
// 在call()方法內部,就可以對原始RDD中的每一個元素進行各種處理和計算,並返回一個新的元素
// 所有新的元素就會組成一個新的RDD
JavaRDD<Integer> multipleNumberRDD = numberRDD.map(
new Function<Integer, Integer>() {
private static final long serialVersionUID = 1L;
// 傳入call()方法的,就是1,2,3,4,5
// 返回的就是2,4,6,8,10
@Override
public Integer call(Integer v1) throws Exception {
return v1 * 2;
}
});
// 列印新的RDD
multipleNumberRDD.foreach(new VoidFunction<Integer>() {
private static final long serialVersionUID = 1L;
@Override
public void call(Integer t) throws Exception {
System.out.println(t);
}
});
// 關閉JavaSparkContext
sc.close();
}
/**
* filter運算元案例:過濾集合中的偶數
*/
private static void filter() {
// 建立SparkConf
SparkConf conf = new SparkConf()
.setAppName("filter")
.setMaster("local");
// 建立JavaSparkContext
JavaSparkContext sc = new JavaSparkContext(conf);
// 模擬集合
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
// 並行化集合,建立初始RDD
JavaRDD<Integer> numberRDD = sc.parallelize(numbers);
// 對初始RDD執行filter運算元,過濾出其中的偶數
// filter運算元,傳入的也是Function,其他的使用注意點,實際上和map是一樣的
// 但是,唯一的不同,就是call()方法的返回型別是Boolean
// 每一個初始RDD中的元素,都會傳入call()方法,此時你可以執行各種自定義的計算邏輯
// 來判斷這個元素是否是你想要的
// 如果你想在新的RDD中保留這個元素,那麼就返回true;否則,不想保留這個元素,返回false
JavaRDD<Integer> evenNumberRDD = numberRDD.filter(
new Function<Integer, Boolean>() {
private static final long serialVersionUID = 1L;
// 在這裡,1到10,都會傳入進來
// 但是根據我們的邏輯,只有2,4,6,8,10這幾個偶數,會返回true
// 所以,只有偶數會保留下來,放在新的RDD中
@Override
public Boolean call(Integer v1) throws Exception {
return v1 % 2 == 0;
}
});
// 列印新的RDD
evenNumberRDD.foreach(new VoidFunction<Integer>() {
private static final long serialVersionUID = 1L;
@Override
public void call(Integer t) throws Exception {
System.out.println(t);
}
});
// 關閉JavaSparkContext
sc.close();
}
/**
* flatMap案例:將文字行拆分為多個單詞
*/
private static void flatMap() {
// 建立SparkConf
SparkConf conf = new SparkConf()
.setAppName("flatMap")
.setMaster("local");
// 建立JavaSparkContext
JavaSparkContext sc = new JavaSparkContext(conf);
// 構造集合
List<String> lineList = Arrays.asList("hello you", "hello me", "hello world");
// 並行化集合,建立RDD
JavaRDD<String> lines = sc.parallelize(lineList);
// 對RDD執行flatMap運算元,將每一行文字,拆分為多個單詞
// flatMap運算元,在java中,接收的引數是FlatMapFunction
// 我們需要自己定義FlatMapFunction的第二個泛型型別,即,代表了返回的新元素的型別
// call()方法,返回的型別,不是U,而是Iterable<U>,這裡的U也與第二個泛型型別相同
// flatMap其實就是,接收原始RDD中的每個元素,並進行各種邏輯的計算和處理,返回可以返回多個元素
// 多個元素,即封裝在Iterable集合中,可以使用ArrayList等集合
// 新的RDD中,即封裝了所有的新元素;也就是說,新的RDD的大小一定是 >= 原始RDD的大小
JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
private static final long serialVersionUID = 1L;
// 在這裡會,比如,傳入第一行,hello you
// 返回的是一個Iterable<String>(hello, you)
@Override
public Iterable<String> call(String t) throws Exception {
return Arrays.asList(t.split(" "));
}
});
// 列印新的RDD
words.foreach(new VoidFunction<String>() {
private static final long serialVersionUID = 1L;
@Override
public void call(String t) throws Exception {
System.out.println(t);
}
});
// 關閉JavaSparkContext
sc.close();
}
/**
* groupByKey案例:按照班級對成績進行分組
*/
private static void groupByKey() {
// 建立SparkConf
SparkConf conf = new SparkConf()
.setAppName("groupByKey")
.setMaster("local");
// 建立JavaSparkContext
JavaSparkContext sc = new JavaSparkContext(conf);
// 模擬集合
List<Tuple2<String, Integer>> scoreList = Arrays.asList(
new Tuple2<String, Integer>("class1", 80),
new Tuple2<String, Integer>("class2", 75),
new Tuple2<String, Integer>("class1", 90),
new Tuple2<String, Integer>("class2", 65));
// 並行化集合,建立JavaPairRDD
JavaPairRDD<String, Integer> scores = sc.parallelizePairs(scoreList);
// 針對scores RDD,執行groupByKey運算元,對每個班級的成績進行分組
// groupByKey運算元,返回的還是JavaPairRDD
// 但是,JavaPairRDD的第一個泛型型別不變,第二個泛型型別變成Iterable這種集合型別
// 也就是說,按照了key進行分組,那麼每個key可能都會有多個value,此時多個value聚合成了Iterable
// 那麼接下來,我們是不是就可以通過groupedScores這種JavaPairRDD,很方便地處理某個分組內的資料
JavaPairRDD<String, Iterable<Integer>> groupedScores = scores.groupByKey();
// 列印groupedScores RDD
groupedScores.foreach(new VoidFunction<Tuple2<String,Iterable<Integer>>>() {
private static final long serialVersionUID = 1L;
@Override
public void call(Tuple2<String, Iterable<Integer>> t)
throws Exception {
System.out.println("class: " + t._1);
Iterator<Integer> ite = t._2.iterator();
while(ite.hasNext()) {
System.out.println(ite.next());
}
System.out.println("==============================");
}
});
// 關閉JavaSparkContext
sc.close();
}
/**
* reduceByKey案例:統計每個班級的總分
*/
private static void reduceByKey() {
// 建立SparkConf
SparkConf conf = new SparkConf()
.setAppName("reduceByKey")
.setMaster("local");
// 建立JavaSparkContext
JavaSparkContext sc = new JavaSparkContext(conf);
// 模擬集合
List<Tuple2<String, Integer>> scoreList = Arrays.asList(
new Tuple2<String, Integer>("class1", 80),
new Tuple2<String, Integer>("class2", 75),
new Tuple2<String, Integer>("class1", 90),
new Tuple2<String, Integer>("class2", 65));
// 並行化集合,建立JavaPairRDD
JavaPairRDD<String, Integer> scores = sc.parallelizePairs(scoreList);
// 針對scores RDD,執行reduceByKey運算元
// reduceByKey,接收的引數是Function2型別,它有三個泛型引數,實際上代表了三個值
// 第一個泛型型別和第二個泛型型別,代表了原始RDD中的元素的value的型別
// 因此對每個key進行reduce,都會依次將第一個、第二個value傳入,將值再與第三個value傳入
// 因此此處,會自動定義兩個泛型型別,代表call()方法的兩個傳入引數的型別
// 第三個泛型型別,代表了每次reduce操作返回的值的型別,預設也是與原始RDD的value型別相同的
// reduceByKey演算法返回的RDD,還是JavaPairRDD<key, value>
JavaPairRDD<String, Integer> totalScores = scores.reduceByKey(
new Function2<Integer, Integer, Integer>() {
private static final long serialVersionUID = 1L;
// 對每個key,都會將其value,依次傳入call方法
// 從而聚合出每個key對應的一個value
// 然後,將每個key對應的一個value,組合成一個Tuple2,作為新RDD的元素
@Override
public Integer call(Integer v1, Integer v2) throws Exception {
return v1 + v2;
}
});
// 列印totalScores RDD
totalScores.foreach(new VoidFunction<Tuple2<String,Integer>>() {
private static final long serialVersionUID = 1L;
@Override
public void call(Tuple2<String, Integer> t) throws Exception {
System.out.println(t._1 + ": " + t._2);
}
});
// 關閉JavaSparkContext
sc.close();
}
/**
* sortByKey案例:按照學生分數進行排序
*/
private static void sortByKey() {
// 建立SparkConf
SparkConf conf = new SparkConf()
.setAppName("sortByKey")
.setMaster("local");
// 建立JavaSparkContext
JavaSparkContext sc = new JavaSparkContext(conf);
// 模擬集合
List<Tuple2<Integer, String>> scoreList = Arrays.asList(
new Tuple2<Integer, String>(65, "leo"),
new Tuple2<Integer, String>(50, "tom"),
new Tuple2<Integer, String>(100, "marry"),
new Tuple2<Integer, String>(80, "jack"));
// 並行化集合,建立RDD
JavaPairRDD<Integer, String> scores = sc.parallelizePairs(scoreList);
// 對scores RDD執行sortByKey運算元
// sortByKey其實就是根據key進行排序,可以手動指定升序,或者降序
// 返回的,還是JavaPairRDD,其中的元素內容,都是和原始的RDD一模一樣的
// 但是就是RDD中的元素的順序,不同了
JavaPairRDD<Integer, String> sortedScores = scores.sortByKey(false);
// 列印sortedScored RDD
sortedScores.foreach(new VoidFunction<Tuple2<Integer,String>>() {
private static final long serialVersionUID = 1L;
@Override
public void call(Tuple2<Integer, String> t) throws Exception {
System.out.println(t._1 + ": " + t._2);
}
});
// 關閉JavaSparkContext
sc.close();
}
/**
* join案例:列印學生成績
*/
private static void join() {
// 建立SparkConf
SparkConf conf = new SparkConf()
.setAppName("join")
.setMaster("local");
// 建立JavaSparkContext
JavaSparkContext sc = new JavaSparkContext(conf);
// 模擬集合
List<Tuple2<Integer, String>> studentList = Arrays.asList(
new Tuple2<Integer, String>(1, "leo"),
new Tuple2<Integer, String>(2, "jack"),
new Tuple2<Integer, String>(3, "tom"));
List<Tuple2<Integer, Integer>> scoreList = Arrays.asList(
new Tuple2<Integer, Integer>(1, 100),
new Tuple2<Integer, Integer>(2, 90),
new Tuple2<Integer, Integer>(3, 60));
// 並行化兩個RDD
JavaPairRDD<Integer, String> students = sc.parallelizePairs(studentList);
JavaPairRDD<Integer, Integer> scores = sc.parallelizePairs(scoreList);
// 使用join運算元關聯兩個RDD
// join以後,還是會根據key進行join,並返回JavaPairRDD
// 但是JavaPairRDD的第一個泛型型別,之前兩個JavaPairRDD的key的型別,因為是通過key進行join的
// 第二個泛型型別,是Tuple2<v1, v2>的型別,Tuple2的兩個泛型分別為原始RDD的value的型別
// join,就返回的RDD的每一個元素,就是通過key join上的一個pair
// 什麼意思呢?比如有(1, 1) (1, 2) (1, 3)的一個RDD
// 還有一個(1, 4) (2, 1) (2, 2)的一個RDD
// join以後,實際上會得到(1 (1, 4)) (1, (2, 4)) (1, (3, 4))
JavaPairRDD<Integer, Tuple2<String, Integer>> studentScores = students.join(scores);
// 列印studnetScores RDD
studentScores.foreach(
new VoidFunction<Tuple2<Integer,Tuple2<String,Integer>>>() {
private static final long serialVersionUID = 1L;
@Override
public void call(Tuple2<Integer, Tuple2<String, Integer>> t)
throws Exception {
System.out.println("student id: " + t._1);
System.out.println("student name: " + t._2._1);
System.out.println("student score: " + t._2._2);
System.out.println("===============================");
}
});
// 關閉JavaSparkContext
sc.close();
}
/**
* cogroup案例:列印學生成績
*/
private static void cogroup() {
// 建立SparkConf
SparkConf conf = new SparkConf()
.setAppName("cogroup")
.setMaster("local");
// 建立JavaSparkContext
JavaSparkContext sc = new JavaSparkContext(conf);
// 模擬集合
List<Tuple2<Integer, String>> studentList = Arrays.asList(
new Tuple2<Integer, String>(1, "leo"),
new Tuple2<Integer, String>(2, "jack"),
new Tuple2<Integer, String>(3, "tom"));
List<Tuple2<Integer, Integer>> scoreList = Arrays.asList(
new Tuple2<Integer, Integer>(1, 100),
new Tuple2<Integer, Integer>(2, 90),
new Tuple2<Integer, Integer>(3, 60),
new Tuple2<Integer, Integer>(1, 70),
new Tuple2<Integer, Integer>(2, 80),
new Tuple2<Integer, Integer>(3, 50));
// 並行化兩個RDD
JavaPairRDD<Integer, String> students = sc.parallelizePairs(studentList);
JavaPairRDD<Integer, Integer> scores = sc.parallelizePairs(scoreList);
// cogroup與join不同
// 相當於是,一個key join上的所有value,都給放到一個Iterable裡面去了
// cogroup,不太好講解,希望大家通過動手編寫我們的案例,仔細體會其中的奧妙
JavaPairRDD<Integer, Tuple2<Iterable<String>, Iterable<Integer>>> studentScores =
students.cogroup(scores);
// 列印studnetScores RDD
studentScores.foreach(
new VoidFunction<Tuple2<Integer,Tuple2<Iterable<String>,Iterable<Integer>>>>() {
private static final long serialVersionUID = 1L;
@Override
public void call(
Tuple2<Integer, Tuple2<Iterable<String>, Iterable<Integer>>> t)
throws Exception {
System.out.println("student id: " + t._1);
System.out.println("student name: " + t._2._1);
System.out.println("student score: " + t._2._2);
System.out.println("===============================");
}
});
// 關閉JavaSparkContext
sc.close();
}
}
scala
package cn.spark.study.core
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
/**
* @author Administrator
*/
object TransformationOperation {
def main(args: Array[String]) {
// map()
// filter()
// flatMap()
// groupByKey()
// reduceByKey()
// sortByKey()
// join()
cogroup()
}
def map() {
val conf = new SparkConf()
.setAppName("map")
.setMaster("local")
val sc = new SparkContext(conf)
val numbers = Array(1, 2, 3, 4, 5)
val numberRDD = sc.parallelize(numbers, 1)
val multipleNumberRDD = numberRDD.map { num => num * 2 }
multipleNumberRDD.foreach { num => println(num) }
}
def filter() {
val conf = new SparkConf()
.setAppName("filter")
.setMaster("local")
val sc = new SparkContext(conf)
val numbers = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
val numberRDD = sc.parallelize(numbers, 1)
val evenNumberRDD = numberRDD.filter { num => num % 2 == 0 }
evenNumberRDD.foreach { num => println(num) }
}
def flatMap() {
val conf = new SparkConf()
.setAppName("flatMap")
.setMaster("local")
val sc = new SparkContext(conf)
val lineArray = Array("hello you", "hello me", "hello world")
val lines = sc.parallelize(lineArray, 1)
val words = lines.flatMap { line => line.split(" ") }
words.foreach { word => println(word) }
}
def groupByKey() {
val conf = new SparkConf()
.setAppName("groupByKey")
.setMaster("local")
val sc = new SparkContext(conf)
val scoreList = Array(Tuple2("class1", 80), Tuple2("class2", 75),
Tuple2("class1", 90), Tuple2("class2", 60))
val scores = sc.parallelize(scoreList, 1)
val groupedScores = scores.groupByKey()
groupedScores.foreach(score => {
println(score._1);
score._2.foreach { singleScore => println(singleScore) };
println("=============================")
})
}
def reduceByKey() {
val conf = new SparkConf()
.setAppName("groupByKey")
.setMaster("local")
val sc = new SparkContext(conf)
val scoreList = Array(Tuple2("class1", 80), Tuple2("class2", 75),
Tuple2("class1", 90), Tuple2("class2", 60))
val scores = sc.parallelize(scoreList, 1)
val totalScores = scores.reduceByKey(_ + _)
totalScores.foreach(classScore => println(classScore._1 + ": " + classScore._2))
}
def sortByKey() {
val conf = new SparkConf()
.setAppName("sortByKey")
.setMaster("local")
val sc = new SparkContext(conf)
val scoreList = Array(Tuple2(65, "leo"), Tuple2(50, "tom"),
Tuple2(100, "marry"), Tuple2(85, "jack"))
val scores = sc.parallelize(scoreList, 1)
val sortedScores = scores.sortByKey(false)
sortedScores.foreach(studentScore => println(studentScore._1 + ": " + studentScore._2))
}
def join() {
val conf = new SparkConf()
.setAppName("join")
.setMaster("local")
val sc = new SparkContext(conf)
val studentList = Array(
Tuple2(1, "leo"),
Tuple2(2, "jack"),
Tuple2(3, "tom"));
val