大資料之路離港(一)——HDFS檔案系統
摘要:當資料集超出一臺物理計算機的儲存能力量時,便有必要將它分佈到多個獨立的計算機。管理著跨計算機網路儲存的檔案系統稱為分散式檔案系統。HDFS全稱為Hadoop Distributed Filesystem,是Hadoop的分散式檔案系統,也是當下開源社群最受歡迎的分散式檔案系統。本文主要介紹了HDFS的設計、概念、檔案讀寫原理等基礎知識,並給出了HDFS各類檔案操作的Java實現。
1. HDFS的設計
理念一:超大檔案
HDFS面向儲存幾百MB,幾百GB甚至幾百TB大小的大檔案,這是傳統的本地檔案系統所不及的。
理念二:流式資料訪問
流式資料訪問,就是一次寫入,多次讀取,不可以修改資料,只能刪除資料。那麼,為什麼寫入是一次性的,不可以再修改呢?在學完HDFS檔案讀寫原理後,我們再來探討其原因。
理念三:商用硬體
HDFS設計理念之一就是讓它能執行在普通的硬體之上,即便硬體出現故障,也可以通過容錯策略來保證資料的高可用。
不適合一:低延遲讀寫
由於HDFS主要是為達到高的資料吞吐量而設計的,所以其延遲較高,對於那些有低延時要求的應用程式,HBase是一個更好的選擇。
不適合二:儲存大量小檔案
因為namenode把檔案系統的元資料放置在記憶體中,所以檔案系統所能容納的檔案數目是由namenode的記憶體大小來決定。一般來說,每一個檔案、資料夾和Block需要佔據150位元組左右的空間,所以,如果你有100萬個檔案,每一個佔據一個Block,你就至少需要300MB記憶體。當前來說,數百萬的檔案還是可行的,當擴充套件到數十億時,對於當前的硬體水平來說就沒法實現了。HBase利用SequenceFile、MapFile、Har等方式歸檔小檔案,同時,儲存歸檔檔案的對映關係,從而解決了HDFS不適合儲存大量小檔案的難題。
不適合三:多使用者寫入和任意修改檔案
在HDFS的一個檔案中只有一個寫入者,而且寫操作只能在檔案末尾完成,即只能執行追加操作。目前HDFS還不支援多個使用者對同一檔案的寫操作,以及在檔案任意位置進行修改。其原因,我們會在後面進行探討。
2. HDFS的概念
2.1 塊
在《作業系統原理》中就有磁碟塊的概念,它是作業系統中最小的邏輯儲存單位,可以遮蔽底層硬體儲存結構的複雜性,為上層應用提供統一的儲存結構和程式設計規範。而HDFS引入了塊的概念,且預設塊的大小為64MB,它比磁碟塊大的多,其原因主要是為了減小塊定址操作在整個塊操作中的時間比。
在分散式檔案系統中使用抽象塊可以帶來很多好處:
好處一:可儲存超大檔案
對於一個超大檔案,可以通過將檔案分成多個塊,每個分塊可以儲存到叢集的任意一個磁碟,最後,通過整合檔案的分塊來恢復檔案。
好處二:簡化儲存子系統
因為塊的大小固定,計算一個磁碟能存多少塊就相對容易。另外,塊只是一部分儲存的資料,對於檔案的元資料,不需要與塊一同儲存,其他系統可以正交地管理元資料。
好處三:利於資料的備份
塊很適合於為提供容錯和實用性而做的複製操作。如果一個塊損壞了,系統可以在其它地方讀取另一個副本(在Hadoop搭建時可以設定塊備份數量,搭建方法可參考文章 ofollow,noindex">大資料之路起航——大資料環境搭建 ),從而保證了資料的完整性。
2.2 名稱節點和資料節點
HDFS叢集有兩種節點,以管理者-工作者的模式執行,即一個名稱節點(管理者)和多個數據節點(工作者)。
名稱節點(NameNode)
NameNode負責管理檔案目錄、檔案和Block的對應關係以及Block和資料節點(DataNode)的對應關係。
資料節點(DataNode)
DataNode負責儲存資料塊,同時,在被使用者或NameNode呼叫時提供塊定位服務。DataNode還定時向NameNode傳送它們儲存的塊的列表。
二級名稱節點(SecondryNameNode)
沒有NameNode,檔案系統將無法使用。因為,我們不知道如何通過塊來重建檔案。所以,Hadoop叢集中運行了一個二級名稱節點(SecondryNameNode),它負責定期備份NameNode中的元資料防止NameNode的元資料全部丟失。
3. HDFS資料流
3.1 檔案讀取剖析
為了瞭解客戶端及與之互動NameNode和DataNode之間的資料流是怎樣的,我們可以參考圖3-1,其中顯示了在讀取檔案時一些事件的主要順序。

圖3-1 HDFS讀檔案過程
- 客戶端通過呼叫FileSystem物件的open()來讀取希望開啟的檔案。對於HDFS來說,這個物件是分散式檔案系統的一個例項。
- DistributedFileSystem通過RPC(Remote Procedure Call)通訊協議來遠端呼叫namenode,以確定檔案的開頭部分的塊位置。對於每一塊,namenode返回具有該塊副本的datanode地址。此外,這些datanode根據他們與client的距離來排序(根據網路叢集的拓撲)。如果該client本身就是一個datanode,便從本地datanode中讀取。DistributedFileSystem 返回一個FSDataInputStream物件給client讀取資料,FSDataInputStream轉而包裝了一個DFSInputStream物件。
- 接著client對這個輸入流呼叫read()。儲存著檔案開頭部分塊的資料節點地址的DFSInputStream隨即與這些塊最近的datanode相連線。
- 通過在資料流中反覆呼叫read(),資料會從datanode返回client。
- 到達塊的末端時,DFSInputStream會關閉與datanode間的聯絡,然後為下一個塊找到最佳的datanode。client端只需要讀取一個連續的流,這些對於client來說都是透明的。
另外,在讀取的時候,如果client與datanode通訊時遇到一個錯誤,那麼它就會去嘗試對這個塊來說下一個最近的塊。它也會記住那個故障節點的datanode,以保證不會再對之後的塊進行徒勞無益的嘗試。client也會確認datanode發來的資料的校驗和。如果發現一個損壞的塊,它就會在client試圖從別的datanode中讀取一個塊的副本之前報告給namenode。這個設計的一個重點是,client直接聯絡datanode去檢索資料,並被namenode指引到塊中最好的datanode。因為資料流在此叢集中是在所有datanode分散進行的。所以這種設計能使HDFS可擴充套件到最大的併發client數量。同時,namenode只不過提供塊的位置請求(儲存在記憶體中,十分高效),不是提供資料。否則如果客戶端數量增長,namenode會快速的成為一個“瓶頸”。
3.2 檔案寫入剖析
客戶端要向HDFS寫資料,首先要跟namenode通訊以確認可以寫檔案並獲得接收檔案block的datanode,然後,客戶端按順序將檔案block逐個傳遞給相應datanode,並由接收到block的datanode負責向其他datanode複製block的副本。

圖3-2 HDFS寫檔案過程
- 客戶端通過在DistributedFileSystem中呼叫create()來建立檔案。
- DistributedFileSystem 使用RPC去呼叫namenode,在檔案系統的名稱空間創一個新的檔案,沒有塊與之相聯絡。namenode執行各種不同的檢查以確保這個檔案不會已經存在,並且在client有可以建立檔案的適當的許可。如果檢查通過,namenode就會生成一個新的檔案記錄;否則,檔案建立失敗並向client丟擲一個IOException異常。分散式檔案系統返回一個檔案系統資料輸出流,讓client開始寫入資料。就像讀取事件一樣,檔案系統資料輸出流控制一個DFSOutputStream,負責處理datanode和namenode之間的通訊。
- 在client寫入資料時,DFSOutputStream將它分成一個個的包,寫入內部佇列,稱為資料佇列。資料流處理資料佇列,資料流的責任是根據適合的datanode的列表要求namenode分配適合的新塊來儲存資料副本。這一組datanode列表形成一個管線————假設副本數是3,所以有3個節點在管線中。
- 資料流將包分流給管線中第一個的datanode,這個節點會儲存包並且傳送給管線中的第二個datanode。同樣地,第二個datanode儲存包並且傳給管線中的第三個資料節點。
- DFSOutputStream也有一個內部的資料包佇列來等待datanode收到確認,稱為確認佇列。一個包只有在被管線中所有的節點確認後才會被移除出確認佇列。如果在有資料寫入期間,datanode發生故障, 則會執行下面的操作,當然這對寫入資料的client而言是透明的。首先管線被關閉,確認佇列中的任何包都會被添加回資料佇列的前面,以確保故障節點下游的datanode不會漏掉任意一個包。為儲存在另一正常datanode的當前資料塊制定一個新的標識,並將該標識傳給namenode,以便故障節點datanode在恢復後可以刪除儲存的部分資料塊。從管線中刪除故障資料節點並且把餘下的資料塊寫入管線中的兩個正常的datanode。namenode注意到塊複本量不足時,會在另一個節點上建立一個新的複本。後續的資料塊繼續正常接收處理。只要dfs.replication.min的副本(預設是1)被寫入,寫操作就是成功的,並且這個塊會在叢集中被非同步複製,直到其滿足目標副本數(dfs.replication 預設值為3)。
- client完成資料的寫入後,就會在流中呼叫close()。
- 在向namenode節點發送完訊息之前,此方法會將餘下的所有包放入datanode管線並等待確認。namenode節點已經知道檔案由哪些塊組成(通過Data streamer 詢問塊分配),所以它只需在返回成功前等待塊進行最小量的複製。
複本的佈局需要對可靠性、寫入頻寬和讀取頻寬進行權衡。Hadoop的預設佈局策略是在執行客戶端的節點上放第1個複本(如果客戶端執行在叢集之外,就隨機選擇一個節點,不過系統會避免挑選那些儲存太滿或太忙的節點。)第2個複本放在與第1個複本不同且隨機另外選擇的機架的節點上(離架)。第3個複本與第2個複本放在相同的機架,且隨機選擇另一個節點。其他複本放在叢集中隨機的節點上,不過系統會盡量避免相同的機架放太多複本。總的來說,這一方法不僅提供了很好的穩定性(資料塊儲存在兩個機架中)並實現很好的負載均衡,包括寫入頻寬(寫入操作只需要遍歷一個交換機)、讀取效能(可以從兩個機架中選擇讀取)和叢集中塊的均勻分佈(客戶端只在本地機架上寫入一個塊)。
4. Java應用程式程式碼
首先,需要在maven中引入Hadoop的依賴,版本需要根據安裝的hadoop版本而定,作者的hadoop版本是2.7.6。
<dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>2.7.6</version> </dependency>
然後,建立HDFSConfig類,該類負責配置連線資訊,並連線HDFS。
package com.whut.config; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import java.io.IOException; public class HDFSConfig { private static FileSystem fileSystem = null; static { Configuration conf = new Configuration(); conf.set("fs.defaultFS","hdfs://localhost:9000"); conf.setBoolean("dfs.support.append", true); conf.set("dfs.client.block.write.replace-datanode-on-failure.policy", "NEVER"); conf.set("dfs.client.block.write.replace-datanode-on-failure.enable", "true"); try { fileSystem =FileSystem.get(conf); } catch (IOException e) { e.printStackTrace(); } } public static FileSystem getFileSystem() { return fileSystem; } public static void close() { try { fileSystem.close(); } catch (IOException e) { e.printStackTrace(); } } }
最後,建立HDFS檔案操作工具類,具體包括:建立資料夾、列出資料夾、檔案上傳、檔案下載、寫檔案(追加和覆蓋模式)、列出檔案、刪除檔案、讀檔案等。
package com.whut.util; import com.whut.config.HDFSConfig; import org.apache.hadoop.fs.*; import org.apache.hadoop.io.IOUtils; import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.util.ArrayList; import java.util.List; /** * Created by 楊贇 on 2018-07-18. */ public class HDFSUtil { /** * @since 2018-07-18 * @author 楊贇 * @describe 上傳本地檔案到HDFS * @param local 本地路徑 * @param hdfs DFS路徑 * @return void */ public static void upload(String local, String hdfs){ FileSystem fs = HDFSConfig.getFileSystem(); if(fs == null) return; try { fs.copyFromLocalFile(new Path(local),new Path(hdfs)); } catch (IOException e) { e.printStackTrace(); } } /** * @since 2018-07-18 * @author 楊贇 * @describe 將hdfs上檔案下載到本地 * @param hdfs HDFS路徑 * @param local 本地路徑 * @return void */ public static void download(String hdfs, String local){ FileSystem fs = HDFSConfig.getFileSystem(); if(fs == null) return; try { fs.copyToLocalFile(new Path(hdfs), new Path(local)); } catch (IOException e) { e.printStackTrace(); } } /** * @since 2018-07-18 * @author 楊贇 * @describe 在hdfs目錄下面建立資料夾 * @param name HDFS資料夾名 * @return void */ public static void createDir(String name){ FileSystem fs = HDFSConfig.getFileSystem(); if(fs == null) return; try { fs.mkdirs(new Path(name)); } catch (IOException e) { e.printStackTrace(); } } /** * @since 2018-07-18 * @author 楊贇 * @describe 建立新的檔案 * @param name 檔名 * @param content 內容 * @return void */ public static void createFile(String name, String content) { FileSystem fs = HDFSConfig.getFileSystem(); if(fs == null) return; FSDataOutputStream os = null; try { os = fs.create(new Path(name)); os.write(content.getBytes()); os.close(); } catch (IOException e) { e.printStackTrace(); } } /** * @since 2018-07-18 * @author 楊贇 * @describe 向檔案寫入新的資料 * @param name 檔名 * @param content 內容 * @return void */ public static void append(String name, String content){ FileSystem fs = HDFSConfig.getFileSystem(); if(fs == null) return; try { if (fs.exists(new Path(name))) { try { InputStream in = new ByteArrayInputStream(content.getBytes()); OutputStream out = fs.append(new Path(name)); IOUtils.copyBytes(in, out, 4096, true); out.close(); in.close(); } catch (Exception e) { e.printStackTrace(); } } else { createFile(name, content); } } catch (IOException e) { e.printStackTrace(); } } /** * @since 2018-07-18 * @author 楊贇 * @describe 刪除hdfs上的檔案或資料夾 * @param name 檔案或資料夾名 * @return void */ public static void remove(String name) { FileSystem fs = HDFSConfig.getFileSystem(); if(fs == null) return; try { fs.delete(new Path(name),true); } catch (IOException e) { e.printStackTrace(); } } /** * @since 2018-07-18 * @author 楊贇 * @describe 列出檔案 * @param path HDFS資料夾名 * @return 檔案列表 */ public static List<String> listFile(String path) { List<String> list = new ArrayList<>(); FileSystem fs = HDFSConfig.getFileSystem(); if(fs == null) return list; FileStatus[] fileStatuses = new FileStatus[0]; try { fileStatuses = fs.listStatus(new Path(path)); } catch (IOException e) { e.printStackTrace(); } for (FileStatus fileStatus : fileStatuses) { if(fileStatus.isFile()) list.add(path); } return list; } /** * @since 2018-07-18 * @author 楊贇 * @describe 列出資料夾 * @param path HDFS資料夾名 * @return 資料夾列表 */ public static List<String> listDir(String path){ List<String> list = new ArrayList<>(); FileSystem fs = HDFSConfig.getFileSystem(); if(fs == null) return list; FileStatus[] fileStatuses = new FileStatus[0]; try { fileStatuses = fs.listStatus(new Path(path)); } catch (IOException e) { e.printStackTrace(); } for (FileStatus fileStatus : fileStatuses) { if(fileStatus.isDirectory()) list.add(path); } return list; } /** * @since 2018-07-18 * @author 楊贇 * @describe 讀檔案 * @param name 檔名 * @return byte[] */ public static byte[] readFile(String name){ FileSystem fs = HDFSConfig.getFileSystem(); if(fs == null) return null; Path path = new Path(name); try { if (fs.exists(path)) { FSDataInputStream is = fs.open(path); FileStatus stat = fs.getFileStatus(path); byte[] buffer = new byte[Integer.parseInt(String.valueOf(stat.getLen()))]; is.readFully(0, buffer); is.close(); return buffer; } } catch (IOException e) { e.printStackTrace(); } return null; } /** * @since 2018-07-18 * @author 楊贇 * @describe 檔案操作測試 */ public static void main(String[] args) { HDFSUtil.download("/data/output.csv","/home/ubuntu/桌面"); } }
5. Hadoop採用流資料訪問模式的原因
現在,我們可以來總結一下Hadoop採用流資料訪問模式的原因了。
原因一:寫效能低於讀效能
因為hadoop在寫資料的時候要在namenode上進行meta元素的寫入,與此同時還需要將資料寫入到datanode上,如果是大型的資料,檔案會分成很多個區塊。這樣在meta資料和塊資料的寫入時會產生大量的消耗。相對於讀取操作來說,就會簡單很多,只需要在namenode上查詢到資料的meta資訊,接著就不關namenode的事了。從而為namenode省下大量資源來進行其他的操作。
原因二:互斥鎖影響效能
可以試想一下,當有多個使用者同時寫資料時,那麼就要給寫操作加互斥鎖,而鎖會大幅度降低效能。所以為了簡單,就不能修改了,這是非常暴力但是卻很有效的手段。
原因三:大檔案修改意義不大
我們進行大資料分析時,並不會經常性的修改原始資料,而且對於大檔案來說,想要在檔案中定位位置是非常耗費時間的,還不如直接刪除重新寫入。
6. 總結
本文主要參考了《Hadoop權威指南》,對HDFS中相對基礎的知識點進行了歸納總結。並通過Java程式設計實現了HDFS的檔案操作工具類。