1. 程式人生 > >hadoop2.6.0原始碼剖析-客戶端(第二部分--DFSClient)上(非HA代理)

hadoop2.6.0原始碼剖析-客戶端(第二部分--DFSClient)上(非HA代理)

我們講解完了DistributedFileSystem,隨著程式碼我們來到了DFSClient建構函式中,該函式程式碼如下:

/** 
   * Create a new DFSClient connected to the given nameNodeUri or rpcNamenode.
   * If HA is enabled and a positive value is set for 
   * {@link DFSConfigKeys#DFS_CLIENT_TEST_DROP_NAMENODE_RESPONSE_NUM_KEY} in the
   * configuration, the DFSClient will use {@link LossyRetryInvocationHandler}
   * as its RetryInvocationHandler. Otherwise one of nameNodeUri or rpcNamenode 
   * must be null.
   */
  @VisibleForTesting
  public DFSClient(URI nameNodeUri, ClientProtocol rpcNamenode,Configuration conf, FileSystem.Statistics stats)
    throws IOException {
    // Copy only the required DFSClient configuration
    this.dfsClientConf = new Conf(conf);
    if (this.dfsClientConf.useLegacyBlockReaderLocal) {
      LOG.debug("Using legacy short-circuit local reads.");
    }
    this.conf = conf;//設定HDFS配置資訊
    this.stats = stats;//Client狀態統計資訊,包括Client讀、寫位元組數等。

    //下面內容有介紹
    this.socketFactory = NetUtils.getSocketFactory(conf, ClientProtocol.class);

    //dtpReplaceDatanodeOnFailure用來表明當Client讀寫資料時,Datanode出現故障,是否進行 
    //Datanode替換的策略。如果為false,那麼一旦Datanode出了故障就不會進行替換,如果為true,那麼就 
    //會去獲取備用的副本Datanode,並進行替換
    this.dtpReplaceDatanodeOnFailure = ReplaceDatanodeOnFailure.get(conf);

    //獲取當前使用者資訊,用於許可權管理,例如hdfs檔案讀、寫、刪除等的時候判斷是否有這個許可權,提交job 
    //的時候是否有相應的使用資源許可權,詳解見 
    //https://blog.csdn.net/u011491148/article/details/45868467
    this.ugi = UserGroupInformation.getCurrentUser();
    //獲得授權機構資訊(包括使用者資訊、主機和埠) 
    this.authority = nameNodeUri == null? "null": nameNodeUri.getAuthority();
    //使用者名稱稱
    this.clientName = "DFSClient_" + dfsClientConf.taskId + "_" + 
        DFSUtil.getRandom().nextInt()  + "_" + Thread.currentThread().getId();
    //用於解密
    provider = DFSUtil.createKeyProvider(conf);
    if (LOG.isDebugEnabled()) {
      if (provider == null) {
        LOG.debug("No KeyProvider found.");
      } else {
        LOG.debug("Found KeyProvider: " + provider.toString());
      }
    int numResponseToDrop = conf.getInt(
        DFSConfigKeys.DFS_CLIENT_TEST_DROP_NAMENODE_RESPONSE_NUM_KEY,
        DFSConfigKeys.DFS_CLIENT_TEST_DROP_NAMENODE_RESPONSE_NUM_DEFAULT);
    NameNodeProxies.ProxyAndInfo<ClientProtocol> proxyInfo = null;
    AtomicBoolean nnFallbackToSimpleAuth = new AtomicBoolean(false);
    if (numResponseToDrop > 0) {
      // This case is used for testing.
      LOG.warn(DFSConfigKeys.DFS_CLIENT_TEST_DROP_NAMENODE_RESPONSE_NUM_KEY
          + " is set to " + numResponseToDrop
          + ", this hacked client will proactively drop responses");
      proxyInfo = NameNodeProxies.createProxyWithLossyRetryHandler(conf,
          nameNodeUri, ClientProtocol.class, numResponseToDrop,
          nnFallbackToSimpleAuth);
    }
    
    if (proxyInfo != null) {
      this.dtService = proxyInfo.getDelegationTokenService();
      this.namenode = proxyInfo.getProxy();
    } else if (rpcNamenode != null) {
      // This case is used for testing.
      Preconditions.checkArgument(nameNodeUri == null);
      this.namenode = rpcNamenode;
      dtService = null;
    } else {
      Preconditions.checkArgument(nameNodeUri != null,
          "null URI");

      //通過NameNodeProxies.createProxy()建立Rpc引用,下面會進行詳細描述
      proxyInfo = NameNodeProxies.createProxy(conf, nameNodeUri,
          ClientProtocol.class, nnFallbackToSimpleAuth);
      this.dtService = proxyInfo.getDelegationTokenService();
      this.namenode = proxyInfo.getProxy();
    }

    String localInterfaces[] =
      conf.getTrimmedStrings(DFSConfigKeys.DFS_CLIENT_LOCAL_INTERFACES);
    localInterfaceAddrs = getLocalInterfaceAddrs(localInterfaces);
    if (LOG.isDebugEnabled() && 0 != localInterfaces.length) {
      LOG.debug("Using local interfaces [" +
      Joiner.on(',').join(localInterfaces)+ "] with addresses [" +
      Joiner.on(',').join(localInterfaceAddrs) + "]");
    }
    
    Boolean readDropBehind = (conf.get(DFS_CLIENT_CACHE_DROP_BEHIND_READS) == null) ?
        null : conf.getBoolean(DFS_CLIENT_CACHE_DROP_BEHIND_READS, false);
    Long readahead = (conf.get(DFS_CLIENT_CACHE_READAHEAD) == null) ?
        null : conf.getLong(DFS_CLIENT_CACHE_READAHEAD, 0);
    Boolean writeDropBehind = (conf.get(DFS_CLIENT_CACHE_DROP_BEHIND_WRITES) == null) ?
        null : conf.getBoolean(DFS_CLIENT_CACHE_DROP_BEHIND_WRITES, false);
    this.defaultReadCachingStrategy =
        new CachingStrategy(readDropBehind, readahead);
    this.defaultWriteCachingStrategy =
        new CachingStrategy(writeDropBehind, readahead);
    this.clientContext = ClientContext.get(
        conf.get(DFS_CLIENT_CONTEXT, DFS_CLIENT_CONTEXT_DEFAULT),
        dfsClientConf);
    this.hedgedReadThresholdMillis = conf.getLong(
        DFSConfigKeys.DFS_DFSCLIENT_HEDGED_READ_THRESHOLD_MILLIS,
        DFSConfigKeys.DEFAULT_DFSCLIENT_HEDGED_READ_THRESHOLD_MILLIS);
    int numThreads = conf.getInt(
        DFSConfigKeys.DFS_DFSCLIENT_HEDGED_READ_THREADPOOL_SIZE,
        DFSConfigKeys.DEFAULT_DFSCLIENT_HEDGED_READ_THREADPOOL_SIZE);
    if (numThreads > 0) {
      this.initThreadsNumForHedgedReads(numThreads);
    }
    this.saslClient = new SaslDataTransferClient(
      conf, DataTransferSaslUtil.getSaslPropertiesResolver(conf),
      TrustedChannelResolver.getInstance(conf), nnFallbackToSimpleAuth);
  }

執行程式碼this.socketFactory = NetUtils.getSocketFactory(conf, ClientProtocol.class);

/**
   * Get the socket factory for the given class according to its
   * configuration parameter
   * <tt>hadoop.rpc.socket.factory.class.&lt;ClassName&gt;</tt>. When no
   * such parameter exists then fall back on the default socket factory as
   * configured by <tt>hadoop.rpc.socket.factory.class.default</tt>. If
   * this default socket factory is not configured, then fall back on the JVM
   * default socket factory.
   * 
   * @param conf the configuration
   * @param clazz the class (usually a {@link VersionedProtocol})
   * @return a socket factory
   */
  public static SocketFactory getSocketFactory(Configuration conf,
      Class<?> clazz) {

    SocketFactory factory = null;

    //由於clazz為ClientProtocol.class,對hadoop.rpc.socket.factory.class.ClientProtocol字串進行解析,主要有兩方面的變動,

   //(1)、如果該字串已經被廢棄,那麼就將新的替換

   //(2)、將該字串中帶有$的屬性替換成相對應的字串

  //經過上面的兩個步驟,最終得到一個真正的class包名稱

    String propValue =
        conf.get("hadoop.rpc.socket.factory.class." + clazz.getSimpleName());
    if ((propValue != null) && (propValue.length() > 0))

      //到配置檔案中查詢name為上面得到的包名稱並建立該類(SocketFactory類)物件
      factory = getSocketFactoryFromProperty(conf, propValue);

    if (factory == null)

      //如果factory為null,那麼就採用預設的方式
      factory = getDefaultSocketFactory(conf);

    return factory;
  }

執行程式碼factory = getSocketFactoryFromProperty(conf, propValue);

/**
   * Get the socket factory corresponding to the given proxy URI. If the
   * given proxy URI corresponds to an absence of configuration parameter,
   * returns null. If the URI is malformed raises an exception.
   * 
   * @param propValue the property which is the class name of the
   *        SocketFactory to instantiate; assumed non null and non empty.
   * @return a socket factory as defined in the property value.
   */
  public static SocketFactory getSocketFactoryFromProperty(
      Configuration conf, String propValue) {

    try {

      //根據類名獲取到對應的Class型別物件
      Class<?> theClass = conf.getClassByName(propValue);

      //根據Class型別物件建立例項
      return (SocketFactory) ReflectionUtils.newInstance(theClass, conf);

    } catch (ClassNotFoundException cnfe) {
      throw new RuntimeException("Socket Factory class not found: " + cnfe);
    }
  }

}

我們進入ReflectionUtils中的newInstance函式中,這個函式用來建立相應的類並初始化,程式碼如下:

/** Create an object for the given class and initialize it from conf
   * 
   * @param theClass class of which an object is created
   * @param conf Configuration
   * @return a new object
   */
  @SuppressWarnings("unchecked")
  public static <T> T newInstance(Class<T> theClass, Configuration conf) {
    T result;
    try {

      //先從快取中獲取該類中的建構函式
      Constructor<T> meth = (Constructor<T>) CONSTRUCTOR_CACHE.get(theClass);
      if (meth == null) {

        //如果快取中沒有,那麼就從類中去獲取一個無參建構函式
        meth = theClass.getDeclaredConstructor(EMPTY_ARRAY);

        //將建構函式設定為public
        meth.setAccessible(true);

        //將類中的建構函式儲存到快取中
        CONSTRUCTOR_CACHE.put(theClass, meth);
      }

      //通過建構函式建立類物件
      result = meth.newInstance();
    } catch (Exception e) {
      throw new RuntimeException(e);
    }
    setConf(result, conf);
    return result;
  }

我們現在回到DFSClient中的建構函式中,開始分析如下程式碼:

//從配置檔案中獲取dfs.client.test.drop.namenode.response.number的值
int numResponseToDrop = conf.getInt(
        DFSConfigKeys.DFS_CLIENT_TEST_DROP_NAMENODE_RESPONSE_NUM_KEY,
        DFSConfigKeys.DFS_CLIENT_TEST_DROP_NAMENODE_RESPONSE_NUM_DEFAULT);
    NameNodeProxies.ProxyAndInfo<ClientProtocol> proxyInfo = null;
    AtomicBoolean nnFallbackToSimpleAuth = new AtomicBoolean(false);
    //如果返回的值大於0,那麼
    if (numResponseToDrop > 0) {
      // This case is used for testing.
      LOG.warn(DFSConfigKeys.DFS_CLIENT_TEST_DROP_NAMENODE_RESPONSE_NUM_KEY
          + " is set to " + numResponseToDrop
          + ", this hacked client will proactively drop responses");
      //用來處理在客戶端受到攻擊的時候情況,這個需要HA配置,詳細後面會講到
      proxyInfo = NameNodeProxies.createProxyWithLossyRetryHandler(conf,
          nameNodeUri, ClientProtocol.class, numResponseToDrop,
          nnFallbackToSimpleAuth);
    }
    //proxyInfo不為null的情況暫時不描述,我們只分析最後else的情況
    if (proxyInfo != null) {
      this.dtService = proxyInfo.getDelegationTokenService();
      this.namenode = proxyInfo.getProxy();
    } else if (rpcNamenode != null) {
      // This case is used for testing.
      Preconditions.checkArgument(nameNodeUri == null);
      this.namenode = rpcNamenode;
      dtService = null;
    } else {
      //檢查引數,如果nameNodeUri為null,那麼就丟擲異常
      Preconditions.checkArgument(nameNodeUri != null,
          "null URI");
      //建立代理
      proxyInfo = NameNodeProxies.createProxy(conf, nameNodeUri,
          ClientProtocol.class, nnFallbackToSimpleAuth);
      this.dtService = proxyInfo.getDelegationTokenService();
      this.namenode = proxyInfo.getProxy();
    }

2018-08-30 23:26追加----start

我們分析一下proxyInfo = NameNodeProxies.createProxyWithLossyRetryHandler(conf,nameNodeUri, ClientProtocol.class, numResponseToDrop,nnFallbackToSimpleAuth);,createProxyWithLossyRetryHandler函式的程式碼如下:

/**
   * Generate a dummy namenode proxy instance that utilizes our hacked
   * {@link LossyRetryInvocationHandler}. Proxy instance generated using this
   * method will proactively drop RPC responses. Currently this method only
   * support HA setup. null will be returned if the given configuration is not 
   * for HA.
   * 
   * @param config the configuration containing the required IPC
   *        properties, client failover configurations, etc.
   * @param nameNodeUri the URI pointing either to a specific NameNode
   *        or to a logical nameservice.
   * @param xface the IPC interface which should be created
   * @param numResponseToDrop The number of responses to drop for each RPC call
   * @param fallbackToSimpleAuth set to true or false during calls to indicate if
   *   a secure client falls back to simple auth
   * @return an object containing both the proxy and the associated
   *         delegation token service it corresponds to. Will return null of the
   *         given configuration does not support HA.
   * @throws IOException if there is an error creating the proxy
   */
  @SuppressWarnings("unchecked")
  public static <T> ProxyAndInfo<T> createProxyWithLossyRetryHandler(
      Configuration config, URI nameNodeUri, Class<T> xface,
      int numResponseToDrop, AtomicBoolean fallbackToSimpleAuth)
      throws IOException {
    //如果numResponseToDrop小於等於0那麼就丟擲異常
    Preconditions.checkArgument(numResponseToDrop > 0);
    //建立一個失敗轉移的代理提供類,這個類主要用來在namenode的leader節點出問題了,那麼就自動切換 
    //到follow的namenode節點,同時更新leader節點,這種保障機制就是HA模式
    AbstractNNFailoverProxyProvider<T> failoverProxyProvider =
        createFailoverProxyProvider(config, nameNodeUri, xface, true,
          fallbackToSimpleAuth);
    //這裡如果物件不為null,說明就是HA模式
    if (failoverProxyProvider != null) { // HA case
      int delay = config.getInt(
          DFS_CLIENT_FAILOVER_SLEEPTIME_BASE_KEY,
          DFS_CLIENT_FAILOVER_SLEEPTIME_BASE_DEFAULT);
      int maxCap = config.getInt(
          DFS_CLIENT_FAILOVER_SLEEPTIME_MAX_KEY,
          DFS_CLIENT_FAILOVER_SLEEPTIME_MAX_DEFAULT);
      int maxFailoverAttempts = config.getInt(
          DFS_CLIENT_FAILOVER_MAX_ATTEMPTS_KEY,
          DFS_CLIENT_FAILOVER_MAX_ATTEMPTS_DEFAULT);
      int maxRetryAttempts = config.getInt(
          DFS_CLIENT_RETRY_MAX_ATTEMPTS_KEY,
          DFS_CLIENT_RETRY_MAX_ATTEMPTS_DEFAULT);
      //建立一個InvocationHandler介面實現類物件
      InvocationHandler dummyHandler = new LossyRetryInvocationHandler<T>(
              numResponseToDrop, failoverProxyProvider,
              RetryPolicies.failoverOnNetworkException(
                  RetryPolicies.TRY_ONCE_THEN_FAIL, maxFailoverAttempts, 
                  Math.max(numResponseToDrop + 1, maxRetryAttempts), delay, 
                  maxCap));
      
      T proxy = (T) Proxy.newProxyInstance(
          failoverProxyProvider.getInterface().getClassLoader(),
          new Class[] { xface }, dummyHandler);
      Text dtService;
      if (failoverProxyProvider.useLogicalURI()) {
        dtService = HAUtil.buildTokenServiceForLogicalUri(nameNodeUri,
            HdfsConstants.HDFS_URI_SCHEME);
      } else {
        dtService = SecurityUtil.buildTokenService(
            NameNode.getAddress(nameNodeUri));
      }
      return new ProxyAndInfo<T>(proxy, dtService,
          NameNode.getAddress(nameNodeUri));
    } else {
      LOG.warn("Currently creating proxy using " +
      		"LossyRetryInvocationHandler requires NN HA setup");
      return null;
    }
  }

進入到函式createFailoverProxyProvider中,程式碼如下:

/** Creates the Failover proxy provider instance*/
  @VisibleForTesting
  public static <T> AbstractNNFailoverProxyProvider<T> createFailoverProxyProvider(
      Configuration conf, URI nameNodeUri, Class<T> xface, boolean checkPort,
      AtomicBoolean fallbackToSimpleAuth) throws IOException {
    Class<FailoverProxyProvider<T>> failoverProxyProviderClass = null;
    AbstractNNFailoverProxyProvider<T> providerNN;
    Preconditions.checkArgument(
        xface.isAssignableFrom(NamenodeProtocols.class),
        "Interface %s is not a NameNode protocol", xface);
    try {
      // Obtain the class of the proxy provider
      //通過配置類物件和nameNodeUri從配置檔案中獲取name為 
      //dfs.client.failover.proxy.provider+host,對應value的Class物件
      //其實這裡面的Class物件對應的全路徑為 
      //org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider
      failoverProxyProviderClass = getFailoverProxyProviderClass(conf,
          nameNodeUri);
      //如果為null,那麼直接返回null
      if (failoverProxyProviderClass == null) {
        return null;
      }
      // Create a proxy provider instance.
      //如果failoverProxyProviderClass不為null,那麼就獲取該類的建構函式
      Constructor<FailoverProxyProvider<T>> ctor = failoverProxyProviderClass
          .getConstructor(Configuration.class, URI.class, Class.class);
      //建立該類物件
      FailoverProxyProvider<T> provider = ctor.newInstance(conf, nameNodeUri,
          xface);

      // If the proxy provider is of an old implementation, wrap it.
      if (!(provider instanceof AbstractNNFailoverProxyProvider)) {
        providerNN = new WrappedFailoverProxyProvider<T>(provider);
      } else {
        providerNN = (AbstractNNFailoverProxyProvider<T>)provider;
      }
    } catch (Exception e) {
      String message = "Couldn't create proxy provider " + failoverProxyProviderClass;
      if (LOG.isDebugEnabled()) {
        LOG.debug(message, e);
      }
      if (e.getCause() instanceof IOException) {
        throw (IOException) e.getCause();
      } else {
        throw new IOException(message, e);
      }
    }

    // Check the port in the URI, if it is logical.
    if (checkPort && providerNN.useLogicalURI()) {
      int port = nameNodeUri.getPort();
      if (port > 0 && port != NameNode.DEFAULT_PORT) {
        // Throwing here without any cleanup is fine since we have not
        // actually created the underlying proxies yet.
        throw new IOException("Port " + port + " specified in URI "
            + nameNodeUri + " but host '" + nameNodeUri.getHost()
            + "' is a logical (HA) namenode"
            + " and does not use port information.");
      }
    }
    providerNN.setFallbackToSimpleAuth(fallbackToSimpleAuth);
    return providerNN;
  }

流程圖如下:

DFSClient中createProxyWithLossyRetryHandler呼叫流程圖

代理類的結構圖

代理類結構圖

2018-08-30 23:26追加----end

我們直接進入到else中的createProxy函式中,這個函式用來建立namenode的代理,程式碼如下:

/**
   * Creates the namenode proxy with the passed protocol. This will handle
   * creation of either HA- or non-HA-enabled proxy objects, depending upon
   * if the provided URI is a configured logical URI.
   *使用傳遞協議建立namenode代理,這將建立一個HA或者非HA的代理物件,至於到底是HA還是非HA物件取決 
   *於配置檔案中的相關配置。
   *
   * @param conf the configuration containing the required IPC
   *        properties, client failover configurations, etc.
   * @param nameNodeUri the URI pointing either to a specific NameNode
   *        or to a logical nameservice.
   * @param xface the IPC interface which should be created
   * @param fallbackToSimpleAuth set to true or false during calls to indicate if
   *   a secure client falls back to simple auth
   * @return an object containing both the proxy and the associated
   *         delegation token service it corresponds to
   * @throws IOException if there is an error creating the proxy
   **/
  @SuppressWarnings("unchecked")
  public static <T> ProxyAndInfo<T> createProxy(Configuration conf,
      URI nameNodeUri, Class<T> xface, AtomicBoolean fallbackToSimpleAuth)
      throws IOException {
    AbstractNNFailoverProxyProvider<T> failoverProxyProvider = createFailoverProxyProvider(conf, nameNodeUri, xface, true,
          fallbackToSimpleAuth);
  
    if (failoverProxyProvider == null) {
      // Non-HA case
      return createNonHAProxy(conf, NameNode.getAddress(nameNodeUri), xface,
          UserGroupInformation.getCurrentUser(), true, fallbackToSimpleAuth);
    } else {
      // HA case
      Conf config = new Conf(conf);
      T proxy = (T) RetryProxy.create(xface, failoverProxyProvider,
          RetryPolicies.failoverOnNetworkException(
              RetryPolicies.TRY_ONCE_THEN_FAIL, config.maxFailoverAttempts,
              config.maxRetryAttempts, config.failoverSleepBaseMillis,
              config.failoverSleepMaxMillis));

      Text dtService;
      if (failoverProxyProvider.useLogicalURI()) {
        dtService = HAUtil.buildTokenServiceForLogicalUri(nameNodeUri,
            HdfsConstants.HDFS_URI_SCHEME);
      } else {
        dtService = SecurityUtil.buildTokenService(
            NameNode.getAddress(nameNodeUri));
      }
      return new ProxyAndInfo<T>(proxy, dtService,
          NameNode.getAddress(nameNodeUri));
    }
  }

 

我們首先來分析createProxy函式中的第一行程式碼

AbstractNNFailoverProxyProvider<T> failoverProxyProvider = createFailoverProxyProvider(conf, nameNodeUri, xface, true,fallbackToSimpleAuth);

//進入到函式createFailoverProxyProvider中,程式碼如下:--------在org.apache.hadoop.hdfs.NameNodeProxies類

/** Creates the Failover proxy provider instance*/
  @VisibleForTesting
  public static <T> AbstractNNFailoverProxyProvider<T> createFailoverProxyProvider(
      Configuration conf, URI nameNodeUri, Class<T> xface, boolean checkPort,
      AtomicBoolean fallbackToSimpleAuth) throws IOException {
    Class<FailoverProxyProvider<T>> failoverProxyProviderClass = null;
    AbstractNNFailoverProxyProvider<T> providerNN;
    Preconditions.checkArgument(
        xface.isAssignableFrom(NamenodeProtocols.class),
        "Interface %s is not a NameNode protocol", xface);
    try {
      // Obtain the class of the proxy provider
      //通過配置類物件和nameNodeUri從配置檔案中獲取name為 
      //dfs.client.failover.proxy.provider+host,對應value的Class物件
      failoverProxyProviderClass = getFailoverProxyProviderClass(conf,
          nameNodeUri);
      //如果為null,那麼直接返回null
      if (failoverProxyProviderClass == null) {
        return null;
      }
      // Create a proxy provider instance.
      //如果failoverProxyProviderClass不為null,那麼就獲取該類的建構函式
      Constructor<FailoverProxyProvider<T>> ctor = failoverProxyProviderClass
          .getConstructor(Configuration.class, URI.class, Class.class);
      //建立該類物件
      FailoverProxyProvider<T> provider = ctor.newInstance(conf, nameNodeUri,
          xface);

      // If the proxy provider is of an old implementation, wrap it.
      if (!(provider instanceof AbstractNNFailoverProxyProvider)) {
        providerNN = new WrappedFailoverProxyProvider<T>(provider);
      } else {
        providerNN = (AbstractNNFailoverProxyProvider<T>)provider;
      }
    } catch (Exception e) {
      String message = "Couldn't create proxy provider " + failoverProxyProviderClass;
      if (LOG.isDebugEnabled()) {
        LOG.debug(message, e);
      }
      if (e.getCause() instanceof IOException) {
        throw (IOException) e.getCause();
      } else {
        throw new IOException(message, e);
      }
    }

    // Check the port in the URI, if it is logical.
    if (checkPort && providerNN.useLogicalURI()) {
      int port = nameNodeUri.getPort();
      if (port > 0 && port != NameNode.DEFAULT_PORT) {
        // Throwing here without any cleanup is fine since we have not
        // actually created the underlying proxies yet.
        throw new IOException("Port " + port + " specified in URI "
            + nameNodeUri + " but host '" + nameNodeUri.getHost()
            + "' is a logical (HA) namenode"
            + " and does not use port information.");
      }
    }
    providerNN.setFallbackToSimpleAuth(fallbackToSimpleAuth);
    return providerNN;
  }

回到函式createProxy                                                     --------在org.apache.hadoop.hdfs.NameNodeProxies類中

我們繼續往下分析,然後根據createFailoverProxyProvider函式返回的值failoverProxyProvider來判斷是否是HA,如果failoverProxyProvider為null那麼就採用非HA的,否則採用HA模式。

我們先分析非HA模式,呼叫函式createNonHAProxy   -----------------在org.apache.hadoop.hdfs.NameNodeProxies類中

/**
   * Creates an explicitly non-HA-enabled proxy object. Most of the time you
   * don't want to use this, and should instead use {@link NameNodeProxies#createProxy}.
   *
   * @param conf the configuration object
   * @param nnAddr address of the remote NN to connect to
   * @param xface the IPC interface which should be created
   * @param ugi the user who is making the calls on the proxy object
   * @param withRetries certain interfaces have a non-standard retry policy
   * @param fallbackToSimpleAuth - set to true or false during this method to
   *   indicate if a secure client falls back to simple auth
   * @return an object containing both the proxy and the associated
   *         delegation token service it corresponds to
   * @throws IOException
   */
  @SuppressWarnings("unchecked")
  public static <T> ProxyAndInfo<T> createNonHAProxy(
      Configuration conf, InetSocketAddress nnAddr, Class<T> xface,
      UserGroupInformation ugi, boolean withRetries,
      AtomicBoolean fallbackToSimpleAuth) throws IOException {
    Text dtService = SecurityUtil.buildTokenService(nnAddr);
  
    T proxy;
    if (xface == ClientProtocol.class) {
      //代理類,客戶端通過該類來與namenode進行互動
      proxy = (T) createNNProxyWithClientProtocol(nnAddr, conf, ugi,
          withRetries, fallbackToSimpleAuth);
    } else if (xface == JournalProtocol.class) {
      //代理類,用來namenode主和namenode從之間互動
      proxy = (T) createNNProxyWithJournalProtocol(nnAddr, conf, ugi);
    } else if (xface == NamenodeProtocol.class) {
      //用來namenode與客戶端之間互動
      proxy = (T) createNNProxyWithNamenodeProtocol(nnAddr, conf, ugi,
          withRetries);
    } else if (xface == GetUserMappingsProtocol.class) {
      proxy = (T) createNNProxyWithGetUserMappingsProtocol(nnAddr, conf, ugi);
    } else if (xface == RefreshUserMappingsProtocol.class) {
      proxy = (T) createNNProxyWithRefreshUserMappingsProtocol(nnAddr, conf, ugi);
    } else if (xface == RefreshAuthorizationPolicyProtocol.class) {
      proxy = (T) createNNProxyWithRefreshAuthorizationPolicyProtocol(nnAddr,
          conf, ugi);
    } else if (xface == RefreshCallQueueProtocol.class) {
      proxy = (T) createNNProxyWithRefreshCallQueueProtocol(nnAddr, conf, ugi);
    } else {
      String message = "Unsupported protocol found when creating the proxy " +
          "connection to NameNode: " +
          ((xface != null) ? xface.getClass().getName() : "null");
      LOG.error(message);
      throw new IllegalStateException(message);
    }

    return new ProxyAndInfo<T>(proxy, dtService, nnAddr);
  }

我們這裡分析proxy = (T) createNNProxyWithClientProtocol(nnAddr, conf, ugi,withRetries, fallbackToSimpleAuth);程式碼如下:

private static ClientProtocol createNNProxyWithClientProtocol(
      InetSocketAddress address, Configuration conf, UserGroupInformation ugi,
      boolean withRetries, AtomicBoolean fallbackToSimpleAuth)
      throws IOException {
	  
	//將name為rpc.engine拼上ClientNamenodeProtocolPB.class.getName(),value為 
    //ProtobufRpcEngine.class.getName()的資訊儲存到配置類中,還有一個前提條件就是 
    //ProtobufRpcEngine要實現或繼承RpcEngine,或者兩者相同,這個函式執行成功後,以後從配置類物件 
    //中獲取rpc.engine拼上ClientNamenodeProtocolPB.class.getName()對應的value就是 
    //ProtobufRpcEngine.class.getName()了
    RPC.setProtocolEngine(conf, ClientNamenodeProtocolPB.class, ProtobufRpcEngine.class);

    //DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_ENABLED_KEY為dfs.client.retry.policy.enabled
    //DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_ENABLED_DEFAULT為false 
    //DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_SPEC_KEY為dfs.client.retry.policy.spec
    //DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_SPEC_DEFAULT為10000,6,60000,10
    //這個函式主要用來根據相應的引數到配置類物件中去獲取相應的重試機制類物件,具體可以看 
    //getDefaultRetryPolicy函式,這裡不深入展開
    final RetryPolicy defaultPolicy = 
        RetryUtils.getDefaultRetryPolicy(
            conf, 
            DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_ENABLED_KEY, 
            DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_ENABLED_DEFAULT, 
            DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_SPEC_KEY,
            DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_SPEC_DEFAULT,
            SafeModeException.class);
    //獲取版本號
    final long version = RPC.getProtocolVersion(ClientNamenodeProtocolPB.class);
    
    //構造ClientNamenodeProtocolPB代理物件
    ClientNamenodeProtocolPB proxy = RPC.getProtocolProxy(
        ClientNamenodeProtocolPB.class, version, address, ugi, conf,
        NetUtils.getDefaultSocketFactory(conf),
        org.apache.hadoop.ipc.Client.getTimeout(conf), defaultPolicy,
        fallbackToSimpleAuth).getProxy();

    if (withRetries) { // create the proxy with retries

      RetryPolicy createPolicy = RetryPolicies
          .retryUpToMaximumCountWithFixedSleep(5,
              HdfsConstants.LEASE_SOFTLIMIT_PERIOD, TimeUnit.MILLISECONDS);
    
      Map<Class<? extends Exception>, RetryPolicy> remoteExceptionToPolicyMap 
                 = new HashMap<Class<? extends Exception>, RetryPolicy>();
      remoteExceptionToPolicyMap.put(AlreadyBeingCreatedException.class,
          createPolicy);

      RetryPolicy methodPolicy = RetryPolicies.retryByRemoteException(
          defaultPolicy, remoteExceptionToPolicyMap);
      Map<String, RetryPolicy> methodNameToPolicyMap 
                 = new HashMap<String, RetryPolicy>();
    
      methodNameToPolicyMap.put("create", methodPolicy);

      ClientProtocol translatorProxy = new ClientNamenodeProtocolTranslatorPB(proxy);
      return (ClientProtocol) RetryProxy.create(
          ClientProtocol.class,
          new DefaultFailoverProxyProvider<ClientProtocol>(
              ClientProtocol.class, translatorProxy),
          methodNameToPolicyMap,
          defaultPolicy);
    } else {
      //構造ClientNamenodeProtocolTranslatorPB物件並返回
      //注意ClientNamenodeProtocolTranslatorPB會持有一個ClientNamenodeProtocolPB物件
      return new ClientNamenodeProtocolTranslatorPB(proxy);
    }
  }

我們繼續往下執行getProtocolProxy函式,程式碼如下:

/**
   * Get a protocol proxy that contains a proxy connection to a remote server
   * and a set of methods that are supported by the server
   * 獲取一個協議代理,這個協議代理包含了一個連線到遠端伺服器的proxy,這個遠端服務包含程式碼集,也就 
   * 是說protocol proxy中包含一個proxy,可以通過這個proxy來呼叫遠端伺服器提供的方法,就跟本地調 
   * 用一樣
   *
   * @param protocol protocol
   * @param clientVersion client's version
   * @param addr server address
   * @param ticket security ticket
   * @param conf configuration
   * @param factory socket factory
   * @param rpcTimeout max time for each rpc; 0 means no timeout
   * @param connectionRetryPolicy retry policy
   * @param fallbackToSimpleAuth set to true or false during calls to indicate if
   *   a secure client falls back to simple auth
   * @return the proxy
   * @throws IOException if any error occurs
   */
   public static <T> ProtocolProxy<T> getProtocolProxy(Class<T> protocol,
                                long clientVersion,
                                InetSocketAddress addr,
                                UserGroupInformation ticket,
                                Configuration conf,
                                SocketFactory factory,
                                int rpcTimeout,
                                RetryPolicy connectionRetryPolicy,
                                AtomicBoolean fallbackToSimpleAuth)
       throws IOException {
    //判斷UserGroupInformation是否處在需要安全認證的環境,
    if (UserGroupInformation.isSecurityEnabled()) {
      SaslRpcServer.init(conf);
    }
    /*首先呼叫getProtocolEngine()方法從配置物件中獲取protocol對應的物件,也就是 
    ProtobufRpcEngine的類物件,然後呼叫ProtobufRpcEngine類物件的getProxy函式
    */
    return getProtocolEngine(protocol, conf).getProxy(protocol, clientVersion,
        addr, ticket, conf, factory, rpcTimeout, connectionRetryPolicy,
        fallbackToSimpleAuth);
  }

我們進入到getProtocolEngine(protocol, conf)函式中看看,程式碼如下:

// return the RpcEngine configured to handle a protocol
  static synchronized RpcEngine getProtocolEngine(Class<?> protocol,
      Configuration conf) {
    //從Map<Class<?>,RpcEngine>類物件PROTOCOL_ENGINES中查詢protocol,如果找到了那麼就返回 
    //RpcEngine類物件
    RpcEngine engine = PROTOCOL_ENGINES.get(protocol);
    //如果沒有找到該class物件
    if (engine == null) {
      //那麼就通過指定的key到配置物件中查詢(其中ENGINE_PROP為rpc.engine,protocol.getName()為 
      //ClientNamenodeProtocolPB,組合起來也就是rpc.engine.ClientNamenodeProtocolPB)相應的類名 
      //稱,我們在函式createNNProxyWithClientProtocol函式中的
      //RPC.setProtocolEngine(conf, 
      //ClientNamenodeProtocolPB.class,ProtobufRpcEngine.class);
      //做了設定,所以這裡conf.getClass獲取到的是ProtobufRpcEngine的Class物件,當然,如果在 
      //conf中沒有找到那麼就採用預設的Class對 
      //象,即 WritableRpcEngine的Class物件
      Class<?> impl = conf.getClass(ENGINE_PROP+"."+protocol.getName(),
                                    WritableRpcEngine.class);
      //根據該Class物件通過反射建立類物件
      engine = (RpcEngine)ReflectionUtils.newInstance(impl, conf);
      //將該物件儲存到PROTOCOL_ENGINES靜態變數中。以便後面直接快速呼叫
      PROTOCOL_ENGINES.put(protocol, engine);
    }
    return engine;
  }

我們回到函式getProtocolProxy中,通過函式getProtocolEngine的解析,得到返回值是ProtobufRpcEngine類物件,我們繼續分析該類中的getProxy函式。程式碼如下:

@Override
  @SuppressWarnings("unchecked")
  public <T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion,
      InetSocketAddress addr, UserGroupInformation ticket, Configuration conf,
      SocketFactory factory, int rpcTimeout, RetryPolicy connectionRetryPolicy,
      AtomicBoolean fallbackToSimpleAuth) throws IOException {

	//首先構造InvocationHandler物件
    final Invoker invoker = new Invoker(protocol, addr, ticket, conf, factory,
        rpcTimeout, connectionRetryPolicy, fallbackToSimpleAuth);
    
    //然後呼叫Proxy.newProxyInstance()獲取動態代理物件,並通過ProtocolProxy返回
    return new ProtocolProxy<T>(protocol, (T) Proxy.newProxyInstance(
        protocol.getClassLoader(), new Class[]{protocol}, invoker), false);
  }

我們首先來看Invoker類,這個類是ProtobufRpcEngine的內部類,申明如下:

private static class Invoker implements RpcInvocationHandler,而RpcInvocationHandler介面的繼承關係如下:

public interface RpcInvocationHandler extends InvocationHandler, Closeable,也就是說Invoker實現了InvocationHandler介面,InvocationHandler介面是用於動態代理的。接下來執行程式碼

return new ProtocolProxy<T>(protocol, (T) Proxy.newProxyInstance(
        protocol.getClassLoader(), new Class[]{protocol}, invoker), false);

這行程式碼會將建立ProtocolProxy類物件,其中protocol引數賦值給ProtocolProxy類中的protocol成員變數,代理物件賦值給proxy成員變數,false賦值給supportServerMethodCheck成員變數。其中protocol引數為ClientNamenodeProtocolPB的Class物件,proxy為ClientNamenodeProtocolPB介面的代理類。

現在我們回到createNNProxyWithClientProtocol函式中,目前分析的程式碼如下:

ClientNamenodeProtocolPB proxy = RPC.getProtocolProxy(
        ClientNamenodeProtocolPB.class, version, address, ugi, conf,
        NetUtils.getDefaultSocketFactory(conf),
        org.apache.hadoop.ipc.Client.getTimeout(conf), defaultPolicy,
        fallbackToSimpleAuth).getProxy();

getProtocolProxy函式獲取到的是ProtocolProxy類物件,然後呼叫getProxy函式返回代理物件,也就是ClientNamenodeProtocolPB介面的代理物件。現在proxy就是ClientNamenodeProtocolPB的代理物件,我們繼續往下執行,程式碼如下:

if (withRetries) { // create the proxy with retries

      RetryPolicy createPolicy = RetryPolicies
          .retryUpToMaximumCountWithFixedSleep(5,
              HdfsConstants.LEASE_SOFTLIMIT_PERIOD, TimeUnit.MILLISECONDS);
    
      Map<Class<? extends Exception>, RetryPolicy> remoteExceptionToPolicyMap 
                 = new HashMap<Class<? extends Exception>, RetryPolicy>();
      remoteExceptionToPolicyMap.put(AlreadyBeingCreatedException.class,
          createPolicy);

      RetryPolicy methodPolicy = RetryPolicies.retryByRemoteException(
          defaultPolicy, remoteExceptionToPolicyMap);
      Map<String, RetryPolicy> methodNameToPolicyMap 
                 = new HashMap<String, RetryPolicy>();
    
      methodNameToPolicyMap.put("create", methodPolicy);

      ClientProtocol translatorProxy =
        new ClientNamenodeProtocolTranslatorPB(proxy);
      return (ClientProtocol) RetryProxy.create(
          ClientProtocol.class,
          new DefaultFailoverProxyProvider<ClientProtocol>(
              ClientProtocol.class, translatorProxy),
          methodNameToPolicyMap,
          defaultPolicy);
    } else {
      //構造ClientNamenodeProtocolTranslatorPB物件並返回
      //注意ClientNamenodeProtocolTranslatorPB會持有一個ClientNamenodeProtocolPB物件
      return new ClientNamenodeProtocolTranslatorPB(proxy);
    }

這裡面withRetries為呼叫代理物件方法失敗後是否要重試,我們下面分兩種情況來分別進行分析:

第一種情況,withRetries為true

執行函式

//5為最大嘗試次數,LEASE_SOFTLIMIT_PERIOD為60*1000,MILLISECONDS表示毫秒
//最終會將這三個值分別儲存到RetryUpToMaximumCountWithFixedSleep類物件的maxRetries、sleepTime、 
//  timeUnit中,而RetryUpToMaximumCountWithFixedSleep實現了RetryPolicy介面
RetryPolicy createPolicy = RetryPolicies
          .retryUpToMaximumCountWithFixedSleep(5,
              HdfsConstants.LEASE_SOFTLIMIT_PERIOD, TimeUnit.MILLISECONDS);

Map<Class<? extends Exception>, RetryPolicy> remoteExceptionToPolicyMap 
                 = new HashMap<Class<? extends Exception>, RetryPolicy>();
//將AlreadyBeingCreatedException類的Class物件作為key,value為實現RetryPolicy介面的類物件
remoteExceptionToPolicyMap.put(AlreadyBeingCreatedException.class,
          createPolicy);

//這裡面用到了defalutPolicy,這個變數指配置中的重試機制類物件,函式retryByRemoteException會返回 
//  RemoteExceptionDependentRetry類物件,這個類實現了RetryPolicy介面,其中defaultPolicy會儲存 
// 到RemoteExceptionDependentRetry類物件的成員變數defaultPolicy中,而 
// remoteExceptionToPolicyMap中的元素的key.getName()作為key,元素的value值儲存到 
// RemoteExceptionDependentRetry類物件成員變數exceptionNameToPolicyMap中,該變數的型別為 
// Map<String, RetryPolicy>
RetryPolicy methodPolicy = RetryPolicies.retryByRemoteException(
          defaultPolicy, remoteExceptionToPolicyMap);

Map<String, RetryPolicy> methodNameToPolicyMap = new HashMap<String, RetryPolicy>();
    
methodNameToPolicyMap.put("create", methodPolicy);

//建立ClientNamenodeProtocolTranslatorPB類物件,並將ClientNamenodeProtocolPB介面代理物件 
//  proxy儲存到ClientNamenodeProtocolPB型別變數rpcProxy中
ClientProtocol translatorProxy = new ClientNamenodeProtocolTranslatorPB(proxy);

最後我們執行程式碼:

return (ClientProtocol) RetryProxy.create(
          ClientProtocol.class,
          new DefaultFailoverProxyProvider<ClientProtocol>(
              ClientProtocol.class, translatorProxy),
          methodNameToPolicyMap,
          defaultPolicy);

我們進入到create函式中,程式碼如下:

/**
   * Create a proxy for an interface of implementations of that interface using
   * the given {@link FailoverProxyProvider} and the a set of retry policies
   * specified by method name. If no retry policy is defined for a method then a
   * default of {@link RetryPolicies#TRY_ONCE_THEN_FAIL} is used.
   * 
   * @param iface the interface that the retry will implement
   * @param proxyProvider provides implementation instances whose methods should be retried
   * @param methodNameToPolicyMapa map of method names to retry policies
   * @return the retry proxy
   */
  //iface值為ClientProtocol.class,
  //proxyProvider值為DefaultFailoverProxyProvider類物件,其中proxy成員變數為上面的 
  //ClientNamenodeProtocolTranslatorPB類物件proxy,iface成員變數為ClientProtocol類的Class對 
  //象。這個函式建立一個代理物件,代理介面為iface,RetryInvocationHandler最終實現 
  //InvocationHandler介面,也就是說呼叫代理類物件的介面最終會呼叫RetryInvocationHandler類中的 
  //Object invoke(Object proxy, Method method, Object[] args)函式
  public static <T> Object create(Class<T> iface,
      FailoverProxyProvider<T> proxyProvider,
      Map<String,RetryPolicy> methodNameToPolicyMap,
      RetryPolicy defaultPolicy) {
    return Proxy.newProxyInstance(
        proxyProvider.getInterface().getClassLoader(),
        new Class<?>[] { iface },
        new RetryInvocationHandler<T>(proxyProvider, defaultPolicy,
            methodNameToPolicyMap)
        );
  }

我們進入RetryInvocationHandler類函式Object invoke(Object proxy, Method method, Object[] args)中,程式碼如下:

@Override
  public Object invoke(Object proxy, Method method, Object[] args)
    throws Throwable {
    //根據方法來獲取對應的重試機制類物件
    RetryPolicy policy = methodNameToPolicyMap.get(method.getName());
    //如果沒有設定重試機制,那麼就採用預設的重試機制類物件
    if (policy == null) {
      policy = defaultPolicy;
    }
    
    // The number of times this method invocation has been failed over.
    int invocationFailoverCount = 0;
    final boolean isRpc = isRpcInvocation(currentProxy.proxy);
    final int callId = isRpc? Client.nextCallId(): RpcConstants.INVALID_CALL_ID;
    int retries = 0;
    while (true) {
      // The number of times this invocation handler has ever been failed over,
      // before this method invocation attempt. Used to prevent concurrent
      // failed method invocations from triggering multiple failover attempts.
      long invocationAttemptFailoverCount;
      synchronized (proxyProvider) {
        invocationAttemptFailoverCount = proxyProviderFailoverCount;
      }

      if (isRpc) {
        Client.setCallIdAndRetryCount(callId, retries);
      }
      try {
        Object ret = invokeMethod(method, args);
        hasMadeASuccessfulCall = true;
        return ret;
      } catch (Exception e) {
        boolean isIdempotentOrAtMostOnce = proxyProvider.getInterface()
            .getMethod(method.getName(), method.getParameterTypes())
            .isAnnotationPresent(Idempotent.class);
        if (!isIdempotentOrAtMostOnce) {
          isIdempotentOrAtMostOnce = proxyProvider.getInterface()
              .getMethod(method.getName(), method.getParameterTypes())
              .isAnnotationPresent(AtMostOnce.class);
        }
        RetryAction action = policy.shouldRetry(e, retries++,
            invocationFailoverCount, isIdempotentOrAtMostOnce);
        if (action.action == RetryAction.RetryDecision.FAIL) {
          if (action.reason != null) {
            LOG.warn("Exception while invoking " + currentProxy.proxy.getClass()
                + "." + method.getName() + " over " + currentProxy.proxyInfo
                + ". Not retrying because " + action.reason, e);
          }
          throw e;
        } else { // retry or failover
          // avoid logging the failover if this is the first call on this
          // proxy object, and we successfully achieve the failover without
          // any flip-flopping
          boolean worthLogging = 
            !(invocationFailoverCount == 0 && !hasMadeASuccessfulCall);
          worthLogging |= LOG.isDebugEnabled();
          if (action.action == RetryAction.RetryDecision.FAILOVER_AND_RETRY &&
              worthLogging) {
            String msg = "Exception while invoking " + method.getName()
                + " of class " + currentProxy.proxy.getClass().getSimpleName()
                + " over " + currentProxy.proxyInfo;

            if (invocationFailoverCount > 0) {
              msg += " after " + invocationFailoverCount + " fail over attempts"; 
            }
            msg += ". Trying to fail over " + formatSleepMessage(action.delayMillis);
            LOG.info(msg, e);
          } else {
            if(LOG.isDebugEnabled()) {
              LOG.debug("Exception while invoking " + method.getName()
                  + " of class " + currentProxy.proxy.getClass().getSimpleName()
                  + " over " + currentProxy.proxyInfo + ". Retrying "
                  + formatSleepMessage(action.delayMillis), e);
            }
          }
          
          if (action.delayMillis > 0) {
            Thread.sleep(action.delayMillis);
          }
          
          if (action.action == RetryAction.RetryDecision.FAILOVER_AND_RETRY) {
            // Make sure that concurrent failed method invocations only cause a
            // single actual fail over.
            synchronized (proxyProvider) {
              if (invocationAttemptFailoverCount == proxyProviderFailoverCount) {
                proxyProvider.performFailover(currentProxy.proxy);
                proxyProviderFailoverCount++;
              } else {
                LOG.warn("A failover has occurred since the start of this method"
                    + " invocation attempt.");
              }
              currentProxy = proxyProvider.getProxy();
            }
            invocationFailoverCount++;
          }
        }
      }
    }
  }

很多細節我們都沒有具體展開,後面有時間可能會進一步描述。

下面是相應呼叫流程圖:

流程圖

代理類產生過程的概要圖:

代理類產生過程的概要圖

接下來我們繼續講解  hadoop2.6.0原始碼剖析-客戶端(第二部分--DFSClient)下(HA代理)