1. 程式人生 > >hadoop2.6.0原始碼剖析-客戶端(第二部分--讀(read)HDFS檔案)

hadoop2.6.0原始碼剖析-客戶端(第二部分--讀(read)HDFS檔案)

上篇文章我們分析了open函式,這個函式會獲取要開啟檔案的塊資訊,接下來我們開始分析讀檔案部分的程式碼。

我們先來看一個示例,程式碼如下:

package com.hadoop.senior.hdfs;
 
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
 
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
 
public class HDFSApp {
	public static FileSystem getFileSystem() throws IOException{	
		//core-site.xml	hdfs-site.xml log4j.properties
		Configuration configuration=new Configuration();
		//FileSystem
		FileSystem filesystem= FileSystem.get(configuration);
		return filesystem;
	}
	//讀取hdfs檔案系統上的檔案,在視窗打印出來
	public static void readFile(String filename) throws IOException{
		
		FileSystem filesystem= getFileSystem();
		//read path
		Path path = new Path(filename);
		//dfInputStream
		FSDataInputStream inStream= filesystem.open(path); 
		try{
			IOUtils.copyBytes(inStream, System.out, 4, false);
		}catch(Exception e){
			e.printStackTrace();
		}finally{
			IOUtils.closeStream(inStream);
		}
	}
	//將本地檔案上傳到hdfs檔案系統上
	public static void writeFile(String filename) throws IOException{
		FileSystem filesystem=getFileSystem();
		Path path= new Path(filename);
		FSDataOutputStream outStream=filesystem.create(path);
		FileInputStream inStream=new FileInputStream(new File("/opt/modules/hadoop-2.5.0-cdh5.3.6/input.txt"));
		try{
			IOUtils.copyBytes(inStream, outStream, 1024, false);
		}catch(Exception e){
			e.printStackTrace();
		}finally{
			IOUtils.closeStream(inStream);
			IOUtils.closeStream(outStream);
		}
	}
	public static void main(String[] args) throws IOException {
	
		//read local file not use core-site.xml hdfs-site.xml
		//String filename="/opt/modules/workspace/senior/pom.xml";
		//read hdfs input.txt
		String filename ="/usr/css/mapreduce/wordcount/input/input.txt";
		String filename2="/usr/css/mapreduce/wordcount/input/input02.txt";
		readFile(filename);
		writeFile(filename2);
	}
}

上面這段程式碼有讀和寫兩個功能,我們先來分析讀,首先會先呼叫FileSystem類的open函式開啟檔案,這個open函式的分析見hadoop2.6.0原始碼剖析-客戶端(第二部分--讀(open)HDFS檔案),然後執行程式碼

IOUtils.copyBytes(inStream, System.out, 4, false);

這行程式碼用來將檔案內容從inStream中拷貝到System.out中,我們進入這個函式,程式碼如下:

/**
   * Copies from one stream to another.
   *
   * @param in InputStrem to read from
   * @param out OutputStream to write to
   * @param buffSize the size of the buffer 
   * @param close whether or not close the InputStream and 
   * OutputStream at the end. The streams are closed in the finally clause.  
   */
  public static void copyBytes(InputStream in, OutputStream out, int buffSize, boolean close) 
    throws IOException {
    try {
      copyBytes(in, out, buffSize);
      if(close) {
        out.close();
        out = null;
        in.close();
        in = null;
      }
    } finally {
      if(close) {
        closeStream(out);
        closeStream(in);
      }
    }
  }

我們看看copyBytes函式,程式碼如下:

/**
   * Copies from one stream to another.
   * 
   * @param in InputStrem to read from
   * @param out OutputStream to write to
   * @param buffSize the size of the buffer 
   */
  public static void copyBytes(InputStream in, OutputStream out, int buffSize) 
    throws IOException {
    PrintStream ps = out instanceof PrintStream ? (PrintStream)out : null;
    byte buf[] = new byte[buffSize];
    int bytesRead = in.read(buf);
    while (bytesRead >= 0) {
      out.write(buf, 0, bytesRead);
      if ((ps != null) && ps.checkError()) {
        throw new IOException("Unable to write to output stream.");
      }
      bytesRead = in.read(buf);
    }
  }

DFSInputStream 繼承 FSInputStream

FSInputStream 繼承 InputStream

DFSOutputStream 繼承 FSOutputSummer

FSOutputSummer 繼承 OutputStream

這裡由於in實際型別是DFSInpuStream,所以in.read(buf)會呼叫DFSInpuStream中的read函式。該函式程式碼如下:

/**
   * Read the entire buffer.
   */
  @Override
  public synchronized int read(final byte buf[], int off, int len) throws IOException {
    ReaderStrategy byteArrayReader = new ByteArrayStrategy(buf);

    return readWithStrategy(byteArrayReader, off, len);
  }

其中ByteArrayStrategy類程式碼如下:

/**
   * Used to read bytes into a byte[]
   */
  private static class ByteArrayStrategy implements ReaderStrategy {
    final byte[] buf;

    public ByteArrayStrategy(byte[] buf) {
      this.buf = buf;
    }

    @Override
    public int doRead(BlockReader blockReader, int off, int len,
            ReadStatistics readStatistics) throws ChecksumException, IOException {
        int nRead = blockReader.read(buf, off, len);
        updateReadStatistics(readStatistics, nRead, blockReader);
        return nRead;
    }
  }

在read函式中會呼叫readWithStrategy函式,該函式程式碼如下:

private int readWithStrategy(ReaderStrategy strategy, int off, int len) throws IOException {
    //用來判斷客戶端連線是否斷開了,如果斷開了那麼就丟擲異常
    dfsClient.checkOpen();
    if (closed) {
      throw new IOException("Stream closed");
    }
    Map<ExtendedBlock,Set<DatanodeInfo>> corruptedBlockMap 
      = new HashMap<ExtendedBlock, Set<DatanodeInfo>>();
    failures = 0;
    //如果當前讀取的檔案位置小於檔案長度
    if (pos < getFileLength()) {
      //重試次數
      int retries = 2;
      while (retries > 0) {
        try {
          // currentNode can be left as null if previous read had a checksum
          // error on the same block. See HDFS-3067
          //如果當前讀取的檔案位置超過了塊最大位置或者當前node為null,那麼需要更新當前的node,
          //也就是說此時要獲取的資料不在當前的塊上,而是另外一個塊,所以需要找到需要的那個node
          if (pos > blockEnd || currentNode == null) {
            currentNode = blockSeekTo(pos);
          }
          int realLen = (int) Math.min(len, (blockEnd - pos + 1L));
          if (locatedBlocks.isLastBlockComplete()) {
            realLen = (int) Math.min(realLen, locatedBlocks.getFileLength());
          }
          int result = readBuffer(strategy, off, realLen, corruptedBlockMap);
          
          if (result >= 0) {
            pos += result;
          } else {
            // got a EOS from reader though we expect more data on it.
            throw new IOException("Unexpected EOS from the reader");
          }
          if (dfsClient.stats != null) {
            dfsClient.stats.incrementBytesRead(result);
          }
          return result;
        } catch (ChecksumException ce) {
          throw ce;            
        } catch (IOException e) {
          if (retries == 1) {
            DFSClient.LOG.warn("DFS Read", e);
          }
          blockEnd = -1;
          if (currentNode != null) { addToDeadNodes(currentNode); }
          if (--retries == 0) {
            throw e;
          }
        } finally {
          // Check if need to report block replicas corruption either read
          // was successful or ChecksumException occured.
          reportCheckSumFailure(corruptedBlockMap, 
              currentLocatedBlock.getLocations().length);
        }
      }
    }
    return -1;
  }

當如果當前讀取的檔案位置超過了塊最大位置或者當前node為null,那麼需要更新當前的node,此時會呼叫函式blockSeekTo,我們進入到該函式中,程式碼如下:

/**
   * Open a DataInputStream to a DataNode so that it can be read from.
   * We get block ID and the IDs of the destinations at startup, from the namenode.
   */
  private synchronized DatanodeInfo blockSeekTo(long target) throws IOException {
    if (target >= getFileLength()) {
      throw new IOException("Attempted to read past end of file");
    }

    // Will be getting a new BlockReader.
    // 由於要獲得一個新的BlockReader,所以這裡需要將該變數置空
    if (blockReader != null) {
      blockReader.close();
      blockReader = null;
    }

    //
    // Connect to best DataNode for desired Block, with potential offset
    //
    DatanodeInfo chosenNode = null;
    int refetchToken = 1; // only need to get a new access token once
    int refetchEncryptionKey = 1; // only need to get a new encryption key once
    
    boolean connectFailedOnce = false;

    while (true) {
      //
      // Compute desired block
      //
      LocatedBlock targetBlock = getBlockAt(target, true);
      assert (target==pos) : "Wrong postion " + pos + " expect " + target;
      long offsetIntoBlock = target - targetBlock.getStartOffset();

      //獲取一個Datanode用來讀取該資料塊,
      DNAddrPair retval = chooseDataNode(targetBlock, null);
      chosenNode = retval.info;
      InetSocketAddress targetAddr = retval.addr;
      StorageType storageType = retval.storageType;

      try {
        ExtendedBlock blk = targetBlock.getBlock();
        Token<BlockTokenIdentifier> accessToken = targetBlock.getBlockToken();
        blockReader = new BlockReaderFactory(dfsClient.getConf()).
            setInetSocketAddress(targetAddr).
            setRemotePeerFactory(dfsClient).
            setDatanodeInfo(chosenNode).
            setStorageType(storageType).
            setFileName(src).
            setBlock(blk).
            setBlockToken(accessToken).
            setStartOffset(offsetIntoBlock).
            setVerifyChecksum(verifyChecksum).
            setClientName(dfsClient.clientName).
            setLength(blk.getNumBytes() - offsetIntoBlock).
            setCachingStrategy(cachingStrategy).
            setAllowShortCircuitLocalReads(!shortCircuitForbidden()).
            setClientCacheContext(dfsClient.getClientContext()).
            setUserGroupInformation(dfsClient.ugi).
            setConfiguration(dfsClient.getConfiguration()).
            build();//構造從指定Datanode上讀取資料塊的BlockReader物件,這裡使用了 
                    //BlockReaderFactory工廠類
        if(connectFailedOnce) {
          DFSClient.LOG.info("Successfully connected to " + targetAddr +
                             " for " + blk);
        }
        return chosenNode;
      } catch (IOException ex) {
        if (ex instanceof InvalidEncryptionKeyException && refetchEncryptionKey > 0) {
          DFSClient.LOG.info("Will fetch a new encryption key and retry, " 
              + "encryption key was invalid when connecting to " + targetAddr
              + " : " + ex);
          // The encryption key used is invalid.
          refetchEncryptionKey--;
          dfsClient.clearDataEncryptionKey();
        } else if (refetchToken > 0 && tokenRefetchNeeded(ex, targetAddr)) {
          refetchToken--;
          fetchBlockAt(target);
        } else {
          connectFailedOnce = true;
          DFSClient.LOG.warn("Failed to connect to " + targetAddr + " for block"
            + ", add to deadNodes and continue. " + ex, ex);
          // Put chosen node into dead list, continue
          addToDeadNodes(chosenNode);
        }
      }
    }
  }

函式blockSeekTo中呼叫getBlockAt用來獲取指定位置所在的塊,getBlockAt函式程式碼如下:

/**
   * Get block at the specified position.
   * Fetch it from the namenode if not cached.
   * 
   * @param offset block corresponding to this offset in file is returned
   * @param updatePosition whether to update current position
   * @return located block
   * @throws IOException
   */
  private synchronized LocatedBlock getBlockAt(long offset,
      boolean updatePosition) throws IOException {
    assert (locatedBlocks != null) : "locatedBlocks is null";

    final LocatedBlock blk;

    //check offset
    //資料偏移量不能小於0或者不能大於等於當前檔案的大小,否則丟擲異常
    if (offset < 0 || offset >= getFileLength()) {
      throw new IOException("offset < 0 || offset >= getFileLength(), offset="
          + offset
          + ", updatePosition=" + updatePosition
          + ", locatedBlocks=" + locatedBlocks);
    }//如果偏移量大於等於當前檔案大小
    else if (offset >= locatedBlocks.getFileLength()) {
      // offset to the portion of the last block,
      // which is not known to the name-node yet;
      // getting the last block 
      //獲得最新的塊資料
      blk = locatedBlocks.getLastLocatedBlock();
    }
    else {
      //在當前偏移量大於等於0且小於當前檔案大小時,開始從快取中讀取響應塊資料
      // search cached blocks first
      int targetBlockIdx = locatedBlocks.findBlock(offset);
      if (targetBlockIdx < 0) { // block is not cached
        //此時說明要找的偏移量資料不在指定的塊中
        targetBlockIdx = LocatedBlocks.getInsertIndex(targetBlockIdx);
        // fetch more blocks
        //從遠端伺服器上去獲取指定偏移量的塊資料
        final LocatedBlocks newBlocks = dfsClient.getLocatedBlocks(src, offset);
        assert (newBlocks != null) : "Could not find target position " + offset;
        //將新獲取到的塊資料替換對應的老資料,對應規則下面會講到
        locatedBlocks.insertRange(targetBlockIdx, newBlocks.getLocatedBlocks());
      }
      //返回相應的塊
      blk = locatedBlocks.get(targetBlockIdx);
    }

    // update current position
    if (updatePosition) {
      pos = offset;
      blockEnd = blk.getStartOffset() + blk.getBlockSize() - 1;
      currentLocatedBlock = blk;
    }
    return blk;
  }

我們進入到findBlock函式中,該函式用來獲取指定偏移量所在塊對應的索引號,在快取中查詢塊索引值,程式碼如下:

/**
   * Find block containing specified offset.
   * 
   * @return block if found, or null otherwise.
   */
  public int findBlock(long offset) {
    // create fake block of size 0 as a key
    LocatedBlock key = new LocatedBlock(
        new ExtendedBlock(), new DatanodeInfo[0], 0L, false);
    key.setStartOffset(offset);
    key.getBlock().setNumBytes(1);
    Comparator<LocatedBlock> comp = 
      new Comparator<LocatedBlock>() {
        // Returns 0 iff a is inside b or b is inside a
        @Override
        public int compare(LocatedBlock a, LocatedBlock b) {
          long aBeg = a.getStartOffset();
          long bBeg = b.getStartOffset();
          long aEnd = aBeg + a.getBlockSize();
          long bEnd = bBeg + b.getBlockSize();
          if(aBeg <= bBeg && bEnd <= aEnd 
              || bBeg <= aBeg && aEnd <= bEnd)
            return 0; // one of the blocks is inside the other
          if(aBeg < bBeg)
            return -1; // a's left bound is to the left of the b's
          return 1;
        }
      };
    return Collections.binarySearch(blocks, key, comp);
  }

findBlock函式:

返回0表示一個塊在另外一個塊的裡面,

返回-1表示要找到偏移量不在快取中的塊中

返回1表示其他情況

我們現在回到getBlockAt函式中,繼續往下,我們進入到函式insertRange中,程式碼如下:

public void insertRange(int blockIdx, List<LocatedBlock> newBlocks) {
    int oldIdx = blockIdx;
    int insStart = 0, insEnd = 0;
    for(int newIdx = 0; newIdx < newBlocks.size() && oldIdx < blocks.size(); 
                                                        newIdx++) {
      long newOff = newBlocks.get(newIdx).getStartOffset();
      long oldOff = blocks.get(oldIdx).getStartOffset();
      if(newOff < oldOff) {
        insEnd++;
      } else if(newOff == oldOff) {
        // replace old cached block by the new one
        blocks.set(oldIdx, newBlocks.get(newIdx));
        if(insStart < insEnd) { // insert new blocks
          blocks.addAll(oldIdx, newBlocks.subList(insStart, insEnd));
          oldIdx += insEnd - insStart;
        }
        insStart = insEnd = newIdx+1;
        oldIdx++;
      } else {  // newOff > oldOff
        assert false : "List of LocatedBlock must be sorted by startOffset";
      }
    }
    insEnd = newBlocks.size();
    if(insStart < insEnd) { // insert new blocks
      blocks.addAll(oldIdx, newBlocks.subList(insStart, insEnd));
    }
  }

這個函式會將新的塊替換老的塊。我們回到getBlockAt函式中,獲取到塊後,會將相應的引數進行更新並返回塊資訊,當前的塊讀取偏移量,當前的塊,當前塊的最大偏移量。

至此整個讀的過程就結束了!文章很多函式都沒有深入講解,後面會做相應的補充。