1. 程式人生 > >初步瞭解Hadoop平臺

初步瞭解Hadoop平臺

前天阿里電面問到對hadoop平臺的瞭解,雖然以前接觸過一段時間,但幾乎都忘了,所以悲劇了。今天重新來複習下基礎知識吧,畢竟現在不懂點hadoop說不過去。

什麼是Hadoop?

--------------------------------------------

hadoop一個用於在普通硬體構成 的大叢集上執行應用程式的框架。Hadoop框架透明地為應用程式提供可靠性與資料移動保障。Hadoop實現了一個被稱為 mapReduce的 計算模型,在這個計算模型中應用程式被分為很多的小塊,每一塊都能在叢集中的任意節點上執行或重新執行。另外,它還提供了一個分散式檔案系統(HDFS)來在計算節 點上儲存資料,為叢集提供了非常高的聚合頻寬。在本框架中無論是Map/Reduce還是分散式檔案系統都被設計為能夠自動地處理節點上的錯誤。

Hadoop的組成:

  • Hadoop Common – contains libraries and utilities needed by other Hadoop modules
  • Hadoop Distributed File System (HDFS) – a distributed file-system that stores data on commodity machines, providing very high aggregate bandwidth across the cluster.
  • Hadoop YARN – a resource-management platform responsible for managing compute resources in clusters and using them for scheduling of users' applications.
  • Hadoop MapReduce – a programming model for large scale data processing.
Hadoop框架中最核心的設計就是:MapReduce和HDFS。MapReduce的思想是由Google的一篇論文所提及而被廣為流傳的,簡單的一句話解釋MapReduce就是“任務的分解與結果的彙總”。HDFS是Hadoop分散式檔案系統(Hadoop Distributed File System)的縮寫,為分散式計算儲存提供了底層支援。

MapReduce原理:

---------------------------------------------

MapReduce從它名字上來看就大致可以看出個緣由,兩個動詞Map和Reduce,“Map(展開)”就是將一個任務分解成為多個任務,“Reduce”就是將分解後多工處理的結果彙總起來,得出最後的分析結果。這不是什麼新思想,其實在前面提到的多執行緒,多工的設計就可以找到這種思想的影子。不論是現實社會,還是在程式設計中,一項工作往往可以被拆分成為多個任務,任務之間的關係可以分為兩種:一種是不相關的任務,可以並行執行;另一種是任務之間有相互的依賴,先後順序不能夠顛倒,這類任務是無法並行處理的。回到大學時期,教授上課時讓大家去分析關鍵路徑,無非就是找最省時的任務分解執行方式。在分散式系統中,機器叢集就可以看作硬體資源池,將並行的任務拆分,然後交由每一個空閒機器資源去處理,能夠極大地提高計算效率,同時這種資源無關性,對於計算叢集的擴充套件無疑提供了最好的設計保證。任務分解處理以後,那就需要將處理以後的結果再彙總起來,這就是Reduce要做的工作。結構圖如下:

edd15f7e76dd69430c2c76c70da0f9d8 (354×360)

網上有個簡單的比喻來解釋MapReduce原理:

我們要數圖書館中的所有書。你數1號書架,我數2號書架。這就是“Map”。我們人越多,數書就更快。

現在我們到一起,把所有人的統計數加在一起。這就是“Reduce”。


 圖:MapReduce結構示意圖 

上圖就是MapReduce大致的結構圖,在Map前還可能會對輸入的資料有Split(分割)的過程,保證任務並行效率,在Map之後還會有Shuffle(混合)的過程,對於提高Reduce的效率以及減小資料傳輸的壓力有很大的幫助。後面會具體提及這些部分的細節。           

1.Map-Reduce的邏輯過程

假設我們需要處理一批有關天氣的資料,其格式如下:

  • 按照ASCII碼儲存,每行一條記錄
  • 每一行字元從0開始計數,第15個到第18個字元為年
  • 第25個到第29個字元為溫度,其中第25位是符號+/-

0067011990999991950051507+0000+

0043011990999991950051512+0022+

0043011990999991950051518-0011+

0043012650999991949032412+0111+

0043012650999991949032418+0078+

0067011990999991937051507+0001+

0043011990999991937051512-0002+

0043011990999991945051518+0001+

0043012650999991945032412+0002+

0043012650999991945032418+0078+

現在需要統計出每年的最高溫度。

Map-Reduce主要包括兩個步驟:Map和Reduce

每一步都有key-value對作為輸入和輸出:

  • map階段的key-value對的格式是由輸入的格式所決定的,如果是預設的TextInputFormat,則每行作為一個記錄程序處理,其中key為此行的開頭相對於檔案的起始位置,value就是此行的字元文字
  • map階段的輸出的key-value對的格式必須同reduce階段的輸入key-value對的格式相對應

對於上面的例子,在map過程,輸入的key-value對如下:

(0, 0067011990999991950051507+0000+)

(33, 0043011990999991950051512+0022+)

(66, 0043011990999991950051518-0011+)

(99, 0043012650999991949032412+0111+)

(132, 0043012650999991949032418+0078+)

(165, 0067011990999991937051507+0001+)

(198, 0043011990999991937051512-0002+)

(231, 0043011990999991945051518+0001+)

(264, 0043012650999991945032412+0002+)

(297, 0043012650999991945032418+0078+)

在map過程中,通過對每一行字串的解析,得到年-溫度的key-value對作為輸出:

(1950, 0)

(1950, 22)

(1950, -11)

(1949, 111)

(1949, 78)

(1937, 1)

(1937, -2)

(1945, 1)

(1945, 2)

(1945, 78)

在reduce過程,將map過程中的輸出,按照相同的key將value放到同一個列表中作為reduce的輸入

(1950, [0, 22, –11])

(1949, [111, 78])

(1937, [1, -2])

(1945, [1, 2, 78])

在reduce過程中,在列表中選擇出最大的溫度,將年-最大溫度的key-value作為輸出:

(1950, 22)

(1949, 111)

(1937, 1)

(1945, 78)

其邏輯過程可用如下圖表示:

2、編寫Map-Reduce程式

編寫Map-Reduce程式,一般需要實現兩個函式:mapper中的map函式和reducer中的reduce函式。

一般遵循以下格式:

  • map: (K1, V1)  ->  list(K2, V2)

public interface Mapper<K1, V1, K2, V2> extends JobConfigurable, Closeable {

  void map(K1 key, V1 value, OutputCollector<K2, V2> output, Reporter reporter)

  throws IOException;

}

  • reduce: (K2, list(V))  ->  list(K3, V3) 

public interface Reducer<K2, V2, K3, V3> extends JobConfigurable, Closeable {

  void reduce(K2 key, Iterator<V2> values,

              OutputCollector<K3, V3> output, Reporter reporter)

    throws IOException;

}

對於上面的例子,則實現的mapper如下:

public class MaxTemperatureMapper extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> {

    @Override

    public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {

        String line = value.toString();

        String year = line.substring(15, 19);

        int airTemperature;

        if (line.charAt(25) == '+') {

            airTemperature = Integer.parseInt(line.substring(26, 30));

        } else {

            airTemperature = Integer.parseInt(line.substring(25, 30));

        }

        output.collect(new Text(year), new IntWritable(airTemperature));

    }

}

實現的reducer如下:

public class MaxTemperatureReducer extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> {

    public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {

        int maxValue = Integer.MIN_VALUE;

        while (values.hasNext()) {

            maxValue = Math.max(maxValue, values.next().get());

        }

        output.collect(key, new IntWritable(maxValue));

    }

}

欲執行上面實現的Mapper和Reduce,則需要生成一個Map-Reduce得任務(Job),其基本包括以下三部分:

  • 輸入的資料,也即需要處理的資料
  • Map-Reduce程式,也即上面實現的Mapper和Reducer
  • 此任務的配置項JobConf

欲配置JobConf,需要大致瞭解Hadoop執行job的基本原理:

  • Hadoop將Job分成task進行處理,共兩種task:map task和reduce task
  • Hadoop有兩類的節點控制job的執行:JobTracker和TaskTracker
    • JobTracker協調整個job的執行,將task分配到不同的TaskTracker上
    • TaskTracker負責執行task,並將結果返回給JobTracker
  • Hadoop將輸入資料分成固定大小的塊,我們稱之input split
  • Hadoop為每一個input split建立一個task,在此task中依次處理此split中的一個個記錄(record)
  • Hadoop會盡量讓輸入資料塊所在的DataNode和task所執行的DataNode(每個DataNode上都有一個TaskTracker)為同一個,可以提高執行效率,所以input split的大小也一般是HDFS的block的大小。
  • Reduce task的輸入一般為Map Task的輸出,Reduce Task的輸出為整個job的輸出,儲存在HDFS上。
  • 在reduce中,相同key的所有的記錄一定會到同一個TaskTracker上面執行,然而不同的key可以在不同的TaskTracker上面執行,我們稱之為partition
    • partition的規則為:(K2, V2) –> Integer, 也即根據K2,生成一個partition的id,具有相同id的K2則進入同一個partition,被同一個TaskTracker上被同一個Reducer進行處理。

public interface Partitioner<K2, V2> extends JobConfigurable {

  int getPartition(K2 key, V2 value, int numPartitions);

}

下圖大概描述了Map-Reduce的Job執行的基本原理:

image

下面我們討論JobConf,其有很多的項可以進行配置:

  • setInputFormat:設定map的輸入格式,預設為TextInputFormat,key為LongWritable, value為Text
  • setNumMapTasks:設定map任務的個數,此設定通常不起作用,map任務的個數取決於輸入的資料所能分成的input split的個數
  • setMapperClass:設定Mapper,預設為IdentityMapper
  • setMapRunnerClass:設定MapRunner, map task是由MapRunner執行的,預設為MapRunnable,其功能為讀取input split的一個個record,依次呼叫Mapper的map函式
  • setMapOutputKeyClass和setMapOutputValueClass:設定Mapper的輸出的key-value對的格式
  • setOutputKeyClass和setOutputValueClass:設定Reducer的輸出的key-value對的格式
  • setPartitionerClass和setNumReduceTasks:設定Partitioner,預設為HashPartitioner,其根據key的hash值來決定進入哪個partition,每個partition被一個reduce task處理,所以partition的個數等於reduce task的個數
  • setReducerClass:設定Reducer,預設為IdentityReducer
  • setOutputFormat:設定任務的輸出格式,預設為TextOutputFormat
  • FileInputFormat.addInputPath:設定輸入檔案的路徑,可以使一個檔案,一個路徑,一個萬用字元。可以被呼叫多次新增多個路徑
  • FileOutputFormat.setOutputPath:設定輸出檔案的路徑,在job執行前此路徑不應該存在

當然不用所有的都設定,由上面的例子,可以編寫Map-Reduce程式如下:

public class MaxTemperature {

    public static void main(String[] args) throws IOException {

        if (args.length != 2) {

            System.err.println("Usage: MaxTemperature <input path> <output path>");

            System.exit(-1);

        }

        JobConf conf = new JobConf(MaxTemperature.class);

        conf.setJobName("Max temperature");

        FileInputFormat.addInputPath(conf, new Path(args[0]));

        FileOutputFormat.setOutputPath(conf, new Path(args[1]));

        conf.setMapperClass(MaxTemperatureMapper.class);

        conf.setReducerClass(MaxTemperatureReducer.class);

        conf.setOutputKeyClass(Text.class);

        conf.setOutputValueClass(IntWritable.class);

        JobClient.runJob(conf);

    }

}

3、Map-Reduce資料流(data flow)

Map-Reduce的處理過程主要涉及以下四個部分:

  • 客戶端Client:用於提交Map-reduce任務job
  • JobTracker:協調整個job的執行,其為一個Java程序,其main class為JobTracker
  • TaskTracker:執行此job的task,處理input split,其為一個Java程序,其main class為TaskTracker
  • HDFS:hadoop分散式檔案系統,用於在各個程序間共享Job相關的檔案

image

3.1、任務提交

JobClient.runJob()建立一個新的JobClient例項,呼叫其submitJob()函式。

  • 向JobTracker請求一個新的job ID
  • 檢測此job的output配置
  • 計算此job的input splits
  • 將Job執行所需的資源拷貝到JobTracker的檔案系統中的資料夾中,包括job jar檔案,job.xml配置檔案,input splits
  • 通知JobTracker此Job已經可以運行了

提交任務後,runJob每隔一秒鐘輪詢一次job的進度,將進度返回到命令列,直到任務執行完畢。

3.2、任務初始化

當JobTracker收到submitJob呼叫的時候,將此任務放到一個佇列中,job排程器將從佇列中獲取任務並初始化任務。

初始化首先建立一個物件來封裝job執行的tasks, status以及progress。

在建立task之前,job排程器首先從共享檔案系統中獲得JobClient計算出的input splits。

其為每個input split建立一個map task。

每個task被分配一個ID。

3.3、任務分配

TaskTracker週期性的向JobTracker傳送heartbeat。

在heartbeat中,TaskTracker告知JobTracker其已經準備執行一個新的task,JobTracker將分配給其一個task。

在JobTracker為TaskTracker選擇一個task之前,JobTracker必須首先按照優先順序選擇一個Job,在最高優先順序的Job中選擇一個task。

TaskTracker有固定數量的位置來執行map task或者reduce task。

預設的排程器對待map task優先於reduce task

當選擇reduce task的時候,JobTracker並不在多個task之間進行選擇,而是直接取下一個,因為reduce task沒有資料本地化的概念。

3.4、任務執行

TaskTracker被分配了一個task,下面便要執行此task。

首先,TaskTracker將此job的jar從共享檔案系統中拷貝到TaskTracker的檔案系統中。

TaskTracker從distributed cache中將job執行所需要的檔案拷貝到本地磁碟。

其次,其為每個task建立一個本地的工作目錄,將jar解壓縮到檔案目錄中。

其三,其建立一個TaskRunner來執行task。

TaskRunner建立一個新的JVM來執行task。

被建立的child JVM和TaskTracker通訊來報告執行進度。

3.4.1、Map的過程

MapRunnable從input split中讀取一個個的record,然後依次呼叫Mapper的map函式,將結果輸出。

map的輸出並不是直接寫入硬碟,而是將其寫入快取memory buffer。

當buffer中資料的到達一定的大小,一個背景執行緒將資料開始寫入硬碟。

在寫入硬碟之前,記憶體中的資料通過partitioner分成多個partition。

在同一個partition中,背景執行緒會將資料按照key在記憶體中排序。

每次從記憶體向硬碟flush資料,都生成一個新的spill檔案。

當此task結束之前,所有的spill檔案被合併為一個整的被partition的而且排好序的檔案。

reducer可以通過http協議請求map的輸出檔案,tracker.http.threads可以設定http服務執行緒數。

3.4.2、Reduce的過程

當map task結束後,其通知TaskTracker,TaskTracker通知JobTracker。

對於一個job,JobTracker知道TaskTracer和map輸出的對應關係。

reducer中一個執行緒週期性的向JobTracker請求map輸出的位置,直到其取得了所有的map輸出。

reduce task需要其對應的partition的所有的map輸出。

reduce task中的copy過程即當每個map task結束的時候就開始拷貝輸出,因為不同的map task完成時間不同。

reduce task中有多個copy執行緒,可以並行拷貝map輸出。

當很多map輸出拷貝到reduce task後,一個背景執行緒將其合併為一個大的排好序的檔案。

當所有的map輸出都拷貝到reduce task後,進入sort過程,將所有的map輸出合併為大的排好序的檔案。

最後進入reduce過程,呼叫reducer的reduce函式,處理排好序的輸出的每個key,最後的結果寫入HDFS。

image

3.5、任務結束

當JobTracker獲得最後一個task的執行成功的報告後,將job得狀態改為成功。

當JobClient從JobTracker輪詢的時候,發現此job已經成功結束,則向用戶列印訊息,從runJob函式中返回。

HdFS的基本概念

--------------------------------------------------

1.1、資料塊(block)

  • HDFS(Hadoop Distributed File System)預設的最基本的儲存單位是64M的資料塊。
  • 和普通檔案系統相同的是,HDFS中的檔案是被分成64M一塊的資料塊儲存的。
  • 不同於普通檔案系統的是,HDFS中,如果一個檔案小於一個數據塊的大小,並不佔用整個資料塊儲存空間。

1.2、元資料節點(Namenode)和資料節點(datanode)

  • 元資料節點用來管理檔案系統的名稱空間
    • 其將所有的檔案和資料夾的元資料儲存在一個檔案系統樹中。
    • 這些資訊也會在硬碟上儲存成以下檔案:名稱空間映象(namespace image)及修改日誌(edit log)
    • 其還儲存了一個檔案包括哪些資料塊,分佈在哪些資料節點上。然而這些資訊並不儲存在硬碟上,而是在系統啟動的時候從資料節點收集而成的。
  • 資料節點是檔案系統中真正儲存資料的地方。
    • 客戶端(client)或者元資料資訊(namenode)可以向資料節點請求寫入或者讀出資料塊。
    • 其週期性的向元資料節點回報其儲存的資料塊資訊。
  • 從元資料節點(secondary namenode)
    • 從元資料節點並不是元資料節點出現問題時候的備用節點,它和元資料節點負責不同的事情。
    • 其主要功能就是週期性將元資料節點的名稱空間映象檔案和修改日誌合併,以防日誌檔案過大。這點在下面會相信敘述。
    • 合併過後的名稱空間映象檔案也在從元資料節點儲存了一份,以防元資料節點失敗的時候,可以恢復。

1.2.1、元資料節點資料夾結構

image

  • VERSION檔案是java properties檔案,儲存了HDFS的版本號。
    • layoutVersion是一個負整數,儲存了HDFS的持續化在硬碟上的資料結構的格式版本號。
    • namespaceID是檔案系統的唯一識別符號,是在檔案系統初次格式化時生成的。
    • cTime此處為0
    • storageType表示此資料夾中儲存的是元資料節點的資料結構。

namespaceID=1232737062

cTime=0

storageType=NAME_NODE

layoutVersion=-18

1.2.2、檔案系統名稱空間映像檔案及修改日誌

  • 當檔案系統客戶端(client)進行寫操作時,首先把它記錄在修改日誌中(edit log)
  • 元資料節點在記憶體中儲存了檔案系統的元資料資訊。在記錄了修改日誌後,元資料節點則修改記憶體中的資料結構。
  • 每次的寫操作成功之前,修改日誌都會同步(sync)到檔案系統。
  • fsimage檔案,也即名稱空間映像檔案,是記憶體中的元資料在硬碟上的checkpoint,它是一種序列化的格式,並不能夠在硬碟上直接修改。
  • 同資料的機制相似,當元資料節點失敗時,則最新checkpoint的元資料資訊從fsimage載入到記憶體中,然後逐一重新執行修改日誌中的操作。
  • 從元資料節點就是用來幫助元資料節點將記憶體中的元資料資訊checkpoint到硬碟上的
  • checkpoint的過程如下:
    • 從元資料節點通知元資料節點生成新的日誌檔案,以後的日誌都寫到新的日誌檔案中。
    • 從元資料節點用http get從元資料節點獲得fsimage檔案及舊的日誌檔案。
    • 從元資料節點將fsimage檔案載入到記憶體中,並執行日誌檔案中的操作,然後生成新的fsimage檔案。
    • 從元資料節點獎新的fsimage檔案用http post傳回元資料節點
    • 元資料節點可以將舊的fsimage檔案及舊的日誌檔案,換為新的fsimage檔案和新的日誌檔案(第一步生成的),然後更新fstime檔案,寫入此次checkpoint的時間。
    • 這樣元資料節點中的fsimage檔案儲存了最新的checkpoint的元資料資訊,日誌檔案也重新開始,不會變的很大了。

hadoop01

1.2.3、從元資料節點的目錄結構

image

1.2.4、資料節點的目錄結構

image

  • 資料節點的VERSION檔案格式如下:

namespaceID=1232737062

storageID=DS-1640411682-127.0.1.1-50010-1254997319480

cTime=0

storageType=DATA_NODE

layoutVersion=-18

  • blk_<id>儲存的是HDFS的資料塊,其中儲存了具體的二進位制資料。
  • blk_<id>.meta儲存的是資料塊的屬性資訊:版本資訊,型別資訊,和checksum
  • 當一個目錄中的資料塊到達一定數量的時候,則建立子資料夾來儲存資料塊及資料塊屬性資訊。

二、資料流(data flow)

2.1、讀檔案的過程

  • 客戶端(client)用FileSystem的open()函式開啟檔案
  • DistributedFileSystem用RPC呼叫元資料節點,得到檔案的資料塊資訊。
  • 對於每一個數據塊,元資料節點返回儲存資料塊的資料節點的地址。
  • DistributedFileSystem返回FSDataInputStream給客戶端,用來讀取資料。
  • 客戶端呼叫stream的read()函式開始讀取資料。
  • DFSInputStream連線儲存此檔案第一個資料塊的最近的資料節點。
  • Data從資料節點讀到客戶端(client)
  • 當此資料塊讀取完畢時,DFSInputStream關閉和此資料節點的連線,然後連線此檔案下一個資料塊的最近的資料節點。
  • 當客戶端讀取完畢資料的時候,呼叫FSDataInputStream的close函式。
  • 在讀取資料的過程中,如果客戶端在與資料節點通訊出現錯誤,則嘗試連線包含此資料塊的下一個資料節點。
  • 失敗的資料節點將被記錄,以後不再連線。

image

2.2、寫檔案的過程

  • 客戶端呼叫create()來建立檔案
  • DistributedFileSystem用RPC呼叫元資料節點,在檔案系統的名稱空間中建立一個新的檔案。
  • 元資料節點首先確定檔案原來不存在,並且客戶端有建立檔案的許可權,然後建立新檔案。
  • DistributedFileSystem返回DFSOutputStream,客戶端用於寫資料。
  • 客戶端開始寫入資料,DFSOutputStream將資料分成塊,寫入data queue。
  • Data queue由Data Streamer讀取,並通知元資料節點分配資料節點,用來儲存資料塊(每塊預設複製3塊)。分配的資料節點放在一個pipeline裡。
  • Data Streamer將資料塊寫入pipeline中的第一個資料節點。第一個資料節點將資料塊傳送給第二個資料節點。第二個資料節點將資料傳送給第三個資料節點。
  • DFSOutputStream為發出去的資料塊儲存了ack queue,等待pipeline中的資料節點告知資料已經寫入成功。
  • 如果資料節點在寫入的過程中失敗:
    • 關閉pipeline,將ack queue中的資料塊放入data queue的開始。
    • 當前的資料塊在已經寫入的資料節點中被元資料節點賦予新的標示,則錯誤節點重啟後能夠察覺其資料塊是過時的,會被刪除。
    • 失敗的資料節點從pipeline中移除,另外的資料塊則寫入pipeline中的另外兩個資料節點。
    • 元資料節點則被通知此資料塊是複製塊數不足,將來會再建立第三份備份。
  • 當客戶端結束寫入資料,則呼叫stream的close函式。此操作將所有的資料塊寫入pipeline中的資料節點,並等待ack queue返回成功。最後通知元資料節點寫入完畢。

image

                                                                                                                                                                                    

如果你想深入理解hadoop平臺的話我覺得研究其原始碼是少不了的,Hadoop原始碼網址。再結合這本書Hadoop原始碼分析(完整版)去看。

有時間的話我也會仔細去研究,不過現在還是先補基礎,基礎好了看這些東西要來的些。這裡就簡單概述一下Hadoop平臺的東西吧。