【Hadoop】HDFS分散式檔案系統
HDFS分散式檔案系統
HDFS基本知識
前言
1. 分散式檔案系統是Hadoop兩大核心組成部分之一,提供了在廉價伺服器叢集中進行大規模分散式檔案儲存的能力。HDFS是Google的GFS的開源實現。
2. HDFS具有很好的容錯能力,並且相容廉價的硬體裝置,因此可以以較低的成本利用現有機器實現大流量和大資料量的讀寫。
3.
- “主節點”:Master Node 或 “名稱節點”:NameNode;
- ”從節點”:Slave Node 或 “資料節點”:DataNode;
4. NameNode和DataNode之間的互動是通過RPC進行的。RPC(Remote Procedure Call Protocol)是遠端呼叫協議,其對底層網路協議是透明的,跨越了傳輸層和應用層,是Hadoop是最重要的底層核心協議之一。
目標
HDFS在設計上要實現以下目標:
-
1)相容廉價的硬體裝置
-
2)流資料讀寫
-
3)大資料集
-
4)簡單的檔案模型:採用“一次寫入,多次讀取”模型,檔案一旦完成寫入,關閉後就無法再次寫入,只能被讀取。
-
5)強大的跨平臺相容性
侷限性
-
1)不適合低延遲資料訪問:對於低延時要求的應用,HBase更好
-
2)無法高效儲存大量小檔案:名稱節點來管理檔案系統元資料時,元資料會被儲存在記憶體中使客戶端可以快速獲取,訪問大量小檔案時影響效能
-
3)不支援多使用者寫入及任意修改檔案。
HDFS相關概念
塊(Block)
- 預設一個塊大小是64MB,在HDFS中的檔案會被拆分成多個塊,每個塊作為獨立的單元進行儲存。HDFS在檔案塊大小設定上要遠遠大於普通檔案系統,以期在處理大規模檔案時能得到更好的效能。但是,通常MapReduce中的Map任務一次只處理一個塊中的資料,如果啟動的任務太少會降低作業並行處理速度,所以塊的大小設定也不易過大。
HDFS架構
名稱節點(NameNode)
-
名稱節點主要負責檔案和目錄的建立,刪除和重新命名等,同時管理著資料節點和檔案塊的對映關係。因此客戶端只有訪問名稱節點才能找到請求的檔案塊所在的位置,進而到相應位置讀取所需檔案塊。
-
同時,名稱節點還負責管理分散式檔案系統的名稱空間,儲存了兩個核心的資料結構,即FsImage(元資料映象檔案)和EditLog(日誌檔案)。FsImage用於維護檔案系統樹以及檔案樹中所有檔案和資料夾的元資料(檔案的名稱,位置,副本數,擁有者,組,許可權,儲存塊,各塊在哪些節點上)。操作日誌檔案EditLog中記錄了所有針對檔案的建立,刪除,重新命名等操作。
名稱節點啟動時,會將FsImage的內容載入到記憶體中,然後執行EditLog中的各項操作,使得記憶體中的元資料保持最新,操作完成後,會建立新的FsImage檔案和一個空的EditLog。
資料節點(DataNode)
-
資料節點負責資料的儲存和讀取。在儲存時,由名稱節點分配儲存位置,然後由客戶端把資料直接寫入相應資料節點;在讀取時,客戶端從名稱節點獲得資料節點和檔案塊的對映關係,從而找到相應位置訪問檔案塊。資料節點還要根據名稱節點的命令建立,刪除資料塊和冗餘複製。
-
每個資料節點會週期性向名稱節點發送"心跳"資訊,報告自己的狀態,沒有按時傳送心跳資訊的節點會被標記為"宕機",不會給他分配任何I/O請求。
第二名稱節點(Secondary NameNode)
- 在設計中,HDFS採用第二名稱節點"Secondary NameNode",以解決實際操作中EditLog逐漸變大的問題。
功能:
- 首先,可完成EditLog和FsImage的合併操作,減小EditLog檔案大小,縮短名稱節點重啟時間;
- 其次,作為名稱節點的"檢查點",儲存名稱節點的元資料資訊,起到"冷備份"的作用。
體系結構圖示
HDFS核心設計
Block大小設定
前文我們有提到,在這裡不進行過分的贅述。
HDFS儲存原理
-
1.資料的冗餘儲存
HDFS採用了多副本方式對資料進行冗餘儲存,通常一個數據塊的多個副本會被分佈到不同資料節點上。可加快資料傳輸速度,容易檢查資料錯誤,保證資料可靠性。 -
2.資料副本存取策略(機架感知:就近寫入,就近讀取)
HDFS預設的冗餘複製因子是3,每一個檔案塊會被同時儲存到3個地方。其中,兩份副本在同一機架的不同機器上,第三個副本放在不同機架的機器上面。
資料複製(流水線)
- 當客戶端向HDFS檔案寫資料的時候,一開始是寫入本地的臨時檔案,假設該檔案的複製因子是3,那麼客戶端會從NameNode獲取一張DataNode列表來存放副本。然後客戶端再向第一個DataNode傳輸資料,第一個DadaNode會一小部分一小部分(4KB)地接受資料,將每個部分寫入本地倉庫,同時傳輸給第二個DataNode。其他節點也是這樣,邊接受邊傳輸,直到最後一個副本節點,只接受並存儲。
HDFS 資料讀寫(使用Java API)
讀檔案
1.呼叫java.net.URL(簡單粗暴法)
- (1)呼叫java.net.URL類獲得輸入流
- (2)通過IOUtils操作輸入流對檔案讀取
static {
URL.setURLStreamHandlerFactory(new FsUrlStreamHandlerFactory());
//識別URL路徑
}
InputStream in = new URL("hdfs://bigdata-hadoop.wbc.com/user/wbc/datas/").openStream();
//do something for in
IOUtils.closeStream(in);
2.呼叫FileSystem類
- (1)例項化FileSystem物件
在FileSystem類中有兩種靜態方法可以獲得FileSystem類物件
1. public static FileSystem get(Configuration conf)
//預設載入core-site.xml,並返回預設檔案系統。
2. public static FileSystem get(URI uri, Configuration conf)
//根據傳入的完整的URI來確定返回的檔案系統型別。根據傳入的完整的URI來確定返回的檔案系統型別。
- (2)通過.open()方法開啟檔案,DistributedFileSystem會建立輸入流FSDataInputStream物件(兩種)。
對於HDFS而言,具體的輸入流就是DFSInputStream。1. public FSDataInputStream open (path f) //預設使用4KB的緩衝大小 2. public abstract FSDataInputStream open (Path f, int bufferSize) //自定義快取大小
為了深入瞭解FSDataInputStream,可以看一下它的原始碼
//擷取部分原始碼
1.public class FSDataInputStream extends DataInputStream
implements Seekable, PositionedReadable,
ByteBufferReadable, HasFileDescriptor, CanSetDropBehind, CanSetReadahead,
HasEnhancedByteBufferAccess, CanUnbuffer, StreamCapabilities {
//可以看到FSDataInputStream實現了Seekable和PositionedReadable介面,因此實現了隨機查詢和讀取的方法
2. public long getPos() throws IOException {
return ((Seekable)in).getPos();
}
//用於查詢當前位置相對於檔案開始處的偏移量
3.public int read(long position, byte[] buffer, int offset, int length)
throws IOException {
return ((PositionedReadable)in).read(position, buffer, offset, length);
}
//從檔案給定位置開始讀取length長度的位元組數到buffer中,並返回讀取到位元組數目,(且是安全函式)
4.public void readFully(long position, byte[] buffer)
throws IOException {
((PositionedReadable)in).readFully(position, buffer, 0, buffer.length);
}
//從檔案給定位置開始讀取buffer長度的位元組數到buffer中,並返回讀取到位元組數目,(且是安全函式)
4.public void readFully(long position, byte[] buffer, int offset, int length)
throws IOException {
((PositionedReadable)in).readFully(position, buffer, offset, length);
}
//readFully過載方法。讀取length長度的位元組陣列到buffer中。(安全)
5.public void seek(long desired) throws IOException {
((Seekable)in).seek(desired);
}
//從檔案的開始搜尋到給定的偏移量,下一個read()函式將從該位置偏移開始讀取
- (3)在DFSInputStream的建構函式中,輸入流通過ClientProtocal.getBlockLocations()遠端呼叫名稱節點,獲得檔案開始部分資料塊存放位置。
對於該資料塊,名稱節點返回儲存該資料塊的所有資料節點的地址,同時根據距離客戶端的遠近對資料節點進行排序,然後,DistributedFileSystem會利用DFSInputStream來例項化FSDataInputStream,返回給客戶端,同時返回了資料塊的資料節點地址。 - (4)獲得輸入流FSDataInputStream後,客戶端呼叫read()函式開始讀取資料。輸入流根據排序結果,選擇距離客戶端最近的資料節點建立連線並讀取資料。
- (5)資料從該資料節點讀到客戶端。當該資料塊讀取完畢後,FSDataInputStream關閉與該資料節點的連線。
- (6)輸入流通過getBlockLocations()方法查詢下一個資料塊(如果客戶端快取中已包含該資料塊的位置資訊,就不需要呼叫該方法)
- (7)找到該資料塊的最佳資料節點,讀取資訊。
- (8)當客戶端讀取完畢資料時,呼叫FSDataInputStream的close()函式,關閉資料流。
3.程式碼實戰練習(讀檔案)
下面通過一個簡單的測試例子來說明如何讀取文字檔案。
package com.hadoop.hdfs;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import java.net.URI;
public class HdfsRead {
public static void main(String[] args) throws Exception{
String uri = args[0];
//讀取args陣列的第一個元素
Configuration conf = new Configuration();
//讀取配置檔案
FileSystem hdfs = FileSystem.get(URI.create(uri),conf);
//例項化FileSystem物件
FSDataInputStream in = null;
try{
in = hdfs.open(new Path(uri));
//相當於 Path path = new Path(uri)-> hdfs.open(path)
//呼叫open()方法獲得輸入流名為in的FSDataInputStream物件
byte buffer[] = new byte[256];
int bytesRead = 0;
while ((bytesRead = in.read(buffer)) > 0){
//讀取檔案
System.out.write(buffer, 0, bytesRead);
//列印輸出
}
}finally {
IOUtils.closeStream(in);
//讀取完畢關閉流
}
}
}
將程式打包並提交到hdfs上執行
hadoop jar HdfsRead.jar com.hadoop.hdfs.HdfsRead /user/datas/hdfs_read.txt
//hadoop + jar + jar名 + class名 + 檔案路徑
- 當然,檔案目錄和URI引數也可以由使用者在程式中定義。
寫檔案
-
(1)通過FileSystem類獲得HDFS檔案系統物件。
-
(2)客戶端通過FileSystem.create()建立檔案,相應地,DistributedFileSystem具體實現了FileSystem,因此,呼叫create()方法後,DistributedFileSystem會建立輸出流FSDataOutputStream。
對於HDFS而言,具體的輸出流就是DFSOutputStream。
-
這裡介紹FileSystem中兩個與寫檔案相關的重要方法:create()和append(),通過使用這兩個函式可以得到檔案輸出流FSDataOutputStream的物件。
-
FSDataOutputStream繼承了java.io.DataOutputStream,實現了Syncable()介面,通過write()函式就可以對HDFS上的檔案進行寫入操作
(1)create ()方法
public FSDataOutputStream create (Path f) throws IOException
//如果檔案存在則預設覆蓋
public FSDataOutputStream create (Path f, boolean overwrite) throw IOException
//可以指定是否覆蓋
(2)如果使用者要寫入一個大檔案,通常需要程式反饋寫入進度,這時可以呼叫以下介面:
public FSDataOutputStream create (Path f, Progressable progress) throw IOException
public void progress () { System.out.print ("."); }
//該方法也會覆蓋已存在檔案,需要使用者實現progress()函式。每寫入64KB列印一個點號。
(3)append()方法
1.public abstract FSDataOutputStream append(Path f, int bufferSize, Progressable progress) throws IOException
//bufferSize:寫入時使用的緩衝buffer大小
//progress:進度報告
2.public FSDataOutputStream append (Path f) throws IOException
//相當於呼叫append(f, getConf().getInt("io.file.buffer.size",4096),null)函式
3.public FSDataOutputStream append (Path f, int bufferSize) throws IOException
//相當於呼叫append(f,bufferSize,null)函式
-
(3)DistributedFileSystem通過RPC遠端呼叫名稱節點,在檔案系統的名稱空間中建立新檔案。遠端方法呼叫結束後,DistributedFileSystem會利用DFSOutputStream來例項化FSDataOutputStream,返回給客戶端,客戶端使用這個輸出流寫入資料。
-
(4)獲得輸出流FSDataOutputStream以後,客戶端呼叫輸出流的write()方法向HDFS中對應的檔案寫入資料。
-
(5)客戶端向輸出流FSDataOutputStream中寫入資料會首先被分成一個個分包,放入DFSOutputStream物件的內部佇列。輸出流FSDataOutputStream會向名稱節點申請儲存檔案和副本資料塊的若干個資料節點,這些資料節點形成一個資料流管道,佇列中的分包被打包成資料包,進行流水線複製傳輸。
-
(6)接收到資料的資料節點向傳送者傳送“確認包”(ACK Packet)。確認包隨資料流管道逆流而上,最終發往客戶端,當客戶端收到應答時,將對應分包從內部佇列移除。不斷執行(3)~(5)步,知道資料全部寫完。
-
(7)客戶端呼叫close()方法關閉輸出流。當DFSOutputStream物件內部佇列中的分包都收到應答以後,可使用ClientProtocol.complete()方法通知名稱節點關閉檔案,完成一次寫檔案過程。
程式碼實戰練習(寫檔案)
下面通過一個簡單的測試例子來說明如何寫入文字檔案。
package com.hadoop.hdfs;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.util.Progressable;
import java.io.IOException;
public class HdfsWrite {
public static void main(String[] args) throws IOException {
Configuration conf = new Configuration();
//載入配置檔案
FileSystem local = FileSystem.getLocal(conf);
//獲取本地檔案系統物件
FileSystem hdfs = FileSystem.get(conf);
//獲取hdfs檔案系統物件
Path localdir = new Path(args[0]);
//獲取本地目錄
Path hdfsFile = new Path(args[1]);
//獲取hdfs資料夾目錄
try{
FileStatus[] inputFiles = local.listStatus(localdir);
//得到本地檔案系統目錄下所有檔案資訊
FSDataOutputStream out = hdfs.create(hdfsFile, new Progressable() {
public void progress() {
System.out.print(".");
}
//呼叫反饋進度函式
});
//呼叫create()函式,獲得輸出流
for(int i = 0;i < inputFiles.length; i++){
System.out.println(inputFiles[i].getPath().getName());
//輸出檔名
FSDataInputStream in = local.open(inputFiles[i].getPath());
//得到FSDataInputStream物件
byte buffer[] = new byte[512];
int bytesRead = 0;
while ((bytesRead = in.read(buffer)) > 0){
out.write(buffer,0,bytesRead);
//寫入檔案
}
in.close();
}
out.close();
}catch (IOException e){
e.printStackTrace();
}
}
}
提交執行方法與讀檔案相同