1. 程式人生 > >Hadoop之WordCount詳解

Hadoop之WordCount詳解

ride 開始 zookeepe ati 程序 form 數組 -c 狀態

花了好長時間查找資料理解、學習、總結 這應該是一篇比較全面的MapReduce之WordCount文章了 耐心看下去

1,創建本地文件

在hadoop-2.6.0文件夾下創建一個文件夾data,在其中創建一個text文件

mkdir data
cd data
vi hello

再在當前文件夾中創建一個apps文件夾,方便後續傳jar包

mkdir apps

將文本文件傳到HDFS的根目錄下

bin/hdfs dfs -put data/hello /

2,程序打jar包並上傳到apps目錄

3,執行Hadoop命令

bin/hadoop jar apps/WordClass-***.jar /hello /out

4,查看輸出結果

技術分享

將HDFS根目錄下的/out輸出文件傳到本地目錄中查看,通常有兩個文件:

技術分享

5,WordCount程序詳解

這部分是最重要的,但是也是最容易讓人犯暈的部分,涉及到許多mapreduce的原理,但是學習就是這樣,你越難吃透的東西,通常越重要

先把程序貼上來:

package cn.hx.test;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WordCountApp { //自定義的mapper,繼承org.apache.hadoop.mapreduce.Mapper public static class MyMapper extends org.apache.hadoop.mapreduce.Mapper<LongWritable, Text, Text, LongWritable>{ @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, LongWritable>.Context context) throws IOException, InterruptedException { String line
= value.toString();
//split 函數是用於按指定字符(串)或正則去分割某個字符串,結果以字符串數組形式返回,這裏按照"\t"來分割text文件中字符,即一個制表符,這就是為什麽我在文本中用了空格分割,導致最後的結果有很大的出入。 String[] splited
= line.split("\t");
      //foreach 就是 for(元素類型t 元素變量x:遍歷對象obj){引用x的java語句}
for (String word : splited) { context.write(new Text(word), new LongWritable(1)); } } } public static class MyReducer extends org.apache.hadoop.mapreduce.Reducer<Text, LongWritable, Text, LongWritable>{ @Override protected void reduce(Text k2, Iterable<LongWritable> v2s, Reducer<Text, LongWritable, Text, LongWritable>.Context context) throws IOException, InterruptedException { long count = 0L; for (LongWritable v2 : v2s) { count += v2.get(); } LongWritable v3 = new LongWritable(count); context.write(k2, v3); } } //客戶端代碼,寫完交給ResourceManager框架去執行 public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf, WordCountApp.class.getSimpleName()); //打成jar執行 job.setJarByClass(WordCountApp.class); //數據在哪裏? FileInputFormat.setInputPaths(job, args[0]); //使用哪個mapper處理輸入的數據? job.setMapperClass(MyMapper.class); //map輸出的數據類型是什麽? job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(LongWritable.class); //使用哪個reducer處理輸入的數據? job.setReducerClass(MyReducer.class); //reduce輸出的數據類型是什麽? job.setOutputKeyClass(Text.class); job.setOutputValueClass(LongWritable.class); //數據輸出到哪裏? FileOutputFormat.setOutputPath(job, new Path(args[1])); //交給yarn去執行,直到執行結束才退出本程序 job.waitForCompletion(true); } }

POM文件:

技術分享

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>cn.hx.test</groupId>
    <artifactId>WordClass</artifactId>
    <version>1.0-SNAPSHOT</version>
    <packaging>jar</packaging>

    <name>WordCount</name>
    <url>http://maven.apache.org</url>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
    </properties>

    <build>
        <pluginManagement>
            <plugins>
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-jar-plugin</artifactId>
                    <configuration>

                        <archive>
                            <manifest>
                                <mainClass>cn.hx.test.WordClass</mainClass>
                                <addClasspath>true</addClasspath>
                                <classpathPrefix>lib/</classpathPrefix>
                            </manifest>

                        </archive>
                        <classesDirectory>
                        </classesDirectory>
                    </configuration>
                </plugin>
            </plugins>
        </pluginManagement>
    </build>

    <dependencies>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>3.8.1</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>2.6.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>2.6.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-hdfs</artifactId>
            <version>2.6.0</version>
        </dependency>
        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>1.2.17</version>
        </dependency>
    </dependencies>
</project>

現在來一部分 一部分的理解程序:

要寫一個mapreduce程序,首先要實現一個map函數和reduce函數。我們看看map的方法:

protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, LongWritable>.Context context)

這裏有三個參數,前面兩個LongWritable key, Text value就是輸入的key和value,第三個參數Context context這是可以記錄輸入的key和value,例如:

context.write(new Text(word), new LongWritable(1));

此外context還會記錄map運算的狀態。

對於reduce函數的方法:

 protected void reduce(Text k2, Iterable<LongWritable> v2s,Reducer<Text, LongWritable, Text, LongWritable>.Context context) 

reduce函數的輸入也是一個key/value的形式,不過它的value是一個叠代器的形式Iterable<IntWritable> values,也就是說reduce的輸入是一個key對應一組的值的value,reduce也有context和map的context作用一致。

下面就是main函數的調用了:

 Configuration conf = new Configuration();

運行mapreduce程序前都要初始化Configuration,該類主要是讀取mapreduce系統配置信息,這些信息包括hdfs還有mapreduce,也就是安裝hadoop時候的配置文件例如:core-site.xml、hdfs-site.xml和mapred-site.xml等等文件裏的信息,我們開發mapreduce時候只是在填空,在map函數和reduce函數裏編寫實際進行的業務邏輯,其它的工作都是交給mapreduce框架自己操作的,但是至少我們要告訴它怎麽操作啊,比如hdfs在哪裏啊,mapreduce的jobstracker在哪裏啊,而這些信息就在conf包下的配置文件裏。

接下來關於Job的使用代碼的註釋中寫得很清楚了

6,MapReduce的運行機制

技術分享

仔細研究圖 可以更加利於理解

首先講講物理實體,參入mapreduce作業執行涉及4個獨立的實體:

  1. 客戶端(client):編寫mapreduce程序,配置作業,提交作業,這就是程序員完成的工作;
  2. JobTracker:初始化作業,分配作業,與TaskTracker通信,協調整個作業的執行;
  3. TaskTracker:保持與JobTracker的通信,在分配的數據片段上執行Map或Reduce任務,TaskTracker和JobTracker的不同有個很重要的方面,就是在執行任務時候TaskTracker可以有n多個,JobTracker則只會有一個(JobTracker只能有一個就和hdfs裏namenode一樣存在單點故障,我會在後面的mapreduce的相關問題裏講到這個問題的)
  4. Hdfs:保存作業的數據、配置信息等等,最後的結果也是保存在hdfs上面

MapReduce的運行機制:

首先是客戶端要編寫好mapreduce程序,配置好mapreduce的作業也就是job,接下來就是提交job了,提交job是提交到JobTracker上的,這個時候JobTracker就會構建這個job,具體就是分配一個新的job任務的ID值,接下來它會做檢查操作,這個檢查就是確定輸出目錄是否存在,如果存在那麽job就不能正常運行下去,JobTracker會拋出錯誤給客戶端,接下來還要檢查輸入目錄是否存在,如果不存在同樣拋出錯誤,如果存在JobTracker會根據輸入計算輸入分片(Input Split),如果分片計算不出來也會拋出錯誤,至於輸入分片後面會做講解的,這些都做好了JobTracker就會配置Job需要的資源了。分配好資源後,JobTracker就會初始化作業,初始化主要做的是將Job放入一個內部的隊列,讓配置好的作業調度器能調度到這個作業,作業調度器會初始化這個job,初始化就是創建一個正在運行的job對象(封裝任務和記錄信息),以便JobTracker跟蹤job的狀態和進程。

初始化完畢後,作業調度器會獲取輸入分片信息(input split),每個分片創建一個map任務。接下來就是任務分配了,這個時候tasktracker會運行一個簡單的循環機制定期發送心跳給jobtracker,心跳間隔是5秒,程序員可以配置這個時間,心跳就是jobtracker和tasktracker溝通的橋梁,通過心跳,jobtracker可以監控tasktracker是否存活,也可以獲取tasktracker處理的狀態和問題,同時tasktracker也可以通過心跳裏的返回值獲取jobtracker給它的操作指令。任務分配好後就是執行任務了。在任務執行時候jobtracker可以通過心跳機制監控tasktracker的狀態和進度,同時也能計算出整個job的狀態和進度,而tasktracker也可以本地監控自己的狀態和進度。當jobtracker獲得了最後一個完成指定任務的tasktracker操作成功的通知時候,jobtracker會把整個job狀態置為成功,然後當客戶端查詢job運行狀態時候(註意:這個是異步操作),客戶端會查到job完成的通知的。如果job中途失敗,mapreduce也會有相應機制處理,一般而言如果不是程序員程序本身有bug,mapreduce錯誤處理機制都能保證提交的job能正常完成。

下面我從邏輯實體的角度講解mapreduce運行機制,這些按照時間順序包括:輸入分片(input split)、map階段、combiner階段、shuffle階段和reduce階段

1. 輸入分片(input split):在進行map計算之前,mapreduce會根據輸入文件計算輸入分片(input split),每個輸入分片(input split)針對一個map任務,輸入分片(input split)存儲的並非數據本身,而是一個分片長度和一個記錄數據的位置的數組,輸入分片(input split)往往和hdfs的block(塊)關系很密切,假如我們設定hdfs的塊的大小是64mb,如果我們輸入有三個文件,大小分別是3mb、65mb和127mb,那麽mapreduce會把3mb文件分為一個輸入分片(input split),65mb則是兩個輸入分片(input split)而127mb也是兩個輸入分片(input split),換句話說我們如果在map計算前做輸入分片調整,例如合並小文件,那麽就會有5個map任務將執行,而且每個map執行的數據大小不均,這個也是mapreduce優化計算的一個關鍵點。

2. map階段:就是程序員編寫好的map函數了,因此map函數效率相對好控制,而且一般map操作都是本地化操作也就是在數據存儲節點上進行;

3. combiner階段:combiner階段是程序員可以選擇的,combiner其實也是一種reduce操作,因此我們看見WordCount類裏是用reduce進行加載的。Combiner是一個本地化的reduce操作,它是map運算的後續操作,主要是在map計算出中間文件前做一個簡單的合並重復key值的操作,例如我們對文件裏的單詞頻率做統計,map計算時候如果碰到一個hadoop的單詞就會記錄為1,但是這篇文章裏hadoop可能會出現n多次,那麽map輸出文件冗余就會很多,因此在reduce計算前對相同的key做一個合並操作,那麽文件會變小,這樣就提高了寬帶的傳輸效率,畢竟hadoop計算力寬帶資源往往是計算的瓶頸也是最為寶貴的資源,但是combiner操作是有風險的,使用它的原則是combiner的輸入不會影響到reduce計算的最終輸入,例如:如果計算只是求總數,最大值,最小值可以使用combiner,但是做平均值計算使用combiner的話,最終的reduce計算結果就會出錯。

4. shuffle階段:將map的輸出作為reduce的輸入的過程就是shuffle了,這個是mapreduce優化的重點地方。這裏我不講怎麽優化shuffle階段,講講shuffle階段的原理,因為大部分的書籍裏都沒講清楚shuffle階段。Shuffle一開始就是map階段做輸出操作,一般mapreduce計算的都是海量數據,map輸出時候不可能把所有文件都放到內存操作,因此map寫入磁盤的過程十分的復雜,更何況map輸出時候要對結果進行排序,內存開銷是很大的,map在做輸出時候會在內存裏開啟一個環形內存緩沖區,這個緩沖區專門用來輸出的,默認大小是100mb,並且在配置文件裏為這個緩沖區設定了一個閥值,默認是0.80(這個大小和閥值都是可以在配置文件裏進行配置的),同時map還會為輸出操作啟動一個守護線程,如果緩沖區的內存達到了閥值的80%時候,這個守護線程就會把內容寫到磁盤上,這個過程叫spill,另外的20%內存可以繼續寫入要寫進磁盤的數據,寫入磁盤和寫入內存操作是互不幹擾的,如果緩存區被撐滿了,那麽map就會阻塞寫入內存的操作,讓寫入磁盤操作完成後再繼續執行寫入內存操作,前面我講到寫入磁盤前會有個排序操作,這個是在寫入磁盤操作時候進行,不是在寫入內存時候進行的,如果我們定義了combiner函數,那麽排序前還會執行combiner操作。

每次spill操作也就是寫入磁盤操作時候就會寫一個溢出文件,也就是說在做map輸出有幾次spill就會產生多少個溢出文件,等map輸出全部做完後,map會合並這些輸出文件。這個過程裏還會有一個Partitioner操作,對於這個操作很多人都很迷糊,其實Partitioner操作和map階段的輸入分片(Input split)很像,一個Partitioner對應一個reduce作業,如果我們mapreduce操作只有一個reduce操作,那麽Partitioner就只有一個,如果我們有多個reduce操作,那麽Partitioner對應的就會有多個,Partitioner因此就是reduce的輸入分片,這個程序員可以編程控制,主要是根據實際key和value的值,根據實際業務類型或者為了更好的reduce負載均衡要求進行,這是提高reduce效率的一個關鍵所在。到了reduce階段就是合並map輸出文件了,Partitioner會找到對應的map輸出文件,然後進行復制操作,復制操作時reduce會開啟幾個復制線程,這些線程默認個數是5個,程序員也可以在配置文件更改復制線程的個數,這個復制過程和map寫入磁盤過程類似,也有閥值和內存大小,閥值一樣可以在配置文件裏配置,而內存大小是直接使用reduce的tasktracker的內存大小,復制時候reduce還會進行排序操作和合並文件操作,這些操作完了就會進行reduce計算了。

5. reduce階段:和map函數一樣也是程序員編寫的,最終結果是存儲在hdfs上的。

Mapreduce的相關問題

① jobtracker的單點故障:jobtracker和hdfs的namenode一樣也存在單點故障,單點故障一直是hadoop被人詬病的大問題,為什麽hadoop的做的文件系統和mapreduce計算框架都是高容錯的,但是最重要的管理節點的故障機制卻如此不好,我認為主要是namenode和jobtracker在實際運行中都是在內存操作,而做到內存的容錯就比較復雜了,只有當內存數據被持久化後容錯才好做,namenode和jobtracker都可以備份自己持久化的文件,但是這個持久化都會有延遲,因此真的出故障,任然不能整體恢復,另外hadoop框架裏包含zookeeper框架,zookeeper可以結合jobtracker,用幾臺機器同時部署jobtracker,保證一臺出故障,有一臺馬上能補充上,不過這種方式也沒法恢復正在跑的mapreduce任務。

② 做mapreduce計算時候,輸出一般是一個文件夾,而且該文件夾是不能存在,而且這個檢查做的很早,當我們提交job時候就會進行,mapreduce之所以這麽設計是保證數據可靠性,如果輸出目錄存在reduce就搞不清楚你到底是要追加還是覆蓋,不管是追加和覆蓋操作都會有可能導致最終結果出問題,mapreduce是做海量數據計算,一個生產計算的成本很高,例如一個job完全執行完可能要幾個小時,因此一切影響錯誤的情況mapreduce是零容忍的。

③ Mapreduce還有一個InputFormat和OutputFormat,我們在編寫map函數時候發現map方法的參數是之間操作行數據,沒有牽涉到InputFormat,這些事情在我們new Path時候mapreduce計算框架幫我們做好了,而OutputFormat也是reduce幫我們做好了,我們使用什麽樣的輸入文件,就要調用什麽樣的InputFormat,InputFormat是和我們輸入的文件類型相關的,mapreduce裏常用的InputFormat有FileInputFormat普通文本文件,SequenceFileInputFormat是指hadoop的序列化文件,另外還有KeyValueTextInputFormat。OutputFormat就是我們想最終存儲到hdfs系統上的文件格式了,這個根據你需要定義了,hadoop有支持很多文件格式。

Hadoop之WordCount詳解