1. 程式人生 > >HDFS中讀取檔案總結

HDFS中讀取檔案總結

主要參考Hadoop 1.0.3程式碼 一 HDFS讀取過程概述 1.開啟檔案 1.1客戶端 HDFS開啟檔案時,呼叫DistributedFileSystem.open(Path f, int bufferSize),該方法在DistributedFileSystem.java的第152-156行 如下所示:
public FSDataInputStream open(Path f, int bufferSize) throws IOException {
    statistics.incrementReadOps(1);
    return new DFSClient.DFSDataInputStream(
          dfs.open(getPathName(f), bufferSize, verifyChecksum, statistics));
  }

其中dfs為DistributedFileSystem的成員變數(型別為DFSClient)。 dfs.open(String src, int buffersize, boolean verifyChecksum, FileSystem.Statistics stats)這一方法,在
DFSClient.java
的第573-579行 如下所示:
public DFSInputStream open(String src, int buffersize, boolean verifyChecksum,
                      FileSystem.Statistics stats
      ) throws IOException {
    checkOpen();
    //    Get block info from namenode
    return new DFSInputStream(src, buffersize, verifyChecksum);
  }

可以看到DFSInputStream被建立並返回。 在DFSInputStream建立過程中,建構函式在DFSClient.java的第1828-1835行, 如下所示:
DFSInputStream(String src, int buffersize, boolean verifyChecksum
                   ) throws IOException {
      this.verifyChecksum = verifyChecksum;
      this.buffersize = buffersize;
      this.src = src;
      prefetchSize = conf.getLong("dfs.read.prefetch.size", prefetchSize);
      openInfo();
    }


可以看到,openInfo被呼叫。具體程式碼可以在DFSClient.java的第1840-1861行, 如下所示:
synchronized void openInfo() throws IOException {
      LocatedBlocks newInfo = callGetBlockLocations(namenode, src, 0, prefetchSize);
      if (newInfo == null) {
        throw new FileNotFoundException("File does not exist: " + src);
      }

      // I think this check is not correct. A file could have been appended to
      // between two calls to openInfo().
      if (locatedBlocks != null && !locatedBlocks.isUnderConstruction() &&
          !newInfo.isUnderConstruction()) {
        Iterator<LocatedBlock> oldIter = locatedBlocks.getLocatedBlocks().iterator();
        Iterator<LocatedBlock> newIter = newInfo.getLocatedBlocks().iterator();
        while (oldIter.hasNext() && newIter.hasNext()) {
          if (! oldIter.next().getBlock().equals(newIter.next().getBlock())) {
            throw new IOException("Blocklist for " + src + " has changed!");
          }
        }
      }
      updateBlockInfo(newInfo);
      this.locatedBlocks = newInfo;
      this.currentNode = null;
    }


updateBlockInfo主要實現了“在檔案構造過程中,基於從datanode返回的block長度更新最後一個block的大小” For files under construction,update the last block size based on the length of the block from the datanode.“
private void updateBlockInfo(LocatedBlocks newInfo) {
      if (!serverSupportsHdfs200 || !newInfo.isUnderConstruction()
          || !(newInfo.locatedBlockCount() > 0)) {
        return;
      }

      LocatedBlock last = newInfo.get(newInfo.locatedBlockCount() - 1);
      boolean lastBlockInFile = (last.getStartOffset() + last.getBlockSize() == newInfo
          .getFileLength());
      if (!lastBlockInFile || last.getLocations().length <= 0) {
        return;
      }
      ClientDatanodeProtocol primary = null;
      DatanodeInfo primaryNode = last.getLocations()[0];
      try {
        primary = createClientDatanodeProtocolProxy(primaryNode, conf,
            last.getBlock(), last.getBlockToken(), socketTimeout);
        Block newBlock = primary.getBlockInfo(last.getBlock());
        long newBlockSize = newBlock.getNumBytes();
        long delta = newBlockSize - last.getBlockSize();
        // if the size of the block on the datanode is different
        // from what the NN knows about, the datanode wins!
        last.getBlock().setNumBytes(newBlockSize);
        long newlength = newInfo.getFileLength() + delta;
        newInfo.setFileLength(newlength);
        LOG.debug("DFSClient setting last block " + last + " to length "
            + newBlockSize + " filesize is now " + newInfo.getFileLength());
      } catch (IOException e) {
        if (e.getMessage().startsWith(
            "java.io.IOException: java.lang.NoSuchMethodException: "
                + "org.apache.hadoop.hdfs.protocol"
                + ".ClientDatanodeProtocol.getBlockInfo")) {
          // We're talking to a server that doesn't implement HDFS-200.
          serverSupportsHdfs200 = false;
        } else {
          LOG.debug("DFSClient file " + src
              + " is being concurrently append to" + " but datanode "
              + primaryNode.getHostName() + " probably does not have block "
              + last.getBlock());
        }
      }
    }


callGetBlockLocations主要用於獲取block的資訊
static LocatedBlocks callGetBlockLocations(ClientProtocol namenode,
      String src, long start, long length) throws IOException {
    try {
      return namenode.getBlockLocations(src, start, length);
    } catch(RemoteException re) {
      throw re.unwrapRemoteException(AccessControlException.class,
                                    FileNotFoundException.class);
    }
  }


可以看到返回的LocatedBlocks,他是一個連結串列List<LocatedBlock> blocks,每個blocks包含
  • Block b: block的資訊
  • long offset: block在檔案中的偏移量
  • DatanodeInfo[]locs: 位於哪些Datanode
上面namenode.getBlockLocations 是一個RPC 呼叫,最終呼叫NameNode 類的getBlockLocations 函式。  1.2namenode 客戶端的部分已經執行完畢,下來接著分析namenode部分,可以看到namenode.getBlockLocations呼叫了namenode的方法。 發現了函式的具體實現  
public LocatedBlocks   getBlockLocations(String src,
                                          long offset,
                                          long length) throws IOException {
    myMetrics.incrNumGetBlockLocations();
    return namesystem.getBlockLocations(getClientMachine(),
                                        src, offset, length);
  }

namesystem是NameNode的一個成員變數,型別為FSNamesystem。 進裡面看看到底做了些什麼。  
public LocatedBlocks getBlockLocations(String src, long offset, long length,
      boolean doAccessTime, boolean needBlockToken) throws IOException {
    if (isPermissionEnabled) {
      checkPathAccess(src, FsAction.READ);
    }

    if (offset < 0) {
      throw new IOException("Negative offset is not supported. File: " + src );
    }
    if (length < 0) {
      throw new IOException("Negative length is not supported. File: " + src );
    }
    final LocatedBlocks ret = getBlockLocationsInternal(src,
        offset, length, Integer.MAX_VALUE, doAccessTime, needBlockToken); 
    if (auditLog.isInfoEnabled() && isExternalInvocation()) {
      logAuditEvent(UserGroupInformation.getCurrentUser(),
                    Server.getRemoteIp(),
                    "open", src, null, null);
    }
    return ret;
  }


重點在於這個 getBlockLocationsInternal 繼續往下看,就可以看到這個函式的實現  
private synchronized LocatedBlocks getBlockLocationsInternal(String src,
                                                       long offset,
                                                       long length,
                                                       int nrBlocksToReturn,
                                                       boolean doAccessTime,
                                                       boolean needBlockToken)
                                                       throws IOException {
    INodeFile inode = dir.getFileINode(src);
    if(inode == null) {
      return null;
    }
    if (doAccessTime && isAccessTimeSupported()) {
      dir.setTimes(src, inode, -1, now(), false);
    }
    //獲得此block的資訊
    Block[] blocks = inode.getBlocks();
    if (blocks == null) {
      return null;
    }
    if (blocks.length == 0) {
      return inode.createLocatedBlocks(new ArrayList<LocatedBlock>(blocks.length));
    }
    List<LocatedBlock> results;
    results = new ArrayList<LocatedBlock>(blocks.length);
    //從offset->offset+length,所涉及的blocks
    int curBlk = 0;
    long curPos = 0, blkSize = 0;
    int nrBlocks = (blocks[0].getNumBytes() == 0) ? 0 : blocks.length;
    for (curBlk = 0; curBlk < nrBlocks; curBlk++) {
      blkSize = blocks[curBlk].getNumBytes();
      assert blkSize > 0 : "Block of size 0";
      if (curPos + blkSize > offset) {
    //沒超過block長度則curPos還指向當前block
        break;
      }
      curPos += blkSize;
    }
   
    if (nrBlocks > 0 && curBlk == nrBlocks)   // offset >= end of file
      return null;
   
    long endOff = offset + length;
   
    do {
      // get block locations
      int numNodes = blocksMap.numNodes(blocks[curBlk]);
      int numCorruptNodes = countNodes(blocks[curBlk]).corruptReplicas();
      int numCorruptReplicas = corruptReplicas.numCorruptReplicas(blocks[curBlk]);
      if (numCorruptNodes != numCorruptReplicas) {
        LOG.warn("Inconsistent number of corrupt replicas for " +
            blocks[curBlk] + "blockMap has " + numCorruptNodes +
            " but corrupt replicas map has " + numCorruptReplicas);
      }
    //找到block所對應的datanode
      DatanodeDescriptor[] machineSet = null;
      boolean blockCorrupt = false;
      if (inode.isUnderConstruction() && curBlk == blocks.length - 1
          && blocksMap.numNodes(blocks[curBlk]) == 0) {
        // get unfinished block locations
        INodeFileUnderConstruction cons = (INodeFileUnderConstruction)inode;
        machineSet = cons.getTargets();
        blockCorrupt = false;
      } else {
        blockCorrupt = (numCorruptNodes == numNodes);
        int numMachineSet = blockCorrupt ? numNodes :
                            (numNodes - numCorruptNodes);
        machineSet = new DatanodeDescriptor[numMachineSet];
        if (numMachineSet > 0) {
          numNodes = 0;
          for(Iterator<DatanodeDescriptor> it =
              blocksMap.nodeIterator(blocks[curBlk]); it.hasNext();) {
            DatanodeDescriptor dn = it.next();
            boolean replicaCorrupt = corruptReplicas.isReplicaCorrupt(blocks[curBlk], dn);
            if (blockCorrupt || (!blockCorrupt && !replicaCorrupt))
              machineSet[numNodes++] = dn;
          }
        }
      }
    //構造一個LocatedBlock
      LocatedBlock b = new LocatedBlock(blocks[curBlk], machineSet, curPos,
          blockCorrupt);
      if(isAccessTokenEnabled && needBlockToken) {
        b.setBlockToken(accessTokenHandler.generateToken(b.getBlock(),
            EnumSet.of(BlockTokenSecretManager.AccessMode.READ)));
      }
    //加入到results中
      results.add(b);
      curPos += blocks[curBlk].getNumBytes();
      curBlk++;
    } while (curPos < endOff
          && curBlk < blocks.length
          && results.size() < nrBlocksToReturn);
    //返回結果
    return inode.createLocatedBlocks(results);
  }


1.3總結 可以看到,最終返回給使用者的是FSDataInputStream 通過client訪問namenode完成了整個”開啟檔案“的過程,我個人理解的開啟過程,其實就是從namenode蒐集block所對應的datanode、基本資訊等內容,以用於接下來的read。 二 檔案的讀取 2.1客戶端 在檔案讀取的時候,直接呼叫DFSDataInputStream.read   
public int read(long position, byte[] buffer, int offset, int length)
      throws IOException {
      // sanity checks
      checkOpen();
      if (closed) {
        throw new IOException("Stream closed");
      }
      failures = 0;
      long filelen = getFileLength();
      if ((position < 0) || (position >= filelen)) {
        return -1;
      }
      int realLen = length;
      if ((position + length) > filelen) {
        realLen = (int)(filelen - position);
      }
     
      // determine the block and byte range within the block
      // corresponding to position and realLen
      // 獲取從offset到offset+length內容的block列表
      // 例如從100M開始,讀取長度為128M的資料
      // ■64■128■192■256■320  .....
      // 則應該讀取的為第2、3、4塊
      List<LocatedBlock> blockRange = getBlockRange(position, realLen);
      int remaining = realLen;
      //例如,第二塊從36M(100-64)開始,第三塊64M全讀,第四塊到36M(228M-192M)結束
      for (LocatedBlock blk : blockRange) {
        long targetStart = position - blk.getStartOffset();
        long bytesToRead = Math.min(remaining, blk.getBlockSize() - targetStart);
        fetchBlockByteRange(blk, targetStart,
                            targetStart + bytesToRead - 1, buffer, offset);
        remaining -= bytesToRead;
        position += bytesToRead;
        offset += bytesToRead;
      }
      assert remaining == 0 : "Wrong number of bytes read.";
      if (stats != null) {
        stats.incrementBytesRead(realLen);
      }
      return realLen;
    }


可以看到在獲取blockRange呼叫了getBlockRange(position, realLen)這一方法 從DFSClient.java,第1991行可以看到如下實現  
private synchronized List<LocatedBlock> getBlockRange(long offset,
                                                          long length)
                                                        throws IOException {
      assert (locatedBlocks != null) : "locatedBlocks is null";
      List<LocatedBlock> blockRange = new ArrayList<LocatedBlock>();
      // search cached blocks first
      //首先從快取的locatedBlocks中查詢offset所在的block在快取連結串列中的位置
      int blockIdx = locatedBlocks.findBlock(offset);
      if (blockIdx < 0) { // block is not cached
        blockIdx = LocatedBlocks.getInsertIndex(blockIdx);
      }
      long remaining = length;
      long curOff = offset;
      while(remaining > 0) {
        LocatedBlock blk = null;
      //按照blockIdx的位置找到block 
        if(blockIdx < locatedBlocks.locatedBlockCount())
          blk = locatedBlocks.get(blockIdx);
      //如果block為空,則快取中沒有此block,則直接從NameNode中查詢這些block,並加入快取
        if (blk == null || curOff < blk.getStartOffset()) {
          LocatedBlocks newBlocks;
          newBlocks = callGetBlockLocations(namenode, src, curOff, remaining);
          locatedBlocks.insertRange(blockIdx, newBlocks.getLocatedBlocks());
          continue;
        }
        assert curOff >= blk.getStartOffset() : "Block not found";
       //如果block找到,則放入結果集 
        blockRange.add(blk);
        long bytesRead = blk.getStartOffset() + blk.getBlockSize() - curOff;
        remaining -= bytesRead;
        curOff += bytesRead;
        //取下一個block
        blockIdx++;
      }
      return blockRange;
    }


read當中,還有一個比較關鍵的呼叫fetchBlockByteRange(blk, targetStart,  targetStart + bytesToRead - 1, buffer, offset); 在2291行找到了其實現,   
private void fetchBlockByteRange(LocatedBlock block, long start,
                                     long end, byte[] buf, int offset) throws IOException {
      //
      // Connect to best DataNode for desired Block, with potential offset
      //
      Socket dn = null;
      int refetchToken = 1; // only need to get a new access token once
     
      while (true) {
        // cached block locations may have been updated by chooseDataNode()
        // or fetchBlockAt(). Always get the latest list of locations at the
        // start of the loop.
        block = getBlockAt(block.getStartOffset(), false);
       //選擇一個Datanode讀取資料
        DNAddrPair retval = chooseDataNode(block);
        DatanodeInfo chosenNode = retval.info;
        InetSocketAddress targetAddr = retval.addr;
        BlockReader reader = null;

        int len = (int) (end - start + 1);
        try {
          Token<BlockTokenIdentifier> accessToken = block.getBlockToken();
          // first try reading the block locally.
          if (shouldTryShortCircuitRead(targetAddr)) {
            try {
              reader = getLocalBlockReader(conf, src, block.getBlock(),
                  accessToken, chosenNode, DFSClient.this.socketTimeout, start);
            } catch (AccessControlException ex) {
              LOG.warn("Short circuit access failed ", ex);
              //Disable short circuit reads
              shortCircuitLocalReads = false;
              continue;
            }
          } else {
            //如果本地沒有,
            // go to the datanode
           //建立Socket連線
            dn = socketFactory.createSocket();
            NetUtils.connect(dn, targetAddr, socketTimeout);
            dn.setSoTimeout(socketTimeout);
           //利用建立的Socket連結,生成一個reader負責從DataNode讀取資料
            reader = BlockReader.newBlockReader(dn, src,
                block.getBlock().getBlockId(), accessToken,
                block.getBlock().getGenerationStamp(), start, len, buffersize,
                verifyChecksum, clientName);
          }
          //讀取資料
          int nread = reader.readAll(buf, offset, len);
          if (nread != len) {
            throw new IOException("truncated return from reader.read(): " +
                                  "excpected " + len + ", got " + nread);
          }
          return;
        } catch (ChecksumException e) {
          LOG.warn("fetchBlockByteRange(). Got a checksum exception for " +
                   src + " at " + block.getBlock() + ":" +
                   e.getPos() + " from " + chosenNode.getName());
          reportChecksumFailure(src, block.getBlock(), chosenNode);
        } catch (IOException e) {
          if (refetchToken > 0 && tokenRefetchNeeded(e, targetAddr)) {
            refetchToken--;
            fetchBlockAt(block.getStartOffset());
            continue;
          } else {
            LOG.warn("Failed to connect to " + targetAddr + " for file " + src
                + " for block " + block.getBlock() + ":" + e);
            if (LOG.isDebugEnabled()) {
              LOG.debug("Connection failure ", e);
            }
          }
        } finally {
          IOUtils.closeStream(reader);
          IOUtils.closeSocket(dn);
        }
        // Put chosen node into dead list, continue
        //如果讀取失敗,則將此DataNode標記為失敗節點
        addToDeadNodes(chosenNode);
      }
    }


此句,
reader = BlockReader.newBlockReader(dn, src, 
                block.getBlock().getBlockId(), accessToken,
                block.getBlock().getGenerationStamp(), start, len, buffersize, 
                verifyChecksum, clientName);


呼叫了 BlockReader.newBlockReader見DFSClient.java 1689行   
public static BlockReader newBlockReader( Socket sock, String file,
                                       long blockId,
                                       Token<BlockTokenIdentifier> accessToken,
                                       long genStamp,
                                       long startOffset, long len,
                                       int bufferSize, boolean verifyChecksum,
                                       String clientName)
                                       throws IOException {
      // in and out will be closed when sock is closed (by the caller)
     //使用Socket建立寫入流,向DataNode傳送讀指令
      DataOutputStream out = new DataOutputStream(
        new BufferedOutputStream(NetUtils.getOutputStream(sock,HdfsConstants.WRITE_TIMEOUT)));

      //write the header.
      out.writeShort( DataTransferProtocol.DATA_TRANSFER_VERSION );
      out.write( DataTransferProtocol.OP_READ_BLOCK );
      out.writeLong( blockId );
      out.writeLong( genStamp );
      out.writeLong( startOffset );
      out.writeLong( len );
      Text.writeString(out, clientName);
      accessToken.write(out);
      out.flush();
     
      //
      // Get bytes in block, set streams
      //
     //使用Socket建立讀入流,用於從DataNode讀取資料
      DataInputStream in = new DataInputStream(
          new BufferedInputStream(NetUtils.getInputStream(sock),
                                  bufferSize));
     
      short status = in.readShort();
      if (status != DataTransferProtocol.OP_STATUS_SUCCESS) {
        if (status == DataTransferProtocol.OP_STATUS_ERROR_ACCESS_TOKEN) {
          throw new InvalidBlockTokenException(
              "Got access token error for OP_READ_BLOCK, self="
                  + sock.getLocalSocketAddress() + ", remote="
                  + sock.getRemoteSocketAddress() + ", for file " + file
                  + ", for block " + blockId + "_" + genStamp);
        } else {
          throw new IOException("Got error for OP_READ_BLOCK, self="
              + sock.getLocalSocketAddress() + ", remote="
              + sock.getRemoteSocketAddress() + ", for file " + file
              + ", for block " + blockId + "_" + genStamp);
        }
      }
      DataChecksum checksum = DataChecksum.newDataChecksum( in );
      //Warning when we get CHECKSUM_NULL?
     
      // Read the first chunk offset.
      long firstChunkOffset = in.readLong();
     
      if ( firstChunkOffset < 0 || firstChunkOffset > startOffset ||
          firstChunkOffset >= (startOffset + checksum.getBytesPerChecksum())) {
        throw new IOException("BlockReader: error in first chunk offset (" +
                              firstChunkOffset + ") startOffset is " +
                              startOffset + " for file " + file);
      }
      //生成一個reader,主要包含讀入流,用於讀取資料
      return new BlockReader( file, blockId, in, checksum, verifyChecksum,
                              startOffset, firstChunkOffset, sock );
    }


2.2 DataNode 在其320行,有start的實現,重點關注399-419,有Socket Server初始化的過程  
void startDataNode(Configuration conf,
                     AbstractList<File> dataDirs, SecureResources resources
                     ) throws IOException 
{
     ........
    // find free port or use privileged port provide
    ServerSocket ss;
    if(secureResources == null) {
     //建立ServerSocket
      ss = (socketWriteTimeout > 0) ?
        ServerSocketChannel.open().socket() : new ServerSocket();
      Server.bind(ss, socAddr, 0);
    } else {
      ss = resources.getStreamingSocket();
    }
    ss.setReceiveBufferSize(DEFAULT_DATA_SOCKET_SIZE);
    // adjust machine name with the actual port
    tmpPort = ss.getLocalPort();
    selfAddr = new InetSocketAddress(ss.getInetAddress().getHostAddress(),
                                     tmpPort);
    this.dnRegistration.setName(machineName + ":" + tmpPort);
    LOG.info("Opened info server at " + tmpPort);
     
    this.threadGroup = new ThreadGroup("dataXceiverServer");
    this.dataXceiverServer = new Daemon(threadGroup,
        new DataXceiverServer(ss, conf, this));
    this.threadGroup.setDaemon(true); // auto destroy when empty
     ......
}

第128行開始
public void run() {
    while (datanode.shouldRun) {
      try {
        //接受客戶端的連線
        Socket s = ss.accept();
        s.setTcpNoDelay(true);
        //生成一個執行緒DataXceiver來對建立的連結提供服務
        new Daemon(datanode.threadGroup,
            new DataXceiver(s, datanode, this)).start();
      } catch (SocketTimeoutException ignored) {
        // wake up to see if should continue to run
      } catch (AsynchronousCloseException ace) {
          LOG.warn(datanode.dnRegistration + ":DataXceiveServer:"
                  + StringUtils.stringifyException(ace));
          datanode.shouldRun = false;
      } catch (IOException ie) {
        LOG.warn(datanode.dnRegistration + ":DataXceiveServer: IOException due to:"
                                 + StringUtils.stringifyException(ie));
      } catch (Throwable te) {
        LOG.error(datanode.dnRegistration + ":DataXceiveServer: Exiting due to:"
                                 + StringUtils.stringifyException(te));
        datanode.shouldRun = false;
      }
    }
    try {
      ss.close();
    } catch (IOException ie) {
      LOG.warn(datanode.dnRegistration + ":DataXceiveServer: Close exception due to: "
                               + StringUtils.stringifyException(ie));
    }
    LOG.info("Exiting DataXceiveServer");
  }


77行,關注run()  
public void run() {
    DataInputStream in=null;
    try {
    //建立一個輸入流,讀取客戶端傳送的指令
      in = new DataInputStream(
          new BufferedInputStream(NetUtils.getInputStream(s),
                                  SMALL_BUFFER_SIZE));
      short version = in.readShort();
      if ( version != DataTransferProtocol.DATA_TRANSFER_VERSION ) {
        throw new IOException( "Version Mismatch" );
      }
      boolean local = s.getInetAddress().equals(s.getLocalAddress());
      byte op = in.readByte();
      // Make sure the xciver count is not exceeded
      int curXceiverCount = datanode.getXceiverCount();
      if (curXceiverCount > dataXceiverServer.maxXceiverCount) {
        throw new IOException("xceiverCount " + curXceiverCount
                              + " exceeds the limit of concurrent xcievers "
                              + dataXceiverServer.maxXceiverCount);
      }
      long startTime = DataNode.now();
      switch ( op ) {
     //讀取
      case DataTransferProtocol.OP_READ_BLOCK:
        readBlock( in );
        datanode.myMetrics.addReadBlockOp(DataNode.now() - startTime);
        if (local)
          datanode.myMetrics.incrReadsFromLocalClient();
        else
          datanode.myMetrics.incrReadsFromRemoteClient();
        break;
      case DataTransferProtocol.OP_WRITE_BLOCK:
        writeBlock( in );
        datanode.myMetrics.addWriteBlockOp(DataNode.now() - startTime);
        if (local)
          datanode.myMetrics.incrWritesFromLocalClient();
        else
          datanode.myMetrics.incrWritesFromRemoteClient();
        break;
      case DataTransferProtocol.OP_REPLACE_BLOCK: // for balancing purpose; send to a destination
        replaceBlock(in);
        datanode.myMetrics.addReplaceBlockOp(DataNode.now() - startTime);
        break;
      case DataTransferProtocol.OP_COPY_BLOCK:
            // for balancing purpose; send to a proxy source
        copyBlock(in);
        datanode.myMetrics.addCopyBlockOp(DataNode.now() - startTime);
        break;
      case DataTransferProtocol.OP_BLOCK_CHECKSUM: //get the checksum of a block
        getBlockChecksum(in);
        datanode.myMetrics.addBlockChecksumOp(DataNode.now() - startTime);
        break;
      default:
        throw new IOException("Unknown opcode " + op + " in data stream");
      }
    } catch (Throwable t) {
      LOG.error(datanode.dnRegistration + ":DataXceiver",t);
    } finally {
      LOG.debug(datanode.dnRegistration + ":Number of active connections is: "
                               + datanode.getXceiverCount());
      IOUtils.closeStream(in);
      IOUtils.closeSocket(s);
      dataXceiverServer.childSockets.remove(s);
    }
  }

 
private void readBlock(DataInputStream in) throws IOException {
    //
    // Read in the header
    //
    long blockId = in.readLong();         
    Block block = new Block( blockId, 0 , in.readLong());

    long startOffset = in.readLong();
    long length = in.readLong();
    String clientName = Text.readString(in);
    //建立一個寫入流,用於向客戶端寫資料
    Token<BlockTokenIdentifier> accessToken = new Token<BlockTokenIdentifier>();
    accessToken.readFields(in);
    OutputStream baseStream = NetUtils.getOutputStream(s,
        datanode.socketWriteTimeout);
    DataOutputStream out = new DataOutputStream(
                 new BufferedOutputStream(baseStream, SMALL_BUFFER_SIZE));
    if (datanode.isBlockTokenEnabled) {
      try {
        datanode.blockTokenSecretManager.checkAccess(accessToken, null, block,
            BlockTokenSecretManager.AccessMode.READ);
      } catch (InvalidToken e) {
        try {

          out.writeShort(DataTransferProtocol.OP_STATUS_ERROR_ACCESS_TOKEN);
          out.flush();
          throw new IOException("Access token verification failed, for client "
              + remoteAddress + " for OP_READ_BLOCK for block " + block);
        } finally {
          IOUtils.closeStream(out);
        }
      }
    }
    // send the block
    BlockSender blockSender = null;
    final String clientTraceFmt =
      clientName.length() > 0 && ClientTraceLog.isInfoEnabled()
        ? String.format(DN_CLIENTTRACE_FORMAT, localAddress, remoteAddress,
            "%d", "HDFS_READ", clientName, "%d",
            datanode.dnRegistration.getStorageID(), block, "%d")
        : datanode.dnRegistration + " Served block " + block + " to " +
            s.getInetAddress();
    try {
      try {
    //生成BlockSender用於讀取本地的block的資料,併發送給客戶端 
    //BlockSender有一個成員變數InputStream blockIn用於讀取本地block的資料  
        blockSender = new BlockSender(block, startOffset, length,
            true, true, false, datanode, clientTraceFmt);
      } catch(IOException e) {
        out.writeShort(DataTransferProtocol.OP_STATUS_ERROR);
        throw e;
      }
      //向客戶端寫入資料
      out.writeShort(DataTransferProtocol.OP_STATUS_SUCCESS); // send op status
      long read = blockSender.sendBlock(out, baseStream, null); // send data

      if (blockSender.isBlockReadFully()) {
        // See if client verification succeeded.
        // This is an optional response from client.
        try {
          if (in.readShort() == DataTransferProtocol.OP_STATUS_CHECKSUM_OK  &&
              datanode.blockScanner != null) {
            datanode.blockScanner.verifiedByClient(block);
          }
        } catch (IOException ignored) {}
      }
     
      datanode.myMetrics.incrBytesRead((int) read);
      datanode.myMetrics.incrBlocksRead();
    } catch ( SocketException ignored ) {
      // Its ok for remote side to close the connection anytime.
      datanode.myMetrics.incrBlocksRead();
    } catch ( IOException ioe ) {
      /* What exactly should we do here?
       * Earlier version shutdown() datanode if there is disk error.
       */
      LOG.warn(datanode.dnRegistration +  ":Got exception while serving " +
          block + " to " +
                s.getInetAddress() + ":\n" +
                StringUtils.stringifyException(ioe) );
      throw ioe;
    } finally {
      IOUtils.closeStream(out);
      IOUtils.closeStream(blockSender);
    }
  }

三 UML序列圖 開啟過程
讀取過程