《深入理解Spark》之Spark常用運算元詳解(java版+spark1.6.1)
最近公司要用Java開發Spark專案,以前用的是Scala語言,今天就把Spark常用的運算元使用java語言實現了一遍
XML Code
1 |
package com.lyzx.spark.streaming; * 測試java語言開發Spark程式的各個運算元 * 由於java語言是強型別的語言,所以這兒儘量使用lambda表示式以儘量使程式碼簡潔 */ public class Day01 { /** * 測試map和mapToPair運算元 * map: 是一個對映運算元即把RDD中每個Partition中的資料做一個對映即轉變為別的格式/型別 * JavaRDD[M]=>JavaRDD[N],對每一條資料都呼叫一次map操作 * mapToPair:也是一個對映運算元,把一個RDD對映為[K,V]格式的RDD * mapPartitions:對每一個Partition呼叫一次對映運算元,是對map運算元的優化 * mapPartitionsWithIndex:對每一個Partition呼叫一次對映運算元,並帶有每個partition的編號即索引 * @param ctx */ public void testMap(JavaSparkContext ctx){ JavaRDD<Integer> rdd = ctx.parallelize(Arrays.asList(1,2,3,4,5,6,7,8,9,10)); rdd.map(x->x+100) .mapToPair(x->new Tuple2<>(x,x)) .mapPartitions(itr->{ List<String> result = new ArrayList<>(); while (itr.hasNext()){ Tuple2 t = itr.next(); result.add(t._1+"-"+t._2); } return result; }) .mapPartitionsWithIndex((index,itr)->{ List<String> result = new ArrayList<>(); System.out.println("==="+index); while(itr.hasNext()){ String item = itr.next(); System.out.printf(" "+item); result.add(item); } return result.iterator(); },true) .foreach(x-> System.out.println(x)); } /** * 測試filter運算元 * 通過對每個元素做判斷並返回boolean值型別的引數, * 如果返回true就保留該元素,否則去除掉 * @param ctx */ public void testFilter(JavaSparkContext ctx){ JavaRDD<Integer> rdd = ctx.parallelize(Arrays.asList(1,2,3,4,5,6,7,8,9,10)); //x代表這個RDD的每一個元素,迭代每個元素判斷是否保留 JavaPairRDD pairRDD = rdd.filter(x->x>5) .mapToPair(x->new Tuple2<>(x,x+1000)); pairRDD.foreach(x-> System.out.println(x)); } /** * 測試flatMap運算元,這個運算元就是先map後flat即 * 先做map操作如下做完map操作後的返回值是String[] * 然後把陣列中的每一項掏出來作為單獨的一項 * @param ctx */ public void testFlatMap(JavaSparkContext ctx){ JavaRDD<String> rdd = ctx.parallelize(Arrays.asList("hello java","hello c++","hello c","hello java","hello 鳴人 "," hello 雛田")); List<Tuple2<String,Integer>> items = rdd.flatMap(x->Arrays.asList(x.trim().split(" "))) .mapToPair(x->new Tuple2<>(x,1)) .reduceByKey((x,y)->x+y) .take(3); Iterator itr = items.iterator(); while(itr.hasNext()){ System.out.println(itr.next()); } } /** * Sample:隨機抽樣運算元 * withReplacement:表示取出來的元素是否要放回去,true表示要放回去 * fraction:取多少,用一個小數表示,0.2表示取出20% * seed:表示種子,用於在除錯時取出相同的元素 * @param ctx */ public void testSample(JavaSparkContext ctx){ String[] table = {"A","B","C","D","E","F","G","H","I","J","K","L","M","N","O","P","Q","R","S","T","U","V","W","X","Y","Z"}; JavaRDD rdd = ctx.parallelize(Arrays.asList(table)); JavaRDD sampleRdd = rdd.sample(false,0.2); sampleRdd.foreach(x-> System.out.println(x)); } /** * RDD還可以當做一個集合使用 * union:表示求並集但是不去重 * intersection:求交集 * subtract:求餘集 * cartesian:求笛卡爾積 * @param ctx */ public void testSetOperate(JavaSparkContext ctx){ JavaRDD r1 = ctx.parallelize(Arrays.asList(1,2,3,4,5,6,7,8,9,10)); JavaRDD r2 = ctx.parallelize(Arrays.asList(10,11,12,13,14,15,16,17,18,19)); //求並集不去重 r1.union(r2) .distinct() .foreach(x-> System.out.print(x+" ")); System.out.println("======================="); //求交集 r1.intersection(r2) .foreach(x-> System.out.print(x+"=")); System.out.println("======================="); //差集/餘集 r1-r2 r1.subtract(r2) .foreach(x-> System.out.print(x+" <> ")); //笛卡爾積 System.out.println("======================="); r1.cartesian(r2) .foreach(x-> System.out.print(x+"|")); } /** * groupByKey:按照key分組,把key相同的資料放在一個組裡,組成二元組格式Key是相同的鍵, * 把對應的值都放在一個Iterable集合裡 * 注意:這個運算元時基於[K,V]格式的RDD的 * 對於[K,V]格式的RDD即JavaPairRDD一定要指定泛型要不然不能使用具體的型別 * 如下在foreach時x是一個Object型別而不是Tuple2型別 * * JavaPairRDD[K,V]=>JavaPairRDD[k,Iterable<V>] * e.g: * rdd1:[(1,1),(2,2),(3,3),(1,1),(2,2)] * rdd2 = rdd1.groupByKey() * rdd2:[(1,[1,1]),(2,(2,2)),(3,[3])] * 返回值型別二元組,key就是相同的鍵,值時相同的鍵對應的值的Iterable集合 * @param ctx */ public void testGroupByKey(JavaSparkContext ctx){ List<Tuple2<String,Integer>> pairData = Arrays.asList(new Tuple2<>("A",1), new Tuple2<>("B",2), new Tuple2<>("C",3), new Tuple2<>("B",2), new Tuple2<>("C",3)); JavaPairRDD<String,Integer> r1 = ctx.parallelizePairs(pairData); r1.groupByKey() .foreach(x->{ System.out.println(x._1); Iterable<Integer> itr = x._2; for(Integer i : itr){ System.out.print(i+" "); } System.out.println(); }); } /** * 按照key做聚合 * 有兩個型別相同的引數(T t1,T t2)最後的結果是T型別 * 也就是說把鍵相同的值多一次聚合即[K,V]是["one",Iterable<1,2,3,4,5,6,7,8,9,10>] * (Int,Int)=>Int 例如:(x,y)=>x+y * 第一次把1賦給x,把2賦給y做相加的操作後把結果3再次賦值給x * 然後把第三個元素賦給y在做相加操作,以此類推 * @param ctx */ public void testReduceByKey(JavaSparkContext ctx){ JavaRDD<String> rdd = ctx.parallelize(Arrays.asList("A","B","D","Z","X","Y","A","B","D","Z","X","Y","A","B","D","A","B","D","A","B","D","A","B","D")); rdd.mapToPair(x->new Tuple2<>(x,1)) .reduceByKey((x,y)->x+y) .sortByKey(false) .foreach(x-> System.out.println(x)); } /** * join:類似於SQL語言中的等值inner join * 把鍵相同的值們放入一個元組中,下面是這個例子程式的運算結果 * (B,(2,B2)) * (B,(2,B2)) * (C,(3,C2)) * (C,(3,C2)) * @param ctx */ public void testJoin(JavaSparkContext ctx){ JavaPairRDD<String,Integer> r1 = ctx.parallelizePairs(Arrays.asList( new Tuple2<>("A",1), new Tuple2<>("B",2), new Tuple2<>("C",3), new Tuple2<>("A",1), new Tuple2<>("B",2), new Tuple2<>("C",3) )); JavaPairRDD<String,String> r2 = ctx.parallelizePairs(Arrays.asList( new Tuple2<>("B","B2"), new Tuple2<>("C","C2"), new Tuple2<>("D","D2") )); r1.join(r2) .foreach(x-> System.out.println(x)); } /** * cogroup:做一個等值連線,不過是把鍵相同的值現在map端做一下聚合 * 如下的例子中: * 先把r1中B對應的值做一下聚合放入一個Iterable裡面形如 [B,Iterable<200,500>] * 再把R2中B對應的值做聚合形如:[B,Iterable<2,5>] * 最後再把B對應的值再次做聚合形成一個二元組,最後B對應的結果如[B,(Iterable<200,500>,Iterable<2,5>)] * 鍵對應的值時一個二元組,裡面放的是兩個Iterable集合 * @param ctx */ public void testCogroup(JavaSparkContext ctx){ JavaPairRDD<String,Integer> r1 = ctx.parallelizePairs(Arrays.asList( new Tuple2<>("A",100), new Tuple2<>("B",200), new Tuple2<>("C",300), new Tuple2<>("A",400), new Tuple2<>("B",500), new Tuple2<>("C",600) )); JavaPairRDD<String,Integer> r2 = ctx.parallelizePairs(Arrays.asList( new Tuple2<>("B",2), new Tuple2<>("C",3), new Tuple2<>("D",4), new Tuple2<>("B",5), new Tuple2<>("C",6), new Tuple2<>("D",7) )); r1.cogroup(r2) .foreach(x->{ Tuple2<Iterable<Integer>,Iterable<Integer>> t2 = x._2; //r1對應的值的集合 Iterable<Integer> itr1 = t2._1; for(Integer item : itr1 ){ System.out.print(":"+item+" "); } //r2對應的值的集合 Iterable<Integer> itr2 = t2._2; for(Integer item : itr2 ){ System.out.println(":::"+item+" "); } }); } /** * coalesce:重分割槽運算元 * 當第二個引數為true時肯定執行且有shuffle操作 * @param ctx */ public void testCoalesce(JavaSparkContext ctx){ JavaPairRDD<String,Integer> rdd = ctx.parallelize(Arrays.asList("A","B","D","Z","X","Y","A","B","D","Z","X","Y"),3) .mapToPair(x->new Tuple2<>(x,1)); rdd.coalesce(2) .mapPartitionsWithIndex((index,itr)->{ System.out.println(">>::"+index); while(itr.hasNext()){ System.out.print(itr.next()+" "); } return itr; },false) .foreach(x-> System.out.println(x)); } /** * 聚合操作(T t1,T t2)=>T t * @param ctx */ public void testReduce(JavaSparkContext ctx){ JavaRDD<Integer> rdd = ctx.parallelize(Arrays.asList(1,2,3,4,5,6,7,8,9)); JavaRDD<Integer> cacheRdd = rdd.cache(); long count = cacheRdd.count(); Integer i = cacheRdd.reduce((x,y)->x+y); System.out.println(count+"===="+i); } /** * takeSample:隨機取出N個元素 * takeOrdered:按照順序取出N個元素 * @param ctx */ public void testTakeSample(JavaSparkContext ctx){ JavaRDD<String> rdd = ctx.parallelize(Arrays.asList("F","G","H","A","B","C","D","E","I","L","M","N")); JavaRDD cacheRdd = rdd.cache(); List<String> elements = cacheRdd.takeSample(false,5); for(String item : elements){ System.out.println(item); } List<String> orderedList = cacheRdd.takeOrdered(2); for(String item : orderedList){ System.out.println("::"+item); } } public static void main(String[] args) { SparkConf conf = new SparkConf().setAppName("Day01").setMaster("local"); JavaSparkContext ctx = new JavaSparkContext(conf); //Q1:在foreachRDD中怎麼取出二元組的每一項 Day01 t = new Day01(); // t.testMap(ctx); // t.testFilter(ctx); // t.testFlatMap(ctx); // t.testSample(ctx); // t.testSetOperate(ctx); // t.testGroupByKey(ctx); // t.testReduceByKey(ctx); // t.testJoin(ctx); t.testCogroup(ctx); // t.testCoalesce(ctx); // t.testReduce(ctx); // t.testTakeSample(ctx); } |