1. 程式人生 > >HDFS原始碼分析(三)-----資料塊關係基本結構

HDFS原始碼分析(三)-----資料塊關係基本結構

前言

正如我在前面的文章中曾經寫過,在HDFS中存在著兩大關係模組,一個是檔案與block資料塊的關係,簡稱為第一關係,但是相比於第一個關係清晰的結構關係,HDFS的第二關係就沒有這麼簡單了,第二關係自然是與資料節點相關,就是資料塊與資料節點的對映關係,裡面的有些過程的確是錯綜複雜的,這個也很好理解嘛,本身block塊就很多,而且還有副本設定,然後一旦叢集規模擴大,資料節點的數量也將會變大,如何處理此時的資料塊與對應資料節點的對映就必然不是簡單的事情了,所以這裡有一點是比較特別的,隨著系統的執行,資料節點與他所包含的塊列表時動態建立起來的,相應的名位元組點也需要通過心跳來獲取資訊,並不斷更新元資料資訊。同樣在第二關係中,有諸多巧妙的設計,望讀者在下面的閱讀中細細體會

相關涉及類

資料塊關係中涉及的基本類其實不是很多,見如下:

1.BlocksMap--資料塊對映圖類,儲存了資料塊到資料節點列表以及所屬INode檔案節點的對映關係,是非常重要的一個類.

2.BlocksMap.BlockInfo--BlocksMap的內部類,儲存了block的資料資訊類.

3.DatanodeDescriptor--資料節點類,是對資料節點的抽象,裡面包含了許多與資料節點有關的變數資訊.

4.DatanodeID和DatanodeInfo--資料節點基本類,是DatanodeDescriptor的父類,在這裡定義了一個節點最基本的資訊.

5.GSet以及LightWeightGSet--連結串列集合類,儲存了blockInfo資訊的集合類,採用了雜湊儲存的方式儲存block資料塊資訊.

6.PendingReplicationBlocks和UnderReplicatedBlocks--副本相關類,其實這並不算是基本資料塊關係結構中的部分,這是副本關係中涉及的類,因為與資料塊有聯絡,放在這兒舉個例子

7.FSNamesystem--命名系統類,他是一個非常大的類,裡面的程式碼量多大5千多行,涉及了多個模組間的互動處理,其中副本相關的部分也有在這裡操作的.

OK,下面開始主要內容的講述.

BlocksMap

對映關係圖類,首先這一定是類似於map這種資料型別的,所以肯定有類似於put(key,value),get(key)這樣的操作,的確是這樣,但是他並沒有直接沿用hashMap這樣的現成的類,而是自己實現了一個輕量級的集合類,什麼叫做輕量級呢,與我們平常見的又有區別呢.先看BlocksMap的主要變數定義

/**
 * This class maintains the map from a block to its metadata.
 * block's metadata currently includes INode it belongs to and
 * the datanodes that store the block.
 */
class BlocksMap {
/**
   * Internal class for block metadata.
   * blockMap內部類儲存block資訊元資料
   */
  static class BlockInfo extends Block implements LightWeightGSet.LinkedElement {
...
}
/** Constant {@link LightWeightGSet} capacity. */
  private final int capacity;
  
  private GSet<Block, BlockInfo> blocks;

  BlocksMap(int initialCapacity, float loadFactor) {
    this.capacity = computeCapacity();
    //用輕量級的GSet實現block與blockInfo的對映儲存
    this.blocks = new LightWeightGSet<Block, BlockInfo>(capacity);
  }
....
在建構函式的上面就是這個圖類,新定義的集合型別,GSet,看裡面的變數關係,就很明瞭,從Block到BlockInfo的對映,其中注意到後者是前者的子類,所以在這裡,HDFS構造了一個父類到子類的對映.於是我們要跑到GSet類下看看具體的定義宣告
/**
 * A {@link GSet} is set,
 * which supports the {@link #get(Object)} operation.
 * The {@link #get(Object)} operation uses a key to lookup an element.
 * 
 * Null element is not supported.
 * 
 * @param <K> The type of the keys.
 * @param <E> The type of the elements, which must be a subclass of the keys.
 * 定義了特殊集合類GSet,能夠通過key來尋找目標物件,也可以移除物件,物件類同時也是key的子類
 */
public interface GSet<K, E extends K> extends Iterable<E> {
  /**
   * @return The size of this set.
   */
  int size();
  boolean contains(K key);
  E get(K key);
  E put(E element);
  E remove(K key);
}
答案的確是這樣,定義的方法也是類似於map類的方法.OK,GSet的定義明白,但是這還不夠,因為在建構函式,HDFS用的是他的子類LightWeightGSet,在此類的介紹文字中說,這是輕量級的集合類,可以低開銷的實現元素的查詢
/**
 * A low memory footprint {@link GSet} implementation,
 * which uses an array for storing the elements
 * and linked lists for collision resolution.
 *
 * No rehash will be performed.
 * Therefore, the internal array will never be resized.
 *
 * This class does not support null element.
 *
 * This class is not thread safe.
 *
 * @param <K> Key type for looking up the elements
 * @param <E> Element type, which must be
 *       (1) a subclass of K, and
 *       (2) implementing {@link LinkedElement} interface.
 * 輕量級的集合類,低記憶體的實現用於尋找物件類,不允許儲存null型別,儲存的型別必須為key的子類
 * 在裡面用了雜湊演算法做了對映,沒有進行雜湊重對映,因此大小不會調整
 */
public class LightWeightGSet<K, E extends K> implements GSet<K, E> {
看起來非常神祕哦,接下來看變數定義
public class LightWeightGSet<K, E extends K> implements GSet<K, E> {
  /**
   * Elements of {@link LightWeightGSet}.
   */
  public static interface LinkedElement {
    /** Set the next element. */
    public void setNext(LinkedElement next);

    /** Get the next element. */
    public LinkedElement getNext();
  }

  public static final Log LOG = LogFactory.getLog(GSet.class);
  //最大陣列長度2的30次方
  static final int MAX_ARRAY_LENGTH = 1 << 30; //prevent int overflow problem
  static final int MIN_ARRAY_LENGTH = 1;

  /**
   * An internal array of entries, which are the rows of the hash table.
   * The size must be a power of two.
   * 儲存物件陣列,也就是儲存雜湊表,必須為2的冪次方
   */
  private final LinkedElement[] entries;
  /** A mask for computing the array index from the hash value of an element. */
  //雜湊計算掩碼
  private final int hash_mask;
  /** The size of the set (not the entry array). */
  private int size = 0;
  /** Modification version for fail-fast.
   * @see ConcurrentModificationException
   */
  private volatile int modification = 0;
在最上方的LinkedElement介面中有next相關的方法,可以預見到,這是一個連結串列結構的關係,entries陣列就是儲存這個連結串列的,連結串列的最大長度非常大,達到2的30次方,不過這裡還有一個比較奇怪的變數hash_mask雜湊掩碼,這是幹嘛的呢,猜測是在做取雜湊索引值的時候用的.然後看下此集合的建構函式.
/**
   * @param recommended_length Recommended size of the internal array.
   */
  public LightWeightGSet(final int recommended_length) {
  	//傳入初始長度,但是需要經過適配判斷
    final int actual = actualArrayLength(recommended_length);
    LOG.info("recommended=" + recommended_length + ", actual=" + actual);

    entries = new LinkedElement[actual];
    //把陣列長度減1作為掩碼
    hash_mask = entries.length - 1;
  }
在建構函式中傳入的陣列長度,不過需要經過方法驗證,要在給定的合理範圍之內,在這裡,掩碼值被設定成了長度值小1,這個得看了後面才能知道.在這個類Map的方法中,一定要看的方法有2個,put和get方法
 @Override
  public E get(final K key) {
    //validate key
    if (key == null) {
      throw new NullPointerException("key == null");
    }

    //find element
    final int index = getIndex(key);
    for(LinkedElement e = entries[index]; e != null; e = e.getNext()) {
      if (e.equals(key)) {
        return convert(e);
      }
    }
    //element not found
    return null;
  }
做法是先找到第一個索引值,一般元素如果沒有被remove,第一個找到的元素就是目標值,這裡重點看下雜湊索引值的計算,這時候如果是我做的話,一般都是雜湊取餘數,就是hashCode()%N,在LightWeightGSet中給出略微不同的實現
//構建索引的方法,取雜湊值再與掩碼進行計算,這裡採用的並不是雜湊取餘的演算法
  //與掩碼計算的目的就是擷取掉高位多餘的數字部分使索引值落在陣列儲存長度範圍之內
  private int getIndex(final K key) {
    return key.hashCode() & hash_mask;
  }
簡單理解為就是把雜湊值排成全部由0和1組成的二進位制數,然後直接陣列長度段內低位的數字,因為高位的都被掩碼計算成0舍掉了,用於位運算的好處是避免了十進位制數字與二進位制數字直接的來回轉換.這個方法與雜湊取餘數的方法具體有什麼樣的效果上的差別這個我到時沒有仔細比較過,不過在日後可以作為一種新的雜湊連結串列演算法的選擇.put方法就是標準的連結串列操作方法
/**
   * Remove the element corresponding to the key,
   * given key.hashCode() == index.
   *
   * @return If such element exists, return it.
   *         Otherwise, return null.
   */
  private E remove(final int index, final K key) {
    if (entries[index] == null) {
      return null;
    } else if (entries[index].equals(key)) {
      //remove the head of the linked list
      modification++;
      size--;
      final LinkedElement e = entries[index];
      entries[index] = e.getNext();
      e.setNext(null);
      return convert(e);
    } else {
      //head != null and key is not equal to head
      //search the element
      LinkedElement prev = entries[index];
      for(LinkedElement curr = prev.getNext(); curr != null; ) {
      	 //下面是標準的連結串列操作
        if (curr.equals(key)) {
          //found the element, remove it
          modification++;
          size--;
          prev.setNext(curr.getNext());
          curr.setNext(null);
          return convert(curr);
        } else {
          prev = curr;
          curr = curr.getNext();
        }
      }
      //element not found
      return null;
    }
  }

在這裡不做過多介紹.再回過頭來看BlockInfo內部,有一個Object物件陣列

static class BlockInfo extends Block implements LightWeightGSet.LinkedElement {
    //資料塊資訊所屬的INode檔案節點
    private INodeFile          inode;

    /** For implementing {@link LightWeightGSet.LinkedElement} interface */
    private LightWeightGSet.LinkedElement nextLinkedElement;

    /**
     * This array contains triplets of references.
     * For each i-th data-node the block belongs to
     * triplets[3*i] is the reference to the DatanodeDescriptor
     * and triplets[3*i+1] and triplets[3*i+2] are references 
     * to the previous and the next blocks, respectively, in the 
     * list of blocks belonging to this data-node.
     * triplets物件陣列儲存同一資料節點上的連續的block塊。triplets[3*i]儲存的是當前的資料節點
     * triplets[3*i+1]和triplets[3*i+2]儲存的則是一前一後的block資訊
     */
    private Object[] triplets;
在這個物件陣列中儲存入3個物件,第一個是資料塊所在資料節點物件,第二個物件儲存的是資料塊的前一資料塊,第三物件是儲存資料塊的後一資料塊,形成了一個雙向連結串列的結構,也就是說,順著這個資料塊遍歷,你可以遍歷完某個資料節點的所有的資料塊,triplets的陣列長度受block資料塊的副本數限制,因為不同的副本一般位於不同的資料節點。
BlockInfo(Block blk, int replication) {
      super(blk);
      //因為還需要同時儲存前後的資料塊資訊,所以這裡會乘以3
      this.triplets = new Object[3*replication];
      this.inode = null;
    }

因為儲存物件的不同,所以這裡用了Object作為物件陣列,而不是用具體的類名。裡面的許多操作也是完全類似於連結串列的操作

/**
     * Remove data-node from the block.
     * 移除資料節點操作,把目標資料塊移除,移除位置用最後一個塊的資訊替代,然後移除末尾塊
     */
    boolean removeNode(DatanodeDescriptor node) {
      int dnIndex = findDatanode(node);
      if(dnIndex < 0) // the node is not found
        return false;
      assert getPrevious(dnIndex) == null && getNext(dnIndex) == null : 
        "Block is still in the list and must be removed first.";
      // find the last not null node
      int lastNode = numNodes()-1; 
      // replace current node triplet by the lastNode one 
      //用末尾塊替代當前塊
      setDatanode(dnIndex, getDatanode(lastNode));
      setNext(dnIndex, getNext(lastNode)); 
      setPrevious(dnIndex, getPrevious(lastNode)); 
      //設定末尾塊為空
      // set the last triplet to null
      setDatanode(lastNode, null);
      setNext(lastNode, null); 
      setPrevious(lastNode, null); 
      return true;
    }

DatanodeDescriptor

下面來看另外一個關鍵類DatanodeDescriptor,可以說是對資料節點的抽象,首先在此類中定義了資料塊到資料節點的對映類

/**************************************************
 * DatanodeDescriptor tracks stats on a given DataNode,
 * such as available storage capacity, last update time, etc.,
 * and maintains a set of blocks stored on the datanode. 
 *
 * This data structure is a data structure that is internal
 * to the namenode. It is *not* sent over-the-wire to the Client
 * or the Datnodes. Neither is it stored persistently in the
 * fsImage.
 DatanodeDescriptor資料節點描述類跟蹤描述了一個數據節點的狀態資訊
 **************************************************/
public class DatanodeDescriptor extends DatanodeInfo {
  
  // Stores status of decommissioning.
  // If node is not decommissioning, do not use this object for anything.
  //下面這個物件只與decomission撤銷工作相關
  DecommissioningStatus decommissioningStatus = new DecommissioningStatus();

  /** Block and targets pair */
  //資料塊以及目標資料節點列表對映類
  public static class BlockTargetPair {
  	//目標資料塊
    public final Block block;
    //該資料塊的目標資料節點
    public final DatanodeDescriptor[] targets;    

    BlockTargetPair(Block block, DatanodeDescriptor[] targets) {
      this.block = block;
      this.targets = targets;
    }
  }
然後定義了這樣的佇列類
/** A BlockTargetPair queue. */
  //block塊目標資料節點類佇列
  private static class BlockQueue {
  	//此類維護了BlockTargetPair列表物件
    private final Queue<BlockTargetPair> blockq = new LinkedList<BlockTargetPair>();
然後下面又聲明瞭一系列的與資料塊相關的列表
  /** A queue of blocks to be replicated by this datanode */
  //此資料節點上待複製的block塊列表
  private BlockQueue replicateBlocks = new BlockQueue();
  /** A queue of blocks to be recovered by this datanode */
  //此資料節點上待租約恢復的塊列表
  private BlockQueue recoverBlocks = new BlockQueue();
  /** A set of blocks to be invalidated by this datanode */
  //此資料節點上無效待刪除的塊列表
  private Set<Block> invalidateBlocks = new TreeSet<Block>();

  /* Variables for maintaning number of blocks scheduled to be written to
   * this datanode. This count is approximate and might be slightly higger
   * in case of errors (e.g. datanode does not report if an error occurs 
   * while writing the block).
   */
  //寫入這個資料節點的塊的數目統計變數
  private int currApproxBlocksScheduled = 0;
  private int prevApproxBlocksScheduled = 0;
  private long lastBlocksScheduledRollTime = 0;
  private static final int BLOCKS_SCHEDULED_ROLL_INTERVAL = 600*1000; //10min
下面看一個典型的新增資料塊的方法,在這裡新增的步驟其實應該拆分成2個步驟來看,第一個步驟是將當前資料節點新增到此block資料塊管理的資料節點列表中,第二是反向關係,將資料塊加入到資料節點的塊列表關係中,程式碼如下
 /**
   * Add data-node to the block.
   * Add block to the head of the list of blocks belonging to the data-node.
   * 將資料節點加入到block塊對應的資料節點列表中
   */
  boolean addBlock(BlockInfo b) {
  	//新增新的資料節點
    if(!b.addNode(this))
      return false;
    // add to the head of the data-node list
    //將此資料塊新增到資料節點管理的資料塊列表中,並於當前資料塊時相鄰位置
    blockList = b.listInsert(blockList, this);
    return true;
  }

資料塊操作例子-副本

在這裡舉一個與資料塊相關的樣本操作過程,副本請求過程。副本相關的很多操作的協調處理都是在FSNamesystem這個大類中的,因此在這個變數中就定義了很多的相關變數

public class FSNamesystem implements FSConstants, FSNamesystemMBean,
    NameNodeMXBean, MetricsSource {
  ...
  
  //等待複製的資料塊副本數
  volatile long pendingReplicationBlocksCount = 0L;
  //已損壞的副本數數目
  volatile long corruptReplicaBlocksCount = 0L;
  //正在被複制的塊副本數
  volatile long underReplicatedBlocksCount = 0L;
  //正在被排程的副本數
  volatile long scheduledReplicationBlocksCount = 0L;
  //多餘資料塊副本數
  volatile long excessBlocksCount = 0L;
  //等待被刪除的副本數
  volatile long pendingDeletionBlocksCount = 0L;
各種狀態的副本狀態計數,以及下面的相應的物件設定
//
  // Keeps a Collection for every named machine containing
  // blocks that have recently been invalidated and are thought to live
  // on the machine in question.
  // Mapping: StorageID -> ArrayList<Block>
  //
  //最近無效的塊列表,資料節點到塊的對映
  private Map<String, Collection<Block>> recentInvalidateSets = 
    new TreeMap<String, Collection<Block>>();

  //
  // Keeps a TreeSet for every named node.  Each treeset contains
  // a list of the blocks that are "extra" at that location.  We'll
  // eventually remove these extras.
  // Mapping: StorageID -> TreeSet<Block>
  //
  // 過剩的資料副本塊圖,也是資料節點到資料塊的對映
  Map<String, Collection<Block>> excessReplicateMap = 
    new TreeMap<String, Collection<Block>>();


/**
 * Store set of Blocks that need to be replicated 1 or more times.
 * Set of: Block
 */
  private UnderReplicatedBlocks neededReplications = new UnderReplicatedBlocks();
  // We also store pending replication-orders.
  //待複製的block
  private PendingReplicationBlocks pendingReplications;
在副本狀態轉移相關的類,主要關注2個,一個是UnderReplicatedBlocks,另外一個是PendingReplicaionBlocks,前者是準備生產副本複製請求,而後者就是待複製請求類,在UnderReplicatedBlocks裡面,做的一個很關鍵的操作是確定副本複製請求的優先順序,他會根據剩餘副本數量以及是否是在decomission等狀態的情況下,然後給出優先順序,所以在變數中首先會有優先順序佇列設定
/* Class for keeping track of under replication blocks
 * Blocks have replication priority, with priority 0 indicating the highest
 * Blocks have only one replicas has the highest
 * 此類對正在操作的副本塊進行了跟蹤
 * 並對此設定了不同的副本優先順序佇列,只有1個佇列擁有最高優先順序
 */
class UnderReplicatedBlocks implements Iterable<BlockInfo> {
  //定義了4種level級別的佇列
  static final int LEVEL = 4;
  //損壞副本數佇列在最後一個佇列中
  static public final int QUEUE_WITH_CORRUPT_BLOCKS = LEVEL-1;
  //定了副本優先順序佇列
  private List<LightWeightLinkedSet<BlockInfo>> priorityQueues
      = new ArrayList<LightWeightLinkedSet<BlockInfo>>();
  
  private final RaidMissingBlocks raidQueue;
在這裡,大致上分出了2大類佇列,一個是損壞副本佇列,還有一個就是正常的情況,不過是優先順序不同而已。level數字越小,代表優先順序越高,優先順序確定的核心函式
/* Return the priority of a block
   * 
   * If this is a Raided block and still has 1 replica left, not assign the highest priority.
   * 
   * @param block a under replication block
   * @param curReplicas current number of replicas of the block
   * @param expectedReplicas expected number of replicas of the block
   */
  private int getPriority(BlockInfo block, 
                          int curReplicas, 
                          int decommissionedReplicas,
                          int expectedReplicas) {
    //副本數為負數或是副本數超過預期值,都屬於異常情況,歸結為損壞佇列中
    if (curReplicas<0 || curReplicas>=expectedReplicas) {
      return LEVEL; // no need to replicate
    } else if(curReplicas==0) {
      // If there are zero non-decommissioned replica but there are
      // some decommissioned replicas, then assign them highest priority
      //如果資料節點正位於撤銷操作中,以及最高優先順序
      if (decommissionedReplicas > 0) {
        return 0;
      }
      return QUEUE_WITH_CORRUPT_BLOCKS; // keep these blocks in needed replication.
    } else if(curReplicas==1) {
      //副本數只有1個的時候同樣給予高優先順序
      return isRaidedBlock(block) ? 1 : 0; // highest priority
    } else if(curReplicas*3<expectedReplicas) {
      return 1;
    } else {
      //其他情況給及普通優先順序
      return 2;
    }
  }
這些副本請求產生之後,就會加入到PendingReplicationBlocks的類中,在裡面有相應的變數會管理這些請求資訊
/***************************************************
 * PendingReplicationBlocks does the bookkeeping of all
 * blocks that are getting replicated.
 *
 * It does the following:
 * 1)  record blocks that are getting replicated at this instant.
 * 2)  a coarse grain timer to track age of replication request
 * 3)  a thread that periodically identifies replication-requests
 *     that never made it.
 *
 ***************************************************/
//此類記錄了待複製請求資訊
class PendingReplicationBlocks {
  //block塊到副本資料塊複製請求資訊的對映
  private Map<Block, PendingBlockInfo> pendingReplications;
  //記錄超時複製請求列表
  private ArrayList<Block> timedOutItems;
  Daemon timerThread = null;
  private volatile boolean fsRunning = true;
就是上面的PendingBlockInfo,聲明瞭請求產生的時間和當前此資料塊的請求數
  /**
   * An object that contains information about a block that 
   * is being replicated. It records the timestamp when the 
   * system started replicating the most recent copy of this
   * block. It also records the number of replication
   * requests that are in progress.
   * 內部類包含了複製請求資訊,記錄複製請求的開始時間,以及當前對此塊的副本請求數
   */
  static class PendingBlockInfo {
  	//請求產生時間
    private long timeStamp;
    //當前的進行復制請求數
    private int numReplicasInProgress;
產生時間用於進行超時檢測,請求數會與預期副本數進行對比,在這個類中會對永遠不結束的複製請求進行超時檢測,預設時間5~10分鐘
//
  // It might take anywhere between 5 to 10 minutes before
  // a request is timed out.
  //
  //預設複製請求超時檢測5到10分鐘
  private long timeout = 5 * 60 * 1000;
  private long defaultRecheckInterval = 5 * 60 * 1000;
下面是監控方法
/*
   * A periodic thread that scans for blocks that never finished
   * their replication request.
   * 一個週期性的執行緒監控不會結束的副本請求
   */
  class PendingReplicationMonitor implements Runnable {
    public void run() {
      while (fsRunning) {
        long period = Math.min(defaultRecheckInterval, timeout);
        try {
          //呼叫下面的函式進行檢查
          pendingReplicationCheck();
          Thread.sleep(period);
        } catch (InterruptedException ie) {
          FSNamesystem.LOG.debug(
                "PendingReplicationMonitor thread received exception. " + ie);
        }
      }
    }

    /**
     * Iterate through all items and detect timed-out items
     */
    void pendingReplicationCheck() {
      synchronized (pendingReplications) {
        Iterator iter = pendingReplications.entrySet().iterator();
        long now = FSNamesystem.now();
        FSNamesystem.LOG.debug("PendingReplicationMonitor checking Q");
        while (iter.hasNext()) {
          Map.Entry entry = (Map.Entry) iter.next();
          PendingBlockInfo pendingBlock = (PendingBlockInfo) entry.getValue();
          //取出請求開始時間比較時間是否超時
          if (now > pendingBlock.getTimeStamp() + timeout) {
            Block block = (Block) entry.getKey();
            synchronized (timedOutItems) {
              //加入超時佇列中
              timedOutItems.add(block);
            }
            FSNamesystem.LOG.warn(
                "PendingReplicationMonitor timed out block " + block);
            iter.remove();
          }
        }
      }
    }
  }
超時的請求會被加入到timeOutItems,這些請求一般在最好又會被插入到UnderReplicatedBlocks中。OK,簡單的闡述了一下副本資料塊關係流程分析。

參考文獻

《Hadoop技術內部–HDFS結構設計與實現原理》.蔡斌等