1. 程式人生 > >Hadoop On Yarn Mapreduce執行原理與常用資料壓縮格式

Hadoop On Yarn Mapreduce執行原理與常用資料壓縮格式

我們通過提交jar包,進行MapReduce處理,那麼整個執行過程分為五個環節:

1、向client端提交MapReduce job.

2、隨後yarn的ResourceManager進行資源的分配.

3、由NodeManager進行載入與監控containers.

4、通過applicationMaster與ResourceManager進行資源的申請及狀態的互動,由NodeManagers進行MapReduce執行時job的管理.

5、通過hdfs進行job配置檔案、jar包的各節點分發。

Hadoop

Job 提交過程

job的提交通過 呼叫submit()方法 建立一個 JobSubmitter 例項,並 呼叫submitJobInternal() 方法。整個job的執行過程如下:

1、向ResourceManager申請application ID,此ID為該MapReduce的jobId。

2、檢查output的路徑是否正確,是否已經被建立。

3、計算input的splits。

4、拷貝執行job 需要的jar包、配置檔案以及計算input的split 到各個節點。

5、在ResourceManager中呼叫submitAppliction()方法,執行job

Job 初始化過程

1、當resourceManager收到了submitApplication()方法的呼叫通知後,scheduler開始分配Container,隨之ResouceManager傳送applicationMaster程序,告知每個nodeManager管理器。

2、 由applicationMaster決定 如何執行tasks,如果job資料量比較小,applicationMaster便選擇 將tasks執行在一個JVM中 。那麼如何判別這個job是大是小呢?當一個job的 mappers數量小於10個 , 只有一個reducer或者讀取的檔案大小要小於一個HDFS block時 ,(可通過修改配置項mapreduce.job.ubertask.maxmaps,mapreduce.job.ubertask.maxreduces以及mapreduce.job.ubertask.maxbytes 進行調整)

3、在執行tasks之前,applicationMaster將會 呼叫setupJob()方法 ,隨之建立output的輸出路徑(這就能夠解釋,不管你的mapreduce一開始是否報錯,輸出路徑都會建立)

Task 任務分配

1、接下來applicationMaster向ResourceManager請求containers用於執行map與reduce的tasks(step 8),這裡map task的優先順序要高於reduce task,當所有的map tasks結束後,隨之進行sort(這裡是shuffle過程後面再說),最後進行reduce task的開始。(這裡有一點,當map tasks執行了百分之5%的時候,將會請求reduce,具體下面再總結)

2、執行tasks的是需要消耗記憶體與CPU資源的, 預設情況下,map和reduce的task資源分配為1024MB與一個核 ,(可修改執行的最小與最大引數配置,mapreduce.map.memory.mb,mapreduce.reduce.memory.mb,mapreduce.map.cpu.vcores,mapreduce.reduce.reduce.cpu.vcores.)

Task 任務執行

1、這時一個task已經被ResourceManager分配到一個container中,由applicationMaster告知nodemanager啟動container,這個task將會被一個 主函式為YarnChild 的Java application執行,但在執行task之前, 首先定位task需要的jar包、配置檔案以及載入在快取中的檔案 。

2、YarnChild運行於一個專屬的JVM中,所以 任何一個map或reduce任務出現問題,都不會影響整個nodemanager的crash或者hang 。

3、每個task都可以在相同的JVM task中完成,隨之將完成的處理資料寫入臨時檔案中。

Mapreduce資料流

執行進度與狀態更新

1、MapReduce是一個較長執行時間的批處理過程,可以是一小時、幾小時甚至幾天,那麼Job的執行狀態監控就非常重要。每個job以及 每個task都有一個包含job(running,successfully completed,failed)的狀態 ,以及value的計數器,狀態資訊及描述資訊(描述資訊一般都是在程式碼中加的列印資訊),那麼,這些資訊是如何與客戶端進行通訊的呢?

2、當一個task開始執行,它將會保持執行記錄,記錄task完成的比例,對於map的任務,將會記錄其執行的百分比,對於reduce來說可能複雜點,但系統依舊會估計reduce的完成比例。當一個map或reduce任務執行時, 子程序會持續每三秒鐘與applicationMaster進行互動 。

Job 完成

最終,applicationMaster會收到一個job完成的通知,隨後改變job的狀態為successful。最終,applicationMaster與task containers被清空。

Shuffle與Sort

從map到reduce的過程,被稱之為shuffle過程,MapReduce使到reduce的資料一定是經過key的排序的,那麼shuffle是如何運作的呢?

當map任務將資料output時, 不僅僅是將結果輸出到磁碟,它是將其寫入記憶體緩衝區域,並進行一些預分類 。

1、The Map Side

首先map任務的 output過程是一個環狀的記憶體緩衝區,緩衝區的大小預設為100MB(可通過修改配置項mpareduce.task.io.sort.mb進行修改),當寫入記憶體的大小到達一定比例 ,預設為80% (可通過mapreduce.map.sort.spill.percent配置項修改),便開始寫入磁碟。

在寫入磁碟之前,執行緒將會指定資料寫入與reduce相應的patitions中,最終傳送給reduce.在每個partition中 ,後臺執行緒將會在記憶體中進行Key的排序 ,( 如果程式碼中有combiner方法,則會在output時就進行sort排序 ,這裡,如果只有少於3個寫入磁碟的檔案,combiner將會在outputfile前啟動,如果只有一個或兩個,那麼將不會呼叫)

這裡 將map輸出的結果進行壓縮會大大減少磁碟IO與網路傳輸的開銷 (配置引數mapreduce.map .output.compress 設定為true,如果使用第三方壓縮jar,可通過mapreduce.map.output.compress.codec進行設定)

隨後這些paritions輸出檔案將會通過HTTP傳送至reducers,傳送的最大啟動執行緒通過mapreduce.shuffle.max.threads進行配置。

2、The Reduce Side

首先上面每個節點的map都將結果寫入了本地磁碟中,現在reduce需要將map的結果通過叢集拉取過來,這裡要注意的是, 需要等到所有map任務結束後,reduce才會對map的結果進行拷貝 ,由於reduce函式有少數幾個複製執行緒,以至於它 可以同時拉取多個map的輸出結果。預設的為5個執行緒 (可通過修改配置mapreduce.reduce.shuffle.parallelcopies來修改其個數)

這裡有個問題,那麼reducers怎麼知道從哪些機器拉取資料呢?

當所有map的任務結束後, applicationMaster通過心跳機制(heartbeat mechanism),由它知道mapping的輸出結果與機器host ,所以 reducer會定時的通過一個執行緒訪問applicationmaster請求map的輸出結果 。

Map的結果將會被拷貝到reduce task的JVM的記憶體中(記憶體大小可在mapreduce.reduce.shuffle.input.buffer.percent中設定)如果不夠用,則會寫入磁碟。當記憶體緩衝區的大小到達一定比例時(可通過mapreduce.reduce.shuffle.merge.percent設定)或map的輸出結果檔案過多時(可通過配置mapreduce.reduce.merge.inmen.threshold),將會除法合併(merged)隨之寫入磁碟。

這時要注意, 所有的map結果這時都是被壓縮過的,需要先在記憶體中進行解壓縮,以便後續合併它們 。(合併最終檔案的數量可通過mapreduce.task.io.sort.factor進行配置) 最終reduce進行運算進行輸出。

這裡附帶的整理了下Parquet儲存結構與SequenceFile儲存結構的特點

Parquet

Parquet是面向分析型業務的列式儲存格式,由Twitter和Cloudera合作開發,2015年5月從Apache的孵化器裡畢業成為Apache頂級專案,那麼這裡就總結下Parquet資料結構到底是什麼樣的呢?

一個Parquet檔案是 由一個header以及一個或多個block塊組成,以一個footer結尾。header中只包含一個4個位元組的數字PAR1用來識別整個Parquet檔案格式。檔案中所有的metadata都存在於footer中 。footer中的metadata包含了格式的版本資訊,schema資訊、key-value paris以及所有block中的metadata資訊。footer中最後兩個欄位為一個以4個位元組長度的footer的metadata,以及同header中包含的一樣的PAR1。

讀取一個Parquet檔案時,需要完全讀取Footer的meatadata,Parquet格式檔案不需要讀取sync markers這樣的標記分割查詢,因為所有block的邊界都儲存於footer的metadata中(因為metadata的寫入是在所有blocks塊寫入完成之後的,所以吸入操作包含的所有block的位置資訊都是存在於記憶體直到檔案close)

這裡注意,不像sequence files以及Avro資料格式檔案的header以及sync markers是用來分割blocks。Parquet格式檔案不需要sync markers,因此block的邊界儲存與footer的meatada中。

在Parquet檔案中,每一個block都具有一組Row group,她們是由一組Column chunk組成的列資料。繼續往下,每一個column chunk中又包含了它具有的pages。每個page就包含了來自於相同列的值.Parquet同時使用更緊湊形式的編碼,當寫入Parquet檔案時,它會自動基於column的型別適配一個合適的編碼,比如,一個boolean形式的值將會被用於run-length encoding。

另一方面,Parquet檔案對於每個page支援標準的壓縮演算法比如支援Snappy,gzip以及LZO壓縮格式,也支援不壓縮。

Parquet格式的資料型別:

Hadoop SequenceFile

在一些應用中,我們需要一種特殊的資料結構來儲存資料,並進行讀取,這裡就分析下為什麼用SequenceFile格式檔案。

Hadoop提供的SequenceFile檔案格式提供一對key,value形式的不可變的資料結構。同時,HDFS和MapReduce job使用SequenceFile檔案可以使檔案的讀取更加效率。

SequenceFile的格式

SequenceFile的格式是 由一個header 跟隨一個或多個記錄組成 。前三個位元組是一個Bytes SEQ代表著版本號,同時header也包括key的名稱,value class , 壓縮細節,metadata,以及Sync markers。Sync markers的作用在於可以讀取任意位置的資料。

在recourds中,又分為是否壓縮格式。當沒有被壓縮時,key與value使用Serialization序列化寫入SequenceFile。當選擇壓縮格式時,record的壓縮格式與沒有壓縮其實不盡相同,除了value的bytes被壓縮,key是不被壓縮的。

在Block中,它使所有的資訊進行壓縮,壓縮的最小大小由配置檔案中,io.seqfile.compress.blocksize配置項決定。

SequenceFile的MapFile

一個MapFile可以通過SequenceFile的地址,進行分類查詢的格式。使用這個格式的優點在於,首先會將SequenceFile中的地址都載入入記憶體,並且進行了key值排序,從而提供更快的資料查詢。

寫SequenceFile檔案:

將key按100-1以IntWritable object進行倒敘寫入sequence file,value為Text objects格式。在將key和value寫入Sequence File前,首先將每行所在的位置寫入(writer.getLength())

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.FileSystem;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.IOUtils;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.SequenceFile;

import org.apache.hadoop.io.Text;

import java.io.IOException;import java.net.URI;

public class SequenceFileWriteDemo {

private static final String[] DATA = {

“One, two, buckle my shoe”,

“Three, four, shut the door”,

“Five, six, pick up sticks”,

“Seven, eight, lay them straight”,

“Nine, ten, a big fat hen”

};

public static void main(String[] args) throws IOException {

String uri = args[0];

Configuration conf = new Configuration();

FileSystem fs = FileSystem.get(URI.create(uri), conf);

Path path = new Path(uri);

IntWritable key = new IntWritable();

Text value = new Text();

SequenceFile.Writer writer = null;

try {

writer = SequenceFile.createWriter(fs, conf, path,

key.getClass(), value.getClass());

for (int i = 0; i < 100; i++) {

key.set(100 – i);

value.set(DATA[i % DATA.length]);

System.out.printf(“[%s]\t%s\t%s\n”, writer.getLength(), key, value);

writer.append(key, value);

}

} finally {

IOUtils.closeStream(writer);

}

}

}

讀取SequenceFile檔案:

首先需要建立SequenceFile.Reader例項,隨後通過呼叫next()函式進行每行結果集的迭代(需要依賴序列化).

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.FileSystem;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.IOUtils;

import org.apache.hadoop.io.SequenceFile;

import org.apache.hadoop.io.Writable;

import org.apache.hadoop.util.ReflectionUtils;

import java.io.IOException;import java.net.URI;

public class SequenceFileReadDemo {

public static void main(String[] args) throws IOException {

String uri = args[0];

Configuration conf = new Configuration();

FileSystem fs = FileSystem.get(URI.create(uri), conf);

Path path = new Path(uri);

SequenceFile.Reader reader = null;

try {

reader = new SequenceFile.Reader(fs, path, conf);

Writable key = (Writable)

ReflectionUtils.newInstance(reader.getKeyClass(), conf);

Writable value = (Writable)

ReflectionUtils.newInstance(reader.getValueClass(), conf);

long position = reader.getPosition();

while (reader.next(key, value)) {//同步記錄的邊界

String syncSeen = reader.syncSeen() ? “*” : “”;

System.out.printf(“[%s%s]\t%s\t%s\n”, position, syncSeen, key, value);

position = reader.getPosition(); // beginning of next record

}

} finally {

IOUtils.closeStream(reader);

}

}

}