1. 程式人生 > >mapreduce的cleanUp和setUp的特殊用法(TopN問題)和常規用法

mapreduce的cleanUp和setUp的特殊用法(TopN問題)和常規用法

ado clean 進入 htm 這一 很多 set mapred net

一:特殊用法

我們上來不講普通用法,普通用法放到最後。我們來談一談特殊用法,了解這一用法,讓你的mapreduce編程能力提高一個檔次,毫不誇張!!!扯淡了,讓我們進入正題:

我們知道reduce和map都有一個局限性就是map是讀一行執行一次,reduce是每一組執行一次

但是當我們想全部得到數據之後,按照需求刪選然後再輸出怎麽辦?

這時候只使用map和reduce顯然是達不到目的的?

那該怎麽呢?這時候我們想到了 setUp和cleanUp的特性,只執行一次。

這樣我們對於最終數據的過濾,然後輸出要放在cleanUp中。這樣就能實現對數據,不一組一組輸出,而是全部拿到,最後過濾輸出。經典運用常見,mapreduce分析數據然後再求數據的topN 問題。

以求出單詞出現次數前三名為例

MAPREDUCE求topn問題

以wordcount為例,求出單詞出現數量前三名
數據:

love you do

you like me

me like you do

love you do

you like me

me like you do

love you do

you like me

me like you do

love you do

you like me

分析:

我們知道mapreduce有分許聚合的功能,所以第一步就是:

把每個單詞讀出來,然後在reduce中聚合,求出每個單詞出現的次數

但是怎麽控制只輸出前三名呢?

我們知道,map是讀一行執行一次,reduce是每一組執行一次

所以只用map,和reduce是無法控制輸出的次數的

但是我們又知道,無論map或者reduce都有 setUp 和cleanUp而且這兩個執行一次

所以我們可以在reduce階段把每一個單詞當做key,單詞出現的次數當做value,每一組存放到一個map裏面,此時只存,不寫出。在reduce的cleanUp階段map排序,然後輸出前三名

代碼:

maper代碼

public class WcMapper extends Mapper<LongWritable, Text, Text, IntWritable> {

@Override

protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context)

throws IOException, InterruptedException {

String[] split = value.toString().split(" ");

for (String word : split) {

context.write(new Text(word), new IntWritable(1));

}

}

}

reduce代碼

public class WcReducer extends Reducer<Text, IntWritable, Text, IntWritable>{

Map<String,Integer> map=new HashMap<String, Integer>();

protected void reduce(Text key, Iterable<IntWritable> iter,

Reducer<Text, IntWritable, Text, IntWritable>.Context conext) throws IOException, InterruptedException {

int count=0;

for (IntWritable wordCount : iter) {

count+=wordCount.get();

}

String name=key.toString();

map.put(name, count);

}

@Override

protected void cleanup(Reducer<Text, IntWritable, Text, IntWritable>.Context context)

throws IOException, InterruptedException {

//這裏將map.entrySet()轉換成list

List<Map.Entry<String,Integer>> list=new LinkedList<Map.Entry<String,Integer>>(map.entrySet());

//通過比較器來實現排序

Collections.sort(list,new Comparator<Map.Entry<String,Integer>>() {

//降序排序

@Override

public int compare(Entry<String, Integer> arg0,Entry<String, Integer> arg1) {

return (int) (arg1.getValue() - arg0.getValue());

}

});

for(int i=0;i<3;i++){

context.write(new Text(list.get(i).getKey()), new IntWritable(list.get(i).getValue()));

}

}}

job客戶端代碼

public class JobClient{
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
//conf.set("fs.defaultFS", "hdfs://wangzhihua1:9000/");
conf.set("mapreduce.framework", "local");
Job job = Job.getInstance(conf);

// 封裝本mr程序相關到信息到job對象中
//job.setJar("d:/wc.jar");
job.setJarByClass(JobClient.class);

// 指定mapreduce程序用jar包中的哪個類作為Mapper邏輯類
job.setMapperClass(WcMapper.class);
// 指定mapreduce程序用jar包中的哪個類作為Reducer邏輯類
job.setReducerClass(WcReducer.class);

// 告訴mapreduce程序,我們的map邏輯輸出的KEY.VALUE的類型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);

// 告訴mapreduce程序,我們的reduce邏輯輸出的KEY.VALUE的類型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);

// 告訴mapreduce程序,我們的原始文件在哪裏
FileInputFormat.setInputPaths(job, new Path("d:/wc/input/"));
// 告訴mapreduce程序,結果數據往哪裏寫
FileOutputFormat.setOutputPath(job, new Path("d:/wc/output/"));

// 設置reduce task的運行實例數
job.setNumReduceTasks(1); // 默認是1

// 調用job對象的方法來提交任務
job.submit();
}

}

二 常規用法

在hadoop的源碼中,基類Mapper類和Reducer類中都是只包含四個方法:setup方法,cleanup方法,run方法,map方法。如下所示:

技術分享圖片

其方法的調用方式是在run方法中,如下所示:

  可以看出,在run方法中調用了上面的三個方法:setup方法,map方法,cleanup方法。其中setup方法和cleanup方法默認是不做任何操作,且它們只被執行一次。但是setup方法一般會在map函數之前執行一些準備工作,如作業的一些配置信息等;cleanup方法則是在map方法運行完之後最後執行 的,該方法是完成一些結尾清理的工作,如:資源釋放等。如果需要做一些配置和清理的工作,需要在Mapper/Reducer的子類中進行重寫來實現相應的功能。map方法會在對應的子類中重新實現,就是我們自定義的map方法。該方法在一個while循環裏面,表明該方法是執行很多次的。run方法就是每個maptask調用的方

技術分享圖片

hadoop中的MapReduce框架裏已經預定義了相關的接口,其中如Mapper類下的方法setup()和cleanup()。

setup(),此方法被MapReduce框架僅且執行一次,在執行Map任務前,進行相關變量或者資源的集中初始化工作。若是將資源初始化工作放在方法map()中,導致Mapper任務在解析每一行輸入時都會進行資源初始化工作,導致重復,程序運行效率不高!

cleanup(),此方法被MapReduce框架僅且執行一次,在執行完畢Map任務後,進行相關變量或資源的釋放工作。若是將釋放資源工作放入方法map()中,也會導致Mapper任務在解析、處理每一行文本後釋放資源,而且在下一行文本解析前還要重復初始化,導致反復重復,程序運行效率不高!
所以,建議資源初始化及釋放工作,分別放入方法setup()和cleanup()中進行。

mapreduce的cleanUp和setUp的特殊用法(TopN問題)和常規用法