1. 程式人生 > >大數據算法設計模式(1) - topN spark實現

大數據算法設計模式(1) - topN spark實現

lin pair run remove integer fun zab map [0

topN算法,spark實現

package com.kangaroo.studio.algorithms.topn;


import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.broadcast.Broadcast; import scala.Tuple2; import java.io.Serializable; import java.util.*; public class TopnSpark implements Serializable { private JavaSparkContext jsc; Broadcast<Integer> topNum; private
String inputPath; /* * 構造函數 * 1. 初始化JavaSparkContext * 2. 初始化廣播變量topN個數, 可以被所有partition共享 * 3. 初始化輸入路徑 * */ public TopnSpark(Integer Num, String path) { jsc = new JavaSparkContext(); topNum = jsc.broadcast(Num); inputPath = path; }
/* * 程序入口函數 * */ public void run() { /* * 讀入inputPath中的數據 * */ JavaRDD<String> lines = jsc.textFile(inputPath, 1); /* * 將rdd規約到9個分區 * */ JavaRDD<String> rdd = lines.coalesce(9); /* * 將輸入轉化為kv格式 * key是規約的主鍵, value是排序參考的個數 * 註: 這裏的key並不唯一, 即相同的key可能有多條記錄, 所以下面我們規約key成唯一鍵 * 輸入:line, 輸出:kv * */ JavaPairRDD<String, Integer> kv = rdd.mapToPair(new PairFunction<String, String, Integer>() { public Tuple2<String, Integer> call(String s) throws Exception { String[] tokens = s.split(","); return new Tuple2<String, Integer>(tokens[0], Integer.parseInt(tokens[1])); } }); /* * 規約主鍵成為唯一鍵 * 輸入:kv, 輸出:kv * */ JavaPairRDD<String, Integer> uniqueKeys = kv.reduceByKey(new Function2<Integer, Integer, Integer>() { public Integer call(Integer i1, Integer i2) throws Exception { return i1 + i2; } }); /* * 計算各個分區的topN * 這裏通過廣播變量拿到了topN具體個數, 每個分區都保留topN, 所有分區總個數: partitionNum * topN * 輸入:kv, 輸出:SortMap, 長度topN * */ JavaRDD<SortedMap<Integer, String>> partitions = uniqueKeys.mapPartitions(new FlatMapFunction<Iterator<Tuple2<String,Integer>>, SortedMap<Integer, String>>() { public Iterable<SortedMap<Integer, String>> call(Iterator<Tuple2<String, Integer>> iter) throws Exception { final int N = topNum.getValue(); SortedMap<Integer, String> topN = new TreeMap<Integer, String>(); while (iter.hasNext()) { Tuple2<String, Integer> tuple = iter.next(); topN.put(tuple._2, tuple._1); if (topN.size() > N) { topN.remove(topN.firstKey()); } } return Collections.singletonList(topN); } }); /* * 規約所有分區的topN SortMap, 得到最終的SortMap, 長度topN * reduce過後, 數據已經到了本地緩存, 這是最後結果 * 輸入: SortMap, 長度topN, 當然有partitionNum個, 輸出:SortMap, 長度topN * */ SortedMap<Integer, String> finalTopN = partitions.reduce(new Function2<SortedMap<Integer, String>, SortedMap<Integer, String>, SortedMap<Integer, String>>() { public SortedMap<Integer, String> call(SortedMap<Integer, String> m1, SortedMap<Integer, String> m2) throws Exception { final int N = topNum.getValue(); SortedMap<Integer, String> topN = new TreeMap<Integer, String>(); for (Map.Entry<Integer, String> entry : m1.entrySet()) { topN.put(entry.getKey(), entry.getValue()); if (topN.size() > N) { topN.remove(topN.firstKey()); } } for (Map.Entry<Integer, String> entry : m2.entrySet()) { topN.put(entry.getKey(), entry.getValue()); if (topN.size() > N) { topN.remove(topN.firstKey()); } } return topN; } }); /* * 將本地緩存的最終結果打印出來 * */ for (Map.Entry<Integer, String> entry : finalTopN.entrySet()) { System.out.println(entry.getKey() + " -- " + entry.getValue()); } } public static void main(String[] args) { String inputPath = args[0]; TopnSpark topnMapper = new TopnSpark(10, inputPath); topnMapper.run(); } }

大數據算法設計模式(1) - topN spark實現