1. 程式人生 > >kerberos體系下的應用(yarn,spark on yarn)

kerberos體系下的應用(yarn,spark on yarn)

kerberos 介紹

閱讀本文之前建議先預讀下面這篇部落格
kerberos認證原理---講的非常細緻,易懂

Kerberos實際上一個基於Ticket的認證方式。Client想要獲取Server端的資源,先得通過Server的認證;而認證的先決條件是ClientServer提供從KDC獲得的一個有ServerMaster Key進行加密的Session Ticket(Session Key + Client Info)

image.png

 

大體邏輯流程如下:

  • Client向KDC申請TGT(Ticket Granting Ticket)。
  • Client通過獲得TGT向KDC申請用於訪問Server的Ticket。
  • Client最終向為了Server對自己的認證向其提交Ticket。

kerberos 中的幾個概念

  • Principals

簡單的說, principals 類似於多使用者系統中的使用者名稱,每個server都對應一個 principals

principals由幾個部分的字串構成。
例如:

component1 / component2 @ REALM

  1. @ 後面是一個 principals 必不可少的部分 REALM,為大寫英文字元。
  2. @ 前面是一個 principals 的具體身份,它可能由多個部分組成,使用/ 分割。

[email protected]
代表的是一個屬於EXAMPLE.COM領域的使用者reborn
這類principals 我們稱之為 User Principals。
還有一類我們稱之為 Service Principals。 它代表的不是具體的user,而是服務:
yarn/[email protected]
比如上面的這個, / 前面的部分為 yarn,說明它代表的是 yarn的服務,/ 後面的部分則是DNS域名,@後面的則是每個principals都必須有的 REALM

上面所提及的 Client通過獲得TGT向KDC申請用於訪問Server的Ticket 就是通過 Service Principals 來向KDC 來申請Ticket的。

  • Keys 和 KeyTab

每個 principals 都有自己的 Master key 用來證明自己就是 principals的擁有者。同時 在 ClientKDCServerTGTTicket加密。具體方式可才考開篇的 部落格連結。
一般來說,User Principals的 key是使用者密碼,Service Principals的key是隨機數串,他們都分別被存放在 KDC中一份,keytab 檔案中一份。

keytab檔案就是一個密碼本,除非對該使用者重新生成keytab,否則這個檔案是不會過期的,使用該keytab即可以登入相應的principals

獲取TGT

從上面的概念上大家可以看出,為了訪問有kerberos認證的服務,作為Client首先要先向KDC發起請求獲取TGT 得到 KDC的授權,才繼而才能申請對 service 的Ticket。

  • kerberos client 的安裝
    Client 所在的機器環境必須是 kerberos client 環境,具體的安裝操作,網上有很多 �Installing Kerberos ,在安裝的過程中,最普遍出現的問題就是預設的加解密方式 jce不支援,解決方式網上也有java-jce-hadoop-kerberos 要麼改變加解密方式,要麼給jre打補丁

  • 使用命令列來獲取TGT環境
    這裡列出幾個簡單的常用的命令:

    • kinit: 用來獲取TGT的命令, 可以使用密碼來向KDC申請,也可以直接使用keytab
    kinit wanghuan70
    Password for [email protected]:
    
    kinit -kt wanghuan70.keytab wanghuan70
    
    • kdestroy: 用來銷燬當前的tgt情況
    • klist: 用來展示當前的tgt情況

    如果當前還沒有申請TGT:

    klist
    klist: Credentials cache file '/tmp/krb5cc_2124' not found
    

    如果已經通過 kinit 申請過了TGT:

    -sh-4.2$ klist
    Ticket cache: FILE:/tmp/krb5cc_2124
    Default principal: [email protected]
    
    Valid starting       Expires              Service principal
    08/03/2017 09:31:52  08/11/2017 09:31:52  krbtgt/IDC.XXX-    [email protected]
          renew until 08/10/2017 09:31:52
    

    klist 中的資訊展示的很詳細了,標明Client principal[email protected]
    Service principal為 krbtgt/IDC.XXX- [email protected]
    這個 Service principal 實際上是 上圖中的 Tickt Granting Service(TGS)的principal。
    TGT是有時效性的,超過過期日期就不可以再使用,但是可以在 renew時間之前 使用

     klist -r 
    

    來重新整理。

  • 在程式碼中登入
    首先要保證的是執行程式碼的機器上是有kerberos client 環境

    /**
     * User and group information for Hadoop.
     * This class wraps around a JAAS Subject and provides methods   to determine the
     * user's username and groups. It supports both the Windows, Unix   and Kerberos 
     * login modules.
     */
    @InterfaceAudience.LimitedPrivate({"HDFS", "MapReduce",     "HBase", "Hive", "Oozie"})
    @InterfaceStability.Evolving
    public class UserGroupInformation {
    

    hadoop-common 的工程下提供瞭如上的 UserGroupInformation 用於使用者認證。我們在程式碼中只需要呼叫 其中的api即可,簡單舉例子,我們想用 [email protected] 這個 principal 來執行後續的程式碼, 只需要呼叫如下api:

        UserGroupInformation.setConfiguration(configuration);
        System.setProperty("sun.security.krb5.debug", "true");
        UserGroupInformation.loginUserFromKeytab("wanghuan70", "/home/wanghuan70/wanghuan70.keytab");
    

    該api會改變當前環境下的tgt。
    如果我們想只對部分程式碼使用另一個principal來執行,那麼可以使用如下api,然後呼叫doAs執行:

    ugi =   UserGroupInformation.loginUserFromKeytabAndReturnUGI("hbase"  , "hbase.keytab");
      ugi.doAs(new PrivilegedExceptionAction<Void>() {
         @Override
         public Void run() throws Exception {
           try {
             Connection connection =   ConnectionFactory.createConnection(conf);
             Admin admin = connection.getAdmin();
             HTableDescriptor[] tables = admin.listTables();
             for (HTableDescriptor descriptor : tables) {
               HTable hTable = new HTable();
               hTable.setTableName(descriptor.getTableName().getNameAsString(  ));
    
               Collection<HColumnDescriptor> families =   descriptor.getFamilies();
               for (HColumnDescriptor family : families) {
                 hTable.addFamily(family.getNameAsString());
               }
    
             hTables.addhTable(hTable);
           }
         } catch (Exception ex) {
           logger.info("list hbase table internal failed: %s", ex);
           throw new Exception(ex);
         }
         return null;
       }
     });
    

在業務系統中訪問需要kerberos認證的服務

這個比較簡單,如上節例子中可以看到,只需要改變當前程序環境下的tgt即可,可以使用 命令列也可以在程式碼中實現。該部分暫時不討論 tgt的過期問題,後續會擴充套件分析。

編寫yarn application提交到kerberos認證的叢集中

這類業務可能比較少,因為各種框架都自行實現了 xxx on yarn的程式碼,比如 spark on yarn、flink on yarn。但是也有一些熱門的框架還沒有來得及實現on yarn。 如 tf on yarn,storm on datax on yarn ,datax on yarn或者apache twill。我們可以自己動手去完成一個 yarn application的工程,繼而可以推測 其他框架的on yarn是怎麼去實現的。
官網的參考檔案如下:
Hadoop: Writing YARN Applications
YARN應用開發流程
上述文章已經很詳細的講解了如何編寫 yarn application,我們再這裡不再累述,而我們的關注點在於提交到kerberos認證的叢集

image.png

在上面這個圖大概的描述了我們的 yarn application的邏輯流程,這裡需要注意的是:

  • Client Node 需要使用 ApplicationClientProtocol(Client-RM之間的協議) 將應用提交給 RM。
  • AM 需要使用 ApplicationMasterProtocol(AM-RM之間的協議)向RM申請資源。
  • AM需要使用 ContainerManagementProtocol(AM-NM之間的協議)向NM發起啟動container的命令

也就是說這三次的rpc通訊,我們的應用是需要與Yarn進行通訊的,在kerberos認證的系統中,換句話說,我們需要與yarn service 進行通訊的Ticket

  • Client Node 需要使用ClientNamenodeProtocol(DFSClient-HDFS協議)將應用需要的資源上傳到HDFS上;
  • AM (可能的操作)需要使用ClientNamenodeProtocol(DFSClient-HDFS協議)將資源下載下來;
  • Container 需要使用ClientNamenodeProtocol(DFSClient-HDFS協議)將資源下載下來;

也就是說這三次的rpc通訊,我們的應用是需要與HDFS進行通訊的,在kerberos認證的系統中,換句話說,我們需要與hdfs service 進行通訊的Ticket

還有一個問題需要注意的是,在應用中,我們發起RPC通訊 可能在不同的機器上這個時候如何進行構造相同的環境是我們這裡需要表述的東西;

  • 從上面的連結我們可以知道,Client是如何提交Application到RM,程式碼可如下:

     ApplicationId submitApplication(
              YarnClientApplication app,
              String appName,
              ContainerLaunchContext launchContext,
              Resource resource,
              String queue) throws Exception {
          ApplicationSubmissionContext appContext = app.getApplicationSubmissionContext();
          appContext.setApplicationName(appName);
          appContext.setApplicationTags(new HashSet<String>());
          appContext.setAMContainerSpec(launchContext);
          appContext.setResource(resource);
          appContext.setQueue(queue);
    
          return yarnClient.submitApplication(appContext);
      }
    

    Client呼叫 YarnClientApplication 向RM提交 ApplicationSubmissionContext
    這裡包含了

    • 應用的名稱
    • 所依賴的資源
    • 提交的佇列
    • 還有一個重要的東西 ContainerLaunchContext 它是什麼東西呢。
    /**
    * <p><code>ContainerLaunchContext</code> represents all of   the information
    * needed by the <code>NodeManager</code> to launch a   container.</p>
    * 
    * <p>It includes details such as:
    *   <ul>
    *     <li>{@link ContainerId} of the container.</li>
    *     <li>{@link Resource} allocated to the container.</li>
    *     <li>User to whom the container is allocated.</li>
    *     <li>Security tokens (if security is enabled).</li>
    *     <li>
    *       {@link LocalResource} necessary for running the container   such
    *       as binaries, jar, shared-objects, side-files etc. 
    *     </li>
    *     <li>Optional, application-specific binary service data.</li>
    *     <li>Environment variables for the launched process.</li>
    *     <li>Command to launch the container.</li>
    *   </ul>
    * </p>
    * 
    * @see   ContainerManagementProtocol#startContainers(org.apache.hadoop  .yarn.api.protocolrecords.StartContainersRequest)
    */
    

    我們的ApplucationMaster 本身上也是在Container裡面執行的,所以也有這個上下文,建構函式如下:

    public static ContainerLaunchContext newInstance(
        Map<String, LocalResource> localResources,
        Map<String, String> environment, List<String> commands,
        Map<String, ByteBuffer> serviceData,  ByteBuffer tokens,
        Map<ApplicationAccessType, String> acls) {
      ContainerLaunchContext container =
          Records.newRecord(ContainerLaunchContext.class);
      container.setLocalResources(localResources);
      container.setEnvironment(environment);
      container.setCommands(commands);
      container.setServiceData(serviceData);
      container.setTokens(tokens);
      container.setApplicationACLs(acls);
      return container;
    }
    

    可以從建構函式來看到我們在設定Container中的環境、資源、執行命令等之外,還添加了 ByteBuffer tokens

     * Set security tokens needed by this container.
     * @param tokens security tokens 
     */
    @Public
    @Stable
    public abstract void setTokens(ByteBuffer tokens);
    

    沒錯! 這個tokens就是我們傳遞給container裡面的安全資訊。

    kerberos 和 Delegation token的關係需要說明一下,我們使用kerberos通過認證後,可以獲取一個帶有時效的委託token,如果我們把這個資訊儲存起來,在token沒過期之前,使用這個token就可以直接連線服務,而無需再走kerberos那一套授權流程了。

    那這個值,我們Client是從哪裡獲取並賦予給container的呢?

     /**
       * setup security token given current user
       * @return the ByeBuffer containing the security tokens
       * @throws IOException
       */
      private ByteBuffer setupTokens(FileSystem fs) throws IOException {
          DataOutputBuffer buffer = new DataOutputBuffer();
          String loc = System.getenv().get("HADOOP_TOKEN_FILE_LOCATION");
          if ((loc != null && loc.trim().length() > 0)
                  ||  (!UserGroupInformation.isSecurityEnabled())) {
              this.credentials.writeTokenStorageToStream(buffer);
          } else {
              // Note: Credentials class is marked as LimitedPrivate for HDFS and MapReduce
              Credentials credentials = new Credentials();
              String tokenRenewer = conf.get(YarnConfiguration.RM_PRINCIPAL);
              if (tokenRenewer == null || tokenRenewer.length() == 0) {
                  throw new IOException(
                          "Can't get Master Kerberos principal for the RM to use as renewer");
              }
              // For now, only getting tokens for the default file-system.
              final org.apache.hadoop.security.token.Token<?> tokens[] = fs.addDelegationTokens(tokenRenewer, credentials);
              if (tokens != null) {
                  for (org.apache.hadoop.security.token.Token<?> token : tokens) {
                      LOG.info("Got dt for " + fs.getUri() + "; " + token);
                  }
              }
              credentials.writeTokenStorageToStream(buffer);
          }
          return ByteBuffer.wrap(buffer.getData(), 0, buffer.getLength());
      }
    

    不同的 xx on yarn可能程式碼寫法不同,但是,思路都是一致的:

     /**
     * Obtain all delegation tokens used by this FileSystem that are not
     * already present in the given Credentials.  Existing tokens will neither
     * be verified as valid nor having the given renewer.  Missing tokens will
     * be acquired and added to the given Credentials.
     * 
     * Default Impl: works for simple fs with its own token
     * and also for an embedded fs whose tokens are those of its
     * children file system (i.e. the embedded fs has not tokens of its
     * own).
     * 
     * @param renewer the user allowed to renew the delegation tokens
     * @param credentials cache in which to add new delegation tokens
     * @return list of new delegation tokens
     * @throws IOException
     */
    @InterfaceAudience.LimitedPrivate({ "HDFS", "MapReduce" })
    public Token<?>[] addDelegationTokens(
    
    17/08/03 15:48:49 INFO client.LaunchCluster: tokenRenewer is    yarn/[email protected]
    17/08/03 15:48:49 INFO hdfs.DFSClient: Created   HDFS_DELEGATION_TOKEN token 762341 for wanghuan70 on   ha-hdfs:nn-idc
    17/08/03 15:48:49 INFO client.LaunchCluster: Got dt for hdfs://nn-  idc; Kind: HDFS_DELEGATION_TOKEN, Service: ha-hdfs:nn-idc,   Ident: (HDFS_DELEGATION_TOKEN token 762341 for   wanghuan70)
    17/08/03 15:48:49 WARN token.Token: Cannot find class for token   kind kms-dt
    17/08/03 15:48:49 INFO client.LaunchCluster: Got dt for hdfs://nn-  idc; Kind: kms-dt, Service: 10.214.129.150:16000, Ident: 00 0a 77   61 6e 67 68 75 61 6e 37 30 04 79 61 72 6e 00 8a 01 5d a7 14 e1   22 8a 01 5d cb 21 65 22 8d 0d d1 8e 8f d7
    

    我們這裡是生成了訪問hdfs的Token HDFS_DELEGATION_TOKEN 以及 在hdfs上的 KMS的token,
    這裡我們可以注意到,在上面的分析中,我們的AM也要去連線RM和NM,但是為什麼這裡沒有去生成Token呢。我們可以看一下AM裡面的 ** UserGroupInformation**的狀態,我們通過在我們的 ApplicationMaster的啟動類中,加入如下程式碼:

          LOG.info("isSecurityEnabled: {}", UserGroupInformation.getCurrentUser().isSecurityEnabled());
          LOG.info("isLoginKeytabBased: {}", UserGroupInformation.getCurrentUser().isLoginKeytabBased());
          LOG.info("isLoginTicketBased: {}", UserGroupInformation.getCurrentUser().isLoginTicketBased());
          LOG.info("userName: {}", UserGroupInformation.getCurrentUser().getUserName());
    
          for (org.apache.hadoop.security.token.Token<? extends TokenIdentifier> token : UserGroupInformation.getCurrentUser().getTokens()) {
              LOG.info("Token kind is " + token.getKind().toString()
                      + " and the token's service name is " + token.getService());
          }
    

    讓我們來看下AM端的日誌:

image.png

可以看到 AM端的 初始UserGroupInformation是不帶要tgt的, 也就是說,沒辦法進行kerberos認證流程,AM端不管是與yarn還是 hdfs的通訊都應該是使用Token的。在圖片中Token列表中,我們看到出現了一個 名字叫 YARN_AM_RM_TOKEN ,這個並不是我們Client加進去的,但是可以確信的是AM使用該token與RM進行通訊,這個token哪裡來的呢?

帶著這個疑問,我們需要從Client開始扒拉一下程式碼了,在client端我們使用 YarnClient 將我們的啟動的資訊提交給了RM,這個YarnClient是經過kerberos認證的連線,那麼我們可以看下RM端是怎麼來處理這個 啟動ApplicationMaster請求的。我們提交給RM的是一個名叫ApplicationSubmissionContext, RM要從中創建出ContainerLaunchContext

image.png

這RM端的createAMContainerLaunchContext中,我們查到了我們的疑問之處,這裡出現了

  // Finalize the container
  setupTokens(container, containerID);

進去看看這個方法做了什麼?:

image.png

image.png

我們看到了我們想要的東西,container中新的tokens除了我們老的ContainerLaunchContext中我們從client傳遞過來的tokens,還額外添加了AMRMToken,到了這裡我們解決了我們上面的一個疑問:

AM和RM通訊是使用Token來認證的,這個AMRMToken是RM端啟動am的container的時候加塞進來的。

現在整理一下我們邏輯,啟動之後AM使用** YARN_AM_RM_TOKEN來和RM通訊,使用 HDFS_DELEGATION_TOKEN**來和hdfs filesystem通訊,那麼,AM是怎麼通知NN來啟動自己的 excutor的呢?不妨再看一下程式碼。

image.png

上面的圖很明瞭了,nmTokens由RM提供給AM,在AM建立NMClientAsync的時候,

image.png


從單例 NMTokenCache 中獲取到 nmTokens來進行連線NN。

 

到此,AM中的認證問題,我們已經整明白了,那邊由AM,啟動的其他的container的認證呢?,其實套路是一樣的!

          LOG.info("Launching a new container."
                  + ", containerId=" + container.getId()
                  + ", containerNode=" + container.getNodeId().getHost()
                  + ":" + container.getNodeId().getPort()
                  + ", containerNodeURI=" + container.getNodeHttpAddress()
                  + ", containerResourceMemory="
                  + container.getResource().getMemory()
                  + ", containerResourceVirtualCores="
                  + container.getResource().getVirtualCores()
                  + ", command: " + command);
          ContainerLaunchContext ctx = ContainerLaunchContext.newInstance(
                  localResources, env, Lists.newArrayList(command), null, setupTokens(), null);
          appMaster.addContainer(container);
          appMaster.getNMClientAsync().startContainerAsync(container, ctx);

只需要把AM中的token做傳遞即可。

長任務在kerberos系統中執行,以spark為例子

什麼是長任務? 就是long-running services,長時間執行的任務,可能是流也可以不是。
那麼為什麼,要把長任務單獨來說呢,因為從上述的yarn應用的描述,我們知道,am和excutor中使用的是token來訪問hdfs和rm 的,token是有時效性的我們是知道的,那麼,長時間執行,token肯定會失效,如果token失效的話,肯定就不能訪問hdfs了。所以這個就是 long-running 任務需要考慮的東西。
spark on yarn模式,分為兩種: 一種是 yarn client模式,一種是yarn cluster模式。一般來說業務上都會使用yarn cluster模式來執行,但是隨著分析類工具的推出,比如zeppelin,jupter的使用, 常駐的yarn client 所以這兩種模式都很重要。為了把事情講清楚,我們兩種方式分類來說明,本章節原始碼(1.6.0)通讀可以較多。

yarn clientyarn cluter 說到底都是yarn application,那麼client 和 cluster的區別到底區別在哪呢?-- Spark Driver是在本地執行還是在AM中來執行

擴充套件閱讀
過往記憶
kerberos_and_hadoop

yarn cluster 模式

image.png

  • spark 使用SparkSubmitAction來提交作業

    image.png

    • prepareSubmitEnvironment 根據 master(YARN/STANDALONE/MESOS/LOCAL)和deployMode(CLIENT/CLUSTER)來得到我們需要執行的Class入口
    • runMain 通過反射執行childMainClass中的main函式,因為這裡是 cluster模式,所在這裡執行的並不是使用者的程式碼,而是org.apache.spark.deploy.yarn.Client
  • Client裡面執行的是編譯一個yarn application必要的步驟:

    image.png

    • 建立 yarnClient 用於和RM通訊
    • 向RM申請一個newApp
    • 建立am需要的containerContext
    • 建立ApplicationSubmissionContext並提交,amClass為
    org.apache.spark.deploy.yarn.ApplicationMaster
    
    • Client完成;
  • ApplicationMaster

    • 啟動使用者的程式碼執行緒:

    image.png

    • 當SparkContext、Driver初始化完成的時候,通過amClient向ResourceManager註冊ApplicationMaster

    image.png

    • 上面的邏輯是yarn application必須的步驟,我們注意來看看spark 如何來處理 token失效的:
      // If the credentials file config is present, we must periodically renew tokens. So create
        // a new AMDelegationTokenRenewer
        if (sparkConf.contains("spark.yarn.credentials.file")) {
          delegationTokenRenewerOption = Some(new AMDelegationTokenRenewer(sparkConf, yarnConf))
          // If a principal and keytab have been set, use that to create new credentials for executors
          // periodically
          delegationTokenRenewerOption.foreach(_.scheduleLoginFromKeytab())
        }
    

    1.如果使用者在提交應用的過程中,使用 --keytab 引數上傳了kerberos認證檔案的話,AM裡面會啟動一個執行緒專門用來處理,我們可以看看 AMDelegationTokenRenewer 裡面都做了什麼:

    private[spark] def scheduleLoginFromKeytab(): Unit = {
     val principal = sparkConf.get("spark.yarn.principal")
     val keytab = sparkConf.get("spark.yarn.keytab")
    
     /**
      * Schedule re-login and creation of new tokens. If tokens have already expired, this method
      * will synchronously create new ones.
      */
     def scheduleRenewal(runnable: Runnable): Unit = {
       val credentials = UserGroupInformation.getCurrentUser.getCredentials
       val renewalInterval = hadoopUtil.getTimeFromNowToRenewal(sparkConf, 0.75, credentials)
       // Run now!
       if (renewalInterval <= 0) {
         logInfo("HDFS tokens have expired, creating new tokens now.")
         runnable.run()
       } else {
         logInfo(s"Scheduling login from keytab in $renewalInterval millis.")
         delegationTokenRenewer.schedule(runnable, renewalInterval, TimeUnit.MILLISECONDS)
       }
     }
    
     // This thread periodically runs on the driver to update the delegation tokens on HDFS.
     val driverTokenRenewerRunnable =
       new Runnable {
         override def run(): Unit = {
           try {
             writeNewTokensToHDFS(principal, keytab)
             cleanupOldFiles()
           } catch {
             case e: Exception =>
               // Log the error and try to write new tokens back in an hour
               logWarning("Failed to write out new credentials to HDFS, will try again in an " +
                 "hour! If this happens too often tasks will fail.", e)
               delegationTokenRenewer.schedule(this, 1, TimeUnit.HOURS)
               return
           }
           scheduleRenewal(this)
         }
       }
     // Schedule update of credentials. This handles the case of updating the tokens right now
     // as well, since the renenwal interval will be 0, and the thread will get scheduled
     // immediately.
     scheduleRenewal(driverTokenRenewerRunnable)
    }
    
    private def writeNewTokensToHDFS(principal: String, keytab: String): Unit = {
     // Keytab is copied by YARN to the working directory of the AM, so full path is
     // not needed.
    
     // HACK:
     // HDFS will not issue new delegation tokens, if the Credentials object
     // passed in already has tokens for that FS even if the tokens are expired (it really only
     // checks if there are tokens for the service, and not if they are valid). So the only real
     // way to get new tokens is to make sure a different Credentials object is used each time to
     // get new tokens and then the new tokens are copied over the the current user's Credentials.
     // So:
     // - we login as a different user and get the UGI
     // - use that UGI to get the tokens (see doAs block below)
     // - copy the tokens over to the current user's credentials (this will overwrite the tokens
     // in the current user's Credentials object for this FS).
     // The login to KDC happens each time new tokens are required, but this is rare enough to not
     // have to worry about (like once every day or so). This makes this code clearer than having
     // to login and then relogin every time (the HDFS API may not relogin since we don't use this
     // UGI directly for HDFS communication.
     logInfo(s"Attempting to login to KDC using principal: $principal")
     val keytabLoggedInUGI = UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, keytab)
     logInfo("Successfully logged into KDC.")
     val tempCreds = keytabLoggedInUGI.getCredentials
     val credentialsPath = new Path(credentialsFile)
     val dst = credentialsPath.getParent
     keytabLoggedInUGI.doAs(new PrivilegedExceptionAction[Void] {
       // Get a copy of the credentials
       override def run(): Void = {
         val nns = YarnSparkHadoopUtil.get.getNameNodesToAccess(sparkConf) + dst
         hadoopUtil.obtainTokensForNamenodes(nns, freshHadoopConf, tempCreds)
         null
       }
     })
     // Add the temp credentials back to the original ones.
     UserGroupInformation.getCurrentUser.addCredentials(tempCreds)
     val remoteFs = FileSystem.get(freshHadoopConf)
     // If lastCredentialsFileSuffix is 0, then the AM is either started or restarted. If the AM
     // was restarted, then the lastCredentialsFileSuffix might be > 0, so find the newest file
     // and update the lastCredentialsFileSuffix.
     if (lastCredentialsFileSuffix == 0) {
       hadoopUtil.listFilesSorted(
         remoteFs, credentialsPath.getParent,
         credentialsPath.getName, SparkHadoopUtil.SPARK_YARN_CREDS_TEMP_EXTENSION)
         .lastOption.foreach { status =>
         lastCredentialsFileSuffix = hadoopUtil.getSuffixForCredentialsPath(status.getPath)
       }
     }
     val nextSuffix = lastCredentialsFileSuffix + 1
     val tokenPathStr =
       credentialsFile + SparkHadoopUtil.SPARK_YARN_CREDS_COUNTER_DELIM + nextSuffix
     val tokenPath = new Path(tokenPathStr)
     val tempTokenPath = new Path(tokenPathStr + SparkHadoopUtil.SPARK_YARN_CREDS_TEMP_EXTENSION)
     logInfo("Writing out delegation tokens to " + tempTokenPath.toString)
     val credentials = UserGroupInformation.getCurrentUser.getCredentials
     credentials.writeTokenStorageFile(tempTokenPath, freshHadoopConf)
     logInfo(s"Delegation Tokens written out successfully. Renaming file to $tokenPathStr")
     remoteFs.rename(tempTokenPath, tokenPath)
     logInfo("Delegation token file rename complete.")
     lastCredentialsFileSuffix = nextSuffix
    }
    

    程式碼很長,邏輯可以概括為如下:
    1.根據token時效判斷是否需要進行token重新整理行為;
    2.使用hdfs上的keytab獲取新的tgt -- keytabLoggedInUGI
    3.在新的UserGroupInformation下,重新獲取新的 HDFS_DELEGATION_TOKEN 加到當前的 UserGroupInformation中,這裡大家留意一下
    freshHadoopConf

    image.png


    我們後面緊接著會具體講 如何與hdfs通訊的時候分析一下https://issues.apache.org/jira/browse/HDFS-9276
    4.將新的token資訊更新到hdfs目錄下。

     

    • Excutor的啟動的類為org.apache.spark.executor.org.apache.spark.executor

      image.png

       

      如果需要重新整理token,excutor會啟動一個更新token程式

    def updateCredentialsIfRequired(): Unit = {
      try {
        val credentialsFilePath = new Path(credentialsFile)
        val remoteFs = FileSystem.get(freshHadoopConf)
        SparkHadoopUtil.get.listFilesSorted(
          remoteFs, credentialsFilePath.getParent,
          credentialsFilePath.getName, SparkHadoopUtil.SPARK_YARN_CREDS_TEMP_EXTENSION)
          .lastOption.foreach { credentialsStatus =>
          val suffix = SparkHadoopUtil.get.getSuffixForCredentialsPath(credentialsStatus.getPath)
          if (suffix > lastCredentialsFileSuffix) {
            logInfo("Reading new delegation tokens from " + credentialsStatus.getPath)
            val newCredentials = getCredentialsFromHDFSFile(remoteFs, credentialsStatus.getPath)
            lastCredentialsFileSuffix = suffix
            UserGroupInformation.getCurrentUser.addCredentials(newCredentials)
            logInfo("Tokens updated from credentials file.")
          } else {
            // Check every hour to see if new credentials arrived.
            logInfo("Updated delegation tokens were expected, but the driver has not updated the " +
              "tokens yet, will check again in an hour.")
            delegationTokenRenewer.schedule(executorUpdaterRunnable, 1, TimeUnit.HOURS)
            return
          }
        }
        val timeFromNowToRenewal =
          SparkHadoopUtil.get.getTimeFromNowToRenewal(
            sparkConf, 0.8, UserGroupInformation.getCurrentUser.getCredentials)
        if (timeFromNowToRenewal <= 0) {
          executorUpdaterRunnable.run()
        } else {
          logInfo(s"Scheduling token refresh from HDFS in $timeFromNowToRenewal millis.")
          delegationTokenRenewer.schedule(
            executorUpdaterRunnable, timeFromNowToRenewal, TimeUnit.MILLISECONDS)
        }
      } catch {
        // Since the file may get deleted while we are reading it, catch the Exception and come
        // back in an hour to try again
        case NonFatal(e) =>
          logWarning("Error while trying to update credentials, will try again in 1 hour", e)
          delegationTokenRenewer.schedule(executorUpdaterRunnable, 1, TimeUnit.HOURS)
      }
    }
    

    邏輯也很明瞭:

    1. 從 hdfs相應目錄讀取由AM寫入的token檔案資訊;
    2. 更新到自己的ugi中;
      這裡也需要 對

     

    image.png


    也和上述https://issues.apache.org/jira/browse/HDFS-9276有關。

     


至此,實際上啟動的過程大概就是這樣,那麼現在我們需要對我們關心的問題來具體分析:

  • 我們的應用是怎麼連線到hdfs的?
    在hadoop api中提供 FileSystem 介面用於與各種檔案系統進行連線,HDFS也不除外,其具體類為DistributedFileSystem,進入這個類,可以看到連線hdfs的客戶端

    image.png

    DEBUG [2017-07-28 13:24:46,255] ({main}   DFSClient.java[<init>]:455) - dfs.client.use.legacy.blockreader.local   = false
    DEBUG [2017-07-28 13:24:46,255] ({main}   DFSClient.java[<init>]:458) - dfs.client.read.shortcircuit = false
    DEBUG [2017-07-28 13:24:46,256] ({main}   DFSClient.java[<init>]:461) - dfs.client.domain.socket.data.traffic =   false
    DEBUG [2017-07-28 13:24:46,256] ({main}   DFSClient.java[<init>]:464) - dfs.domain.socket.path = /var/run/hdfs-  sockets/dn
    DEBUG [2017-07-28 13:24:46,282] ({main}   HAUtil.java[cloneDelegationTokenForLogicalUri]:329) - No HA   service delegation token found for logical URI hdfs://nn-idc
    DEBUG [2017-07-28 13:24:46,282] ({main}   DFSClient.java[<init>]:455) - dfs.client.use.legacy.blockreader.local   = false
    DEBUG [2017-07-28 13:24:46,282] ({main}   DFSClient.java[<init>]:458) - dfs.client.read.shortcircuit = false
    DEBUG [2017-07-28 13:24:46,283] ({main}   DFSClient.java[<init>]:461) - dfs.client.domain.socket.data.traffic =   false
    DEBUG [2017-07-28 13:24:46,283] ({main}   DFSClient.java[<init>]:464) - dfs.domain.socket.path = /var/run/hdfs-  sockets/dn
    DEBUG [2017-07-28 13:24:46,285] ({main}   RetryUtils.java[getDefaultRetryPolicy]:75) -   multipleLinearRandomRetry = null
    DEBUG [2017-07-28 13:24:46,290] ({main}   ClientCache.java[getClient]:63) - getting client out of cache:   [email protected]
    DEBUG [2017-07-28 13:24:46,514] ({main}   NativeCodeLoader.java[<clinit>]:46) - Trying to load the custom-built   native-hadoop library...
    DEBUG [2017-07-28 13:24:46,515] ({main}   NativeCodeLoader.java[<clinit>]:50) - Loaded the native-hadoop   library
    DEBUG [2017-07-28 13:24:46,520] ({Thread-36}   DomainSocketWatcher.java[run]:453) -   [email protected]:   starting with interruptCheckPeriodMs = 60000
    DEBUG [2017-07-28 13:24:46,524] ({main}   DomainSocketFactory.java[<init>]:110) - Both short-circuit local   reads and UNIX domain socket are disabled.
    DEBUG [2017-07-28 13:24:46,530] ({main}   DataTransferSaslUtil.java[getSaslPropertiesResolver]:183) -   DataTransferProtocol not using SaslPropertiesResolver, no QOP   found in configuration for dfs.data.transfer.protection
    DEBUG [2017-07-28 13:24:46,534] ({main}   Logging.scala[logDebug]:62) - delegation token renewer is:   yarn/[email protected]
     INFO [2017-07-28 13:24:46,535] ({main}   Logging.scala[logInfo]:58) - getting token for namenode: hdfs://nn-  idc/user/wanghuan70/.sparkStaging/application_1499341382704_7  8490
    DEBUG [2017-07-28 13:24:46,537] ({main} Client.java[<init>]:434)   - The ping interval is 60000 ms.
    DEBUG [2017-07-28 13:24:46,537] ({main}   Client.java[setupIOstreams]:704) - Connecting to   ctum2f0302002.idc.xxx-group.net/10.214.128.51:8020
    DEBUG [2017-07-28 13:24:46,538] ({main}   UserGroupInformation.java[logPrivilegedAction]:1715) -   PrivilegedAction as:[email protected]   (auth:KERBEROS)   from:org.apache.hadoop.ipc.Client$Connection.setupIOstreams(Cli  ent.java:725)
    DEBUG [2017-07-28 13:24:46,539] ({main}   SaslRpcClient.java[sendSaslMessage]:457) - Sending sasl   message state: NEGOTIATE
    
    DEBUG [2017-07-28 13:24:46,541] ({main}   SaslRpcClient.java[saslConnect]:389) - Received SASL message   state: NEGOTIATE
    auths {
      method: "TOKEN"
      mechanism: "DIGEST-MD5"
      protocol: ""
      serverId: "default"
      challenge:   "realm=\"default\",nonce=\"FsxK1F2sX0QvIYFTYdwpNFYlB+uCuXr  x7se1tCAa\",qop=\"auth\",charset=utf-8,algorithm=md5-sess"
    }
    auths {
      method: "KERBEROS"
      mechanism: "GSSAPI"
      protocol: "hdfs"
      serverId: "ctum2f0302002.idc.xxx-group.net"
    }
    
    DEBUG [2017-07-28 13:24:46,541] ({main}   SaslRpcClient.java[getServerToken]:264) - Get token info   proto:interface   org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolPB   info:@org.apache.hadoop.security.token.TokenInfo(value=class   org.apache.hadoop.hdfs.security.token.delegation.DelegationToken  Selector)
    DEBUG [2017-07-28 13:24:46,542] ({main}   SaslRpcClient.java[getServerPrincipal]:291) - Get kerberos info   proto:interface   org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolPB   info:@org.apache.hadoop.security.KerberosInfo(clientPrincipal=,   serverPrincipal=dfs.namenode.kerberos.principal)
    DEBUG [2017-07-28 13:24:46,545] ({main}   SaslRpcClient.java[createSaslClient]:236) - RPC Server's Kerberos   principal name for   protocol=org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProto  colPB is hdfs/[email protected]  GROUP.NET
      DEBUG [2017-07-28 13:24:46,546] ({main}   SaslRpcClient.java[createSaslClient]:247) - Creating SASL   GSSAPI(KERBEROS)  client to authenticate to service at   ctum2f0302002.idc.wanda-group.net
    DEBUG [2017-07-28 13:24:46,547] ({main}   SaslRpcClient.java[selectSaslClient]:176) - Use KERBEROS   authentication for protocol ClientNamenodeProtocolPB
    DEBUG [2017-07-28 13:24:46,564] ({main}   SaslRpcClient.java[sendSaslMessage]:457) - Sending sasl   message state: INITIATE
    

    這裡摘錄了部分debug日誌,這樣就很好的邏輯描述清楚了

    1. DFSClient 通過 ClientNamenodeProtocolPB協議來和namenode建立聯絡。底層RPC在簡歷連線的時候如果有token則使用token進行建立連線,如果沒有token再進行kerberos認證後建立連線。

     

    image.png


    在dfsclient中使用 DelegationTokenSelector來選取即id為 HDFS_DELEGATION_TOKEN的token。在我們沒有使用

     

    YarnSparkHadoopUtil.get.obtainTokensForNamenodes(nns, hadoopConf, credentials)
    

    當前的UGI中是不能使用token進行連線的。
    在初始化 DFSClient 中,使用的 dfs.client.failover.proxy.provider 是 org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider 在構造過程中會呼叫

      // The client may have a delegation token set for the logical
        // URI of the cluster. Clone this token to apply to each of the
        // underlying IPC addresses so that the IPC code can find it.
        HAUtil.cloneDelegationTokenForLogicalUri(ugi, uri, addressesOfNns);
    

    這裡的作用在 HA mode下很重要,在HA mode的形式下,我們使用 obtainTokensForNamenodes 生成的 token的 service name 為 ha-hdfs:nn-idc

    DFSClient.java[getDelegationToken]:1066) - Created HDFS_DELEGATION_TOKEN token 735040 for wanghuan70 on ha-hdfs:nn-idc

    但是呢,在rpc連線的時候,使用的host或者ip加port的 service name來尋找 token的,換句話說,即時我們獲取了token,saslRPC在連線的時候也找不到,這裡就是使用 HAUtil.java[cloneDelegationTokenForLogicalUri]:329) 將 service name為ha-hdfs:nn-idc 拷貝成和 ip對應的token,這樣
    saslRPC才可以順利使用token。但是要注意的是 只有在DFSClient初始化過程中,才會進行這個token的拷貝。 可是呢,

     

    image.png


    在獲取 FileSystem的時候,預設的情況下,這個例項會被cache的,也就是說,DFSClient就不會初始化了,我們更新的token就不會使用 HAUtil.java[cloneDelegationTokenForLogicalUri]:329) 將 service name為ha-hdfs:nn-idc 拷貝成和 ip對應的token,這樣即使這樣
    saslRPC使用仍然是老token,就會過期掉,這就是 https://issues.apache.org/jira/browse/HDFS-9276的描述的問題。針對這個問題,hadoop版本升級後可以修復,還有一個方法就是,如果不cache的話,就會呼叫 DFSClient 初始化方法,所以,我們可以設定這個預設引數為 true

     

  • spark的excutor並不一定一開始就是給定的,是動態的增加的,也就是說一個長應用的AM可能在很長的一段時間內都會和 RM通訊,我們回顧一下上面的內容,我們知道AMRMToken是RM在啟動AM的時候下發的,而且,我們在重新整理機制中,僅僅重新整理了HDFS_DELEGATION_TOKEN,那邊怎麼來處理AMRMToken過期呢,這spark裡面其實並沒有在對此做處理,為什麼呢?

建立的saslRPC連線只有空閒時間超過10s中,連線才會被關閉,如果我們的AM保持著對RM的心跳,也就不需要重新與RM建立連線(讀者可以推演一下RM發生準備切換的情景)。

yarn client 模式

image.png

這裡只講一下和 yarn cluster的不同之處:

  • 因為Spark Driver是在本地執行,所以在使用SparkSubmit提交的時候 runMain 通過反射執行childMainClass中的main函式,這裡的childMainClass 是使用者的程式碼。

  • SparkContext生成的過程,根據提交方式,使用YarnClientSchedulerBackend來排程

    image.png

  • 因為使用者的程式碼已經本地啟動了,那麼啟動的AM裡面執行什麼呢?
    什麼業務程式碼都不執行,只負責向RM申請資源。

  • Driver 因為需要獲悉application的執行情況,啟動了一個監控執行緒,每1s鍾向RM諮詢一次狀態,也不需要重新整理token

    image.png


我們上面所說的 hdfs的token重新整理都是在使用者使用 --keytab的方式提交的,如果不是以這種方式提交的長任務,token肯定會失效,會報錯。



作者:PunyGod
連結:https://www.jianshu.com/p/ae5a3f39a9af
來源:簡書
簡書著作權歸作者所有,任何形式的轉載都請聯絡作者獲得授權並註明出處。