hadoop的第一個hello world程序(wordcount)
在hadoop生態中,wordcount是hadoop世界的第一個hello world程序。
wordcount程序是用於對文本中出現的詞計數,從而得到詞頻,本例中的詞以空格分隔。
關於mapper、combiner、shuffler、reducer等含義請參照Hadoop權威指南裏的說明。
1、hadoop平臺搭建
參照之前的帖子搭一個偽分布式的hadoop就可以。鏈接:https://www.cnblogs.com/asker009/p/9126354.html
2、新建一個普通console程序,引入maven框架。
引入hadoop核心依賴,註意hadoop平臺用的3.1版本,引入的依賴盡量使用這個版本,以免出現版本兼容問題
<dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-core</artifactId> <version>1.2.1</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>3.1.0</version> </dependency>
檢查版本
[hadoop@hp4411s ~]$ hadoop version Hadoop 3.1.0 Source code repository https://github.com/apache/hadoop -r 16b70619a24cdcf5d3b0fcf4b58ca77238ccbe6d Compiled by centos on 2018-03-30T00:00Z Compiled with protoc 2.5.0 From source with checksum 14182d20c972b3e2105580a1ad6990 This command was run using/opt/hadoop/hadoop-3.1.0/share/hadoop/common/hadoop-common-3.1.0.jar
3、編寫mapper
import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; /** * @Author: xu.dm * @Date: 2019/1/29 16:44 * @Description: 讀取采用空格分隔的字符,並且每個詞計數為1 */ public class WordCountMapper extends Mapper<Object, Text, Text, IntWritable> { @Override protected void map(Object key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String[] words = line.split(" "); for (String word : words) { System.out.println(word); context.write(new Text(word), new IntWritable(1)); } } }
4、編寫reducer
import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; /** * @Author: xu.dm * @Date: 2019/1/29 16:44 * @Description:累加由map傳遞過來的計數 */ public class WordCountReducer extends Reducer<Text,IntWritable,Text,IntWritable> { @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; for(IntWritable val:values) { sum+=val.get(); } context.write(key,new IntWritable(sum)); } }
5、關於shuffle過程,shuffle過程是由hadoop系統內部完成,shuffle是在map和reduce之間,對map的結果進行清洗、組合的過程。
借用hadoop權威指南裏的一個圖來類比說明
假設我們的數據樣本是:
那麽在map階段形成的數據是:
hadoop 1 hadoop 1 abc 1 abc 1 test 1 test 1 wow 1 wow 1 wow 1 ... ...
經過shuffle後大概是這樣:
hadoop [1,1] abc [1,1] test [1,1] wow [1,1,1] ... ...
其中還有排序什麽的。
shuffle其實就是性能關鍵點。shuffle的結果傳遞給reduce,reduce根據需求決定如何處理這些數據,本例中就是簡單的求和。
6、程序入口,任務調度執行等
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class WordCount { public static void main(String[] args) throws Exception { if(args.length!=2) { System.err.println("使用格式:WordCount <input path> <output path>"); System.exit(-1); } Configuration conf =new Configuration(); Job job = Job.getInstance(conf,"word count"); job.setJarByClass(WordCount.class); job.setMapperClass(WordCountMapper.class); // job.setCombinerClass(WordCountReducer.class); job.setReducerClass(WordCountReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job,new Path(args[0])); FileOutputFormat.setOutputPath(job,new Path(args[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
7、放入hadoop平臺中執行
1、打成wordcount.jar包
2、上傳jar包到hadoop用戶目錄下
3、在hadoop用戶目錄下,用vi生成一個測試文檔wc.input,裏面隨意填入一些詞,用空格分隔詞。本例中是:
[hadoop@hp4411s ~]$ cat wc.input
hadoop hadoop abc abc test test wow
wow wow
dnf dnf dnf dnf
wow
hd cd
ef hs
xudemin wow wow
xudemin dnf dnf
dnf mytest
4、將wc.input上傳到hdfs文件系統中的/demo/input
hadoop fs -mkdir -p /demo/input
hadoop fs -put wc.input /demo/input
hadoop fs -ls /demo/input
5、用hadoop執行jar包,輸出結果到/demo/output,註意output目錄不能存在,hadoop會自己建立這個目錄,這是hadoop內部的一個機制,如果有這個目錄,程序無法執行。
hadoop jar wordcount.jar /demo/input /demo/output
6、查看運行結果,目錄下有_SUCCESS文件,表示執行成功,結果在part-r-00000中
[hadoop@hp4411s ~]$ hadoop fs -ls /demo/output
Found 2 items
-rw-r--r-- 1 hadoop supergroup 0 2019-01-30 03:42 /demo/output/_SUCCESS
-rw-r--r-- 1 hadoop supergroup 73 2019-01-30 03:42 /demo/output/part-r-00000
7、查看part-r-00000
[hadoop@hp4411s ~]$ hadoop fs -cat /demo/output/part-r-00000
abc 2
cd 1
dnf 7
ef 1
hadoop 2
hd 1
hs 1
mytest 1
test 2
wow 6
xudemin 2
8、關於combiner,上述執行job的時候,程序註釋了一段代碼// job.setCombinerClass(WordCountReducer.class);
在Hadoop中,有一種處理過程叫Combiner,與Mapper和Reducer在處於同等地位,但其執行的時間介於Mapper和Reducer之間,其實就是Mapper和Reducer的中間處理過程,Mapper的輸出是Combiner的輸入,Combiner的輸出是Reducer的輸入。
combiner是什麽作用?
因為hadoop的數據實際上是分布在各個不同的datanode,在mapper後,數據需要在從datanode上傳輸,如果數據很大很多,則會在網絡上花費不少時間,而combiner可以先對數據進行處理,減少傳輸量。
處理的方式是自定義的,本例中,combiner可以先對數據累加,實際上是執行了WordCountReducer類的內容,但是combine因為不是最後階段,所以它只是幫組程序先累加了部分數據,並沒有累加所有數據。
實際已經減少了mapper傳遞的kv數據量,最終到reducer階段需要累加的數據已經減少了。
註意:combine是不會改變最終的reducer的結果,它是一個優化手段
用hadoop權威指南裏天氣數據的例子更深入解釋:
例如獲取歷年的最高溫度例子,以書中所說的1950年為例,在兩個不同分區上的Mapper計算獲得的結果分別如下: 第一個Mapper結果:(1950, [0, 10, 20]) 第二個Mapper結果:(1950, [25, 15]) 如果不考慮Combiner,按照正常思路,這兩個Mapper的結果將直接輸入到Reducer中處理,如下所示: MaxTemperature:(1950, [0, 10, 20, 25, 15]) 最終獲取的結果是25。 如果考慮Combiner,按照正常思路,這兩個Mapper的結果將分別輸入到兩個不同的Combiner中處理,獲得的結果分別如下所示: 第一個Combiner結果:(1950, [20]) 第二個Combiner結果:(1950, [25]) 然後這兩個Combiner的結果會輸出到Reducer中處理,如下所示 MaxTemperature:(1950, [20, 25]) 最終獲取的結果是25。 由上可知:這兩種方法的結果是一致的,使用Combiner最大的好處是節省網絡傳輸的數據,這對於提高整體的效率是非常有幫助的。 但是,並非任何時候都可以使用Combiner處理機制,例如不是求歷年的最高溫度,而是求平均溫度,則會有另一種結果。同樣,過程如下, 如果不考慮Combiner,按照正常思路,這兩個Mapper的結果將直接輸入到Reducer中處理,如下所示: AvgTemperature:(1950, [0, 10, 20, 25, 15]) 最終獲取的結果是14。 如果考慮Combiner,按照正常思路,這兩個Mapper的結果將分別輸入到兩個不同的Combiner中處理,獲得的結果分別如下所示: 第一個Combiner結果:(1950, [10]) 第二個Combiner結果:(1950, [20]) 然後這兩個Combiner的結果會輸出到Reducer中處理,如下所示 AvgTemperature:(1950, [10, 20]) 最終獲取的結果是15。 由上可知:這兩種方法的結果是不一致的,所以在使用Combiner時,一定是優化的思路,但是不能影響到最終結果。
hadoop的第一個hello world程序(wordcount)