1. 程式人生 > >java:MapReduce原理及入門例項:wordcount

java:MapReduce原理及入門例項:wordcount

在這裡插入圖片描述

MapReduce原理

MapperTask -> Shuffle(分割槽排序分組) -> ReducerTask

在這裡插入圖片描述

MapReduce執行步驟

  1. Map處理任務
    1. 讀取檔案每一行,解析成<key、value>,呼叫map函式
    2. 處理邏輯對key、value處理,行成新的key、value
    3. 資料分割槽
  2. Reduce處理任務
    1. 拷貝map任務輸出到reduce節點,對map任務輸出合併,排序
    2. 處理邏輯處理key、value,行成新的key、value
    3. 儲存到檔案中

wordcount示例

  1. 準備檔案
    vim word.txt
hello Jack
hello Tom
hello Jimi
hello Mili
hello Make
  1. 上傳檔案
hadoop fs -put word.txt /word.txt
hadoop fs -ls /   # 檢視
  1. 執行任務
cd hadoop-2.8.5/share/hadoop/mapreduce

hadoop jar hadoop-mapreduce-examples-2.8.5.jar wordcount /word.txt /wcout
  1. 檢視任務結果
hadoop fs -ls /wcout
hadoop fs -cat /wcout/part-r-00000

Jack    1
Jimi    1
Make    1
Mili    1
Tom     1
hello   5

java示例

  1. mapper
package mr;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.
io.IOException; /** * 繼承Mapper 實現map計算 * 傳遞的引數需要實現序列化,通過網路傳輸 */ public class MapDemo extends Mapper<LongWritable, Text, Text, LongWritable>{ protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 接收資料 String line = value.toString(); // 切分單詞 String[] words = line.split(" "); // 將每個單詞轉為數字 for(String word: words) { context.write(new Text(word), new LongWritable(1)); } } }
  1. reducer
package mr;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;

/**
 * 繼承Reducer,實現reduce計算
 */
public class ReduceDemo extends Reducer<Text, LongWritable, Text, LongWritable> {

    @Override
    protected void reduce(Text key, Iterable<LongWritable> values, Context context)
            throws IOException, InterruptedException
    {
        // 定義計數器
        long count = 0;

        // 統計
        for (LongWritable counter : values)
        {
            count += counter.get();
        }

        // 輸出結果
        context.write(key, new LongWritable(count));
    }
}

  1. job
package mr;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

/**
 * 統計單詞個數
 * 執行:hadoop jar hdfsdemo.jar
 * 根據實際路徑指定輸入輸出檔案 
 */
public class WordCount {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        // 構建Job物件
        Job job = Job.getInstance(new Configuration());

        // 注意:main方法所在類
        job.setJarByClass(WordCount.class);

        // 設定輸入檔案路徑
        FileInputFormat.setInputPaths(job, new Path(args[0]));

        // 設定Mapper屬性
        job.setMapperClass(MapDemo.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(LongWritable.class);

        // 設定Reducer屬性
        job.setReducerClass(ReduceDemo.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(LongWritable.class);

        // 設定輸出檔案路徑
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        // 提交任務
        job.waitForCompletion(true);

    }
}

WordCount類的打包為jar,上傳至伺服器,執行

hadoop jar hdfsdemo.jar /word.txt /out

檢視輸出檔案,和haoop中自帶的wordcount輸出一致

Jack	1
Jimi	1
Make	1
Mili	1
Tom	    1
hello	5

總結

匯入依賴jar包
hadoop-2.8.5/share/hadoop/mapreduce/

自定義任務

  1. 分析業務邏輯,確定輸入輸出樣式
  2. 繼承Mapper
  3. 繼承Reducer
  4. 通過job物件組裝Mapper和Reducer