MapReduce程序之TopN問題(排行榜問題)
阿新 • • 發佈:2018-03-09
大數據 Hadoop MapReduce Java [toc]
MapReduce程序之TopN問題(排行榜問題)
需求
有下面的文本文件:
yeyonghao@yeyonghaodeMacBook-Pro:~/data/input/topn$ cat senventeen_a.txt 1,9819,100,121 2,8918,2000,111 3,2813,1234,22 4,9100,10,1101 5,3210,490,111 6,1298,28,1211 7,1010,281,90 8,1818,9000,20 yeyonghao@yeyonghaodeMacBook-Pro:~/data/input/topn$ cat senventeen_b.txt 100,3333,10,100 101,9321,1000,293 102,3881,701,20 103,6791,910,30 104,8888,11,39
以逗號作為分隔符,每一列分別為orderid,userid,payment,productid
,現在需要按照payment
從大到小求出TopN,比如top10,其輸出結果應該如下:
1 9000
2 2000
3 1234
4 1000
5 910
6 701
7 490
8 281
9 100
10 28
此外,TopN中的N應該是動態的,由輸入的參數來決定,根據引寫一個MapReduce程序來進行處理。
程序思路分析
如下:
Mapper: /** * Mapper,因為Block中的每一個split都會交由一個Mapper Task來進行處理,對於TopN問題,可以考慮每一個Mapper Task的輸出 * 可以為這個split中的前N個值,最後每個數據到達Reducer的時候,就可以大大減少原來需要比較的數據量,因為在Reducer處理之前 * Map Task已經幫我們把的數據量大大減少了,比如,在MapReduce中,默認情況下一個Block就為一個split,當然這個是可以設置的 * 而一個Block為128M,顯然128M能夠存儲的文本文件也是相當多的,假設現在我的數據有10個Block,即1280MB的數據,如果要求Top10 * 的問題,此時,這些數據需要10個Mapper Task來進行處理,那麽在每個Mapper Task中先求出前10個數,最後這10個數再交由Reducer來進行處理 * 也就是說,在我們的這個案例中,Reducer需要處理排序的數有100個,顯然經過Map處理之後,Reducer的壓力就大大減少了。 * 那麽如何實現每個Mapper Task中都只輸出10個數呢?這時可以使用一個set來緩存數據,從而達到先緩存10個數的目的,詳細可以參考下面的代碼。 */ Reducer: /** * Reducer,將Mapper Task輸出的數據排序後再輸出 * 處理思路與Mapper是類似的 */ TopN中的N值問題: // 向conf中傳入參數 // 在MapReduce中,因為計算是分散到每個節點上進行的 // 也就是將我們的Maper和Reducer也是分散到每個節點進行的 // 所以不能在TopNJob中設置一個全局變量來對N進行設置(雖然在本地運行時是沒有問題的,但在集群運行時會有問題) // 因此MapReduce提供了在Configuration對象中設置參數的方法 // 通過在Configuration對象中設置某些參數,可以保證每個節點的Mapper和Reducer都能夠讀取到N
MapReduce程序
關於如何處理TopN問題的思路已經在代碼註釋中有說明,不過需要註意的是,這裏使用了前面開發的Job工具類來開發驅動程序。
package com.uplooking.bigdata.mr.topn; import com.uplooking.bigdata.common.utils.MapReduceJobUtil; import com.uplooking.bigdata.mr.secondsort.AccessLogWritable; import com.uplooking.bigdata.mr.secondsort.SecondSortJob; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import java.io.IOException; import java.util.Comparator; import java.util.TreeSet; /** * MapReduce程序之TopN問題 */ public class TopNJob { /** * 驅動程序,使用Job工具類來生成job */ public static void main(String[] args) throws Exception { if (args == null || args.length < 3) { System.err.println("Parameter Errors! Usages:<inputpath> <outputpath> <topN>"); System.exit(-1); } // 向conf中傳入參數 // 在MapReduce中,因為計算是分散到每個節點上進行的 // 也就是將我們的Maper和Reducer也是分散到每個節點進行的 // 所以不能在TopNJob中設置一個全局變量來對N進行設置(雖然在本地運行時是沒有問題的,但在集群運行時會有問題) // 因此MapReduce提供了在Configuration對象中設置參數的方法 // 通過在Configuration對象中設置某些參數,可以保證每個節點的Mapper和Reducer都能夠讀取到N Configuration conf = new Configuration(); conf.set("topN", args[2]); Job job = MapReduceJobUtil.buildJob(conf, TopNJob.class, args[0], TextInputFormat.class, TopNJobMapper.class, IntWritable.class, NullWritable.class, new Path(args[1]), TextOutputFormat.class, TopNReducer.class, IntWritable.class, IntWritable.class); // ReduceTask必須設置為1 job.setNumReduceTasks(1); job.waitForCompletion(true); } /** * Mapper,因為Block中的每一個split都會交由一個Mapper Task來進行處理,對於TopN問題,可以考慮每一個Mapper Task的輸出 * 可以為這個split中的前N個值,最後每個數據到達Reducer的時候,就可以大大減少原來需要比較的數據量,因為在Reducer處理之前 * Map Task已經幫我們把的數據量大大減少了,比如,在MapReduce中,默認情況下一個Block就為一個split,當然這個是可以設置的 * 而一個Block為128M,顯然128M能夠存儲的文本文件也是相當多的,假設現在我的數據有10個Block,即1280MB的數據,如果要求Top10 * 的問題,此時,這些數據需要10個Mapper Task來進行處理,那麽在每個Mapper Task中先求出前10個數,最後這10個數再交由Reducer來進行處理 * 也就是說,在我們的這個案例中,Reducer需要處理排序的數有100個,顯然經過Map處理之後,Reducer的壓力就大大減少了。 * 那麽如何實現每個Mapper Task中都只輸出10個數呢?這時可以使用一個set來緩存數據,從而達到先緩存10個數的目的,詳細可以參考下面的代碼。 */ public static class TopNJobMapper extends Mapper<LongWritable, Text, IntWritable, NullWritable> { TreeSet<Integer> cachedTopN = null; Integer N = null; /** * 每個Mapper Task執行前都會先執行setup函數 * map函數是每行執行一次 */ @Override protected void setup(Context context) throws IOException, InterruptedException { // TreeSet定義的排序規則為倒序,後面做數據的處理時只需要pollLast最後一個即可將 // TreeSet中較小的數去掉 cachedTopN = new TreeSet<Integer>(new Comparator<Integer>() { @Override public int compare(Integer o1, Integer o2) { int ret = 0; if (o1 > o2) { ret = -1; } else if (o1 < o2) { ret = 1; } return ret; } }); // 拿到傳入參數時的topN中的N值 N = Integer.valueOf(context.getConfiguration().get("topN")); } /** * 將split中前N個數篩選出來 */ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 解析每一行 String[] fields = value.toString().split(","); if (fields == null || fields.length < 3) { return; } // 轉換payment為數字,如果出現異常,終止當前map函數的執行 Integer payment = null; try { payment = Integer.valueOf(fields[2]); } catch (NumberFormatException e) { e.printStackTrace(); return; } // 將數字寫入到TreeSet當中 cachedTopN.add(payment); // 判斷cachedTopN中的元素個數是否已經達到N個,如果已經達到N個,則去掉最後一個 if (cachedTopN.size() > N) { cachedTopN.pollLast(); } } /** * 每個Mapper Task執行結束後才會執行cleanup函數 * 將map函數篩選出來的前N個數寫入到context中作為輸出 * 將 * map函數是每行執行一次 */ @Override protected void cleanup(Context context) throws IOException, InterruptedException { for (Integer num : cachedTopN) { context.write(new IntWritable(num), NullWritable.get()); } } } /** * Reducer,將Mapper Task輸出的數據排序後再輸出 * 處理思路與Mapper是類似的 */ public static class TopNReducer extends Reducer<IntWritable, NullWritable, IntWritable, IntWritable> { TreeSet<Integer> cachedTopN = null; Integer N = null; /** * 初始化一個TreeSet */ @Override protected void setup(Context context) throws IOException, InterruptedException { // TreeSet定義的排序規則為倒序,後面做數據的處理時只需要pollLast最後一個即可將 // TreeSet中較小的數去掉 cachedTopN = new TreeSet<Integer>(new Comparator<Integer>() { @Override public int compare(Integer o1, Integer o2) { int ret = 0; if (o1 > o2) { ret = -1; } else if (o1 < o2) { ret = 1; } return ret; } }); // 拿到傳入參數時的topN中的N值 N = Integer.valueOf(context.getConfiguration().get("topN")); } /** * 篩選Reducer Task中的前10個數 */ @Override protected void reduce(IntWritable key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException { cachedTopN.add(Integer.valueOf(key.toString())); // 判斷cachedTopN中的元素個數是否已經達到N個,如果已經達到N個,則去掉最後一個 if (cachedTopN.size() > N) { cachedTopN.pollLast(); } } /** * 將reduce函數篩選出來的前N個數寫入到context中作為輸出 */ @Override protected void cleanup(Context context) throws IOException, InterruptedException { int index = 1; for(Integer num : cachedTopN) { context.write(new IntWritable(index), new IntWritable(num)); index++; } } } }
測試
這裏使用本地環境來運行MapReduce程序,輸入的參數如下:
/Users/yeyonghao/data/input/topn /Users/yeyonghao/data/output/mr/topn 10
也可以將其打包成jar包,然後上傳到Hadoop環境中運行。
運行程序後,查看輸出結果如下:
yeyonghao@yeyonghaodeMacBook-Pro:~/data/output/mr/topn$ cat part-r-00000
1 9000
2 2000
3 1234
4 1000
5 910
6 701
7 490
8 281
9 100
10 28
可以看到,我們的MapReduce程序已經完成了TopN問題的處理,並且其中的N值是動態的,可以根據參數來動態確定。
MapReduce程序之TopN問題(排行榜問題)