Hadoop之MapReduce實戰
原文地址: itweknow.cn/detail?id=6… ,歡迎大家訪問。
MapReduce是一種程式設計模型,"Map(對映)"和"Reduce(歸約)",是它們的主要思想,我們通過Map函式來分散式處理輸入資料,然後通過Reduce彙總結果並輸出。其實這個概念有點類似於我們Java8中的StreamApi,有興趣的同學也可以去看看。
MapReduce任務過程分為兩個處理階段,map階段和reduce階段。每個階段都以鍵-值對作為輸入輸出,鍵和值的型別由我們自己指定。通常情況map的輸入內容鍵是LongWritable型別,為某一行起始位置相對於檔案起始位置的偏移量;值是Text型別,為該行的文字內容。
前提條件
- 一個maven專案。
- 一臺執行著hadoop的linux機器或者虛擬機器,當然了hadoop叢集也可以,如果你還沒有的話可以戳這裡。
我們編寫一個MapReduce程式的一般步驟是:(1)map程式。(2)reduce程式。(3)程式驅動。下面我們就根據這個順序來寫一個簡單的示例,這個例子是用來統計檔案中每個字元出現的次數並輸出。
專案依賴
我們先來解決一下依賴問題,在pom.xml
中新增如下內容。
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId >hadoop-client</artifactId>
<version>${hadoop.version}</version>
</dependency>
複製程式碼
Map程式
我們繼承Mapper
類並重寫了其map方法。Map階段輸入的資料是從hdfs中拿到的原資料,輸入的key為某一行起始位置相對於檔案起始位置的偏移量,value為該行的文字。輸出的內容同樣也為鍵-值對,這個時候輸出資料的鍵值對的型別可以自己指定,在本例中key是Text型別的,value是LongWritable型別的。輸出的結果將會被髮送到reduce函式進一步處理。
public class CharCountMapper extends Mapper<LongWritable, Text, Text, LongWritable> {
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
// 將這一行文字轉為字元陣列
char[] chars = value.toString().toCharArray();
for (char c : chars) {
// 某個字元出現一次,便輸出其出現1次。
context.write(new Text(c + ""), new LongWritable(1));
}
}
}
複製程式碼
Reduce程式
我們繼承Reducer
類並重寫了其reduce方法。在本例中Reduce階段的輸入是Map階段的輸出,輸出的結果可以作為最終的輸出結果。相信你也注意到了,reduce方法的第二個引數是一個Iterable,MapReduce會將map階段中相同字元的輸出彙總到一起作為reduce的輸入。
public class CharCountReducer extends Reducer<Text, LongWritable, Text, LongWritable> {
@Override
protected void reduce(Text key, Iterable<LongWritable> values, Context context)
throws IOException, InterruptedException {
long count = 0;
for (LongWritable value : values) {
count += value.get();
}
context.write(key, new LongWritable(count));
}
}
複製程式碼
驅動程式
到目前為止,我們已經有了map程式和reduce程式,我們還需要一個驅動程式來執行整個作業。可以看到我們在這裡初始化了一個Job物件。Job物件指定整個MapReduce作業的執行規範。我們用它來控制整個作業的運作,在這裡我們指定了jar包位置還有我們的Map
程式、Reduce
程式、Map
程式的輸出型別、整個作業的輸出型別還有輸入輸出檔案的地址。
public class CharCountDriver {
public static void main(String[] args) throws Exception {
Configuration configuration = new Configuration();
Job job = Job.getInstance(configuration);
// Hadoop會自動根據驅動程式的類路徑來掃描該作業的Jar包。
job.setJarByClass(cn.itweknow.mr.CharCountDriver.class);
// 指定mapper
job.setMapperClass(CharCountMapper.class);
// 指定reducer
job.setReducerClass(CharCountReducer.class);
// map程式的輸出鍵-值對型別
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
// 輸出鍵-值對型別
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
// 輸入檔案的路徑
FileInputFormat.setInputPaths(job, new Path(args[0]));
// 輸入檔案路徑
FileOutputFormat.setOutputPath(job, new Path(args[1]));
boolean res = job.waitForCompletion(true);
System.exit(res?0:1);
}
}
複製程式碼
你會發現我們初始化了一個空的Configuration,但是並沒有進行任何的配置,其實當我們將其執行在一個執行著hadoop的機器上時,它會預設使用我們機器上的配置。在後續的文章中我也會寫一下如何在程式中進行配置。
執行MapReduce作業
-
打包作業,我們需要將我們的MapReduce程式打成jar包。
mvn package -Dmaven.test.skip=true 複製程式碼
生成的jar包我們可以在target目錄下找到。
-
將jar包複製到hadoop機器上。
-
在HDFS上準備好要統計的檔案,我準備的檔案在HDFS上的
/mr/input/
目錄下,內容如下。hello hadoop hdfs.I am coming. 複製程式碼
-
執行jar
hadoop jar mr-test-1.0-SNAPSHOT.jar cn.itweknow.mr.CharCountDriver /mr/input/ /mr/output/out.txt 複製程式碼
-
檢視結果
先檢視輸出目錄,結果如下,最終輸出的結果就存放在/mr/output/part-r-00000
檔案中。[email protected]test:~# hadoop fs -ls /mr/output Found 2 items -rw-r--r-- 1 root supergroup 0 2018-12-24 10:33 /mr/output/_SUCCESS -rw-r--r-- 1 root supergroup 68 2018-12-24 10:33 /mr/output/part-r-00000 複製程式碼
檢視結果檔案的具體內容:
[email protected]test:~# hadoop fs -cat /mr/output/part-r-00000 4 . 2 I 1 a 2 c 1 d 2 e 1 f 1 g 1 h 3 i 1 l 2 m 2 n 1 o 4 p 1 s 1 複製程式碼
最後,送上本文的原始碼地址,戳這裡哦。