1. 程式人生 > >讀HDFS書筆記---5.3 檔案短路讀操作

讀HDFS書筆記---5.3 檔案短路讀操作

這一節的目錄:

5.3 檔案短路讀操作

        5.3.1 短路讀共享記憶體

        5.3.2 DataTransferProtocol

        5.3.3 DFSClient短路讀操作流程

        5.3.4 Datanode短路讀操作流程

在HDFS早期版本中,本地讀取和遠端讀取的實現是一樣的,客戶端通過TCP套接字連線Datanode,並通過DataTransferProtocol傳輸資料。這種方式很簡單,但是有一些不好的地方,例如Datanode需要為每個讀取資料塊的客戶端都維持一個執行緒和TCP套接字。核心中TCP協議是有開銷的,DataTransferProtocol本身也有開銷,因此這種實現方式有值得優化的地方,早起版本讀操作流程圖如下:

網路讀操作示意圖
標題網路讀操作示意圖

 

既然客戶端和Datanode在同一臺機器上,那麼DFSClient可以跳過Datanode,直接讀取磁碟上的資料,目前HDFS的實現方案有兩種

<1>、FS-2246

Datanode將所有的資料路徑權開發給客戶端,當執行一個本地讀取時,客戶端直接從本地磁碟的資料路徑讀取資料。但這種實現方式帶來了安全問題,客戶端使用者可以直接瀏覽所有資料,這會存在一定的不安全性,不是一個好選擇,下圖是相應的方案流程圖

HDFS-2246短路讀操作示意圖
HDFS-2246短路讀操作示意圖

<2> HDFS-347

UNIX提供了一種UNIX Domain Socket程序間通訊方式,它使得同一臺機器上的兩個程序能以Socket的方式通訊,並且還可以在程序間傳遞檔案描述符。

HDFS-347使用該機制實現了安全的本地短路讀取,如下圖所示:

HDFS-347短路讀操作示意圖
HDFS-347短路讀操作示意圖

客戶端向Datanode請求資料時,Datanode會開啟塊檔案和校驗和檔案,將這兩個檔案的檔案描述符直接傳遞給客戶端,而不是將路徑傳遞給客戶端。客戶端接收到這兩個檔案的檔案描述符之後,就可以直接開啟檔案讀取資料了,也就是繞過了Datanode程序的轉發,提高了讀取效率。因為檔案描述符是隻讀的,所以客戶端不能修改該檔案,同時,由於客戶端自身無法訪問資料塊檔案所在的目錄,所以它也就不能訪問其他不該訪問的資料了,保證了讀取的安全性。HDFS 2.x採取的就是HDFS-347的設計實現短路讀取功能的。

 

5.3.1 短路讀共享記憶體

瞭解了短路讀取的概念之後,我們看一下HDFS是如何實現這種模式的,在DFSClient中,使用ShortCircuitReplica類封裝可以進行短路讀取的副本。ShortCircuitReplica物件中包含了短路讀取副本的資料塊檔案輸入流、校驗檔案輸入流、短路讀取副本在共享記憶體中的槽位(slot)以及副本的引用次數等資訊。DFSClient會持有一個ShortCircuitCache物件快取並管理所有的ShortCircuitReplica物件,DFSClient從ShortCircuitCache獲得了ShortCircuitReplica的引用之後,就可以構造BlockReaderLocal物件進行本地讀取操作了。

如圖5-15所示,當DFSClient和Datanode在同一臺機器上時,需要一個共享記憶體段來維護所有短路讀取副本的狀態,共享記憶體段中會有很多槽位,每個槽位都記錄了一個短路讀取副本的資訊,例如當前副本是否有效、錨(anchor)的次數等。當Datanode將一個數據塊副本快取到記憶體中時,會將這個資料塊副本設定為可錨(anchorable)狀態,也就是在共享記憶體中該副本對應的槽位上設定可錨狀態位。當一個副本被設定為可錨狀態之後,DFSClient的BlockReaderLocal物件讀取該副本時就不需要校驗了(因為快取中的副本已經執行過校驗操作),並且輸入流可以通過零拷貝模式讀取這個副本。每當客戶端進行這兩種讀取操作時,都需要在副本對應的槽位上新增一個錨計數,只有副本的錨計數為零時,Datanode才可以從快取中刪除這個副本。可以看到共享記憶體以及槽位機制很好地在Datanode程序和DFSClient程序間同步了副本的狀態,保證了Datanode快取操作以及DFSClient讀取副本操作的正確性。

圖5-15 共享記憶體段的結構
圖5-15 共享記憶體段的結構

如圖5-16所示,共享記憶體機制是由DFSClient和Datanode對同一個檔案執行記憶體對映操作實現的。因為MappedByteBuffer物件能讓記憶體與物理檔案的資料實時同步,所以DFSClient和Datanode程序會通過中間檔案來交換資料,中間檔案使得兩個程序的記憶體區域得到及時的同步。DFSClient和Datanode之間可能會有多段共享記憶體,所以DFSClient定義了DFSClientShm類抽象DFSClient側的一段共享記憶體,定義了DFSClientShmManager類管理所有的DFSClientShm物件,而Datanode則定義了RegisteredShm類抽象Datanode側的一段共享記憶體,同時定義了ShortCircuitRegistry類管理所有Datanode側的共享記憶體。

圖5-16 共享記憶體機制示意圖
圖5-16 共享記憶體機制示意圖

DFSClient會呼叫DataTransferProtocol.requestShortCircuitShm介面與Datanode協商建立一段共享記憶體,共享記憶體建立成功後,DFSClient和Datanode會分別構造DFSClientShm以及RegisteredShm物件維護這段共享記憶體。如圖5-17所示,共享記憶體中的檔案對映資料是實時同步的,它儲存了所有槽位二進位制資訊。但是對映資料中二進位制的槽位資訊並不便於操作,所以DFSClientShm和RegisteredShm會構造一個Slot物件操作對映資料中的一個槽位,同時各自定義了集合欄位儲存所有的Slot物件。這裡需要特別注意的是,Slot物件會由DFSClientShm和RegisteredShm分別構造並儲存在各自的集合欄位中,所以DFSClientShm和RegisteredShm之間需要同步Slot物件的建立和刪除操作,以保證DFSClientShm和RegisteredShm儲存的Slot物件資訊是完全同步的。DataTransferProtocol介面就提供了requestShortCircuitFds()以及releaseShortCircuitFds()方法同步Slot物件的建立和刪除操作。

圖5-17 Slot狀態同步示意圖
圖5-17 Slot狀態同步示意圖

 

5.3.2 DataTransferProtocol

DataTransferProtocol定義了requestShortCircuitShm()、requestShortCircuitFds()以及releaseShortCircuitFds()三個介面方法同步Datanode和DFSClient對共享記憶體的操作。

需要注意的是DataTransferProtocol底層是基於Socket流的,而當DFSClient和Datanode在同一臺物理機器上時,DataTransferProtocol底層的Socket將會是DomainSocket,使用DomainSocket的DataTransferProtocol可以在Socket流中傳遞檔案描述符。

接下來我們依次看一下requestShortCircuitShm()、requestShortCircuitFds()以及releaseShortCircuitFds()方法的實現。

<1>、requestShortCircuitShm()

程式碼如下:

@Override
  public void requestShortCircuitShm(String clientName) throws IOException {
    NewShmInfo shmInfo = null;
    boolean success = false;
    DomainSocket sock = peer.getDomainSocket();//獲取底層DomainSocket物件
    try {
      if (sock == null) {//如果DataTransferProtocol底層不是DomainSocket,則發回異常
        sendShmErrorResponse(ERROR_INVALID, "Bad request from " +
            peer + ": must request a shared " +
            "memory segment over a UNIX domain socket.");
        return;
      }
      try {
        //呼叫ShortCircuitRegistry.createNewMemorySegment()方法建立共享記憶體段
        shmInfo = datanode.shortCircuitRegistry.
            createNewMemorySegment(clientName, sock);
        // After calling #{ShortCircuitRegistry#createNewMemorySegment}, the
        // socket is managed by the DomainSocketWatcher, not the DataXceiver.
        releaseSocket();
      } catch (UnsupportedOperationException e) {//丟擲異常,則響應異常訊息
        sendShmErrorResponse(ERROR_UNSUPPORTED, 
            "This datanode has not been configured to support " +
            "short-circuit shared memory segments.");
        return;
      } catch (IOException e) {//丟擲異常,則響應異常訊息
        sendShmErrorResponse(ERROR,
            "Failed to create shared file descriptor: " + e.getMessage());
        return;
      }
      //呼叫sendShmSuccessResponse()方法將共享記憶體檔案的檔案描述符發回客戶端
      sendShmSuccessResponse(sock, shmInfo);
      success = true;
    } finally {
      if (ClientTraceLog.isInfoEnabled()) {
        if (success) {
          BlockSender.ClientTraceLog.info(String.format(
              "cliID: %s, src: 127.0.0.1, dest: 127.0.0.1, " +
              "op: REQUEST_SHORT_CIRCUIT_SHM," +
              " shmId: %016x%016x, srvID: %s, success: true",
              clientName, shmInfo.shmId.getHi(), shmInfo.shmId.getLo(),
              datanode.getDatanodeUuid()));
        } else {
          BlockSender.ClientTraceLog.info(String.format(
              "cliID: %s, src: 127.0.0.1, dest: 127.0.0.1, " +
              "op: REQUEST_SHORT_CIRCUIT_SHM, " +
              "shmId: n/a, srvID: %s, success: false",
              clientName, datanode.getDatanodeUuid()));
        }
      }
      if ((!success) && (peer == null)) {
        // If we failed to pass the shared memory segment to the client,
        // close the UNIX domain socket now.  This will trigger the 
        // DomainSocketWatcher callback, cleaning up the segment.
        IOUtils.cleanup(null, sock);
      }
      IOUtils.cleanup(null, shmInfo);
    }
  }

DFSClient在執行任何短路讀取操作之前,需要先申請一段共享記憶體儲存短路讀取副本的狀態。DFSClient會呼叫DataTransferProtocol.requestShortCircuitShm()方法向Datanode發起申請共享記憶體的請求,Datanode的DataXceiver.requestShortCircuitShm()方法會響應這個請求。

如上面的程式碼所示,DataXceiver.requestShortCircuitShm()會呼叫ShortCircuitRegistry.createNewMemorySegment()方法建立共享記憶體段,createNewMemorySegment()方法會將共享記憶體檔案對映到Datanode的記憶體中,然後構造RegisteredShm類管理這段共享記憶體(請參考ShortCircuitRegistry類小節中的分析)。之後DataXceiver.requestShortCircuitShm()方法會呼叫sendShmSuccessResponse()方法將共享記憶體檔案的檔案描述符通過domainSocket發回客戶端。

DFSClient的DfsClientShmManager物件從domainSocket接收了共享記憶體檔案的檔案描述符後,會開啟共享記憶體檔案並將該檔案對映到DFSClient的記憶體中,之後建立DFSClientShm物件管理這段共享記憶體(DFSClientShmManager類小節中的分析),並將這個DFSClientShm物件儲存在DFSClientShmManager的對應欄位中。

<2>、requestShortCircuitFds()

程式碼如下:

@Override
  public void requestShortCircuitFds(final ExtendedBlock blk,
      final Token<BlockTokenIdentifier> token,
      SlotId slotId, int maxVersion) throws IOException {
    updateCurrentThreadName("Passing file descriptors for block " + blk);
    BlockOpResponseProto.Builder bld = BlockOpResponseProto.newBuilder();
    FileInputStream fis[] = null;
    try {
      if (peer.getDomainSocket() == null) {//如果底層不是DomainSocket,則丟擲異常
        throw new IOException("You cannot pass file descriptors over " +
            "anything but a UNIX domain socket.");
      }
      if (slotId != null) {
        boolean isCached = datanode.data.
            isCached(blk.getBlockPoolId(), blk.getBlockId());
        //呼叫ShortCircuitRegistry.registerSlot()方法在Datanode的共享記憶體中註冊這個Slot物件
        datanode.shortCircuitRegistry.registerSlot(
            ExtendedBlockId.fromExtendedBlock(blk), slotId, isCached);
      }
      try {
        //獲取資料塊檔案以及校驗和檔案的檔案描述符
        fis = datanode.requestShortCircuitFdsForRead(blk, token, maxVersion);
      } finally {
        if ((fis == null) && (slotId != null)) {
          datanode.shortCircuitRegistry.unregisterSlot(slotId);
        }
      }
      //構造響應訊息
      bld.setStatus(SUCCESS);
      bld.setShortCircuitAccessVersion(DataNode.CURRENT_BLOCK_FORMAT_VERSION);
    } catch (ShortCircuitFdsVersionException e) {
      bld.setStatus(ERROR_UNSUPPORTED);
      bld.setShortCircuitAccessVersion(DataNode.CURRENT_BLOCK_FORMAT_VERSION);
      bld.setMessage(e.getMessage());
    } catch (ShortCircuitFdsUnsupportedException e) {
      bld.setStatus(ERROR_UNSUPPORTED);
      bld.setMessage(e.getMessage());
    } catch (InvalidToken e) {
      bld.setStatus(ERROR_ACCESS_TOKEN);
      bld.setMessage(e.getMessage());
    } catch (IOException e) {
      bld.setStatus(ERROR);
      bld.setMessage(e.getMessage());
    }
    try {
      //發回成功的響應訊息
      bld.build().writeDelimitedTo(socketOut);
      if (fis != null) {
        FileDescriptor fds[] = new FileDescriptor[fis.length];
        for (int i = 0; i < fds.length; i++) {
          fds[i] = fis[i].getFD();
        }
        byte buf[] = new byte[] { (byte)0 };
        //通過DomainSocket將資料塊檔案和校驗和檔案的檔案描述符傳送給客戶端
        peer.getDomainSocket().
          sendFileDescriptors(fds, buf, 0, buf.length);
      }
    } finally {
      if (ClientTraceLog.isInfoEnabled()) {
        DatanodeRegistration dnR = datanode.getDNRegistrationForBP(blk
            .getBlockPoolId());
        BlockSender.ClientTraceLog.info(String.format(
            "src: 127.0.0.1, dest: 127.0.0.1, op: REQUEST_SHORT_CIRCUIT_FDS," +
            " blockid: %s, srvID: %s, success: %b",
            blk.getBlockId(), dnR.getDatanodeUuid(), (fis != null)
          ));
      }
      if (fis != null) {
        IOUtils.cleanup(LOG, fis);
      }
    }
  }

<3>、releaseShortCircuitFds()

程式碼如下:

@Override
  public void releaseShortCircuitFds(SlotId slotId) throws IOException {
    boolean success = false;
    try {
      String error;
      Status status;
      try {
        //釋放共享記憶體中的槽位
        datanode.shortCircuitRegistry.unregisterSlot(slotId);
        error = null;
        status = Status.SUCCESS;
      } catch (UnsupportedOperationException e) {
        error = "unsupported operation";
        status = Status.ERROR_UNSUPPORTED;
      } catch (Throwable e) {
        error = e.getMessage();
        status = Status.ERROR_INVALID;
      }
      //構造響應訊息
      ReleaseShortCircuitAccessResponseProto.Builder bld =
          ReleaseShortCircuitAccessResponseProto.newBuilder();
      bld.setStatus(status);
      if (error != null) {
        bld.setError(error);
      }
      //發回響應訊息
      bld.build().writeDelimitedTo(socketOut);
      success = true;
    } finally {
      if (ClientTraceLog.isInfoEnabled()) {
        BlockSender.ClientTraceLog.info(String.format(
            "src: 127.0.0.1, dest: 127.0.0.1, op: RELEASE_SHORT_CIRCUIT_FDS," +
            " shmId: %016x%016x, slotIdx: %d, srvID: %s, success: %b",
            slotId.getShmId().getHi(), slotId.getShmId().getLo(),
            slotId.getSlotIdx(), datanode.getDatanodeUuid(), success));
      }
    }
  }