BigData_A_A_03-YARN-資源管理和任務排程(2)共同好友(spark)
阿新 • • 發佈:2018-12-31
楔子
最近看了hadoop求共同好友,也瞭解一些spark基本程式設計,感覺 思路 一致,可以試試spark
demo
是否是直接好友,因為存在這種情況,他倆是直接好友,但是他倆同時也是別人的間接好友,這種情況排除
,因此兩次flatMap
,第二次就是為了排除第一次 中包含的他倆是直接好友的情況,但是這種存在一個問題,排除使用的是集合的操作,如果資料量大,可能有問題。
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import org. apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.storage.StorageLevel;
import cn.sxt.config.HadoopConfig;
import cn.zhuzi.spark.official. SparkUtils;
import scala.Tuple2;
public class SparkFof {
public static void main(String[] args) {
JavaSparkContext jsContext = SparkUtils.getJavaSparkContext();
JavaRDD<String> textFile = jsContext.textFile(HadoopConfig.getInputPath("data/sxt/friend"));
textFile.persist(StorageLevel.MEMORY_AND_DISK_SER ());
JavaRDD<String> flatMap = textFile.flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterator<String> call(String t) throws Exception {
ArrayList<String> resuList = new ArrayList<String>();
String[] split = t.split(" ");
// 此處只是給 一個人的共同好友佩對 比如 tom hello hadoop cat
// 就輸出 hello:hadoop ,hello:cat ,hadoop:cat ,
// 然後 按照 Wordcount 那樣求和 此處出現 一個問題,
// TODO 這樣計算過程中,他倆是別人的好友,可能儲存在他倆是直接好友
for (int i = 1; i < split.length; i++) {
for (int j = i + 1; j < split.length; j++) {
resuList.add(FofMapper.friends(split[i], split[j]));
}
}
return resuList.iterator();
}
});
// 直接好友
JavaRDD<String> flatMapFriend = textFile.flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterator<String> call(String t) throws Exception {
ArrayList<String> resuList = new ArrayList<String>();
String[] split = t.split(" ");
for (int i = 1; i < split.length; i++) {
resuList.add(FofMapper.friends(split[i], split[0]));
}
return resuList.iterator();
}
});
ArrayList<String>list =new ArrayList<String>(flatMap.collect()) ;// flatMap的集合不可修改
ArrayList<String> collect2 = new ArrayList<String>(flatMapFriend.collect());
list.removeAll(collect2);
// flatMap中藥排除他倆是直接好友
JavaRDD<String> parallelize = jsContext.parallelize(list);
JavaPairRDD<String, Integer> mapToPair = parallelize.mapToPair(t -> new Tuple2<String, Integer>(t, 1));
// 分組聚合
JavaPairRDD<String, Integer> res = mapToPair.reduceByKey((a, b) -> a + b);
List<Tuple2<String, Integer>> collect = res.collect();
for (Tuple2<String, Integer> tuple2 : collect) {
System.out.println(tuple2);
}
jsContext.close();
}
}