1. 程式人生 > >Hadoop - HDFS - MapReduce - YARN - HA詳解

Hadoop - HDFS - MapReduce - YARN - HA詳解

Hadoop

為什麼要有Hadoop?

      從計算機誕生到現今,積累了海量的資料,這些海量的資料有結構化、半結構化、非

結構的資料,並且這些海量的資料儲存和檢索就成為了一大問題。

      我們都知道大資料技術難題在於一個數據複雜性、資料量、大規模的資料計算。

Hadoop就是為了解決這些問題而出現的。

 

Hadoop的誕生

      Doug Cutting是Lucene的作者,當時Lucene面臨和谷歌同樣的問題,就是海量的資料儲存和檢索,於是就誕生了Nutch。

    在這之後,谷歌的大牛就為解決這個問題發了三篇論文(GFS、Map-Reduce、BigTable),這三篇論文總體表達的意思就是部署多臺廉價的伺服器叢集,通過分散式的方式將海量資料儲存在這個叢集上,然後利用叢集上的所有機器進行資料計算,這樣谷歌就不用買很多很貴的伺服器,只需要把普通的機器組合在一起。

Doug Cutting等人就去研究這三篇論文,發現價值巨大,於是Doug Cutting等人在Nutch上實現了GFS和Map-Reduce,使得Nutch的效能飆升。

      於是Doug Cutting等人就把這兩部分納入到Hadoop專案中,主要還是為了將Hadoop專案作為一個大資料整體化的解決方案。

      所以為什麼後面就出現了Hadoop而不是在Nutch上去做整體化大資料解決方案。

      這三篇論文對應Hadoop的元件:

      GFS -> HDFS                   檔案系統

      Map-Reduce -> MR             計算框架

      BigTable -> Hbase               資料庫系統

 

 

什麼是Hadoop?

      Hadoop是Apache下的一個分散式系統基礎架構,主要是為了解決海量資料儲存和海量的資料計算問題。

      在這個基礎之上發展出了的更多的技術,使得Hadoop稱為大資料技術生態圈之一。

     

 

Hadoop發行版本

1、Apache版本最原始的版本

2、Clodera版本,在大型網際網路企業中用的比較多,軟體免費,通過服務收費。

3、Hortonworks文件比較好

 

 

特點

      高可靠:維護多個副本,假設計算元素和儲存出現故障時,可以對失敗節點重新分佈處理

      高擴充套件:在叢集間分配任務資料,可方便的擴充套件數以千計的節點

      高效性:並行工作

      高容錯:自動儲存多個副本,並且能夠對失敗任務重新分配

 

Hadoop組成

HDFS:一個高可靠高吞吐量的分散式檔案系統

   NameNode(nn):儲存檔案的元資料,如:檔名、檔案目錄結構等資訊

   DataNode(dn)在檔案系統儲存檔案塊資料,以及資料的校驗和,也就是真正儲存檔案內容的,只是檔案大的時候會切割成一小塊一小塊的。

   SecondayNameNode(2nn)用於監控HDFS狀態的輔助後臺程式,每隔一段時間就獲取HDFS的快照,就是備份和監控狀態

 

 

Yarn:作業排程與叢集資源管理框架。(Hadoop2.0加入)

   ResourceManager(rm)處理客戶端請求、啟動和監控MRAppMaster、監控NodeManager,以及資源分配和排程。

   NodeManager(nn)單個節點上的資源管理、處理來自ResourceManager的命令,處理來自MRAppMaster的命令。

   MRAppMaster資料切分、為應用程式申請資源,並分配內部任務、任務監控和容錯。

   Container對任務執行環境的抽象,封裝了CPU、記憶體等多維資源以及環境變數、啟動命令等任務執行相關資訊(hadoop內部檔案操作命令和Liunx差不多)

 

 

MapReduce:分散式離線平行計算框架。

   Map階段:並行處理資料

   Reduce階段:對Map階段處理的結果資料進行彙總

   Common:支援其他模組的工具模組

 

 

理解Hadoop組成

      有一個建築工地的建造時間很緊急,設立了一個支援小組,支援各個小分隊(Common),首先1000包水泥,這些水泥要進行儲存(HDFS),假設這些水泥有防水的和不防水的,防水的水泥存到倉庫1(HDFS-dn),不防水的儲存到倉庫2(HDFS-dn),那麼就要進行記錄,哪些水泥存放到哪裡了(HDFS-nn),因為趕工期擔心水泥可能會因為潮溼那些問題,出現不可用,所以又準備了1000包水泥,並且每天都要對這些水泥進行檢查(HDFS-2nn)。

      如果一個小分隊要領取水泥就要和工地倉儲管理人員申請,倉儲管理人員同意了,就要向公司申請人員來搬水泥(Yarn-MRAppMaster),開始調動這些人員搬運水泥(Yarn-rm),小分隊領取到了水泥之後,開始決定給修外牆的多少包水泥(Yarn-nm)。

      修外牆小組就開始拿著水泥幹活了(MapReduce-Map),直到整棟樓的外牆修好了(MapReduce-Reduce),第N棟也是如此(MapReduce-Map)。

 

 

Hadoop內為什麼要如此劃分?

資料存放在Hadoop,那麼Hadoop必然需要對資料進行管理,如果沒有一個專門管理資料儲存的元件或資料運算的元件,全部都融合在一個東西里面就會顯得很臃腫,並且元件之間只需要通過介面進行溝通,那麼各自的元件就可以僅僅自身的需求做優化等,那麼就不會影響到其他的元件。

各自的元件只需要做好自己的事情,對外提供介面接收相應的資料及返回資料,只要符合我元件規範的就執行,不符合就不執行,而不需要關心其他,專心做自己的事情,也可以使得元件之間可以單獨的執行。

 

 

Hadoop目錄

      bin程式級命令(hdfs、Yarn等)

      etc配置檔案

      include類庫等檔案

      lib類庫等檔案

      libexec類庫等檔案

      sbinhadoop系統命令(關閉、啟動等)

      share官方提供的案例等

 

 

Hadoop執行模式

      本地模式:不需要啟動單獨程序,直接執行,一般測試和開發使用,一臺機器就可以執行,如果是在Liunx,跑的是本地,可以直接通過命令執行相應的jar包。

     

偽分散式模式:等同於分散式,但只有一個節點,具有叢集的配置資訊和執行,由於偽分散式只有一臺機器,可以不啟動Yarn,那麼也就算是Hadoop的HDFS啟動了,直接執行MapReduce程式的話,結果都在HDFS上,不在是在本地,如果需要交由YARN上進行資源排程和分配任務,則需要配置Yarn地址,以及指定資料獲取方式。

 

      完全分散式模式:多個節點一起執行,可以指定不同節點幹不同的活,比如機器1幹NameNode的活,機器2幹ResourceManger的活。

 

注意:啟動NameNode時,DataNode會記錄NameNode資訊(id),當快取的NameNode記錄刪除了,這個時候啟動就會報錯,這個時候就需要將NameNode格式化(刪除所有資料),之後在重新啟動。

 

HDFS

HDFS是什麼?

      HDFS就是一個分散式檔案儲存系統,通過目錄樹來定位檔案,由於分散式特點那麼叢集中的伺服器就有各自的角色。

 

 

特點

      低成本:由於是眾多伺服器組成的,那麼在某伺服器掛了,只需要付出一臺廉價的伺服器。

      高容錯性:HDFS是由眾多伺服器實現的分散式儲存,每個檔案都會有冗餘備份,那麼如果儲存資料的某個伺服器掛了,那麼還有備份的資料,允許伺服器出現故障。

      高吞吐量:HDFS是一次寫多次讀的訪問模型,不允許修改檔案,並簡化了資料的一致性問題。

      就近原則:在資料附近執行程式,也體現出來移動計算比移動資料效率高。

      可移植性:HDFS可以實現不同平臺之間的移植。

 

 

應用場景

      一次寫入,多次讀取,且不支援檔案的修改。

      適合資料分析場景,不適合網盤應用。

 

 

HDFS資料塊

      HDFS的檔案在物理上是分塊儲存的,1.x版本的資料塊預設大小是64MB,2.x版本的資料塊預設塊大小是128MB,這個值是可以通過配置引數(dfs.blocksize)進行調整的。

      HDFS的塊比磁碟的塊大,目的就在於要減少定址的開銷(標準:定址時間只佔傳輸時間的1%),如果塊設定的夠大,從磁碟傳輸資料的時間明顯就大於定位這個塊開始位置所需要的檔案,因此傳輸一個由多個塊組成的檔案的時間取決於磁碟傳輸速率。

 

 

HDFS常用命令(和Liunx差不多)

基本命令:hadoop fs

檢視幫助:hadoop fs 或 hadoop fs -help(詳情)

建立目錄:hadoop fs -mkdir /usr

檢視目錄資訊:hadoop fs -ls /usr

本地剪下,貼上到叢集:hadoop fs -moveFromLocal test.txt /usr/

追加一個檔案到已存在檔案的末尾:hadoop fs -appendToFile test2.txt /usr/test.txt

顯示檔案內容:hadoop fs -cat /usr/test.txt

顯示一個檔案末尾:hadoop fs -tail /usr/ test.txt

以字元形式列印一個檔案內容:hadoop fs -text /usr/test.txt

修改檔案所屬許可權(-chgrp、-chomd、chown)(liunx一樣用法): hadoop fs -chmod 777 /usr/test.txt

從本地複製到hdfs:hadoop fs -copyFormLocal text.txt /usr/test

hdfs複製到本地:hadoop fs -copyToLocal /usr/ text.txt ./

從hdfs路徑拷貝到hdfs另一個路徑:hadoop fs -cp /usr/dir1 /usr/dir2

在hdfs目錄中移動檔案:hadoop fs -mv /usr/test.txt /usr/dir

從hdfs下載檔案到本地:hadoop fs -get /usr/test.txt ./

合併下載多個檔案:hadoop fs -getmerge /usr /*.txt ./result.txt

上傳檔案等於copyFormLocal:hadoop fs -put test.txt /usr

刪除檔案或資料夾:hadoop fs -rmr /usr/test.txt

刪除空目錄:hadoop fs -rmdir /usr/test3

統計檔案系統可用空間資訊(-h格式化資訊):hadoop fs -df -h

統計資料夾大小資訊:hadoop fs -du -h /

統計制定目錄下的檔案節點資料量(巢狀級,當前檔案個數,大小):hadoop fs -count -h /usr

設定檔案的副本數:hadoop fs -setrep 3 /usr/test.txt

 

 

NameNode

NameNode和SecondaryNameNode工作機制

第一階段:NameNode的工作

      1、第一次啟動namenode格式化後,建立fsimage和edits檔案,如果不是第一次啟動,直接載入編輯日誌和映象檔案到記憶體。

      2、客戶端對元資料進行操作請求

      3、NameNode記錄操作日誌,更新滾動日誌。

      4、NameNode在記憶體中對資料進行操作

 

第二階段:Secondary NameNode的工作

      1、Secondary NameNode詢問NameNode是否需要checkpoint,直接帶回NameNode檢查結果。

      2、Secondary NameNode請求執行checkpoint

      3、NameNode滾動正在寫的eits日誌

      4、將滾動前的編輯日誌和映象檔案拷貝到Secondary NameNode

      5、Secondary NameNode載入編輯日誌和映象檔案到記憶體並且合併

      6、生成新的映象檔案fsimage.chkpoint

      7、拷貝fsimage.chkpoint到NameNode

      8、NameNode將fsimage.chkpoint重新命名為fsimage

 

 

說明

Fsimage檔案:HDFS檔案系統元資料的一個永久檢查點,其中包含HDFS檔案系統所有目錄和檔案,以及node序列化資訊。

 

Edits檔案:存放HDFS檔案系統的所有更新操作,檔案系統客戶端執行的所有寫操作日誌都會記錄到edits檔案。

 

Secondary NameNode在主NameNode掛了,可以從Secondary NameNode中恢復資料,但是由於同步的條件限制,會出現資料不一致。

 

 

DataNode

工作機制

 

 

 

叢集安全模式

      NameNode啟動時,受限將映象檔案載入進去記憶體,並編輯日誌檔案中的各項操作,一旦記憶體中成功建立檔案系統元資料映象,則建立一個新的fsimage檔案和一個空的編輯日誌。

      此時的NameNode開始監聽DataNode請求,但此刻,NameNode是執行在安全模式,則此時NameNode檔案系統對於客戶端來說是只可讀

      系統中資料塊檔案並不是由NameNode維護的,而是以塊列表的形式儲存在DataNode,在系統正常操作期間,NameNode會在記憶體中保留所有塊位置影像資訊。

      在安全模式下,各個DataNode會向NameNode傳送最新的塊列表資訊,NameNode瞭解到足夠多的塊資訊之後,即可高效執行檔案系統。

      如果滿足最小複本條件,NameNode會在30秒後就退出安全模式,最小複本條件指的是整個檔案系統中99%的塊都滿足最小複本級別,在啟動一個剛剛格式化的HDFS叢集時,因為系統中還沒有塊,所以NameNode不會進入安全模式。

      叢集啟動完成後自動退出安全模式。

 

安全模式的應用場景

      銀行對賬、維護。

 

 

Java操作HDFS

Demo

 

public static void main(String[] args) throws IllegalArgumentException, IOException, InterruptedException, URISyntaxException {

       

        //配置資訊

        Configuration configuration = new Configuration();

 

        //獲取檔案系統

        FileSystem fileSystem = FileSystem.get(new URI("hdfs://hadoop102:8020"), configuration, "levi");

       

        //拷貝本地檔案到叢集

        fileSystem.copyFromLocalFile(new Path("e:/hdfs/test.txt"), new Path("/usr/hdfs/test.txt"));

       

        //關閉

        fileSystem.close();

}

   

 

HDFS資料流

IO流寫流程

 

IO流方式上傳檔案 (Java)

 

public void fileUpload() throws IOException, InterruptedException, URISyntaxException {

        //配置

        Configuration configuration = new Configuration();

       

        //檔案系統

        FileSystem fileSystem = FileSystem.get(new URI("hdfs://hadoop102:8020"),configuration,"levi");

       

        //獲取輸出流(上傳到伺服器) - 伺服器

        FSDataOutputStream fsDataOutputStream = fileSystem.create(new Path("/usr/hdfs/test03.txt"));

       

        //檔案輸入流(本地上傳)

        FileInputStream fileInputStream = new java.io.FileInputStream(new File("E:/hdfs/test03.txt"));

       

        //流對接

        IOUtils.copyBytes(fileInputStream, fsDataOutputStream, configuration);

       

        fsDataOutputStream.hflush();

        IOUtils.closeStream(fileInputStream);

        IOUtils.closeStream(fsDataOutputStream);

       

        //關閉

        fileSystem.close();

       

    }

 

IO流讀流程

 

 

IO流方式下載檔案 (Java)

public void readFile() throws IOException, InterruptedException, URISyntaxException {

       //配置

       Configuration configuration = new Configuration();

      

       //檔案系統

       FileSystem fileSystem = FileSystem.get(new URI("hdfs://hadoop102:8020"), configuration, "levi");

      

       //輸入流(下載) 伺服器

       FSDataInputStream fsDataInputStream = fileSystem.open(new Path("/usr/hdfs/hadoop-2.7.2.tar.gz"));

      

       //輸出(本地)

       FileOutputStream fileOutputStream = new FileOutputStream(new File("E:/hdfs/block.txt"));

      

       //流對接

       //--- 第一塊

       /*byte[] buf = new byte[1024];

       for (int i = 0; i < 1024*128; i++) {

           fsDataInputStream.read(buf);

           fileOutputStream.write(buf);

       }

       //關閉

       fsDataInputStream.close();

       fileOutputStream.close();*/

 

       //--- 第二塊

       fsDataInputStream.seek(1024 * 1024 * 128);//定位這個位置開始讀

       IOUtils.copyBytes(fsDataInputStream, fileOutputStream, 1024);

       IOUtils.closeStream(fileOutputStream);

       IOUtils.closeStream(fsDataInputStream);

      

       fileSystem.close();

    }

 

副本節點選擇

      在海量資料的處理中,節點之間的資料傳輸速率是很重要,特別是在頻寬很稀缺的情況下,而節點和節點之間的距離越遠,那麼必然會影響資料的傳輸。

      在成千的伺服器叢集中,Hadoop是怎麼選擇副本節點呢?

 

低版本Hadoop

第一個副本在客戶端所處的節點上,但是如果客戶端是在叢集外,隨機選取一個節點

第二個副本和第一個副本位於不同機架的隨機節點上,也就是不和第一個副本在相同機架。

第三個副本和第二個副本位於相同機架,節點隨機

 

Hadoop2.5版本以上

      第一個副本在客戶端所處節點上。如果客戶端在叢集外,隨機選一個

      第二個副本和第一個副本位於相同機架,隨機節點

      第三個副本位於不同機架,節點隨機

 

 

HDFS誤區

小檔案儲存

每個檔案均按照塊儲存,每個塊的元資料儲存在NamNode的記憶體中(一個檔案/目錄/檔案塊一般佔有150位元組的元資料記憶體空間),因此Hadoop儲存小檔案會非常低效,因為大量小檔案會耗盡NameNode中大部分記憶體,但儲存小檔案所需要的磁碟容量和儲存這些檔案原始內容所需要的磁碟空間相比也不會增多。

 

例如:上傳一個檔案1MB,那麼這個檔案會在HDFS中的一個塊儲存著,這個塊預設是128MB,那麼是不是佔用了128MB的磁碟空間呢?

      每一個塊128MB只是HDFS的邏輯上的劃分,所以在磁碟佔用空間還是1MB,只有當一個或多個檔案在一個塊內超過128MB,之後將這個檔案進行切割。

     

 

 

副節點處理

HDFS是先把當前這個節點處理完,在去處理副本節點的。

 

 

回收站

      回收站預設是不啟用的,在core-site.xml檔案中的配置fs.trash.interval預設是為0.

 

 

HDFS全過程

 

MapReduce

MapReduce是什麼?

      MapReduce是一個分散式運算程式的程式設計框架,是使用者開發基於Hadoop的資料分析應用的核心框架。

      MapReduce核心功能是將使用者編寫的業務邏輯程式碼和自帶預設元件整合成一個完整的分散式運算程式,併發的執行在一個Hadoop叢集上。

 

作用

      由於硬體資源限制,海量資料無法在單機上處理,單機版程式擴充套件到叢集進行分散式運算,增加程式的複雜度和開發難度。

    MapReduce框架就是要使得開發人員開源將絕大部分工作集中在業務邏輯的開發上,而分散式運算的複雜性交由MapReduce來處理。

 

 

特點

      適合資料複雜度運算

      不適合演算法複雜度運算

      不適合實時計算、流式計算

 

 

核心思想

 

分散式的運算程式最少需要分成兩個階段:

第一個階段:MapTask併發例項,完全並行執行,互不相干

第二個階段:ReduceTask併發例項,互不相干,但是他們的資料依賴於上一個階段的所有MapTask併發例項的輸出

 

MapReduce程式設計模型只能包含一個Map階段和Reduce階段,如果使用者的業務邏輯非常複雜,那就只能多個MapReduce程式,序列執行。

 

總結

      Map並行處理任務(運算)。

      Reduce:等待相關的所有Map處理完任務,在將任務資料彙總輸出。

      MRAppMaster負責整個程式的過程排程和狀態協調。

 

 

MapReduce程序

      一個完整的MapReduce程式在分散式允許時有三類例項程序:

      MRAppMaster負責整個程式的過程排程和狀態協調。

      MapTask負責Map階段的整個資料處理流程。

      ReduceTask負責Reduce階段的整個資料處理流程。

 

 

序列化

      序列化就是把記憶體中的物件轉換成位元組序列(或其他資料傳輸協議),以便於儲存(持久化)和網路傳輸。

      而序列化就是Map到Reducer的橋樑。

 

      Java序列化是一個重量級的序列化框架(Serializable),使用這個框架進行序列化後會附帶很多額外資訊(各種校驗資訊、header等),不便於網路傳輸,所以Hadoop自己開發了一套序列化機制(Writable),精確、高效。

     

Java型別

Hadoop Writable型別

boolean

BooleanWritable

byte

ByteWritable

int

IntWritable

float

FloatWritable

long

LongWritable

double

DoubleWritable

string

Text

map

MapWritable

array

ArrayWritable

 

備註:自定義的反序列類中的write方法和read方法中DataOutput和DataInput這兩個類所提供的方法中,對應Java型別String的方法,分別是writeUTF()和readUTF()。

 

例項(統計單詞)

 

public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {

   

    private Text key = new Text();

   

    @Override

    protected void map(LongWritable key, Text value, Context context)

           throws IOException, InterruptedException {

       //讀取每一行

       String line = value.toString();

      

       //切割出每一個單詞

       String [] words = line.split("\t");

      

       //將讀取到的每一個單詞都寫出,並且值都為1,因為是在map計算完後到reduce進行彙總,形成Key 多個Value

       for (String word : words) {

           //每次檔案內的讀取一行都呼叫一次map,那樣就形成了呼叫多次map,那樣的話就不用建立多個key物件了

           this.key.set(word);

           context.write(this.key, new IntWritable(1));

       }

    }

}

 

public class WorkCountReducer extends Reducer<Text, IntWritable, Text, IntWritable>{

   

    @Override

    protected void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException {

       //這裡就形成 多個值的彙總結果,那麼將這個值多個進行彙總後,統一歸併到一個key,就形成了一個key對應多個value

       int count = 0;

       for (IntWritable value : values) {

           count += value.get();

       }

       context.write(key, new IntWritable(count));

      

    }

}

 

 

public class WordCountDriver {

    public static void main(String[] args) throws Exception {

       //配置

       Configuration configuration = new Configuration();

      

       //任務執行

       Job job = Job.getInstance(configuration);

       job.setJarByClass(WordCountDriver.class);

      

       //運算類和彙總類

       job.setMapperClass(WordCountMapper.class);

       job.setReducerClass(WorkCountReducer.class);

      

       //運算和彙總輸入和輸出

       job.setMapOutputKeyClass(Text.class);

       job.setMapOutputValueClass(IntWritable.class);

      

       //最終輸出

       job.setOutputKeyClass(Text.class);

       job.setOutputValueClass(IntWritable.class);

      

       //運算檔案的輸入和結果輸出

       FileInputFormat.setInputPaths(job, new Path("E:/hadooptest/mapreduce/input"));

       FileOutputFormat.setOutputPath(job, new Path("E:/hadooptest/mapreduce/output"));

      

       //提交

       job.submit();

      

       //等待

       boolean result = job.waitForCompletion(true);

       System.exit(result ? 0 : 1);

    }

}

 

程式流程分析

1、MapReduce程式讀取輸入目錄存放的相應檔案。

2、客戶端在submit方法執行之前,獲取到待處理的資料資訊,讓後根據急群眾引數配置形成一個任務分配規劃。

      1、建立連線

      2、建立提交任務的代理(本地:LocalRunner、遠端:YarnRunner)

      3、建立給叢集提交資料的stag路徑

      4、獲取到任務id,並建立任務路徑

      5、獲取到任務jar包,拷貝jar包到叢集(這個jar就是程式執行的業務程式碼)

      6、計算切片,生成切片規劃檔案

computeSliteSize(Math.max(minSize,Math.max(maxSize,blocksize)))=blocksize=128MB

      7、提交任務,返回提交狀態

 

3、客戶端提交job.split、jar包、job.xml等檔案給Yarn,Yarn中的resourcemanager啟動MRAppMater。

4、MRAppMater啟動後根據job的描述資訊,計算出需要的MapTask例項數量,然後向叢集申請機器,啟動相應數量的Map Task程序。

5、MapTask利用客戶指定的InputFormat來讀取資料,形成KV對。

6、MapTask將輸入KV對傳遞給客戶定義的map()方法,做邏輯運算。

7、map()運算完畢後將運算結果的KV對,手機到MapTask快取。

8、MapTask快取中的KV對按照K分割槽排序後不斷寫到磁碟檔案。

9、MRAppMaster監控到所有MapTask程序任務完成後,會根據使用者指定的引數啟動相應數量的ReduceTask程序,並告知ReduceTask程序要處理的資料分割槽。

10、ReduceTask程序啟動後,根據MRAppMaster告知待處理資料所在位置,從N臺MapTask執行所在的機器上獲取到N個MapTask輸出結果檔案,並在本地執行重新歸併排序,按照相同Key的KV為一個組,呼叫客戶定義的reduce()方法進行邏輯運算。

11、ReduceTask運算完畢後,呼叫客戶指定的OuputFormat將結果資料輸出(檔案)到外部儲存。

 

說明:

      切片是邏輯上的切片

      規劃檔案就是裡面描述了切多少個片,每個片是怎麼樣的。

 

 

資料切片

      MapTask的並行任務是否越多越好?並行度是如何決定的?MapTask到底開多少個合適?

     

      1、一個job的map()階段並行度(MapTask開幾個),由客戶端在提交job時決定。

      2、每一個Split切片分配一個MapTask並行例項處理。

      3、預設情況下切片大小=塊大小(blocksize)

      4、切片時不考慮資料集整體,而是針對每一個檔案單獨切片(這個是邏輯上的劃分)

 

 

切片流程

      1、獲取到資料儲存目錄

      2、找到要便利處理目錄下的每一個檔案

      3、讀取第一個檔案test.txt(257MB)

           1、獲取檔案大小

           2、計算分片大小,每次切片時,都要判斷剩下的部分是否大於塊大小的1.1倍,大於就在劃分一個塊切片

           切片:

第一塊:128MB

                 第二塊:129MB / 128MB = 1.0078125

                 1.0078125 < 1.1 = 不在切片,反之繼續切

                 原始碼:computeSliteSize(Math.max(minSize,Math.max(naxSize,blocksize)));

           3、將切片資訊寫到一個切片規劃檔案(說明檔案)中

           4、整個切片的核心過程在於getSplit()方法(看submit()原始碼)中完成,資料切片只是邏輯上對輸入資料進行切片,並不會在磁碟上,將檔案切分進行儲存。

           InputSplit只是記錄了分片的元資料資訊。比如:起始位置、長度、所在的節點列表等。

 

注意:塊是HDFS上物理儲存的資料,切片只是邏輯上的劃分。

           5、提交切片規劃檔案(說明檔案)到Yarn上,Yarn上的MrAppMaster就根據切片規劃檔案(說明檔案)計算開啟的MapTask個數(多少個切片就多少個MapTask)。

 

 

FileInputFormat中預設的切片機制

      1、簡單按照檔案內容長度切片

      2、切片大小,預設是塊大小

      3、切片時不考慮資料集整體性,而是逐個檔案的單獨切片,迴圈遍歷每一個檔案。

     

      MaxSize(切片最大值):如果比塊大小還小,則會讓切片變小。

MinSize(切片最小值):如果比塊大小還大,則會讓切片變得比塊還大。

     

假設:塊大小128MB

                      MaxSize設為100MB

                      切片後的儲存佔塊大小100MB

 

 

小檔案切片處理

      如果有大量的小檔案,而每一個檔案都是一個單獨的切片,都會各自交給一個MapTask處理,那麼需要開啟大量的MapTask,則會產生大量的MapTask,導致處理效率低下。

 

解決方案

      1、在資料處理前端,先把小檔案合併成大檔案,在上傳到HDFS做後續分析

      2、如果已經有大量的小檔案存在HDFS,使用CombineFileInputFormat進行處理,CombineFileInputFormat的切片邏輯跟TextFileInputFormat不同,他可以將多個小檔案邏輯上規劃到一個切片中,這樣多個小檔案就可以交給一個MapTask。

      3、優先滿足最小切片大小,不超過最大切片大小的前提下。

 

 

 

檔案合併

 

//-------- 使用提供的自定義類,指定切片大小

job.setInputFormatClass(CombineTextInputFormat.class);

//最大輸入切片大小,一個檔案的大小是4M就開始切,演算法是1.1

CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);

//最小輸入切片大小,多個檔案合併到了一起,到了2M就切,演算法是1.1倍,優先滿足最小切片大小

CombineTextInputFormat.setMinInp