1. 程式人生 > >別被官方文件迷惑了!這篇文章幫你詳解yarn公平排程

別被官方文件迷惑了!這篇文章幫你詳解yarn公平排程

歡迎大家前往騰訊雲+社群,獲取更多騰訊海量技術實踐乾貨哦~

FairScheduler是yarn常用的排程器,但是僅僅參考官方文件,有很多引數和概念文件裡沒有詳細說明,但是這些參明顯會影響到叢集的正常執行。本文的主要目的是通過梳理程式碼將關鍵引數的功能理清楚。下面列出官方文件中常用的引數:

yarn.scheduler.fair.preemption.cluster-utilization-threshold The utilization threshold after which preemption kicks in. The utilization is computed as the maximum ratio of usage to capacity among all resources. Defaults to 0.8f.
yarn.scheduler.fair.update-interval-ms The interval at which to lock the scheduler and recalculate fair shares, recalculate demand, and check whether anything is due for preemption. Defaults to 500 ms.
maxAMShare limit the fraction of the queue’s fair share that can be used to run application masters. This property can only be used for leaf queues. For example, if set to 1.0f, then AMs in the leaf queue can take up to 100% of both the memory and CPU fair share. The value of -1.0f will disable this feature and the amShare will not be checked. The default value is 0.5f.
minSharePreemptionTimeout number of seconds the queue is under its minimum share before it will try to preempt containers to take resources from other queues. If not set, the queue will inherit the value from its parent queue.
fairSharePreemptionTimeout number of seconds the queue is under its fair share threshold before it will try to preempt containers to take resources from other queues. If not set, the queue will inherit the value from its parent queue.
fairSharePreemptionThreshold If the queue waits fairSharePreemptionTimeout without receiving fairSharePreemptionThreshold*fairShare resources, it is allowed to preempt containers to take resources from other queues. If not set, the queue will inherit the value from its parent queue.

在上述引數描述中,timeout等引數值沒有給出預設值,沒有告知不設定會怎樣。minShare,fairShare等概念也沒有說清楚,很容易讓人云裡霧裡。關於這些引數和概念的詳細解釋,在下面的分析中一一給出。

FairScheduler整體結構

img 圖(1) FairScheduler 執行流程圖

公平排程器的執行流程就是RM去啟動FairScheduler,SchedulerDispatcher兩個服務,這兩個服務各自負責update執行緒,handle執行緒。

update執行緒有兩個任務:(1)更新各個佇列的資源(Instantaneous Fair Share),(2)判斷各個leaf佇列是否需要搶佔資源(如果開啟搶佔功能)

handle執行緒主要是處理一些事件響應,比如叢集增加節點,佇列增加APP,佇列刪除APP,APP更新container等。

FairScheduler類圖

img圖(2) FairScheduler相關類圖

佇列繼承模組:yarn通過樹形結構來管理佇列。從管理資源角度來看,樹的根節點root佇列(FSParentQueue),非根節點(FSParentQueue),葉子節點(FSLeaf),app任務(FSAppAttempt,公平排程器角度的App)都是抽象的資源,它們都實現了Schedulable介面,都是一個可排程資源物件。它們都有自己的fair share(佇列的資源量)方法(這裡又用到了fair share概念),weight屬性(權重)、minShare屬性(最小資源量)、maxShare屬性(最大資源量),priority屬性(優先順序)、resourceUsage屬性(資源使用量屬性)以及資源需求量屬性(demand),同時也都實現了preemptContainer搶佔資源的方法,assignContainer方法(為一個ACCEPTED的APP分配AM的container)。

public interface Schedulable {
  /**
   * Name of job/queue, used for debugging as well as for breaking ties in
   * scheduling order deterministically.
   */
  public String getName();

  /**
   * Maximum number of resources required by this Schedulable. This is defined as
   * number of currently utilized resources + number of unlaunched resources (that
   * are either not yet launched or need to be speculated).
   */
  public Resource getDemand();

  /** Get the aggregate amount of resources consumed by the schedulable. */
  public Resource getResourceUsage();

  /** Minimum Resource share assigned to the schedulable. */
  public Resource getMinShare();

  /** Maximum Resource share assigned to the schedulable. */
  public Resource getMaxShare();

  /** Job/queue weight in fair sharing. */
  public ResourceWeights getWeights();

  /** Start time for jobs in FIFO queues; meaningless for QueueSchedulables.*/
  public long getStartTime();

 /** Job priority for jobs in FIFO queues; meaningless for QueueSchedulables. */
  public Priority getPriority();

  /** Refresh the Schedulable's demand and those of its children if any. */
  public void updateDemand();

  /**
   * Assign a container on this node if possible, and return the amount of
   * resources assigned.
   */
  public Resource assignContainer(FSSchedulerNode node);

  /**
   * Preempt a container from this Schedulable if possible.
   */
  public RMContainer preemptContainer();

  /** Get the fair share assigned to this Schedulable. */
  public Resource getFairShare();

  /** Assign a fair share to this Schedulable. */
  public void setFairShare(Resource fairShare);
}

佇列執行模組:從類圖角度描述公平排程的工作原理。SchedulerEventDispatcher類負責管理handle執行緒。FairScheduler類管理update執行緒,通過QueueManager獲取所有佇列資訊。

我們從Instantaneous Fair Share 和Steady Fair Share 這兩個yarn的基本概念開始進行程式碼分析。

Instantaneous Fair Share & Steady Fair Share

Fair Share指的都是Yarn根據每個佇列的權重、最大,最小可執行資源計算的得到的可以分配給這個佇列的最大可用資源。本文描述的是公平排程,公平排程的預設策略FairSharePolicy的規則是single-resource,即只關注記憶體資源這一項指標。

Steady Fair Share:是每個佇列記憶體資源量的固定理論值。Steady Fair Share在RM初期工作後不再輕易改變,只有後續在增加節點(addNode)時才會重新計算。RM的初期工作也是handle執行緒把叢集的每個節點新增到排程器中(addNode)。

Instantaneous Fair Share:是每個佇列的記憶體資源量的實際值,是在動態變化的。yarn裡的fair share如果沒有專門指代,都是指的的Instantaneous Fair Share。

1 Steady Fair Share計算方式

img 圖(3) steady fair share 計算流程

handle執行緒如果接收到NODE_ADDED事件,會去呼叫addNode方法。

  private synchronized void addNode(RMNode node) {
    FSSchedulerNode schedulerNode = new FSSchedulerNode(node, usePortForNodeName);
    nodes.put(node.getNodeID(), schedulerNode);
    //將該節點的記憶體加入到叢集總資源
    Resources.addTo(clusterResource, schedulerNode.getTotalResource());
    //更新available資源
    updateRootQueueMetrics();
    //更新一個container的最大分配,就是UI介面裡的MAX(如果沒有記錯的話)
    updateMaximumAllocation(schedulerNode, true);

    //設定root佇列的steadyFailr=clusterResource的總資源
    queueMgr.getRootQueue().setSteadyFairShare(clusterResource);
    //重新計算SteadyShares
    queueMgr.getRootQueue().recomputeSteadyShares();
    LOG.info("Added node " + node.getNodeAddress() +
        " cluster capacity: " + clusterResource);
  }

recomputeSteadyShares 使用廣度優先遍歷計算每個佇列的記憶體資源量,直到葉子節點。

 public void recomputeSteadyShares() {
    //廣度遍歷整個佇列樹
    //此時getSteadyFairShare 為clusterResource
    policy.computeSteadyShares(childQueues, getSteadyFairShare());
    for (FSQueue childQueue : childQueues) {
      childQueue.getMetrics().setSteadyFairShare(childQueue.getSteadyFairShare());
      if (childQueue instanceof FSParentQueue) {
        ((FSParentQueue) childQueue).recomputeSteadyShares();
      }
    }
  }

computeSteadyShares方法計算每個佇列應該分配到的記憶體資源,總體來說是根據每個佇列的權重值去分配,權重大的佇列分配到的資源更多,權重小的佇列分配到得資源少。但是實際的細節還會受到其他因素影響,是因為每佇列有minResources和maxResources兩個引數來限制資源的上下限。computeSteadyShares最終去呼叫computeSharesInternal方法。比如以下圖為例:

圖中的數字是權重,假如有600G的總資源,parent=300G,leaf1=300G,leaf2=210G,leaf3=70G。

img圖(4) yarn佇列權重

computeSharesInternal方法概括來說就是通過二分查詢法尋找到一個資源比重值R(weight-to-slots),使用這個R為每個佇列分配資源(在該方法裡佇列的型別是Schedulable,再次說明佇列是一個資源物件),公式是steadyFairShare=R * QueueWeights

computeSharesInternal是計算Steady Fair Share 和Instantaneous Fair Share共用的方法,根據引數isSteadyShare來區別計算。

之所以要做的這麼複雜,是因為佇列不是單純的按照比例來分配資源的(單純按權重比例,需要maxR,minR都不設定。maxR的預設值是0x7fffffff,minR預設值是0)。如果設定了maxR,minR,按比例分到的資源小於minR,那麼必須滿足minR。按比例分到的資源大於maxR,那麼必須滿足maxR。因此想要找到一個R(weight-to-slots)來儘可能滿足:

  • R*(Queue1Weights + Queue2Weights+...+QueueNWeights) <=totalResource
  • R*QueueWeights >= minShare
  • R*QueueWeights <= maxShare

注:QueueNWeights為佇列各自的權重,minShare和maxShare即各個佇列的minResources和maxResources

computcomputeSharesInternal詳細來說分為四個步驟:

  1. 確定可用資源:totalResources = min(totalResources-takenResources(fixedShare), totalMaxShare)
  2. 確定R上下限
  3. 二分查詢法逼近R
  4. 使用R設定fair Share
  private static void computeSharesInternal(
      Collection<? extends Schedulable> allSchedulables,
      Resource totalResources, ResourceType type, boolean isSteadyShare) {

    Collection<Schedulable> schedulables = new ArrayList<Schedulable>();
    //第一步
    //排除有固定資源不能動的佇列,並得出固定記憶體資源
    int takenResources = handleFixedFairShares(
        allSchedulables, schedulables, isSteadyShare, type);

    if (schedulables.isEmpty()) {
      return;
    }
    // Find an upper bound on R that we can use in our binary search. We start
    // at R = 1 and double it until we have either used all the resources or we
    // have met all Schedulables' max shares.
    int totalMaxShare = 0;
    //遍歷schedulables(非固定fixed佇列),將各個佇列的資源相加得到totalMaxShare
    for (Schedulable sched : schedulables) {
      int maxShare = getResourceValue(sched.getMaxShare(), type);
      totalMaxShare = (int) Math.min((long)maxShare + (long)totalMaxShare,
          Integer.MAX_VALUE);
      if (totalMaxShare == Integer.MAX_VALUE) {
        break;
      }
    }
    //總資源要減去fiexd share
    int totalResource = Math.max((getResourceValue(totalResources, type) -
        takenResources), 0);
    //佇列所擁有的最大資源是有叢集總資源和每個佇列的MaxResource雙重限制
    totalResource = Math.min(totalMaxShare, totalResource);
    //第二步:設定R的上下限
    double rMax = 1.0;
    while (resourceUsedWithWeightToResourceRatio(rMax, schedulables, type)
        < totalResource) {
      rMax *= 2.0;
    }

    //第三步:二分法逼近合理R值
    // Perform the binary search for up to COMPUTE_FAIR_SHARES_ITERATIONS steps
    double left = 0;
    double right = rMax;
    for (int i = 0; i < COMPUTE_FAIR_SHARES_ITERATIONS; i++) {
      double mid = (left + right) / 2.0;
      int plannedResourceUsed = resourceUsedWithWeightToResourceRatio(
          mid, schedulables, type);
      if (plannedResourceUsed == totalResource) {
        right = mid;
        break;
      } else if (plannedResourceUsed < totalResource) {
        left = mid;
      } else {
        right = mid;
      }
    }
    //第四步:使用R值設定,確定各個非fixed佇列的fairShar,意味著只有活躍佇列可以分資源
    // Set the fair shares based on the value of R we've converged to
    for (Schedulable sched : schedulables) {
      if (isSteadyShare) {
        setResourceValue(computeShare(sched, right, type),
            ((FSQueue) sched).getSteadyFairShare(), type);
      } else {
        setResourceValue(
            computeShare(sched, right, type), sched.getFairShare(), type);
      }
    }
  }

(1) 確定可用資源

handleFixedFairShares方法來統計出所有fixed佇列的fixed記憶體資源(fixedShare)相加,並且fixed佇列排除掉不得瓜分系統資源。yarn確定fixed佇列的標準如下:

  private static int getFairShareIfFixed(Schedulable sched,
      boolean isSteadyShare, ResourceType type) {

    //如果佇列的maxShare <=0  則是fixed佇列,fixdShare=0
    if (getResourceValue(sched.getMaxShare(), type) <= 0) {
      return 0;
    }

    //如果是計算Instantaneous Fair Share,並且該佇列內沒有APP再跑,
    // 則是fixed佇列,fixdShare=0
    if (!isSteadyShare &&
        (sched instanceof FSQueue) && !((FSQueue)sched).isActive()) {
      return 0;
    }

    //如果佇列weight<=0,則是fixed佇列
    //如果對列minShare <=0,fixdShare=0,否則fixdShare=minShare
    if (sched.getWeights().getWeight(type) <= 0) {
      int minShare = getResourceValue(sched.getMinShare(), type);
      return (minShare <= 0) ? 0 : minShare;
    }

    return -1;
  }

(2)確定R上下限

R的下限為1.0,R的上限是由resourceUsedWithWeightToResourceRatio方法來確定。該方法確定的資源值W,第一步中確定的可用資源值TW>=T時,R才能確定。

//根據R值去計算每個佇列應該分配的資源
  private static int resourceUsedWithWeightToResourceRatio(double w2rRatio,
      Collection<? extends Schedulable> schedulables, ResourceType type) {
    int resourcesTaken = 0;
    for (Schedulable sched : schedulables) {
      int share = computeShare(sched, w2rRatio, type);
      resourcesTaken += share;
    }
    return resourcesTaken;
  }
 private static int computeShare(Schedulable sched, double w2rRatio,
      ResourceType type) {
    //share=R*weight,type是記憶體
    double share = sched.getWeights().getWeight(type) * w2rRatio;
    share = Math.max(share, getResourceValue(sched.getMinShare(), type));
    share = Math.min(share, getResourceValue(sched.getMaxShare(), type));
    return (int) share;
  }

(3)二分查詢法逼近R

滿足下面兩個條件中的一個即可終止二分查詢:

  • W == T(步驟2中的W和T)
  • 超過25次(COMPUTE_FAIR_SHARES_ITERATIONS)

(4)使用R設定fair share

設定fair share時,可以看到區分了Steady Fair Share 和Instantaneous Fair Share。

  for (Schedulable sched : schedulables) {
      if (isSteadyShare) {
        setResourceValue(computeShare(sched, right, type),
            ((FSQueue) sched).getSteadyFairShare(), type);
      } else {
        setResourceValue(
            computeShare(sched, right, type), sched.getFairShare(), type);
      }
    }

2 Instaneous Fair Share計算方式

img圖(5)Instaneous Fair Share 計算流程

該計算方式與steady fair的計算呼叫棧是一致的,最終都要使用到computeSharesInternal方法,唯一不同的是計算的時機不一樣。steady fair只有在addNode的時候才會重新計算一次,而Instantaneous Fair Share是由update執行緒定期去更新。

此處強調的一點是,在上文中我們已經分析如果是計算Instantaneous Fair Share,並且佇列為空,那麼該佇列就是fixed佇列,也就是非活躍佇列,那麼計算fair share時,該佇列是不會去瓜分叢集的記憶體資源。

而update執行緒的更新頻率就是由 yarn.scheduler.fair.update-interval-ms來決定的。

private class UpdateThread extends Thread {

    @Override
    public void run() {
      while (!Thread.currentThread().isInterrupted()) {
        try {
          //yarn.scheduler.fair.update-interval-ms
          Thread.sleep(updateInterval);
          long start = getClock().getTime();
          // 更新Instantaneous Fair Share
          update();
          //搶佔資源
          preemptTasksIfNecessary();
          long duration = getClock().getTime() - start;
          fsOpDurations.addUpdateThreadRunDuration(duration);
        } catch (InterruptedException ie) {
          LOG.warn("Update thread interrupted. Exiting.");
          return;
        } catch (Exception e) {
          LOG.error("Exception in fair scheduler UpdateThread", e);
        }
      }
    }
  }

3 maxAMShare意義

handle執行緒如果接收到NODE_UPDATE事件,如果(1)該node的機器記憶體資源滿足條件,(2)並且有ACCEPTED狀態的Application,那麼將會為該待執行的APP的AM分配一個container,使該APP在所處的queue中跑起來。但在分配之前還需要一道檢查canRuunAppAM。能否通過canRuunAppAM,就是由maxAMShare引數限制。

  public boolean canRunAppAM(Resource amResource) {
    //預設是0.5f
    float maxAMShare =
        scheduler.getAllocationConfiguration().getQueueMaxAMShare(getName());
    if (Math.abs(maxAMShare - -1.0f) < 0.0001) {
      return true;
    }
    //該隊的maxAMResource=maxAMShare * fair share(Instantaneous Fair Share)
    Resource maxAMResource = Resources.multiply(getFairShare(), maxAMShare);
    //amResourceUsage是該佇列已經在執行的App的AM所佔資源累加和
    Resource ifRunAMResource = Resources.add(amResourceUsage, amResource);
    //檢視當前ifRunAMResource是否超過maxAMResource
    return !policy
        .checkIfAMResourceUsageOverLimit(ifRunAMResource, maxAMResource);
  }

上面程式碼我們用公式來描述:

  • 佇列中執行的APP為An,每個APP的AM佔用資源為R
  • ACCEPTED狀態(待執行)的APP的AM大小為R1
  • 佇列的fair share為QueFS
  • 佇列的maxAMResource=maxAMShare * QueFS
  • ifRunAMResource=A1.R+A2.R+...+An.R+R1
  • ifRunAMResource > maxAMResource,則該佇列不能接納待執行的APP

之所以要關注這個引數,是因為EMR很多客戶在使用公平佇列時會反映叢集的總資源沒有用滿,但是還有APP在排隊,沒有跑起來,如下圖所示:

img圖(6) APP阻塞例項

公平排程預設策略不關心Core的資源,只關心Memory。圖中Memory用了292G,還有53.6G的記憶體沒用,APP就可以阻塞。原因就是default佇列所有執行中APP的AM資源總和超過了(345.6 * 0.5),導致APP阻塞。

總結

通過分析fair share的計算流程,搞清楚yarn的基本概念和部分引數,從下面的表格對比中,我們也可以看到官方的文件對概念和引數的描述是比較難懂的。剩餘的引數放在第二篇-公平排程之搶佔中分析。

官方描述 總結
Steady Fair Share The queue’s steady fair share of resources. These shares consider all the queues irrespective of whether they are active (have running applications) or not. These are computed less frequently and change only when the configuration or capacity changes.They are meant to provide visibility into resources the user can expect, and hence displayed in the Web UI. 每個非fixed佇列記憶體資源量的固定理論值。Steady Fair Share在RM初期工作後不再輕易改變,只有後續在增加節點改編配置(addNode)時才會重新計算。RM的初期工作也是handle執行緒把叢集的每個節點新增到排程器中(addNode)。
Instantaneous Fair Share The queue’s instantaneous fair share of resources. These shares consider only actives queues (those with running applications), and are used for scheduling decisions. Queues may be allocated resources beyond their shares when other queues aren’t using them. A queue whose resource consumption lies at or below its instantaneous fair share will never have its containers preempted. 每個非fixed佇列(活躍佇列)的記憶體資源量的實際值,是在動態變化的,由update執行緒去定時更新佇列的fair share。yarn裡的fair share如果沒有專門指代,都是指的的Instantaneous Fair Share。
yarn.scheduler.fair.update-interval-ms The interval at which to lock the scheduler and recalculate fair shares, recalculate demand, and check whether anything is due for preemption. Defaults to 500 ms. update執行緒的間隔時間,該執行緒的工作是1更新fair share,2檢查是否需要搶佔資源。
maxAMShare limit the fraction of the queue’s fair share that can be used to run application masters. This property can only be used for leaf queues. For example, if set to 1.0f, then AMs in the leaf queue can take up to 100% of both the memory and CPU fair share. The value of -1.0f will disable this feature and the amShare will not be checked. The default value is 0.5f. 佇列所有執行中的APP的AM資源總和必須不能超過maxAMShare * fair share

此文已由作者授權騰訊雲+社群釋出,更多原文請點選

搜尋關注公眾號「雲加社群」,第一時間獲取技術乾貨,關注後回覆1024 送你一份技術課程大禮包!

海量技術實踐經驗,盡在雲加社群