1. 程式人生 > >【Hadoop】HDFS分散式檔案系統

【Hadoop】HDFS分散式檔案系統

HDFS分散式檔案系統

HDFS基本知識

前言

1. 分散式檔案系統是Hadoop兩大核心組成部分之一,提供了在廉價伺服器叢集中進行大規模分散式檔案儲存的能力。HDFS是Google的GFS的開源實現。

2. HDFS具有很好的容錯能力,並且相容廉價的硬體裝置,因此可以以較低的成本利用現有機器實現大流量大資料量的讀寫。

3.

分散式檔案系統在物理結構上由計算機叢集中多個節點構成。節點分為兩部分:

  1. 主節點”:Master Node 或 “名稱節點”:NameNode
  2. 從節點”:Slave Node 或 “資料節點”:DataNode;

4. NameNode和DataNode之間的互動是通過RPC進行的。RPC(Remote Procedure Call Protocol)遠端呼叫協議,其對底層網路協議是透明的,跨越了傳輸層和應用層,是Hadoop是最重要的底層核心協議之一

目標

HDFS在設計上要實現以下目標

  • 1)相容廉價的硬體裝置

  • 2)流資料讀寫

    :為了提高資料吞吐率,以流式方式來訪問檔案系統資料

  • 3)大資料集

  • 4)簡單的檔案模型:採用“一次寫入,多次讀取”模型,檔案一旦完成寫入,關閉後就無法再次寫入,只能被讀取。

  • 5)強大的跨平臺相容性

侷限性

  • 1)不適合低延遲資料訪問:對於低延時要求的應用,HBase更好

  • 2)無法高效儲存大量小檔案:名稱節點來管理檔案系統元資料時,元資料會被儲存在記憶體中使客戶端可以快速獲取,訪問大量小檔案時影響效能

  • 3)不支援多使用者寫入及任意修改檔案。

HDFS相關概念

塊(Block)

  • 預設一個塊大小是64MB,在HDFS中的檔案會被拆分成多個塊,每個塊作為獨立的單元進行儲存。HDFS在檔案塊大小設定上要遠遠大於普通檔案系統,以期在處理大規模檔案時能得到更好的效能。但是,通常MapReduce中的Map任務一次只處理一個塊中的資料,如果啟動的任務太少會降低作業並行處理速度,所以塊的大小設定也不易過大。

HDFS架構

名稱節點(NameNode)

  • 名稱節點主要負責檔案和目錄的建立,刪除和重新命名等,同時管理著資料節點和檔案塊的對映關係。因此客戶端只有訪問名稱節點才能找到請求的檔案塊所在的位置,進而到相應位置讀取所需檔案塊。

  • 同時,名稱節點還負責管理分散式檔案系統的名稱空間,儲存了兩個核心的資料結構,即FsImage(元資料映象檔案)和EditLog(日誌檔案)FsImage用於維護檔案系統樹以及檔案樹中所有檔案和資料夾的元資料(檔案的名稱,位置,副本數,擁有者,組,許可權,儲存塊,各塊在哪些節點上)。操作日誌檔案EditLog中記錄了所有針對檔案的建立,刪除,重新命名等操作

名稱節點啟動時,會將FsImage的內容載入到記憶體中,然後執行EditLog中的各項操作,使得記憶體中的元資料保持最新,操作完成後,會建立新的FsImage檔案和一個空的EditLog

FsImage和EditLog展示

資料節點(DataNode)

  • 資料節點負責資料的儲存和讀取。在儲存時,由名稱節點分配儲存位置,然後由客戶端把資料直接寫入相應資料節點;在讀取時,客戶端從名稱節點獲得資料節點和檔案塊的對映關係,從而找到相應位置訪問檔案塊。資料節點還要根據名稱節點的命令建立,刪除資料塊和冗餘複製

  • 每個資料節點會週期性向名稱節點發送"心跳"資訊,報告自己的狀態,沒有按時傳送心跳資訊的節點會被標記為"宕機",不會給他分配任何I/O請求。

第二名稱節點(Secondary NameNode)

  • 在設計中,HDFS採用第二名稱節點"Secondary NameNode",以解決實際操作中EditLog逐漸變大的問題

功能

  • 首先,可完成EditLog和FsImage的合併操作,減小EditLog檔案大小,縮短名稱節點重啟時間
  • 其次,作為名稱節點的"檢查點",儲存名稱節點的元資料資訊,起到"冷備份"的作用

第二名稱節點工作過程

體系結構圖示

HDFS體系結構

HDFS核心設計

Block大小設定

前文我們有提到,在這裡不進行過分的贅述。

HDFS儲存原理

  • 1.資料的冗餘儲存
    HDFS採用了多副本方式對資料進行冗餘儲存,通常一個數據塊的多個副本會被分佈到不同資料節點上。可加快資料傳輸速度,容易檢查資料錯誤,保證資料可靠性。

  • 2.資料副本存取策略(機架感知就近寫入,就近讀取
    HDFS預設的冗餘複製因子是3,每一個檔案塊會被同時儲存到3個地方。其中,兩份副本在同一機架的不同機器上,第三個副本放在不同機架的機器上面

資料副本存放策略

資料複製(流水線)

  • 當客戶端向HDFS檔案寫資料的時候,一開始是寫入本地的臨時檔案,假設該檔案的複製因子是3,那麼客戶端會從NameNode獲取一張DataNode列表來存放副本。然後客戶端再向第一個DataNode傳輸資料,第一個DadaNode會一小部分一小部分(4KB)地接受資料,將每個部分寫入本地倉庫,同時傳輸給第二個DataNode。其他節點也是這樣,邊接受邊傳輸,直到最後一個副本節點,只接受並存儲。

HDFS 資料讀寫(使用Java API)

讀檔案

1.呼叫java.net.URL(簡單粗暴法)

  • (1)呼叫java.net.URL類獲得輸入流
  • (2)通過IOUtils操作輸入流對檔案讀取
static {
        URL.setURLStreamHandlerFactory(new FsUrlStreamHandlerFactory());
        //識別URL路徑
    }
    
    InputStream in = new URL("hdfs://bigdata-hadoop.wbc.com/user/wbc/datas/").openStream();
        //do something for in
    IOUtils.closeStream(in);

2.呼叫FileSystem類

  • (1)例項化FileSystem物件

在FileSystem類中有兩種靜態方法可以獲得FileSystem類物件

1. public static FileSystem get(Configuration conf)
//預設載入core-site.xml,並返回預設檔案系統。
2. public static FileSystem get(URI uri, Configuration conf)
//根據傳入的完整的URI來確定返回的檔案系統型別。根據傳入的完整的URI來確定返回的檔案系統型別。
  • (2)通過.open()方法開啟檔案,DistributedFileSystem會建立輸入流FSDataInputStream物件(兩種)。
    對於HDFS而言,具體的輸入流就是DFSInputStream。
     1. public FSDataInputStream open (path f)
       //預設使用4KB的緩衝大小
     2. public abstract FSDataInputStream open (Path f, int bufferSize)
     //自定義快取大小
    
    

為了深入瞭解FSDataInputStream,可以看一下它的原始碼

//擷取部分原始碼
1.public class FSDataInputStream extends DataInputStream
    implements Seekable, PositionedReadable, 
      ByteBufferReadable, HasFileDescriptor, CanSetDropBehind, CanSetReadahead,
      HasEnhancedByteBufferAccess, CanUnbuffer, StreamCapabilities {
//可以看到FSDataInputStream實現了Seekable和PositionedReadable介面,因此實現了隨機查詢和讀取的方法
2.  public long getPos() throws IOException {
    return ((Seekable)in).getPos();
  }
//用於查詢當前位置相對於檔案開始處的偏移量
3.public int read(long position, byte[] buffer, int offset, int length)
    throws IOException {
    return ((PositionedReadable)in).read(position, buffer, offset, length);
  }
//從檔案給定位置開始讀取length長度的位元組數到buffer中,並返回讀取到位元組數目,(且是安全函式)
4.public void readFully(long position, byte[] buffer)
    throws IOException {
    ((PositionedReadable)in).readFully(position, buffer, 0, buffer.length);
  }
//從檔案給定位置開始讀取buffer長度的位元組數到buffer中,並返回讀取到位元組數目,(且是安全函式)  
4.public void readFully(long position, byte[] buffer, int offset, int length)
    throws IOException {
    ((PositionedReadable)in).readFully(position, buffer, offset, length);
  }
//readFully過載方法。讀取length長度的位元組陣列到buffer中。(安全)
5.public void seek(long desired) throws IOException {
    ((Seekable)in).seek(desired);
  }
//從檔案的開始搜尋到給定的偏移量,下一個read()函式將從該位置偏移開始讀取
  • (3)在DFSInputStream的建構函式中,輸入流通過ClientProtocal.getBlockLocations()遠端呼叫名稱節點,獲得檔案開始部分資料塊存放位置
    對於該資料塊,名稱節點返回儲存該資料塊的所有資料節點的地址,同時根據距離客戶端的遠近對資料節點進行排序,然後,DistributedFileSystem會利用DFSInputStream來例項化FSDataInputStream,返回給客戶端,同時返回了資料塊的資料節點地址
  • (4)獲得輸入流FSDataInputStream後,客戶端呼叫read()函式開始讀取資料。輸入流根據排序結果,選擇距離客戶端最近的資料節點建立連線並讀取資料
  • (5)資料從該資料節點讀到客戶端。當該資料塊讀取完畢後,FSDataInputStream關閉與該資料節點的連線。
  • (6)輸入流通過getBlockLocations()方法查詢下一個資料塊(如果客戶端快取中已包含該資料塊的位置資訊,就不需要呼叫該方法)
  • (7)找到該資料塊的最佳資料節點,讀取資訊。
  • (8)當客戶端讀取完畢資料時,呼叫FSDataInputStream的close()函式,關閉資料流

讀操作

3.程式碼實戰練習(讀檔案)

下面通過一個簡單的測試例子來說明如何讀取文字檔案。

package com.hadoop.hdfs;


import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;

import java.net.URI;


public class HdfsRead {
        public static void main(String[] args) throws Exception{

            String uri = args[0];
            //讀取args陣列的第一個元素
            Configuration conf = new Configuration();
            //讀取配置檔案
            FileSystem hdfs = FileSystem.get(URI.create(uri),conf);
            //例項化FileSystem物件
            FSDataInputStream in = null;

            try{
                in = hdfs.open(new Path(uri));
                //相當於 Path path = new Path(uri)-> hdfs.open(path)
                //呼叫open()方法獲得輸入流名為in的FSDataInputStream物件 
                byte buffer[] = new byte[256];

                int bytesRead = 0;

                while ((bytesRead = in.read(buffer)) > 0){
                //讀取檔案
                    System.out.write(buffer, 0, bytesRead);
                    //列印輸出
                }

            }finally {
                IOUtils.closeStream(in);
                //讀取完畢關閉流
            }
        }
}

將程式打包並提交到hdfs上執行

hadoop jar HdfsRead.jar com.hadoop.hdfs.HdfsRead /user/datas/hdfs_read.txt
//hadoop + jar + jar名 + class名 + 檔案路徑 
  • 當然,檔案目錄和URI引數也可以由使用者在程式中定義。

寫檔案

  • (1)通過FileSystem類獲得HDFS檔案系統物件

  • (2)客戶端通過FileSystem.create()建立檔案,相應地,DistributedFileSystem具體實現了FileSystem,因此,呼叫create()方法後,DistributedFileSystem會建立輸出流FSDataOutputStream。

    對於HDFS而言,具體的輸出流就是DFSOutputStream。

  • 這裡介紹FileSystem中兩個與寫檔案相關的重要方法:create()和append(),通過使用這兩個函式可以得到檔案輸出流FSDataOutputStream的物件。

  • FSDataOutputStream繼承了java.io.DataOutputStream,實現了Syncable()介面,通過write()函式就可以對HDFS上的檔案進行寫入操作

(1)create ()方法
public FSDataOutputStream create (Path f) throws IOException
//如果檔案存在則預設覆蓋
public FSDataOutputStream create (Path f, boolean overwrite) throw IOException
//可以指定是否覆蓋
(2)如果使用者要寫入一個大檔案,通常需要程式反饋寫入進度,這時可以呼叫以下介面:
public FSDataOutputStream create (Path f, Progressable progress) throw IOException
public void progress () { System.out.print ("."); }
  //該方法也會覆蓋已存在檔案,需要使用者實現progress()函式。每寫入64KB列印一個點號。
(3)append()方法
   1.public abstract FSDataOutputStream append(Path f, int bufferSize, Progressable progress) throws IOException
   //bufferSize:寫入時使用的緩衝buffer大小
   //progress:進度報告
   2.public  FSDataOutputStream append (Path f) throws IOException 
   //相當於呼叫append(f, getConf().getInt("io.file.buffer.size",4096),null)函式
   3.public FSDataOutputStream append (Path f, int bufferSize) throws IOException
   //相當於呼叫append(f,bufferSize,null)函式
  • (3)DistributedFileSystem通過RPC遠端呼叫名稱節點,在檔案系統的名稱空間中建立新檔案。遠端方法呼叫結束後,DistributedFileSystem會利用DFSOutputStream來例項化FSDataOutputStream,返回給客戶端,客戶端使用這個輸出流寫入資料。

  • (4)獲得輸出流FSDataOutputStream以後,客戶端呼叫輸出流的write()方法向HDFS中對應的檔案寫入資料

  • (5)客戶端向輸出流FSDataOutputStream中寫入資料會首先被分成一個個分包放入DFSOutputStream物件的內部佇列。輸出流FSDataOutputStream會向名稱節點申請儲存檔案和副本資料塊的若干個資料節點,這些資料節點形成一個資料流管道,佇列中的分包被打包成資料包,進行流水線複製傳輸

  • (6)接收到資料的資料節點向傳送者傳送“確認包”(ACK Packet)。確認包隨資料流管道逆流而上,最終發往客戶端,當客戶端收到應答時,將對應分包從內部佇列移除。不斷執行(3)~(5)步,知道資料全部寫完。

  • (7)客戶端呼叫close()方法關閉輸出流。當DFSOutputStream物件內部佇列中的分包都收到應答以後,可使用ClientProtocol.complete()方法通知名稱節點關閉檔案,完成一次寫檔案過程。
    寫操作

程式碼實戰練習(寫檔案)

下面通過一個簡單的測試例子來說明如何寫入文字檔案。

package com.hadoop.hdfs;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.util.Progressable;

import java.io.IOException;


public class HdfsWrite {
    public static void main(String[] args) throws IOException {
        Configuration conf = new Configuration();
        //載入配置檔案
        FileSystem local = FileSystem.getLocal(conf);
        //獲取本地檔案系統物件
        FileSystem hdfs = FileSystem.get(conf);
        //獲取hdfs檔案系統物件
        Path localdir = new Path(args[0]);
        //獲取本地目錄
        Path hdfsFile = new Path(args[1]);
        //獲取hdfs資料夾目錄
        try{
            FileStatus[] inputFiles = local.listStatus(localdir);
            //得到本地檔案系統目錄下所有檔案資訊
            FSDataOutputStream out = hdfs.create(hdfsFile, new Progressable() {
                public void progress() {
                    System.out.print(".");
                }
                //呼叫反饋進度函式
            });
            //呼叫create()函式,獲得輸出流
            for(int i = 0;i < inputFiles.length; i++){
                System.out.println(inputFiles[i].getPath().getName());
                //輸出檔名
                FSDataInputStream in = local.open(inputFiles[i].getPath());
                //得到FSDataInputStream物件
                byte buffer[] = new byte[512];
                int bytesRead = 0;
                while ((bytesRead = in.read(buffer)) > 0){
                    out.write(buffer,0,bytesRead);
                    //寫入檔案
                }
                in.close();
            }
            out.close();
        }catch (IOException e){
            e.printStackTrace();
        }
    }
}

提交執行方法與讀檔案相同