2018-08-07 期 MapReduce模擬實現熱銷商品排行
import java.io.IOException;
import java.util.Comparator;
import java.util.TreeSet;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.junit.Test;
/**
* MapReduce實現熱銷商品TopN排行
* 這裏按照商品購買次數排名在前面的為熱銷商品
* 輸入數據:
* order.data1...10 10個訂單文件,每個文件5000-10000條的購買記錄,格式如下:
* orderid userid payment productid
* c13a009e-a950-42f6-8eab-8e28d1406fe0,U10102000139,1008, 21010185
c5d2a564-619c-4e1a-a350-7ce981bbc22d,U10102000191,1357, 21010117
1d762edd-0044-4773-a413-ab0440081b1e,U10102000150,2173, 21010124
e82c2848-6d6e-4fdf-8d7d-83059376846b,U10102000162,2310, 21010197
......
* 最終輸出數據(TopN):
熱銷商品排行Top10
商品ID 銷售數量
21010129 871
21010182 852
21010153 839
21010131 837
21010142 835
21010159 833
21010117 830
21010110 828
21010141 824
21010198 823
*
* 實現邏輯:
* Mapper端:
* (1)實現數據分片,將讀取的數據分片通過map方法處理後輸出到Combiner
* (2)數據的輸出格式
* <k2>Text <v2>Intwritable
* 21010185 <1>
* 21010117 <1>
* 21010185 <1>
* ... ...
* Combiner端:
* (1)Combiner是一種特殊的Reducer,使用Combiner需要註意不要改變程序原有邏輯,且保障Mapper端和Reducer端的數據類型一致
* (2)這裏使用Combiner主要是為了實現
* 1)每個商品購買次數求和
* 2)對於每個局部的Combiner任務,對接收到Mapper端輸出的數據處理後進行局部TopN排行,這樣可以避免不必要的數據傳遞到Reducer端,同時提高Reducer程序的執行效率
* (3)TopN中的N由Hadoop的configuration中set(K,V)來設置,這樣可以保障運行在各個機器上的任務可以獲取到這個全局唯一的N值
* (4)處理後數據輸出格式如下:
* <k2`> <v2`>
* 21010185 <30>
* 21010117 <20>
* ... ...
* 註意:這裏輸出為局部TopN排行
*
* Reducer端:
* (1)Reducer端主要對Combiner端輸出的多個局部排行的TopN條數據進行全局排行匯總
* (2)由於最終輸出只會到一個文件,因此需要保障Reducer Tasks任務數為1
* (3)通過Reducer處理後,最終輸出為
* <k3> <v4>
* 21010185 <30>
* 21010117 <20>
* ... ...
*
* @author songjq
*
*/
public class HotProductTopN {
/**
* Mapper端:
* (1)實現數據分片,將讀取的數據分片通過map方法處理後輸出到Combiner
* (2)數據的輸出格式
* <k2>Text <v2>Intwritable
* 21010185 <1>
* 21010117 <1>
* 21010185 <1>
* ... ...
* @author songjq
*
*/
static class HotProductTopNMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
private Text tkey = new Text();
private IntWritable tvalue = new IntWritable();
/*
* 讀取文件分片,並處理後輸出到Combiner
* (non-Javadoc)
* @see org.apache.hadoop.mapreduce.Mapper#map(KEYIN, VALUEIN, org.apache.hadoop.mapreduce.Mapper.Context)
*/
@Override
protected void map(LongWritable k1, Text v1, Context context) throws IOException, InterruptedException {
//讀入一行數據
String line = v1.toString();
//分詞處理
String[] order = line.split(",");
if(null!=order && order.length == 4) {
//商品ID
String productId = order[3];
tkey.set(productId);
tvalue.set(1);
//通過context將數據傳遞到Combiner
context.write(tkey, tvalue);
}else {
return;
}
}
}
/**
* * Combiner端:
* (1)Combiner是一種特殊的Reducer,使用Combiner需要註意不要改變程序原有邏輯,且保障Mapper端和Reducer端的數據類型一致
* (2)這裏使用Combiner主要是為了實現
* 1)每個商品購買次數求和
* 2)對於每個局部的Combiner任務,對接收到Mapper端輸出的數據處理後進行局部TopN排行,這樣可以避免不必要的數據傳遞到Reducer端,同時提高Reducer程序的執行效率
* (3)TopN中的N由Hadoop的configuration中set(K,V)來設置,這樣可以保障運行在各個機器上的任務可以獲取到這個全局唯一的N值
* (4)處理後數據輸出格式如下:
* <k2`> <v2`>
* 21010185 <30>
* 21010117 <20>
* ... ...
* 註意:這裏輸出為局部TopN排行
* @author songjq
*
*/
static class HotProductTopNCombiner extends Reducer<Text, IntWritable, Text, IntWritable>{
private TreeSet<String[]> treeSet = null;
//全局前N條商品排名
private Integer N = null;
/*
* 初始化方法,在reduce方法調用前執行,只會被執行一次
* 通過該方法,我們可以獲取全局N變量的值,且可以初始化TopN的treeset集合。
* (non-Javadoc)
* @see org.apache.hadoop.mapreduce.Mapper#setup(org.apache.hadoop.mapreduce.Mapper.Context)
*/
@Override
protected void setup(Context context) throws IOException, InterruptedException {
//獲取全局N
N = Integer.valueOf(context.getConfiguration().get("Global_N"));
//實例化treeSet,並對其內容按照商品購買次數進行排序
treeSet = new TreeSet<String[]>(new Comparator<String[]>() {
@Override
public int compare(String[] o1, String[] o2) {
Integer count1 = Integer.valueOf(o1[1]);
Integer count2 = Integer.valueOf(o2[1]);
int result = 0;
if(count1>count2) {
result = -1;
}else if(count1<count2) {
result = 1;
}
return result;
}
});
}
/*
* 對相同的ProductId求和,並將其加到treeSet集合,treeSet只存放排名TopN的N條商品
* (non-Javadoc)
* @see org.apache.hadoop.mapreduce.Reducer#reduce(KEYIN, java.lang.Iterable, org.apache.hadoop.mapreduce.Reducer.Context)
*/
@Override
protected void reduce(Text k3_, Iterable<IntWritable> v3_, Context ctx)
throws IOException, InterruptedException {
//商品次數求和
Integer count = 0;
for(IntWritable val:v3_) {
count += val.get();
}
//將商品放入treeSet集合
String[] arys = {k3_.toString(),count.toString()};
treeSet.add(arys);
//treeSet記錄超過N條,就刪除最後一條數據
if(treeSet.size()>N) {
treeSet.pollLast();
}
}
/*
* cleanup在reduce調用結束後執行
* 這裏利用cleanup方法將treeSet集合中數據寫出去
* (non-Javadoc)
* @see org.apache.hadoop.mapreduce.Reducer#cleanup(org.apache.hadoop.mapreduce.Reducer.Context)
*/
@Override
protected void cleanup(Context context) throws IOException, InterruptedException {
for(String[] ary:treeSet) {
context.write(new Text(ary[0]), new IntWritable(Integer.valueOf(ary[1])));
}
}
}
/**
* * Reducer端:
* (1)Reducer端主要對Combiner端輸出的多個局部排行的TopN條數據進行全局排行匯總
* (2)由於最終輸出只會到一個文件,因此需要保障Reducer Tasks任務數為1
* (3)通過Reducer處理後,最終輸出為
* <k3> <v4>
* 21010185 <30>
* 21010117 <20>
* ... ...
* @author songjq
*
*/
static class HotProductTopNReducer extends Reducer<Text, IntWritable, Text, Text>{
//實現思路和Combiner一致
//存放TopN記錄 HashMap<"ProductId", count>
private TreeSet<String[]> treeSet = null;
//全局前N條商品排名
private Integer N = null;
@Override
protected void setup(Context context)
throws IOException, InterruptedException {
//獲取全局N
N = Integer.valueOf(context.getConfiguration().get("Global_N"));
//實例化treeSet,並對其內容按照商品購買次數進行排序
treeSet = new TreeSet<String[]>(new Comparator<String[]>() {
@Override
public int compare(String[] o1, String[] o2) {
Integer count1 = Integer.valueOf(o1[1]);
Integer count2 = Integer.valueOf(o2[1]);
int result = 0;
if(count1>count2) {
result = -1;
}else if(count1<count2) {
result = 1;
}
return result;
}
});
}
/*
* 對Combiner輸出的數據進行全局排行
* (non-Javadoc)
* @see org.apache.hadoop.mapreduce.Reducer#reduce(KEYIN, java.lang.Iterable, org.apache.hadoop.mapreduce.Reducer.Context)
*/
@Override
protected void reduce(Text k3, Iterable<IntWritable> v3,
Context ctx) throws IOException, InterruptedException {
//匯總Combiner任務輸出過來的商品次數
int count = 0;
for(IntWritable val:v3) {
count+=val.get();
}
String[] arys = {k3.toString(),String.valueOf(count)};
treeSet.add(arys);
//treeSet超過N條記錄,則刪除最後一個節點
if(treeSet.size()>N) {
treeSet.pollLast();
}
}
/*
* reduce方法結束後執行,這裏將treeSet結果集寫到HDFS
* (non-Javadoc)
* @see org.apache.hadoop.mapreduce.Reducer#cleanup(org.apache.hadoop.mapreduce.Reducer.Context)
*/
@Override
protected void cleanup(Context context)
throws IOException, InterruptedException {
context.write(new Text("熱銷商品排行Top"+N), new Text());
context.write(new Text("商品ID"), new Text("銷售數量"));
for(String[] ary:treeSet) {
context.write(new Text(ary[0]), new Text(ary[1]));
}
}
}
/**
* 提交任務Job
* @throws Exception
*/
@Test
public void HotProductTopNJob() throws Exception {
Configuration conf = new Configuration();
conf.set("Global_N", "10");
Job job = Job.getInstance(conf);
job.setJarByClass(HotProductTopN.class);
//Mapper
job.setMapperClass(HotProductTopNMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
//Combiner
job.setCombinerClass(HotProductTopNCombiner.class);
//Reducer
job.setReducerClass(HotProductTopNReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
//必須設置為1
job.setNumReduceTasks(1);
//輸入路徑
FileInputFormat.setInputPaths(job, "D:\\test\\tmp\\userTopN");
job.setInputFormatClass(TextInputFormat.class);
//輸出路徑
Path outpath = new Path("D:\\test\\tmp\\TopNout");
outpath.getFileSystem(conf).delete(outpath, true);
FileOutputFormat.setOutputPath(job, outpath);
job.waitForCompletion(true);
}
}
2018-08-07 期 MapReduce模擬實現熱銷商品排行