1. 程式人生 > >第3章:Hadoop分散式檔案系統(2)

第3章:Hadoop分散式檔案系統(2)

資料流

讀取檔案資料的剖析

為了知道客戶端與HDFS,NameNode,DataNode互動過程中資料的流向,請看圖3-2,這張圖顯示了讀取檔案過程中主要的事件順序。圖3-2 客戶端從HDFS讀取資料
客戶端通過呼叫FileSystem物件的open()方法開啟一個希望從中讀取資料的檔案,對於HDFS來說,FileSystem是一個DistributedFileSystem的例項物件(圖3-2 步驟1)。DistributedFileSystem遠端呼叫名稱節點(NameNode)得到檔案開頭幾個塊的位置。對於每一個塊,名稱節點返回包含這個塊複本的所有資料節點(DataNode)的地址。進一步,這些資料節點會根據叢集的網路拓撲結構按照距離客戶端的遠近進行排序。如果客戶端本身是一個數據節點(例如一個MapReduce任務),而這個資料節點包含要讀取的塊的複本,則客戶端會直接從本地讀取。

DistributedFileSystem返回一個FSDataInputStream物件給客戶端,用於從檔案中讀取資料。FSDataInputStream是一個輸入流,支援檔案尋位(seek)。FSDataInputStream裡包裝了一個DFSInputStream類,這個類支援資料節點和名稱節點的I/O操作。

客戶端呼叫read()方法從流中讀取資料。DFSInputStream儲存了檔案中開頭幾個塊所在的資料節點的地址。首先連線第一個塊所在的最近的資料節點,資料從資料節點被讀取到客戶端,然後不斷地從這個流中讀取(步驟4)直接這個塊資料被讀完,然後DFSInputStream將會關閉到這個資料節點的連線,尋找下一個塊所在的最近的資料節點(步驟5)。這一系列操作對客戶端來說是透明的,它不用管。從客戶端的角度來看,它僅僅是在讀取一個連續的資料流。

塊按順序依次被讀取。當客戶端從資料流中讀數的時候,DFSInputStream依次建立和關閉和資料節點的連線。如果需要,DistributedFileSystem將再次呼叫名稱節點得到下一批塊所有資料節點的位置。當客戶端完成了所有資料的讀取,它會呼叫FSDataInputStream的close()方法關閉流(步驟6)。

在讀取的過程中,如果DFSInputStream在與資料節點互動的過程中出現了錯誤,它將會嘗試當前塊所在的最近的下一個資料節點。它也會記住那些互動失敗的資料節點以便讀取其它塊時不再在這些失敗的資料節點中讀取。DFSInputStream也會校驗從資料節點傳過來的資料,如果塊中資料損壞了,它將嘗試從另一個包含這個塊複本的資料節點中讀取。它也會向名稱節點報告這個損壞的塊。

這樣設計一個重要的方面是客戶端直接與資料節點互動,並通過名稱節點的引導,找到每一個塊所在的最好的資料節點。這樣設計可以讓HDFS響應大量同時併發請求的客戶端。因為資料分佈在叢集中所有的資料節點中。而且,名稱節點僅僅需要響應獲取塊所有位置的請求(這個位置資訊儲存在記憶體中,所以非常高效)而不需要響應獲取檔案資料的請求。如果名稱節點還響應讀取檔案資料的請求,那麼隨著客戶端資料增多,很快會出現瓶頸。

Hadoop網路拓撲結構
本地網路的兩個節點對彼此"關閉"是什麼意思呢?在大批量資料處理環境中,限制速度的因素是節點之前傳輸的速率,頻寬幾乎對速度沒有一點貢獻,所以可以用節點間的頻寬做為衡量節點間距離的尺碼。但在實踐中並不直接去測試兩個節點間的頻寬,因為這很困難。Hadoop採取了一個簡單的途徑,網路以樹的形式表示,兩個節點的距離等於各自距離他們共同上層節點的距離之和。樹中的層級並不是預先設定好的,通常層級中有資料中心,機架(Rack)和正在執行程序的節點。下面場景中頻寬依次遞減:

  • 相同節點上的處理
  • 同一機架不同節點上的處理
  • 同一資料中心不同機架中節點上的處理
  • 不同資料中心中節點上的處理
    例如:節點n1,在機架r1上,機架在資料中心d1上。用/d1/r1/n1,以這為列,來看看下面四個場景中節點間距離:
  • distance(/d1/r1/n1,/d1/r1/n1)=0(相同節點上的處理)
  • distance(/d1/r1/n1,/d1/r1/n2)=2(相同機架上不同節點)
  • distance(/d1/r1/n1,/d1/r2/n3)=4(相同資料中心不同節點)
  • distance(/d1/r1/n1,/d2/r3/n4)=6(不同資料中心節點)

圖3-3更加形象顯示了上面示例:圖3-3:hadoop中節點間距離最後,你要知道hadoop並不知道你的網路拓撲圖,需要你進行配置。然而,預設的情況下,hadoop會假設所有節點在同一資料中心中一機架上。對於小型叢集,確實是這種情況,這樣的話,就不需要進行額外的配置。

寫入資料到檔案的剖析

下一步,我們將看看資料怎麼寫入到HDFS中的。雖然這是很細節的東西,但它有助於理解HDFS模型如何保證資料一致。

我們考慮這一種情況,在HDFS中建立一個新檔案,寫入資料,然後關閉檔案。如圖3-4所示:圖3-4:客戶端向HDFS寫入資料

客戶端通過呼叫DistributedFileSystem類的create()方法建立檔案(圖3-4步驟1)。DistributedFileSystem遠端呼叫名稱節點在檔案系統的名稱空間中建立一個新檔案,沒有塊與這個新檔案關聯(步驟2)。名稱節點做各種各樣的檢查確保檔案之前沒有被建立過,而且客戶端有許可權建立這個檔案。如果檢查通過,名稱節點將會記錄這個新檔案,否則將建立失敗,拋給客戶端一個IOException異常。如果成功建立,則DistributedFileSystem返回一個FSDataOutputStream物件給客戶端,以便客戶端寫入資料。正如讀資料那樣,FSDataOutputStream封閉了DFSOutputStream類,用此類來與資料節點與名稱節點互動。

當客戶端寫資料的時候(步驟3),DFSOutputStream首先將資料拆分成多個包,寫入"資料佇列"中。然後,DataStreamer過來消費這個資料佇列,它會向名稱節點請求一些合適的新塊用於儲存複本資料。名稱節點會返回包含這些新塊的資料節點列表。這些資料節點形成了一個通道,這裡,我們假設複製級別是3,所以在這個通道中有三個節點。DataStreamer首先向這個通道中第一個資料節點寫入之前被拆分的包資料。第一個資料節點寫完後,會前進到第二個資料節點,第二個資料節點儲存包資料後繼續前進到第三個也是最後一個數據節點(步驟4)。

DFSOutStream也會在內部維護一個"包佇列"。只有當某一個包被所有節點儲存後,這個包才會從包佇列中刪除(步驟5)。

如果在資料寫入過程中,任何一個數據節點寫入失敗了,那麼將麼執行如下操作(這些操作對客戶端來說是透明的)。首先,通道關閉,包佇列中的所有包都將會放到資料佇列前面。這樣,失敗資料節點的下游資料節點不會錯過任何一個包。在好的資料節點上的當前塊被給予一個新的身份標識,將它傳送給名稱節點,以便以後當失敗的資料節點恢復後,它上面已經儲存的部分塊資料將會被刪除。失敗的資料節點從通道中移除,再基於剩下兩個好的資料節點建立一個新通道。資料塊中剩餘的資料寫到管道中剩下好的資料節點中。名稱節點知道這個塊還需要複製,所以它會把它複製到另外一個節點中.餘下的塊照常處理。

雖然不太可能,但在寫入資料的時候仍有可能幾個資料節點同時失敗,只要dfs.namenode.replication.min複本數(預設是1)有值,就會寫入成功。塊將會在叢集中非同步複製直到達到設定的複本複製數(dfs.replication預設是3)。

當客戶端寫入資料完成後,將會呼叫close()方法關閉流(步驟6)。這個方法將會清除資料節點通道中剩下的包,並等待所有包資料寫入完成,然後通知名稱節點,整個檔案已經寫入完成(步驟7)。名稱節點知道這個檔案由哪些塊組成(因為DataStreamer是向名稱節點請求得到塊的位置的),所以它僅需要等待塊完成了最小複製就可以成功返回了。

複本儲存

名稱節點是怎麼知道選擇哪些資料節點儲存複本呢?這是在綜合權衡了可靠性,寫入資料頻寬和讀取資料頻寬之後得到的結果。例如:如果將所有複本放在一個節點上將會造成最小的寫入頻寬(因為複製通道執行在一個節點上),而且,這不是真正的冗餘,因為如果這個節點損壞了,塊資料就會丟失。但是讀資料的頻寬會很高。另一種極端的情況,將複本放在不同的資料中心,這樣或許能最大化冗餘度,但是卻很消耗頻寬。即使在相同的資料中心中,也會有很多種不同的儲存策略。
Hadoop預設的策略是將第一個複本存放在客戶機所在的節點中(對於執行在叢集外的客戶端來說,將會隨機選擇一個節點,系統儘量不會選擇已經儲存很滿或工作太忙的節點)。第二個複本儲存時將會選擇與第一個節點不在同一個硬碟陣列的另外一個機架,隨機選擇一個節點儲存。第三個複本將會放在與第二個節點相同的機架中,但是儲存在隨機選擇的另外一個節點中。其它的複本將會儲存在叢集中隨機選擇的節點中,系統儘量避免將太量複本放到相同的機架中。
一旦複本的儲存位置確定了,就會建立一個通道,結合考慮hadoop的網路拓撲結構之後進行資料的寫入。對於複本個數為3的情況,通道也許如圖3-5所示:圖3-5:一個典型的複製通道
總之,這個策略在可靠性(塊被儲存在兩個機架中),寫入頻寬(寫資料時僅需要通過一個網路交換機),讀取效能(可以選擇兩個機架中任意一個讀取),塊的分佈性(客戶端僅在本地機架中寫入一個塊)這些因素之間做了比較好的權衡。

一致性模型

檔案系統的一致性模型描述了讀取檔案中的資料或向檔案寫入資料的可見性。HDFS為了效能犧牲了一些POSIX標準的要求,導致一些操作可能與你期望的不一樣。

在建立一個檔案後,正如所期望的那樣,在檔案系統名稱空間中看見了這個檔案。

Path p=new Path("p");
fs.create(p);
assertThat(fs.exists(p),is(true));

然而,任何寫入到這個檔案的資料不一定可見,即使輸出流被flush重新整理了。這個檔案的長度仍為0。

Path p=new Path("p");
OutputStream out=fs.create(p);
out.write("content".getBytes("UTF-8"));
out.flush();
assertThat(fs.getFileStatus(p).getLen(),is(0L));

一旦超過一個hadoop塊的資料寫入了,第一個塊將對讀取器可見。對於後續的塊也是如此。當前正在被寫入資料的塊總是對新來的讀取器不可見。

HDFS通過FSDataOutputStream的hflush()方法可以強迫快取中的資料flush進資料節點。在hflush()方法成功返回後,HDFS確保已經寫入檔案的資料都存進了寫資料管道中的資料節點中,並且對新來的讀取器可見。

Path p=new Path("p");
FSDataOutputStream out=fs.create(p);
out.write("content".getBytes("UTF-8"));
out.hflush();
assertThat(fs.getFileStatus(p).getLen(),is((long)"contents".length()));

注意hflush()不能確保資料節點已經將資料寫入磁碟中,僅僅確保資料儲存在資料節點的記憶體中(所以如果資料中心斷電了,資料將會丟失)。如果需要確保資料能寫入磁碟,請使用hsync()。

hsync()方法內部的操作與POSIX標準中fsync()標準命令相似,都會提交快取中的資料到磁碟。例如,使用標準的JAVA API將資料寫入本地檔案,在flush資料流和同步資料到磁碟後,就可以確保能看見已經寫入檔案的內容。

FileOutputStream out=new FileOutputStream(localFile);
out.write("contents".getBytes("UTF-8"));
out.flush();//flush作業系統
out.getFD().sync();//同步進磁碟
assertThat(localFile.length(),is((long)"contents".length()));

關閉HDFS的檔案流時內部也會執行hflush()方法。

Path p=new Path("p");
OutputStream out=fs.create(p);
out.write("contents".getBytes("UTF-8"));
out.close();
assertThat(fs.getFileStatus().getLen(),is((long)"contents".length()));

應用設計的重要性

一致性模型已經蘊涵了設計應用的方法。如果不呼叫hflush()和hsync(),當客戶端或系統故障時,你將會丟失大量資料。對很多應用來說,這是不可接受的。所以你應該在合適的時機呼叫hflush(),例如在寫入相當一部分資料記錄或位元組之後。雖然hflush()這個方法在設計時考慮到不對HDFS造成太大負擔,但是它確實對效能有一些影響(hsync()有更多影響)。所以在資料健壯性與傳輸率之間要有一個權衡。一個可接受的平衡點是當以不同頻率呼叫hflush(),並在考量應用效能前提下,那些依賴應用,合適的資料都能被讀取到時。

使用distcp併發複製

到目前為止我們看到的HDFS獲取資料的形式都是單執行緒的。例如,通過指定檔案萬用字元的方法,我們可以同時操作大量檔案。但要想有效地併發處理這些檔案 ,你必須自己程式設計。Hadoop提供了一個有用的程式,叫做distcp,用於併發地將資料複製到hadoop或從Hadoop複製資料。

distcp其中的一個用途是替代hadoop fs -cp命令。例如,你可以複製一個檔案到另一個檔案中通過使用

% hadoop distcp file1 file2

你也可以複製目錄:

% hadoop distcp dir1 dir2

如果目錄dir2不存在,hadoop將會建立它。並且目錄1中的內容將複製到目錄dir2中。你可以指定多個源路徑,所有這些源路徑下的檔案都將會複製到目的目錄中。

如果目錄dir2已經存在了,dir1目錄將複製到它下一級,建立目錄結構dir2/dir1。如果這不是你所想要的,你可以通過使用-overwrite選項,將資料以覆蓋的形式複製到dir2目錄下。你也可以只更新那些已經改變的檔案,使用-update選項。我們通過一個示例說明。如果我們在目錄dir1中修改了一個檔案,我們將會使用如下命令將dir1目錄的修改同步進dir2中。

% hadoop distcp -update dir1 dir2

distcp使用MapReduce作業方式實現,在叢集中併發執行多個map來進行復制工作,沒有reducer。每一個file使用一個map複製。Distcp粗略地將所有檔案等分成幾份,以便給每一個map分配近似相等的資料量。預設情況下,最多使用20個map。但是這個值可以通過在distcp中指定-m引數改變。

使用distcp一個非常常用的用途是在兩個HDFS叢集間傳輸資料。例如,下面命令在第二個叢集中建立了第一個叢集/foo目錄下檔案的備份。

% hadoop distcp -update -delete -p hdfs://namenode1/foo hdfs://namenod2/foo

-delete引數使用distcp刪除目的目錄下有而源目錄沒有的檔案或目錄。-p引數意思是檔案的狀態屬性像許可權,塊大小和複本個數都保留。你可以不帶任何引數執行distcp命令來檢視引數的詳細使用說明。

如果這兩個叢集執行不同版本的HDFS,那麼你可以使用webhdfs協議在兩個叢集間複製。

% hadoop distcp webhdfs://namenode1:50070/foo webhdfs://namenode2:50070/foo

另一種變通的方法可以使用HTTPFS代理做為distcp的源或目的地(它也使用了webhdfs協議,可以設定防火牆和控制頻寬,參看"HTTP章節")。

保持HDFS叢集平衡

當將資料複製到HDFS中時,考慮叢集的平衡性很重要。當檔案塊在叢集中均勻連續儲存時,HDFS能夠表現地最好。所以你使用distcp時也要確保不打破這個規則。例如,如果你如果指定-m 1,將會有一個map進行復制工作,先不考慮這樣做效率很低,沒有充分有效地利用叢集資源,這樣做就意味著每一個塊的第一個複本將位於執行map任務的節點上(直到磁碟滿了)。第二個和第三個複本將會在叢集其它節點上。但是這樣就達不到平衡,如果使叢集中map任務數比節點數多,就可以避免這個問題。所以,當執行distcp命令時,最好使用預設的每一個節點20個map任務。

然而,不可能一直保持叢集平衡。也許你想要限制map任務的個數,以便節點上資源能夠被其它作業使用。這種情況下,你可以使用平衡工具(可參看"平衡器章節")使叢集中的塊分佈地更加均衡。

本文是筆者翻譯自《OReilly.Hadoop.The.Definitive.Guide.4th.Edition》第一部分第3章,後續將繼續翻譯其它章節。雖盡力翻譯,但奈何水平有限,錯誤再所難免,如果有問題,請不吝指出!希望本文對你有所幫助。
本文轉自我的簡書部落格