1. 程式人生 > >hadoop2.6.0原始碼剖析-客戶端(第二部分--讀(open)HDFS檔案)

hadoop2.6.0原始碼剖析-客戶端(第二部分--讀(open)HDFS檔案)

在讀HDFS檔案前,需要先open該檔案,這個呼叫的是org.apache.hadoop.fs.FileSystem類物件,但是由於實際建立的物件是org.apache.hadoop.hdfs.DistributedFileSystem類物件,後者是前者的子類,所以呼叫父類中的FSDataInputStream open(Path f, int bufferSize)函式最終會呼叫到子類的該函式中,也就是說會呼叫org.apache.hadoop.hdfs.DistributedFileSystem類中的FSDataInputStream open(Path f, int bufferSize)函式,我們進入到該函式中,程式碼如下:

@Override
  public FSDataInputStream open(Path f, final int bufferSize)
      throws IOException {
    //新增讀操作次數
    statistics.incrementReadOps(1);
    //獲取絕對路徑對應的Path類物件
    Path absF = fixRelativePart(f);
    return new FileSystemLinkResolver<FSDataInputStream>() {
      @Override
      public FSDataInputStream doCall(final Path p)
          throws IOException, UnresolvedLinkException {
        final DFSInputStream dfsis =
          dfs.open(getPathName(p), bufferSize, verifyChecksum);
        return dfs.createWrappedInputStream(dfsis);
      }
      @Override
      public FSDataInputStream next(final FileSystem fs, final Path p)
          throws IOException {
        return fs.open(p, bufferSize);
      }
    }.resolve(this, absF);
  }

我們看程式碼的第一行,首先分析一下statistics,這是一個FileSystem中的成員變數,定義程式碼如下:

protected Statistics statistics;

Statistics類是FileSystem類中的內部類,用來統計一些資料。

賦值是在public void initialize(URI name, Configuration conf) throws IOException函式中,這個函式是在建立org.apache.hadoop.hdfs.DistributedFileSystem類物件之後呼叫的,用來初始化,程式碼如下:

/** Called after a new FileSystem instance is constructed.
   * @param name a uri whose authority section names the host, port, etc.
   *   for this FileSystem
   * @param conf the configuration
   */
  public void initialize(URI name, Configuration conf) throws IOException {
    statistics = getStatistics(name.getScheme(), getClass());    
    resolveSymlinks = conf.getBoolean(
        CommonConfigurationKeys.FS_CLIENT_RESOLVE_REMOTE_SYMLINKS_KEY,
        CommonConfigurationKeys.FS_CLIENT_RESOLVE_REMOTE_SYMLINKS_DEFAULT);
  }

我們進入到函式getStatistics中,程式碼如下:

/**
   * Get the statistics for a particular file system
   * @param cls the class to lookup
   * @return a statistics object
   */
  //這個函式是一個同步函式,scheme為協議名稱(如http,https,hdfs等)
  //cls為對應的Class物件
  public static synchronized 
  Statistics getStatistics(String scheme, Class<? extends FileSystem> cls) {
    /* Recording statistics per a FileSystem class
       //statisticsTable是FileSystem中的一個靜態成員變數, 
       private static final Map<Class<? extends FileSystem>, Statistics> 
       statisticsTable = new IdentityHashMap<Class<? extends FileSystem>, Statistics>();
   */
    //先判斷該map中是否存在key為Class物件的元素
    Statistics result = statisticsTable.get(cls);
    //如果沒有
    if (result == null) {
      //那麼就建立Statistics類物件
      result = new Statistics(scheme);
      //將該Statistics類物件與Class物件關聯起來
      statisticsTable.put(cls, result);
    }
    //否則直接返回
    return result;
  }

回到函式open,我們繼續往下分析,程式碼如下:

return new FileSystemLinkResolver<FSDataInputStream>() {
      @Override
      public FSDataInputStream doCall(final Path p)
          throws IOException, UnresolvedLinkException {
        final DFSInputStream dfsis =
          dfs.open(getPathName(p), bufferSize, verifyChecksum);
        return dfs.createWrappedInputStream(dfsis);
      }
      @Override
      public FSDataInputStream next(final FileSystem fs, final Path p)
          throws IOException {
        return fs.open(p, bufferSize);
      }
    }.resolve(this, absF);

這裡首先建立一個匿名內部類(必須繼承一個了類或者實現某個介面,匿名內部類詳細介紹),該內部類繼承FileSystemLinkResolver<FSDataInputStream>類,我們進入到resolve函式,由於匿名內部類沒有過載該函式,那麼就會呼叫父類的,進入到該函式中,程式碼如下:

/**
   * Attempt calling overridden {@link #doCall(Path)} method with
   * specified {@link FileSystem} and {@link Path}. If the call fails with an
   * UnresolvedLinkException, it will try to resolve the path and retry the call
   * by calling {@link #next(FileSystem, Path)}.
   * @param filesys FileSystem with which to try call
   * @param path Path with which to try call
   * @return Generic type determined by implementation
   * @throws IOException
   */
  public T resolve(final FileSystem filesys, final Path path)
      throws IOException {
    int count = 0;
    T in = null;
    Path p = path;
    // Assumes path belongs to this FileSystem.
    // Callers validate this by passing paths through FileSystem#checkPath
    FileSystem fs = filesys;
    for (boolean isLink = true; isLink;) {
      try {
        //呼叫doCall函式,這個函式會呼叫繼承了FileSystemLinkResolver<FSDataInputStream>的匿名 
        //類中的doCall函式
        in = doCall(p);
        isLink = false;
      } catch (UnresolvedLinkException e) {//如果呼叫doCall函式丟擲了不能解析連結的異常
        if (!filesys.resolveSymlinks) {//如果不能使用軟連結,那麼丟擲異常,提示要開啟的目錄包 
                                       //含了軟連結,而系統配置中關閉瞭解析軟連結的支援
          throw new IOException("Path " + path + " contains a symlink"
              + " and symlink resolution is disabled ("
              + CommonConfigurationKeys.FS_CLIENT_RESOLVE_REMOTE_SYMLINKS_KEY
              + ").", e);
        }
        if (!FileSystem.areSymlinksEnabled()) {
          throw new IOException("Symlink resolution is disabled in" +
              " this version of Hadoop.");
        }
        //FsConstants.MAX_PATH_LINKS為最大解析軟連結的遞迴次數
        if (count++ > FsConstants.MAX_PATH_LINKS) {
          throw new IOException("Possible cyclic loop while " +
                                "following symbolic link " + path);
        }
        // Resolve the first unresolved path component
        //解析第一個不能解析的路徑
        //fs.getUri()返回namenode對應的URI類物件,裡面是namenode的訪問地址,p是Path類物件,裡 
        //面儲存了要開啟檔案的路徑資訊,
        p = FSLinkResolver.qualifySymlinkTarget(fs.getUri(), p,
            filesys.resolveLink(p));
        fs = FileSystem.getFSofPath(p, filesys.getConf());
        // Have to call next if it's a new FS
        if (!fs.equals(filesys)) {
          return next(fs, p);
        }
        // Else, we keep resolving with this filesystem
      }
    }
    // Successful call, path was fully resolved
    return in;
  }

這裡使用到了HDFS的軟連結和硬連結知識,這個跟linux的軟連結和硬連結相同,關於這個概念可以檢視軟連結和硬連結,這裡不再贅述。我們下面來分析一下filesys.resolveLink(p)程式碼,這個函式是呼叫DistributedFileSystem類中的,用來解析軟連結,函式程式碼如下:

@Override
  protected Path resolveLink(Path f) throws IOException {
    statistics.incrementReadOps(1);
    //getPathName用來獲取檔案路徑,getLinkTarget用來解析軟連結
    String target = dfs.getLinkTarget(getPathName(fixRelativePart(f)));
    if (target == null) {
      throw new FileNotFoundException("File does not exist: " + f.toString());
    }
    return new Path(target);
  }

我們進入到getLinkTarget函式中,程式碼如下:

/**
   * Resolve the *first* symlink, if any, in the path.
   * 
   * @see ClientProtocol#getLinkTarget(String)
   */
  public String getLinkTarget(String path) throws IOException { 
    checkOpen();
    try {
      //這裡最終會呼叫namenode中的對應函式,而namenode是一個代理物件
      return namenode.getLinkTarget(path);
    } catch (RemoteException re) {
      throw re.unwrapRemoteException(AccessControlException.class,
                                     FileNotFoundException.class);
    }
  }

關於namenode這個代理物件的由來我們前面章節已經介紹過了,我這邊在此總結一下:

DFSClient.jpeg

                                                                                            DFSClient.jpeg

上圖中的drop.jpeg如下:

drop.jpeg

至此namenode這個代理物件的由來我們就總結到這裡,下面我們繼續往下分析。

namenode.getLinkTarget函式會呼叫ClientNamenodeProtocolTranslatorPB類中的getLinkTarget函式,而該函式如下:

@Override
  public String getLinkTarget(String path) throws AccessControlException,
      FileNotFoundException, IOException {
    GetLinkTargetRequestProto req = GetLinkTargetRequestProto.newBuilder()
        .setPath(path).build();
    try {
      GetLinkTargetResponseProto rsp = rpcProxy.getLinkTarget(null, req);
      return rsp.hasTargetPath() ? rsp.getTargetPath() : null;
    } catch (ServiceException e) {
      throw ProtobufHelper.getRemoteException(e);
    }
  }

其中的rpcProxy是ClientNamenodeProtocolPB類的代理物件,而該代理物件在呼叫介面函式的時候會先呼叫org.apache.hadoop.ipc.ProtobufRpcEngine.Invoker類中的invoke函式,這個函式會先將呼叫資訊序列化,然後通過RPC將資料傳送到遠端namenode端,namenode端收到訊息後,會先反序列化,然後解析資訊,並呼叫相應的函式,並將函式執行結果序列化,再將序列化資料通過RPC返回客戶端。我們看看invoke函式,程式碼如下:

/**
     * This is the client side invoker of RPC method. It only throws
     * ServiceException, since the invocation proxy expects only
     * ServiceException to be thrown by the method in case protobuf service.
     * 
     * ServiceException has the following causes:
     * <ol>
     * <li>Exceptions encountered on the client side in this method are 
     * set as cause in ServiceException as is.</li>
     * <li>Exceptions from the server are wrapped in RemoteException and are
     * set as cause in ServiceException</li>
     * </ol>
     * 
     * Note that the client calling protobuf RPC methods, must handle
     * ServiceException by getting the cause from the ServiceException. If the
     * cause is RemoteException, then unwrap it to get the exception thrown by
     * the server.
     */
    @Override
    public Object invoke(Object proxy, Method method, Object[] args)
        throws ServiceException {
      long startTime = 0;
      if (LOG.isDebugEnabled()) {
        startTime = Time.now();
      }
      
      if (args.length != 2) { // RpcController + Message
        throw new ServiceException("Too many parameters for request. Method: ["
            + method.getName() + "]" + ", Expected: 2, Actual: "
            + args.length);
      }
      if (args[1] == null) {
        throw new ServiceException("null param while calling Method: ["
            + method.getName() + "]");
      }

      TraceScope traceScope = null;
      // if Tracing is on then start a new span for this rpc.
      // guard it in the if statement to make sure there isn't
      // any extra string manipulation.
      if (Trace.isTracing()) {
        traceScope = Trace.startSpan(
            method.getDeclaringClass().getCanonicalName() +
            "." + method.getName());
      }

      //構造請求頭域,標明在上面介面上呼叫上面方法
      RequestHeaderProto rpcRequestHeader = constructRpcRequestHeader(method);
      
      if (LOG.isTraceEnabled()) {
        LOG.trace(Thread.currentThread().getId() + ": Call -> " +
            remoteId + ": " + method.getName() +
            " {" + TextFormat.shortDebugString((Message) args[1]) + "}");
      }

      //獲取請求呼叫的引數,例如RenameRequestProto
      Message theRequest = (Message) args[1];
      final RpcResponseWrapper val;
      try {
    	//呼叫PRC.Client傳送請求
        val = (RpcResponseWrapper) client.call(RPC.RpcKind.RPC_PROTOCOL_BUFFER,
            new RpcRequestWrapper(rpcRequestHeader, theRequest), remoteId,
            fallbackToSimpleAuth);

      } catch (Throwable e) {
        if (LOG.isTraceEnabled()) {
          LOG.trace(Thread.currentThread().getId() + ": Exception <- " +
              remoteId + ": " + method.getName() +
                " {" + e + "}");
        }
        if (Trace.isTracing()) {
          traceScope.getSpan().addTimelineAnnotation(
              "Call got exception: " + e.getMessage());
        }
        throw new ServiceException(e);
      } finally {
        if (traceScope != null) traceScope.close();
      }

      if (LOG.isDebugEnabled()) {
        long callTime = Time.now() - startTime;
        LOG.debug("Call: " + method.getName() + " took " + callTime + "ms");
      }
      
      Message prototype = null;
      try {
        //獲取返回引數型別,RenameResponseProto
        prototype = getReturnProtoType(method);
      } catch (Exception e) {
        throw new ServiceException(e);
      }
      Message returnMessage;
      try {
    	//序列化響應資訊並返回
        returnMessage = prototype.newBuilderForType()
            .mergeFrom(val.theResponseRead).build();

        if (LOG.isTraceEnabled()) {
          LOG.trace(Thread.currentThread().getId() + ": Response <- " +
              remoteId + ": " + method.getName() +
                " {" + TextFormat.shortDebugString(returnMessage) + "}");
        }

      } catch (Throwable e) {
        throw new ServiceException(e);
      }
      //返回結果
      return returnMessage;
    }

將呼叫的函式和引數封裝到對應的類物件中後,使用PRC傳送請求到遠端namenode端。程式碼如下:

//呼叫PRC.Client傳送請求
        val = (RpcResponseWrapper) client.call(RPC.RpcKind.RPC_PROTOCOL_BUFFER,
            new RpcRequestWrapper(rpcRequestHeader, theRequest), remoteId,
            fallbackToSimpleAuth);

其中RPC.RpcKind是一個列舉,裡面元素為序列化方式,這裡RPC.RpcKind.RPC_PROTOCOL_BUFFER表示使用ProtobufRpcEngine類中的序列化方式。new RpcRequestWrapper(rpcRequestHeader, theRequest)用來將函式和引數儲存到RpcRequestWrapper類物件中。這裡我們先來看看client的由來,client是一個org.apache.hadoop.ipc.Client類物件,建構函式如下:

/** Construct an IPC client whose values are of the given {@link Writable}
   * class. */
//valueClass為繼承Writable的類對應的Class物件
//conf為配置檔案物件
//factory為一個建立Socket類物件的工廠類物件
  public Client(Class<? extends Writable> valueClass, Configuration conf, 
      SocketFactory factory) {
    this.valueClass = valueClass;
    this.conf = conf;
    this.socketFactory = factory;
    this.connectionTimeout = conf.getInt(CommonConfigurationKeys.IPC_CLIENT_CONNECT_TIMEOUT_KEY,
        CommonConfigurationKeys.IPC_CLIENT_CONNECT_TIMEOUT_DEFAULT);
    this.fallbackAllowed = conf.getBoolean(CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY,
        CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT);
    this.clientId = ClientId.getClientId();
    this.sendParamsExecutor = clientExcecutorFactory.refAndGetInstance();
  }

根據上面的邏輯,factory通過NetUtils.getDefaultSocketFactory(conf)產生,該函式程式碼如下:
 

/**
   * Get the default socket factory as specified by the configuration
   * parameter <tt>hadoop.rpc.socket.factory.default</tt>
   * 
   * @param conf the configuration
   * @return the default socket factory as specified in the configuration or
   *         the JVM default socket factory if the configuration does not
   *         contain a default socket factory property.
   */
  public static SocketFactory getDefaultSocketFactory(Configuration conf) {

    String propValue = conf.get(
        CommonConfigurationKeysPublic.HADOOP_RPC_SOCKET_FACTORY_CLASS_DEFAULT_KEY,
        CommonConfigurationKeysPublic.HADOOP_RPC_SOCKET_FACTORY_CLASS_DEFAULT_DEFAULT);
    if ((propValue == null) || (propValue.length() == 0))
      return SocketFactory.getDefault();

    return getSocketFactoryFromProperty(conf, propValue);
  }

該函式會返回一個工廠類物件,這個工廠類物件會建立Socket類物件,用來客戶端和遠端namenode端通訊。我們看一下建立Socket類物件的程式碼:

@Override
  public Socket createSocket() throws IOException {
    /*
     * NOTE: This returns an NIO socket so that it has an associated 
     * SocketChannel. As of now, this unfortunately makes streams returned
     * by Socket.getInputStream() and Socket.getOutputStream() unusable
     * (because a blocking read on input stream blocks write on output stream
     * and vice versa).
     * 
     * So users of these socket factories should use 
     * NetUtils.getInputStream(socket) and 
     * NetUtils.getOutputStream(socket) instead.
     * 
     * A solution for hiding from this from user is to write a 
     * 'FilterSocket' on the lines of FilterInputStream and extend it by
     * overriding getInputStream() and getOutputStream().
     */
    return SocketChannel.open().socket();
  }

java.nio.channels.SocketChannel這個類是java的NIO提供的,關於NIO我們後面會講到。

我們繼續往下,進入到client.call函式中,程式碼如下:

/**
   * Make a call, passing <code>rpcRequest</code>, to the IPC server defined by
   * <code>remoteId</code>, returning the rpc response.
   *
   * @param rpcKind
   * @param rpcRequest -  contains serialized method and method parameters
   * @param remoteId - the target rpc server
   * @param serviceClass - service class for RPC
   * @param fallbackToSimpleAuth - set to true or false during this method to
   *   indicate if a secure client falls back to simple auth
   * @returns the rpc response
   * Throws exceptions if there are network problems or if the remote code
   * threw an exception.
   */
  public Writable call(RPC.RpcKind rpcKind, Writable rpcRequest,
      ConnectionId remoteId, int serviceClass,
      AtomicBoolean fallbackToSimpleAuth) throws IOException {
	
	//構造Call物件,並將序列化型別資訊、函式和引數資訊封裝到Call類物件中
    final Call call = createCall(rpcKind, rpcRequest);
    //構造Connection物件,首先會根據remoteId到快取中去取,如果沒有找到那麼就建立一個物件,同時將該物件儲存到快取中去,方便下次使用。
    Connection connection = getConnection(remoteId, call, serviceClass,
      fallbackToSimpleAuth);
    try {
      connection.sendRpcRequest(call);                 // send the rpc request
    } catch (RejectedExecutionException e) {
      throw new IOException("connection has been closed", e);
    } catch (InterruptedException e) {
      Thread.currentThread().interrupt();
      LOG.warn("interrupted waiting to send rpc request to server", e);
      throw new IOException(e);
    }

    boolean interrupted = false;
    synchronized (call) {
      while (!call.done) {
        try {
          call.wait();                           // wait for the result
        } catch (InterruptedException ie) {
          // save the fact that we were interrupted
          interrupted = true;
        }
      }

      if (interrupted) {
        // set the interrupt flag now that we are done waiting
        Thread.currentThread().interrupt();
      }

      //提取出Call物件中儲存的異常,直接丟擲
      if (call.error != null) {
        if (call.error instanceof RemoteException) {
          call.error.fillInStackTrace();
          throw call.error;
        } else { // local exception
          InetSocketAddress address = connection.getRemoteAddress();
          throw NetUtils.wrapException(address.getHostName(),
                  address.getPort(),
                  NetUtils.getHostname(),
                  0,
                  call.error);
        }
      } else {//如果成功執行,則返回Call物件中儲存的RPC響應訊息
    	//伺服器成功發回響應資訊,返回RPC響應  
        return call.getRpcResponse();
      }
    }
  }

call函式中呼叫createCall函式,該函式程式碼如下:

Call createCall(RPC.RpcKind rpcKind, Writable rpcRequest) {
    return new Call(rpcKind, rpcRequest);
  }

這個函式用來建立一個Call類物件,我們進入到Call的建構函式中,程式碼如下:

private Call(RPC.RpcKind rpcKind, Writable param) {
      this.rpcKind = rpcKind;//序列化型別
      this.rpcRequest = param;//函式和引數封裝類物件
      //callId的建立程式碼為
      //private static final ThreadLocal<Integer> callId = new ThreadLocal<Integer>();
      final Integer id = callId.get();
      if (id == null) {
        this.id = nextCallId();
      } else {
        callId.set(null);
        this.id = id;
      }
      //retryCount的建立程式碼為
      //private static final ThreadLocal<Integer> retryCount = new ThreadLocal<Integer>();
      final Integer rc = retryCount.get();
      if (rc == null) {
        this.retry = 0;
      } else {
        this.retry = rc;
      }
    }

關於ThreadLocal的詳解請看,ThreadLocal類詳解

關於java中引用的詳解,java引用詳解

我們繼續回到call函式中,繼續進行講解,執行程式碼如下:

connection.sendRpcRequest(call); 

我們進入到sendRpcRequest函式中,程式碼如下:

    /** Initiates a rpc call by sending the rpc request to the remote server.
     * Note: this is not called from the Connection thread, but by other
     * threads.
     * @param call - the rpc request
     */
    public void sendRpcRequest(final Call call)
        throws InterruptedException, IOException {
      //判斷連線是否關閉了,如果關閉了,那麼就直接返回,就無需執行下面的程式碼
      if (shouldCloseConnection.get()) {
        return;
      }

      // Serialize the call to be sent. This is done from the actual
      // caller thread, rather than the sendParamsExecutor thread,
      
      // so that if the serialization throws an error, it is reported
      // properly. This also parallelizes the serialization.
      //
      // Format of a call on the wire:
      // 0) Length of rest below (1 + 2)
      // 1) RpcRequestHeader  - is serialized Delimited hence contains length
      // 2) RpcRequest
      //
      // Items '1' and '2' are prepared here. 
      //先構造RPC請求頭
      final DataOutputBuffer d = new DataOutputBuffer();
      RpcRequestHeaderProto header = ProtoUtil.makeRpcRequestHeader(
          call.rpcKind, OperationProto.RPC_FINAL_PACKET, call.id, call.retry,
          clientId);
      //將RPC請求頭寫入輸出流
      header.writeDelimitedTo(d);
      //將RPC請求(包括請求元資料和請求引數)寫入輸出流
      call.rpcRequest.write(d);

      //這裡使用執行緒池將請求傳送出去,請求包括三個部分:1、長度 2、PRC請求頭 3、RPC請求(包括請求元資料以及請求引數)
      synchronized (sendRpcRequestLock) {
        Future<?> senderFuture = sendParamsExecutor.submit(new Runnable() {
          @Override
          public void run() {
            try {
              synchronized (Connection.this.out) {
                if (shouldCloseConnection.get()) {
                  return;
                }
                
                if (LOG.isDebugEnabled())
                  LOG.debug(getName() + " sending #" + call.id);
         
                byte[] data = d.getData();
                int totalLength = d.getLength();
                out.writeInt(totalLength); // Total Length   總長度
                out.write(data, 0, totalLength);// RpcRequestHeader + RpcRequest  RPC請求頭+RPC請求(請求元資料+引數)
                out.flush();
              }
            } catch (IOException e) {
              // exception at this point would leave the connection in an
              // unrecoverable state (eg half a call left on the wire).
              // So, close the connection, killing any outstanding calls
              //如果發生傳送異常,則直接關閉連線
              markClosed(e);
            } finally {
              //the buffer is just an in-memory buffer, but it is still polite to
              // close early
              //之前申請的buffer給關閉了,比較優雅
              IOUtils.closeStream(d);
            }
          }
        });
      
        //獲取執行結果
        try {
          //這裡會一直阻塞,直到結果返回
          senderFuture.get();
        } catch (ExecutionException e) {
          Throwable cause = e.getCause();
          
          //如果有異常則直接丟擲
          // cause should only be a RuntimeException as the Runnable above
          // catches IOException
          if (cause instanceof RuntimeException) {
            throw (RuntimeException) cause;
          } else {
            throw new RuntimeException("unexpected checked exception", cause);
          }
        }
      }
    }

客戶端通過RPC將請求傳送到namenode端,然後namenode端進行相應的操作,並將結果返回到客戶端,open函式中最終會呼叫匿名函式的doCall函式,程式碼如下:

public FSDataInputStream doCall(final Path p)
          throws IOException, UnresolvedLinkException {
        final DFSInputStream dfsis =
          dfs.open(getPathName(p), bufferSize, verifyChecksum);
        return dfs.createWrappedInputStream(dfsis);
      }

該函式呼叫dfs.open,該函式會建立DFSInputStream類物件,建構函式如下:

DFSInputStream(DFSClient dfsClient, String src, int buffersize, boolean verifyChecksum
                 ) throws IOException, UnresolvedLinkException {
    this.dfsClient = dfsClient;
    this.verifyChecksum = verifyChecksum;
    this.buffersize = buffersize;
    this.src = src;
    this.cachingStrategy =
        dfsClient.getDefaultReadCachingStrategy();
    openInfo();
  }

該建構函式中最終會呼叫openInfo函式,程式碼如下:

/**
   * Grab the open-file info from namenode
   */
  synchronized void openInfo() throws IOException, UnresolvedLinkException {
    lastBlockBeingWrittenLength = fetchLocatedBlocksAndGetLastBlockLength();
    int retriesForLastBlockLength = dfsClient.getConf().retryTimesForGetLastBlockLength;
    while (retriesForLastBlockLength > 0) {
      // Getting last block length as -1 is a special case. When cluster
      // restarts, DNs may not report immediately. At this time partial block
      // locations will not be available with NN for getting the length. Lets
      // retry for 3 times to get the length.
      if (lastBlockBeingWrittenLength == -1) {
        DFSClient.LOG.warn("Last block locations not available. "
            + "Datanodes might not have reported blocks completely."
            + " Will retry for " + retriesForLastBlockLength + " times");
        waitFor(dfsClient.getConf().retryIntervalForGetLastBlockLength);
        lastBlockBeingWrittenLength = fetchLocatedBlocksAndGetLastBlockLength();
      } else {
        break;
      }
      retriesForLastBlockLength--;
    }
    if (retriesForLastBlockLength == 0) {
      throw new IOException("Could not obtain the last block locations.");
    }
  }

該函式會到遠端namenode上獲取要開啟的檔案的資訊,我們進入函式fetchLocatedBlocksAndGetLastBlockLength()中程式碼如下:

private long fetchLocatedBlocksAndGetLastBlockLength() throws IOException {
    final LocatedBlocks newInfo = dfsClient.getLocatedBlocks(src, 0);
    if (DFSClient.LOG.isDebugEnabled()) {
      DFSClient.LOG.debug("newInfo = " + newInfo);
    }
    if (newInfo == null) {
      throw new IOException("Cannot open filename " + src);
    }

    if (locatedBlocks != null) {
      Iterator<LocatedBlock> oldIter = locatedBlocks.getLocatedBlocks().iterator();
      Iterator<LocatedBlock> newIter = newInfo.getLocatedBlocks().iterator();
      while (oldIter.hasNext() && newIter.hasNext()) {
        if (! oldIter.next().getBlock().equals(newIter.next().getBlock())) {
          throw new IOException("Blocklist for " + src + " has changed!");
        }
      }
    }
    locatedBlocks = newInfo;
    long lastBlockBeingWrittenLength = 0;
    if (!locatedBlocks.isLastBlockComplete()) {
      final LocatedBlock last = locatedBlocks.getLastLocatedBlock();
      if (last != null) {
        if (last.getLocations().length == 0) {
          if (last.getBlockSize() == 0) {
            // if the length is zero, then no data has been written to
            // datanode. So no need to wait for the locations.
            return 0;
          }
          return -1;
        }
        final long len = readBlockLength(last);
        last.getBlock().setNumBytes(len);
        lastBlockBeingWrittenLength = len; 
      }
    }

    fileEncryptionInfo = locatedBlocks.getFileEncryptionInfo();

    currentNode = null;
    return lastBlockBeingWrittenLength;
  }

dfsClient.getLocatedBlocks(src, 0);獲取要開啟檔案的塊資訊,為後面的讀寫操作做準備,總結一下,open用來獲取指定檔案在namenode中的塊資訊,通過塊資訊就可以使得客戶端向datanode傳送讀寫請求,關於讀寫操作,我們後面會講到。