1. 程式人生 > >BigData_A_A_03-YARN-資源管理和任務排程(2)共同好友(spark)

BigData_A_A_03-YARN-資源管理和任務排程(2)共同好友(spark)

楔子

最近看了hadoop求共同好友,也瞭解一些spark基本程式設計,感覺 思路 一致,可以試試spark

demo

是否是直接好友,因為存在這種情況,他倆是直接好友,但是他倆同時也是別人的間接好友,這種情況排除,因此兩次flatMap,第二次就是為了排除第一次 中包含的他倆是直接好友的情況,但是這種存在一個問題,排除使用的是集合的操作,如果資料量大,可能有問題。

程式碼位置

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;

import org.
apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.storage.StorageLevel; import cn.sxt.config.HadoopConfig; import cn.zhuzi.spark.official.
SparkUtils; import scala.Tuple2; public class SparkFof { public static void main(String[] args) { JavaSparkContext jsContext = SparkUtils.getJavaSparkContext(); JavaRDD<String> textFile = jsContext.textFile(HadoopConfig.getInputPath("data/sxt/friend")); textFile.persist(StorageLevel.MEMORY_AND_DISK_SER
()); JavaRDD<String> flatMap = textFile.flatMap(new FlatMapFunction<String, String>() { @Override public Iterator<String> call(String t) throws Exception { ArrayList<String> resuList = new ArrayList<String>(); String[] split = t.split(" "); // 此處只是給 一個人的共同好友佩對 比如 tom hello hadoop cat // 就輸出 hello:hadoop ,hello:cat ,hadoop:cat , // 然後 按照 Wordcount 那樣求和 此處出現 一個問題, // TODO 這樣計算過程中,他倆是別人的好友,可能儲存在他倆是直接好友 for (int i = 1; i < split.length; i++) { for (int j = i + 1; j < split.length; j++) { resuList.add(FofMapper.friends(split[i], split[j])); } } return resuList.iterator(); } }); // 直接好友 JavaRDD<String> flatMapFriend = textFile.flatMap(new FlatMapFunction<String, String>() { @Override public Iterator<String> call(String t) throws Exception { ArrayList<String> resuList = new ArrayList<String>(); String[] split = t.split(" "); for (int i = 1; i < split.length; i++) { resuList.add(FofMapper.friends(split[i], split[0])); } return resuList.iterator(); } }); ArrayList<String>list =new ArrayList<String>(flatMap.collect()) ;// flatMap的集合不可修改 ArrayList<String> collect2 = new ArrayList<String>(flatMapFriend.collect()); list.removeAll(collect2); // flatMap中藥排除他倆是直接好友 JavaRDD<String> parallelize = jsContext.parallelize(list); JavaPairRDD<String, Integer> mapToPair = parallelize.mapToPair(t -> new Tuple2<String, Integer>(t, 1)); // 分組聚合 JavaPairRDD<String, Integer> res = mapToPair.reduceByKey((a, b) -> a + b); List<Tuple2<String, Integer>> collect = res.collect(); for (Tuple2<String, Integer> tuple2 : collect) { System.out.println(tuple2); } jsContext.close(); } }