1. 程式人生 > >2018-08-07 期 MapReduce模擬實現熱銷商品排行

2018-08-07 期 MapReduce模擬實現熱銷商品排行

utf java 數據傳遞 進行 sta tex except 調用 class

package cn.sjq.mr.sort;

import java.io.IOException;

import java.util.Comparator;

import java.util.TreeSet;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.Mapper;

import org.apache.hadoop.mapreduce.Reducer;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import org.junit.Test;

/**

* MapReduce實現熱銷商品TopN排行

* 這裏按照商品購買次數排名在前面的為熱銷商品

* 輸入數據:

* order.data1...10 10個訂單文件,每個文件5000-10000條的購買記錄,格式如下:

* orderid userid payment productid

* c13a009e-a950-42f6-8eab-8e28d1406fe0,U10102000139,1008, 21010185

c5d2a564-619c-4e1a-a350-7ce981bbc22d,U10102000191,1357, 21010117

1d762edd-0044-4773-a413-ab0440081b1e,U10102000150,2173, 21010124

e82c2848-6d6e-4fdf-8d7d-83059376846b,U10102000162,2310, 21010197

......

* 最終輸出數據(TopN):

熱銷商品排行Top10

商品ID 銷售數量

21010129 871

21010182 852

21010153 839

21010131 837

21010142 835

21010159 833

21010117 830

21010110 828

21010141 824

21010198 823

*

* 實現邏輯:

* Mapper端:

* (1)實現數據分片,將讀取的數據分片通過map方法處理後輸出到Combiner

* (2)數據的輸出格式

* <k2>Text <v2>Intwritable

* 21010185 <1>

* 21010117 <1>

* 21010185 <1>

* ... ...

* Combiner端:

* (1)Combiner是一種特殊的Reducer,使用Combiner需要註意不要改變程序原有邏輯,且保障Mapper端和Reducer端的數據類型一致

* (2)這裏使用Combiner主要是為了實現

* 1)每個商品購買次數求和

* 2)對於每個局部的Combiner任務,對接收到Mapper端輸出的數據處理後進行局部TopN排行,這樣可以避免不必要的數據傳遞到Reducer端,同時提高Reducer程序的執行效率

* (3)TopN中的N由Hadoop的configuration中set(K,V)來設置,這樣可以保障運行在各個機器上的任務可以獲取到這個全局唯一的N值

* (4)處理後數據輸出格式如下:

* <k2`> <v2`>

* 21010185 <30>

* 21010117 <20>

* ... ...

* 註意:這裏輸出為局部TopN排行

*

* Reducer端:

* (1)Reducer端主要對Combiner端輸出的多個局部排行的TopN條數據進行全局排行匯總

* (2)由於最終輸出只會到一個文件,因此需要保障Reducer Tasks任務數為1

* (3)通過Reducer處理後,最終輸出為

* <k3> <v4>

* 21010185 <30>

* 21010117 <20>

* ... ...

*

* @author songjq

*

*/

public class HotProductTopN {

/**

* Mapper端:

* (1)實現數據分片,將讀取的數據分片通過map方法處理後輸出到Combiner

* (2)數據的輸出格式

* <k2>Text <v2>Intwritable

* 21010185 <1>

* 21010117 <1>

* 21010185 <1>

* ... ...

* @author songjq

*

*/

static class HotProductTopNMapper extends Mapper<LongWritable, Text, Text, IntWritable>{

private Text tkey = new Text();

private IntWritable tvalue = new IntWritable();

/*

* 讀取文件分片,並處理後輸出到Combiner

* (non-Javadoc)

* @see org.apache.hadoop.mapreduce.Mapper#map(KEYIN, VALUEIN, org.apache.hadoop.mapreduce.Mapper.Context)

*/

@Override

protected void map(LongWritable k1, Text v1, Context context) throws IOException, InterruptedException {

//讀入一行數據

String line = v1.toString();

//分詞處理

String[] order = line.split(",");

if(null!=order && order.length == 4) {

//商品ID

String productId = order[3];

tkey.set(productId);

tvalue.set(1);

//通過context將數據傳遞到Combiner

context.write(tkey, tvalue);

}else {

return;

}

}

}

/**

* * Combiner端:

* (1)Combiner是一種特殊的Reducer,使用Combiner需要註意不要改變程序原有邏輯,且保障Mapper端和Reducer端的數據類型一致

* (2)這裏使用Combiner主要是為了實現

* 1)每個商品購買次數求和

* 2)對於每個局部的Combiner任務,對接收到Mapper端輸出的數據處理後進行局部TopN排行,這樣可以避免不必要的數據傳遞到Reducer端,同時提高Reducer程序的執行效率

* (3)TopN中的N由Hadoop的configuration中set(K,V)來設置,這樣可以保障運行在各個機器上的任務可以獲取到這個全局唯一的N值

* (4)處理後數據輸出格式如下:

* <k2`> <v2`>

* 21010185 <30>

* 21010117 <20>

* ... ...

* 註意:這裏輸出為局部TopN排行

* @author songjq

*

*/

static class HotProductTopNCombiner extends Reducer<Text, IntWritable, Text, IntWritable>{

private TreeSet<String[]> treeSet = null;

//全局前N條商品排名

private Integer N = null;

/*

* 初始化方法,在reduce方法調用前執行,只會被執行一次

* 通過該方法,我們可以獲取全局N變量的值,且可以初始化TopN的treeset集合。

* (non-Javadoc)

* @see org.apache.hadoop.mapreduce.Mapper#setup(org.apache.hadoop.mapreduce.Mapper.Context)

*/

@Override

protected void setup(Context context) throws IOException, InterruptedException {

//獲取全局N

N = Integer.valueOf(context.getConfiguration().get("Global_N"));

//實例化treeSet,並對其內容按照商品購買次數進行排序

treeSet = new TreeSet<String[]>(new Comparator<String[]>() {

@Override

public int compare(String[] o1, String[] o2) {

Integer count1 = Integer.valueOf(o1[1]);

Integer count2 = Integer.valueOf(o2[1]);

int result = 0;

if(count1>count2) {

result = -1;

}else if(count1<count2) {

result = 1;

}

return result;

}

});

}

/*

* 對相同的ProductId求和,並將其加到treeSet集合,treeSet只存放排名TopN的N條商品

* (non-Javadoc)

* @see org.apache.hadoop.mapreduce.Reducer#reduce(KEYIN, java.lang.Iterable, org.apache.hadoop.mapreduce.Reducer.Context)

*/

@Override

protected void reduce(Text k3_, Iterable<IntWritable> v3_, Context ctx)

throws IOException, InterruptedException {

//商品次數求和

Integer count = 0;

for(IntWritable val:v3_) {

count += val.get();

}

//將商品放入treeSet集合

String[] arys = {k3_.toString(),count.toString()};

treeSet.add(arys);

//treeSet記錄超過N條,就刪除最後一條數據

if(treeSet.size()>N) {

treeSet.pollLast();

}

}

/*

* cleanup在reduce調用結束後執行

* 這裏利用cleanup方法將treeSet集合中數據寫出去

* (non-Javadoc)

* @see org.apache.hadoop.mapreduce.Reducer#cleanup(org.apache.hadoop.mapreduce.Reducer.Context)

*/

@Override

protected void cleanup(Context context) throws IOException, InterruptedException {

for(String[] ary:treeSet) {

context.write(new Text(ary[0]), new IntWritable(Integer.valueOf(ary[1])));

}

}

}

/**

* * Reducer端:

* (1)Reducer端主要對Combiner端輸出的多個局部排行的TopN條數據進行全局排行匯總

* (2)由於最終輸出只會到一個文件,因此需要保障Reducer Tasks任務數為1

* (3)通過Reducer處理後,最終輸出為

* <k3> <v4>

* 21010185 <30>

* 21010117 <20>

* ... ...

* @author songjq

*

*/

static class HotProductTopNReducer extends Reducer<Text, IntWritable, Text, Text>{

//實現思路和Combiner一致

//存放TopN記錄 HashMap<"ProductId", count>

private TreeSet<String[]> treeSet = null;

//全局前N條商品排名

private Integer N = null;

@Override

protected void setup(Context context)

throws IOException, InterruptedException {

//獲取全局N

N = Integer.valueOf(context.getConfiguration().get("Global_N"));

//實例化treeSet,並對其內容按照商品購買次數進行排序

treeSet = new TreeSet<String[]>(new Comparator<String[]>() {

@Override

public int compare(String[] o1, String[] o2) {

Integer count1 = Integer.valueOf(o1[1]);

Integer count2 = Integer.valueOf(o2[1]);

int result = 0;

if(count1>count2) {

result = -1;

}else if(count1<count2) {

result = 1;

}

return result;

}

});

}

/*

* 對Combiner輸出的數據進行全局排行

* (non-Javadoc)

* @see org.apache.hadoop.mapreduce.Reducer#reduce(KEYIN, java.lang.Iterable, org.apache.hadoop.mapreduce.Reducer.Context)

*/

@Override

protected void reduce(Text k3, Iterable<IntWritable> v3,

Context ctx) throws IOException, InterruptedException {

//匯總Combiner任務輸出過來的商品次數

int count = 0;

for(IntWritable val:v3) {

count+=val.get();

}

String[] arys = {k3.toString(),String.valueOf(count)};

treeSet.add(arys);

//treeSet超過N條記錄,則刪除最後一個節點

if(treeSet.size()>N) {

treeSet.pollLast();

}

}

/*

* reduce方法結束後執行,這裏將treeSet結果集寫到HDFS

* (non-Javadoc)

* @see org.apache.hadoop.mapreduce.Reducer#cleanup(org.apache.hadoop.mapreduce.Reducer.Context)

*/

@Override

protected void cleanup(Context context)

throws IOException, InterruptedException {

context.write(new Text("熱銷商品排行Top"+N), new Text());

context.write(new Text("商品ID"), new Text("銷售數量"));

for(String[] ary:treeSet) {

context.write(new Text(ary[0]), new Text(ary[1]));

}

}

}

/**

* 提交任務Job

* @throws Exception

*/

@Test

public void HotProductTopNJob() throws Exception {

Configuration conf = new Configuration();

conf.set("Global_N", "10");

Job job = Job.getInstance(conf);

job.setJarByClass(HotProductTopN.class);

//Mapper

job.setMapperClass(HotProductTopNMapper.class);

job.setMapOutputKeyClass(Text.class);

job.setMapOutputValueClass(IntWritable.class);

//Combiner

job.setCombinerClass(HotProductTopNCombiner.class);

//Reducer

job.setReducerClass(HotProductTopNReducer.class);

job.setOutputKeyClass(Text.class);

job.setOutputValueClass(Text.class);

//必須設置為1

job.setNumReduceTasks(1);

//輸入路徑

FileInputFormat.setInputPaths(job, "D:\\test\\tmp\\userTopN");

job.setInputFormatClass(TextInputFormat.class);

//輸出路徑

Path outpath = new Path("D:\\test\\tmp\\TopNout");

outpath.getFileSystem(conf).delete(outpath, true);

FileOutputFormat.setOutputPath(job, outpath);

job.waitForCompletion(true);

}

}


2018-08-07 期 MapReduce模擬實現熱銷商品排行