1. 程式人生 > >MapReduce編程模型詳解(基於Windows平臺Eclipse)

MapReduce編程模型詳解(基於Windows平臺Eclipse)

lib read 找到 lin @override ext logs 設置 otf

本文基於Windows平臺Eclipse,以使用MapReduce編程模型統計文本文件中相同單詞的個數來詳述了整個編程流程及需要註意的地方。不當之處還請留言指出。

前期準備

hadoop集群的搭建

編程環境搭建

1、將官網下載的hadoop安裝包解壓,並記住下圖所示的目錄

技術分享圖片

2、創建java project,右鍵工程--->build path--->Configure build path

技術分享圖片

3、進行如下圖操作

技術分享圖片

4、新建MapReduce編程要使用的環境包,如下圖操作

技術分享圖片

5、將下圖所示的commom包以及lib文件夾下所有的包導入

技術分享圖片

6、將下圖所示的hdfs包和lib文件夾下所有的包導入

技術分享圖片

7、將下圖所示的包以及lib文件夾下所有的包導入

技術分享圖片

8、將下圖所示的包以及lib文件夾下的所有包導入

技術分享圖片

9、將新建的好的hadoop_mr庫導入

技術分享圖片

編寫map階段的map函數

package com.cnblogs._52mm;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
/**
 * 第一個參數:默認情況下是mapreduce框架所讀文件的起始偏移量,類型為Long,在mr框架中類型為LongWritable
 * 第二個參數:默認情況下是框架所讀到的內容,類型為String,在mr框架中為Text
 * 第三個參數:框架輸出數據的key,在該單詞統計的編程模型中輸出的是單詞,類型為String,在mr框架中為Text
 * 第四個參數:框架輸出數據的value,在此是每個所對應單詞的個數,類型為Integer,在mr框架中為IntWritable
 * @author Administrator
 *
 */
public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
//  map階段的邏輯
//  對每一行輸入數據調用一次我們自定義的map()方法
    @Override
    protected void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException {
//      將傳入的每一行數據轉為String
        String line = value.toString();
//      根據空格將單詞劃分
        String[] words = line.split(" ");
        
        for(String word: words){
            //將word作為輸出的key,1作為輸出的value    <word,1>
            context.write(new Text(word), new IntWritable(1));
        }
//      mr框架不會在map處理完一行數據就發給reduce,會先將結果收集
    }
}

編寫reduce階段的reduce函數

package com.cnblogs._52mm;

import java.io.IOException;
import java.util.Iterator;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

/**
 * reduce的輸入是map的輸出
 * 第一個和第二個參數分別是map的輸出類型
 * 第三個參數是reduce程序處理完後的輸出值key的類型,單詞,為Text類型
 * 第四個參數是輸出的value的類型,每個單詞所對應的總數,為IntWritable類型
 * @author Administrator
 *
 */
public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable>{
    /**
     * map輸出的內容相當於:
     *          <i,1><i,1><i,1><i,1><i,1><i,1>...
     *          <am,1><am,1><am,1><am,1><am,1><am,1>...
     *          <you,1><you,1><you,1><you,1><you,1><you,1>...   
     */
    
    
    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        int count = 0;
        
//      Iterator<IntWritable> iterator = values.iterator();
//      while(iterator.hasNext()){
//          count += iterator.next().get();
//      }
        
        for(IntWritable value: values){
            count += value.get();
        }
        
        context.write(key, new IntWritable(count));
    }
}

編寫驅動類

package com.cnblogs._52mm;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;



/**
 * 相當於yarn集群的客戶端,封裝mapreduce的相關運行參數,指定jar包,提交給yarn
 * @author Administrator
 *
 */
public class WordCountDriver {
    
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        
        Configuration conf = new Configuration();
//      將默認配置文件傳給job
        Job job = Job.getInstance(conf);
        
//      告訴yarn  jar包在哪
        job.setJarByClass(WordCountDriver.class);
        
        //指定job要使用的map和reduce
        job.setMapperClass(WordCountMapper.class);
        job.setReducerClass(WordCountReducer.class);
        
//      指定map的輸出類型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
        
//      指定最終輸出的類型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        
//      job的輸入數據所在的目錄
//      第一個參數:給哪個job設置
//      第二個參數:輸入數據的目錄,多個目錄用逗號分隔
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        
//      job的數據輸出在哪個目錄
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        
        //將jar包和配置文件提交給yarn
//      submit方法提交作業就退出該程序
//      job.submit();
        
//      waitForCompletion方法提交作業並等待作業執行
//      true表示將作業信息打印出來,該方法會返回一個boolean值,表示是否成功運行
        boolean result = job.waitForCompletion(true);
//      mr運行成功返回true,輸出0表示運行成功,1表示失敗
        System.exit(result?0:1);
    }
    
}

運行MapReduce程序

1、打jar包(鼠標右鍵工程-->Export)

技術分享圖片

技術分享圖片

2、上傳到hadoop集群上(集群中的任何一臺都行),運行

#wordcounrt.jar是剛剛從eclipse打包上傳到linux的jar包
#com.cnblogs._52mm.WordCountDriver是驅動類的全名
#hdfs的/wordcount/input目錄下是需要統計單詞的文本
#程序輸出結果保存在hdfs的/wordcount/output目錄下(該目錄必須不存在,由hadoop程序自己創建)
hadoop jar wordcount.jar com.cnblogs._52mm.WordCountDriver /wordcount/input /wordcount/output

技術分享圖片

3、也可用yarn的web界面查看作業信息

技術分享圖片

ps:在這裏可以看到作業的詳細信息,失敗還是成功一目了然

4、查看輸出結果

hadoop fs -cat /wordcount/output/part-r-00000

也可查看hdfs的web界面
技術分享圖片

報錯解決

Error: java.io.IOException: Unable to initialize any output collector
    at org.apache.hadoop.mapred.MapTask.createSortingCollector(MapTask.java:412)
    at org.apache.hadoop.mapred.MapTask.access$100(MapTask.java:81)
    at org.apache.hadoop.mapred.MapTask$NewOutputCollector.<init>(MapTask.java:695)
    at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:767)
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:341)
    at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:163)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:415)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1692)
    at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158)

該錯誤是由於編寫代碼時impor了錯誤的包導致的(我錯在Text包導錯了),仔細檢查一下,改正後重新打jar包上傳。

 Output directory hdfs://mini1:9000/wordcount/output already exists

顯然,該錯誤是由於reduce的輸出目錄必須是不存在才行,不能自己在hdfs上手動創建輸出目錄。

總結

  • map函數和reduce函數的輸入輸出類型要用hadoop提供的基本類型(可優化網絡序列化傳輸)
  • LongWritable類型相當於java的Long類型,IntWritable類型相當於java的Integer類型,Text類型相當於java的String類型
  • reduce函數的輸入類型等於map函數的輸出類型
  • Job對象控制整個作業的執行。
  • job對象的setJarByClass()方法傳遞一個類,hadoop利用這個類來找到相應的jar文件
  • 運行作業前,輸出目錄不應該存在,否則hadoop會報錯(為了防止覆蓋了之前該目錄下已有的數據)
  • setOutputKeyClass()和setOutputValueClass()控制map和reduce函數的輸出類型,這兩個函數的輸出類型一般相同,如果不同,則通過setMapOutputKeyClass()和setMapOutputValueClass()來設置map函數的輸出類型。
  • 輸入數據的類型默認是TextInputFormat(文本),可通過InputFormat類來改變。
  • Job中的waitForCompletion()方法提交作業並等待執行完成,傳入true作為參數則會將作業的詳細信息打印出來。作業執行成功返回true,執行失敗返回false。

作者:py小傑

博客地址:http://www.cnblogs.com/52mm/

本文版權歸作者和博客園共有,歡迎轉載,但未經作者同意必須保留此段聲明,且在文章頁面明顯開頭給出原文鏈接。

MapReduce編程模型詳解(基於Windows平臺Eclipse)