1. 程式人生 > >hadoop的第一個hello world程序(wordcount)

hadoop的第一個hello world程序(wordcount)

說明 內部 interrupt cat 兼容 str 文件 extend 代碼

在hadoop生態中,wordcount是hadoop世界的第一個hello world程序。

wordcount程序是用於對文本中出現的詞計數,從而得到詞頻,本例中的詞以空格分隔。

關於mapper、combiner、shuffler、reducer等含義請參照Hadoop權威指南裏的說明。

1、hadoop平臺搭建

參照之前的帖子搭一個偽分布式的hadoop就可以。鏈接:https://www.cnblogs.com/asker009/p/9126354.html

2、新建一個普通console程序,引入maven框架。

引入hadoop核心依賴,註意hadoop平臺用的3.1版本,引入的依賴盡量使用這個版本,以免出現版本兼容問題

        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-core</artifactId>
            <version>1.2.1</version>
        </dependency>

        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>3.1
.0</version> </dependency>

檢查版本

[hadoop@hp4411s ~]$ hadoop version
Hadoop 3.1.0
Source code repository https://github.com/apache/hadoop -r 16b70619a24cdcf5d3b0fcf4b58ca77238ccbe6d
Compiled by centos on 2018-03-30T00:00Z
Compiled with protoc 2.5.0
From source with checksum 14182d20c972b3e2105580a1ad6990
This command was run using 
/opt/hadoop/hadoop-3.1.0/share/hadoop/common/hadoop-common-3.1.0.jar

3、編寫mapper

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

/**
 * @Author: xu.dm
 * @Date: 2019/1/29 16:44
 * @Description: 讀取采用空格分隔的字符,並且每個詞計數為1
 */
public class WordCountMapper extends Mapper<Object, Text, Text, IntWritable> {
    @Override
    protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {
        String line = value.toString();
        String[] words = line.split(" ");
        for (String word : words) {
            System.out.println(word);
            context.write(new Text(word), new IntWritable(1));
        }
    }
}

4、編寫reducer

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

/**
 * @Author: xu.dm
 * @Date: 2019/1/29 16:44
 * @Description:累加由map傳遞過來的計數
 */
public class WordCountReducer extends Reducer<Text,IntWritable,Text,IntWritable> {
    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        int sum = 0;
        for(IntWritable val:values)
        {
            sum+=val.get();
        }
        context.write(key,new IntWritable(sum));
    }
}

5、關於shuffle過程,shuffle過程是由hadoop系統內部完成,shuffle是在map和reduce之間,對map的結果進行清洗、組合的過程。

借用hadoop權威指南裏的一個圖來類比說明

技術分享圖片

假設我們的數據樣本是:

技術分享圖片

那麽在map階段形成的數據是:

hadoop 1
hadoop 1
abc 1
abc 1
test 1
test 1
wow 1
wow 1
wow 1
... ...

經過shuffle後大概是這樣:

hadoop [1,1]
abc [1,1]
test [1,1]
wow [1,1,1]
... ...

其中還有排序什麽的。

shuffle其實就是性能關鍵點。shuffle的結果傳遞給reduce,reduce根據需求決定如何處理這些數據,本例中就是簡單的求和。

6、程序入口,任務調度執行等

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WordCount {

    public static void main(String[] args) throws Exception {
        if(args.length!=2)
        {
            System.err.println("使用格式:WordCount <input path> <output path>");
            System.exit(-1);
        }
        Configuration conf =new Configuration();
        Job job = Job.getInstance(conf,"word count");
        job.setJarByClass(WordCount.class);

        job.setMapperClass(WordCountMapper.class);
//        job.setCombinerClass(WordCountReducer.class);
        job.setReducerClass(WordCountReducer.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        FileInputFormat.addInputPath(job,new Path(args[0]));
        FileOutputFormat.setOutputPath(job,new Path(args[1]));

        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

7、放入hadoop平臺中執行

1、打成wordcount.jar包
2、上傳jar包到hadoop用戶目錄下
3、在hadoop用戶目錄下,用vi生成一個測試文檔wc.input,裏面隨意填入一些詞,用空格分隔詞。本例中是:
[hadoop@hp4411s ~]$ cat wc.input
hadoop hadoop abc abc test test wow
wow wow
dnf dnf dnf dnf
wow
hd cd
ef hs
xudemin wow wow
xudemin dnf dnf
dnf mytest
4、將wc.input上傳到hdfs文件系統中的/demo/input
hadoop fs -mkdir -p /demo/input
hadoop fs -put wc.input /demo/input
hadoop fs -ls /demo/input

5、用hadoop執行jar包,輸出結果到/demo/output,註意output目錄不能存在,hadoop會自己建立這個目錄,這是hadoop內部的一個機制,如果有這個目錄,程序無法執行。
hadoop jar wordcount.jar /demo/input /demo/output

6、查看運行結果,目錄下有_SUCCESS文件,表示執行成功,結果在part-r-00000中
[hadoop@hp4411s ~]$ hadoop fs -ls /demo/output
Found 2 items
-rw-r--r-- 1 hadoop supergroup 0 2019-01-30 03:42 /demo/output/_SUCCESS
-rw-r--r-- 1 hadoop supergroup 73 2019-01-30 03:42 /demo/output/part-r-00000

7、查看part-r-00000
[hadoop@hp4411s ~]$ hadoop fs -cat /demo/output/part-r-00000
abc 2
cd 1
dnf 7
ef 1
hadoop 2
hd 1
hs 1
mytest 1
test 2
wow 6
xudemin 2

8、關於combiner,上述執行job的時候,程序註釋了一段代碼// job.setCombinerClass(WordCountReducer.class);

在Hadoop中,有一種處理過程叫Combiner,與Mapper和Reducer在處於同等地位,但其執行的時間介於Mapper和Reducer之間,其實就是Mapper和Reducer的中間處理過程,Mapper的輸出是Combiner的輸入,Combiner的輸出是Reducer的輸入。

combiner是什麽作用?

因為hadoop的數據實際上是分布在各個不同的datanode,在mapper後,數據需要在從datanode上傳輸,如果數據很大很多,則會在網絡上花費不少時間,而combiner可以先對數據進行處理,減少傳輸量。
處理的方式是自定義的,本例中,combiner可以先對數據累加,實際上是執行了WordCountReducer類的內容,但是combine因為不是最後階段,所以它只是幫組程序先累加了部分數據,並沒有累加所有數據。
實際已經減少了mapper傳遞的kv數據量,最終到reducer階段需要累加的數據已經減少了。

註意:combine是不會改變最終的reducer的結果,它是一個優化手段

用hadoop權威指南裏天氣數據的例子更深入解釋:

例如獲取歷年的最高溫度例子,以書中所說的1950年為例,在兩個不同分區上的Mapper計算獲得的結果分別如下:

第一個Mapper結果:(1950, [0, 10, 20])

第二個Mapper結果:(1950, [25, 15])

如果不考慮Combiner,按照正常思路,這兩個Mapper的結果將直接輸入到Reducer中處理,如下所示:

MaxTemperature:(1950, [0, 10, 20, 25, 15])

最終獲取的結果是25。

如果考慮Combiner,按照正常思路,這兩個Mapper的結果將分別輸入到兩個不同的Combiner中處理,獲得的結果分別如下所示:

第一個Combiner結果:(1950, [20])

第二個Combiner結果:(1950, [25])

然後這兩個Combiner的結果會輸出到Reducer中處理,如下所示

MaxTemperature:(1950, [20, 25])

最終獲取的結果是25。

由上可知:這兩種方法的結果是一致的,使用Combiner最大的好處是節省網絡傳輸的數據,這對於提高整體的效率是非常有幫助的。

但是,並非任何時候都可以使用Combiner處理機制,例如不是求歷年的最高溫度,而是求平均溫度,則會有另一種結果。同樣,過程如下,

如果不考慮Combiner,按照正常思路,這兩個Mapper的結果將直接輸入到Reducer中處理,如下所示:

AvgTemperature:(1950, [0, 10, 20, 25, 15])

最終獲取的結果是14。

如果考慮Combiner,按照正常思路,這兩個Mapper的結果將分別輸入到兩個不同的Combiner中處理,獲得的結果分別如下所示:

第一個Combiner結果:(1950, [10])

第二個Combiner結果:(1950, [20])

然後這兩個Combiner的結果會輸出到Reducer中處理,如下所示

AvgTemperature:(1950, [10, 20])

最終獲取的結果是15。

由上可知:這兩種方法的結果是不一致的,所以在使用Combiner時,一定是優化的思路,但是不能影響到最終結果。

hadoop的第一個hello world程序(wordcount)