1. 程式人生 > >Flink原始碼系列——JobManager處理SubmitJob的過程

Flink原始碼系列——JobManager處理SubmitJob的過程

接《Flink原始碼系列——獲取JobGraph的過程》,在獲取到JobGraph後,客戶端會封裝一個SubmitJob訊息,並將其提交給JobManager,本文就接著分析,JobManager在收到SubmitJob訊息後,對其處理邏輯。JobManager是一個Actor,其對接受到的各種訊息的處理入口是handleMessage這個方法,其中對SubmitJob的處理入口如下:

override def handleMessage: Receive = {
    …

    case SubmitJob(jobGraph, listeningBehaviour) =>
        val client = sender()
        val jobInfo = new JobInfo(client, listeningBehaviour, System.currentTimeMillis(), jobGraph.getSessionTimeout)

        submitJob(jobGraph, jobInfo)

    …
}

這裡構造了一個JobInfo例項,其是用來儲存job的相關資訊的,如提交job的客戶端、客戶端監聽模式、任務提交的開始時間、會話超時時間、以及結束時間、耗時等資訊。 
其中監聽模式有三種,三種模型下關心的訊息內容依次增加,解釋如下:

a、DETACHED —— 只關心job提交的確認訊息 
b、EXECUTION_RESULT —— 還關心job的執行結果 
c、EXECUTION_RESULT_AND_STATE_CHANGES —— 還關心job的狀態變化
然後就進入了真正的處理邏輯subminJob()方法中了,這個方法的程式碼稍微有點長,這裡就分段進行分析,另外submitJob這個方法除了上述的jobGraph和jobInfo兩個入參外,還有一個isRecovery的布林變數,預設值是false,用來標識當前處理的是否是一個job的恢復操作。這個邏輯根據jobGraph是否為null分為兩個大的分支,先看下jobGraph為null的情況,處理邏輯就是構造一個job提交異常的訊息,然後通知客戶端,告訴客戶端jobGraph不能為null。

jobInfo.notifyClients(
  decorateMessage(JobResultFailure(
    new SerializedThrowable(
      new JobSubmissionException(null, "JobGraph must not be null.")))))

重點還是分析jobGraph不為null的情況下的處理邏輯,這部分的邏輯也可以分為兩大部分。

a、根據jobGraph構建ExecutionGraph 
b、對構建好的ExecutionGraph進行排程執行
在構建ExecutionGraph這部分,會進行一些初始化的工作,如果在這過程中,發生異常,會將初始化過程做的操作進行回滾操作。

1、構建ExecutionGraph

1.1、總覽

在開始ExecutionGraph的構建之前,會先獲取構建所需的引數,如下:

/** 將job所需jar相關資訊註冊到library管理器中,如果註冊失敗,則丟擲異常 */
try {
  libraryCacheManager.registerJob(
    jobGraph.getJobID, jobGraph.getUserJarBlobKeys, jobGraph.getClasspaths)
}
catch {
  case t: Throwable =>
    throw new JobSubmissionException(jobId,
      "Cannot set up the user code libraries: " + t.getMessage, t)
}

/** 獲取使用者類載入器,如果獲取的類載入器為null,則丟擲異常 */
val userCodeLoader = libraryCacheManager.getClassLoader(jobGraph.getJobID)
if (userCodeLoader == null) {
  throw new JobSubmissionException(jobId,
    "The user code class loader could not be initialized.")
}

/** 判斷{@code JobGraph}中的{@code StreamNode}的個數, 如果為0, 則說明是個空任務,丟擲異常 */
if (jobGraph.getNumberOfVertices == 0) {
  throw new JobSubmissionException(jobId, "The given job is empty")
}

/** 優先採用JobGraph配置的重啟策略,如果沒有配置,則採用JobManager中配置的重啟策略 */
val restartStrategy =
  Option(jobGraph.getSerializedExecutionConfig()
    .deserializeValue(userCodeLoader)
    .getRestartStrategy())
    .map(RestartStrategyFactory.createRestartStrategy)
    .filter(p => p != null) match {
    case Some(strategy) => strategy
    case None => restartStrategyFactory.createRestartStrategy()
  }

log.info(s"Using restart strategy $restartStrategy for $jobId.")

val jobMetrics = jobManagerMetricGroup.addJob(jobGraph)

/** 獲取註冊在排程器上的所有TaskManager例項的總的slot數量 */
val numSlots = scheduler.getTotalNumberOfSlots()

/** 針對jobID,看是否已經存在 ExecutionGraph,如果有,則直接獲取已有的,並將registerNewGraph標識為false */
val registerNewGraph = currentJobs.get(jobGraph.getJobID) match {
  case Some((graph, currentJobInfo)) =>
    executionGraph = graph
    currentJobInfo.setLastActive()
    false
  case None =>
    true
}

上面這段邏輯主要做一些準備工作,如jar包註冊,類載入器,重啟策略等,這些準備好之後,就可以開始ExecutionGraph的構建,呼叫如下:

/** 通過{@link JobGraph}構建出{@link ExecutionGraph} */
executionGraph = ExecutionGraphBuilder.buildGraph(
  executionGraph,
  jobGraph,
  flinkConfiguration,
  futureExecutor,
  ioExecutor,
  scheduler,
  userCodeLoader,
  checkpointRecoveryFactory,
  Time.of(timeout.length, timeout.unit),
  restartStrategy,
  jobMetrics,
  numSlots,
  blobServer,
  log.logger)

/** 如果還沒有註冊過, 則進行註冊 */
if (registerNewGraph) {
  currentJobs.put(jobGraph.getJobID, (executionGraph, jobInfo))
}

/** 註冊job狀態變化監聽器 */
executionGraph.registerJobStatusListener(
  new StatusListenerMessenger(self, leaderSessionID.orNull))

jobInfo.clients foreach {
  /** 如果客戶端關心執行結果和狀態變化,則為客戶端在executiongraph中註冊相應的監聽器 */
  case (client, ListeningBehaviour.EXECUTION_RESULT_AND_STATE_CHANGES) =>
    val listener  = new StatusListenerMessenger(client, leaderSessionID.orNull)
    executionGraph.registerExecutionListener(listener)
    executionGraph.registerJobStatusListener(listener)
  case _ => // 如果不關心,則什麼都不做
}

在ExecutionGraph構建好只有,就會設定相應的監聽器,用來監聽其後續的排程執行情況。 
另外這段程式碼的執行會被整個的進行了try…catch,看下catch中的邏輯。

/** 如果異常, 則進行回收操作 */
case t: Throwable =>
  log.error(s"Failed to submit job $jobId ($jobName)", t)
  /** 進行jar包的註冊回滾 */
  libraryCacheManager.unregisterJob(jobId)
  blobServer.cleanupJob(jobId)
  /** 移除上面註冊的graph */
  currentJobs.remove(jobId)
  /** 如果executionGraph不為null,還需要執行failGlobal操作 */
  if (executionGraph != null) {
    executionGraph.failGlobal(t)
  }
  /** 構建JobExecutionException移除 */
  val rt: Throwable = if (t.isInstanceOf[JobExecutionException]) {
    t
  } else {
    new JobExecutionException(jobId, s"Failed to submit job $jobId ($jobName)", t)
  }
  /** 通知客戶端,job失敗了 */
  jobInfo.notifyClients(
    decorateMessage(JobResultFailure(new SerializedThrowable(rt))))
  /** 退出submitJob方法 */
  return

可見catch中,主要進行一些回滾操作,這樣可以確保在出現異常的情況下,可以讓已經上傳的jar等被刪除掉。

1.2、ExecutionGraph

ExecutionGraph是JobGraph的並行模式,是基於JobGraph構建出來的,主要構建邏輯都在ExecutionGraphBuilder這個類中,而且該方法的建構函式是private的,且該類只有兩個static方法,buildGraph()和idToVertex(),而ExecutionGraph的構造邏輯都在buildGraph()方法中。 
在buildGraph()方法中,先是對executionGraph進行一些基礎的設定,如果有需要,則對各個JobVertex進行初始化操作,然後就是將JobVertex轉化成ExecutionGraph中的元件,轉化成功後,則開始設定checkpoint相關的配置。 
這裡主要JobVertex轉化的邏輯,程式碼如下:

/** 1、構建有序拓撲列表 */
List<JobVertex> sortedTopology = jobGraph.getVerticesSortedTopologicallyFromSources();
if (log.isDebugEnabled()) {
   log.debug("Adding {} vertices from job graph {} ({}).", sortedTopology.size(), jobName, jobId);
}
/** 2、轉化JobVertex */
executionGraph.attachJobGraph(sortedTopology);

主要的轉換程式碼就兩行,先是將jobGraph中的所有的JobVertex,從資料來源開始的有序拓撲節點列表,然後就是將這個有序集合轉化到executionGraph中。

1.2.1 構建有序拓撲列表

有序拓撲列表的構建邏輯在JobGraph類中,如下:

public List<JobVertex> getVerticesSortedTopologicallyFromSources() throws InvalidProgramException {
   /** 節點集合為空時,可以快速退出 */
   if (this.taskVertices.isEmpty()) {
      return Collections.emptyList();
   }

   /** 從source開始的,排好序的JobVertex列表 */
   List<JobVertex> sorted = new ArrayList<JobVertex>(this.taskVertices.size());
   /** 還沒有進入sorted集合,等待排序的JobVertex集合,初始值就是JobGraph中所有JobVertex的集合 */
   Set<JobVertex> remaining = new LinkedHashSet<JobVertex>(this.taskVertices.values());

   /** 找出資料來源節點,也就是那些沒有輸入的JobVertex,以及指向獨立資料集的JobVertex */
   {
      Iterator<JobVertex> iter = remaining.iterator();
      while (iter.hasNext()) {
         JobVertex vertex = iter.next();
         /** 如果該節點沒有任何輸入,則表示該節點是資料來源,新增到sorted集合,同時從remaining集合中移除 */
         if (vertex.hasNoConnectedInputs()) {
            sorted.add(vertex);
            iter.remove();
         }
      }
   }
   /** sorted集合中開始遍歷的起始位置,也就是從第一個元素開始遍歷 */
   int startNodePos = 0;

   /** 遍歷已經新增的節點,直到找出所有元素 */
   while (!remaining.isEmpty()) {
      /** 
       * 處理一個節點後,startNodePos就會加1,
       * 如果startNodePos大於sorted的集合中元素個數,
       * 則說明經過一次處理,並沒有找到新的JobVertex新增到sorted集合中,這表明在graph中存在了迴圈,這是不允許的
       */
      if (startNodePos >= sorted.size()) {
         throw new InvalidProgramException("The job graph is cyclic.");
      }
      /** 獲取當前要處理的JobVertex */
      JobVertex current = sorted.get(startNodePos++);
      /** 遍歷當前JobVertex的下游節點 */
      addNodesThatHaveNoNewPredecessors(current, sorted, remaining);
   }
   return sorted;
}

上述邏輯就是首先從JobGraph的所有JobVertex集合中,找出所有的source節點,然後在從這些source節點開始,依次遍歷其下游節點,當一個節點的所有輸入都已經被新增到sorted集合中時,它自身就可以新增到sorted集合中了,同時從remining集合中移除。

private void addNodesThatHaveNoNewPredecessors(JobVertex start, List<JobVertex> target, Set<JobVertex> remaining) {
   /** 遍歷start節點的所有輸出中間資料集合 */
   for (IntermediateDataSet dataSet : start.getProducedDataSets()) {
      /** 對於每個中間資料集合,遍歷其所有的輸出JobEdge */
      for (JobEdge edge : dataSet.getConsumers()) {
         /** 如果一個節點的所有輸入節點都不在"remaining"集合中,則將這個節點新增到target集合中 */

         /** 如果目標節點已經不在remaining集合中,則continue */
         JobVertex v = edge.getTarget();
         if (!remaining.contains(v)) {
            continue;
         }
         /** 一個JobVertex是否還有輸入節點在remaining集合中的標識 */
         boolean hasNewPredecessors = false;
         /**
          * 如果節點v,其所有輸入節點都已經不在remaining集合中,
          * 則說明其輸入節點都已經被新增到sorted列表,則hasNewPredecessors為false,
          * 否則hasNewPredecessors的值為true,表示節點v還有輸入節點在remaining集合中。
          */
         for (JobEdge e : v.getInputs()) {
            /** 跳過上層迴圈中遍歷到的JobEdge,也就是edge變數 */
            if (e == edge) {
               continue;
            }
            /** 只要有一個輸入還在remaining集合中,說明當前它還不能新增到target集合,直接結束這層內迴圈 */
            IntermediateDataSet source = e.getSource();
            if (remaining.contains(source.getProducer())) {
               hasNewPredecessors = true;
               break;
            }
         }
         /**
          * 如果節點v已經沒有輸入節點還在remaining集合中,則將節點v新增到sorted列表中,
          * 同時從remaining集合中刪除,
          * 然後開始遞迴遍歷節點v的下游節點。
          */
         if (!hasNewPredecessors) {
            target.add(v);
            remaining.remove(v);
            addNodesThatHaveNoNewPredecessors(v, target, remaining);
         }
      }
   }
}

對於具體的某個JobVertex的遍歷邏輯如上,詳見註釋。

1.2.2 JobVertex的轉化

在獲取了排序後的拓撲的JobVertex集合後,就可以開始將其轉換成ExecutionGraph中的ExecutionJobVertex。

public void attachJobGraph(List<JobVertex> topologiallySorted) throws JobException {
   LOG.debug("Attaching {} topologically sorted vertices to existing job graph with {} " +
         "vertices and {} intermediate results.",
         topologiallySorted.size(), tasks.size(), intermediateResults.size());

   final ArrayList<ExecutionJobVertex> newExecJobVertices = new ArrayList<>(topologiallySorted.size());
   final long createTimestamp = System.currentTimeMillis();

   /** 依次順序遍歷排好序的JobVertex集合 */
   for (JobVertex jobVertex : topologiallySorted) {
      /** 對於ExecutionGraph來說,只要有一個不能停止的輸入源JobVertex,那ExecutionGraph就是不可停止的 */
      if (jobVertex.isInputVertex() && !jobVertex.isStoppable()) {
         this.isStoppable = false;
      }
      /** 建立jobVertex對應的ExecutionJobVertex,其中的第三個構造引數1,就是預設的並行度 */
      ExecutionJobVertex ejv = new ExecutionJobVertex(
         this,
         jobVertex,
         1,
         rpcCallTimeout,
         globalModVersion,
         createTimestamp);

      /** 將新建的ExecutionJobVertex例項, 與其前置處理器建立連線 */
      ejv.connectToPredecessors(this.intermediateResults);

      /** 將構建好的ejv,記錄下來,如果發現對一個的jobVertexID已經存在一個ExecutionJobVertex,則需要拋異常 */
      ExecutionJobVertex previousTask = this.tasks.putIfAbsent(jobVertex.getID(), ejv);
      if (previousTask != null) {
         throw new JobException(String.format("Encountered two job vertices with ID %s : previous=[%s] / new=[%s]",
               jobVertex.getID(), ejv, previousTask));
      }

      /** 將這個ExecutionGraph中所有臨時結果IntermediateResult, 都儲存到intermediateResults這個map */
      for (IntermediateResult res : ejv.getProducedDataSets()) {
         IntermediateResult previousDataSet = this.intermediateResults.putIfAbsent(res.getId(), res);
         if (previousDataSet != null) {
            throw new JobException(String.format("Encountered two intermediate data set with ID %s : previous=[%s] / new=[%s]",
                  res.getId(), res, previousDataSet));
         }
      }
      /** 將ejv按建立順序記錄下來 */
      this.verticesInCreationOrder.add(ejv);
      /** 統計所有ejv的並行度 */
      this.numVerticesTotal += ejv.getParallelism();
      newExecJobVertices.add(ejv);
   }
   terminationFuture = new CompletableFuture<>();
   failoverStrategy.notifyNewVertices(newExecJobVertices);
}

上述的邏輯是比較清晰的,就是依次遍歷排好序的JobVertex集合,並構建相應的ExecutionJobVertex例項,並設定ExecutionGraph中的部分屬性。 
在ExecutionJobVertex的建構函式中,會根據並行度,構造相應的ExecutionVertex陣列,該陣列的索引就是子任務的索引號;而在ExecutionVertex的建構函式中,會構造出一個Execution例項。

2、ExecutionGraph的排程執行

在前面的準備工作都完成,ExecutionGraph也構建好之後,接下來就可以對ExecutionGraph進行排程執行。這部分的操作是比較耗時的,所以整個被包在一個futrue中進行非同步執行。

a、如果isRecovery為true,則先進行恢復操作; 
b、如果isRecovery為false,則進行checkpoint設定,並將jobGraph的相關資訊進備份操作。 
上述兩步完成之後,則會通知客戶端,job已經提交成功了。
jobInfo.notifyClients(decorateMessage(JobSubmitSuccess(jobGraph.getJobID)))
1
接下來就是判斷當前JobManager是否是leader,如果是,則開始對executionGraph進行排程執行,如果不是leader,則告訴JobManager自身,去進行remove操作,邏輯如下:

/** 根據當前JobManager是否是leader,執行不同的操作 */
if (leaderElectionService.hasLeadership) {
  log.info(s"Scheduling job $jobId ($jobName).")
  /** executionGraph進行排程執行 */
  executionGraph.scheduleForExecution()
} else {
  /** 移除這個job */
  self ! decorateMessage(RemoveJob(jobId, removeJobFromStateBackend = false))
  log.warn(s"Submitted job $jobId, but not leader. The other leader needs to recover " +
    "this. I am not scheduling the job for execution.")
}

接下里就看下executionGraph的排程執行邏輯。

public void scheduleForExecution() throws JobException {
   /** 將狀態從'CREATED’轉換為’RUNNING' */
   if (transitionState(JobStatus.CREATED, JobStatus.RUNNING)) {
      /** 根據排程模式,執行不同的排程策略 */
      switch (scheduleMode) {
         case LAZY_FROM_SOURCES:
            scheduleLazy(slotProvider);
            break;
         case EAGER:
            scheduleEager(slotProvider, scheduleAllocationTimeout);
            break;
         default:
            throw new JobException("Schedule mode is invalid.");
      }
   }
   else {
      throw new IllegalStateException("Job may only be scheduled from state " + JobStatus.CREATED);
   }
}

上述邏輯就是先將ExecutionGraph的狀態從’CREATED’轉換為’RUNNING’,狀態轉換成功,會給狀態監聽者傳送狀態變化的訊息,然後就根據排程的不同模式,進行不同的排程。排程模式分為兩種:

a、LAZY_FROM_SORUCES —— 該模式下,從source節點開始部署執行,成功後,在部署其下游節點,以此類推; 
b、EAGER —— 該模式下,所有節點同時部署執行;
這裡繼續分析’EAGER’模式下的排程。

private void scheduleEager(SlotProvider slotProvider, final Time timeout) {
   /** 走到這裡了,需要再次確認下當前的狀態是否是'RUNNING' */
   checkState(state == JobStatus.RUNNING, "job is not running currently");
   /** 標識在無法立即獲取部署資源時,是否可以將部署任務入佇列 */
   final boolean queued = allowQueuedScheduling;

   /** 用來維護所有槽位申請的future */
   final ArrayList<CompletableFuture<Execution>> allAllocationFutures = new ArrayList<>(getNumberOfExecutionJobVertices());

   /** 獲取每個ExecutionJobGraph申請槽位的future */
   for (ExecutionJobVertex ejv : getVerticesTopologically()) {
      Collection<CompletableFuture<Execution>> allocationFutures = ejv.allocateResourcesForAll(
         slotProvider,
         queued,
         LocationPreferenceConstraint.ALL);
      allAllocationFutures.addAll(allocationFutures);
   }

   /** 將上面的所有future連線成一個future,只有所有的future都成功,才算成功,否則就是失敗的 */
   final ConjunctFuture<Collection<Execution>> allAllocationsComplete = FutureUtils.combineAll(allAllocationFutures);

   /** 構建一個定時任務,用來檢查槽位分配是否超時 */
   final ScheduledFuture<?> timeoutCancelHandle = futureExecutor.schedule(new Runnable() {
      @Override
      public void run() {
         int numTotal = allAllocationsComplete.getNumFuturesTotal();
         int numComplete = allAllocationsComplete.getNumFuturesCompleted();
         String message = "Could not allocate all requires slots within timeout of " +
               timeout + ". Slots required: " + numTotal + ", slots allocated: " + numComplete;
         /** 如果超時,則以異常的方式結束分配 */
         allAllocationsComplete.completeExceptionally(new NoResourceAvailableException(message));
      }
   }, timeout.getSize(), timeout.getUnit());

   /** 根據槽位分配,進行非同步呼叫執行 */
   allAllocationsComplete.handleAsync(
      (Collection<Execution> executions, Throwable throwable) -> {
         try {
            /** 取消上面的超時檢查任務 */
            timeoutCancelHandle.cancel(false);
            if (throwable == null) {
               /** 成功後去所需槽位, 現在開始部署 */
               for (Execution execution : executions) {
                  execution.deploy();
               }
            }
            else {
               /** 丟擲異常, 讓異常控制代碼處理這個 */
               throw throwable;
            }
         }
         catch (Throwable t) {
            failGlobal(ExceptionUtils.stripCompletionException(t));
         }
         return null;
      },
      futureExecutor);
}

整個處理邏輯分為兩大步驟:

a、先進行槽位的分配,獲取分配的future; 
b、成功獲取槽位之後,進行部署,這步也是非同步的;
另外,在槽位分配上,加上了超時機制,如果達到設定時間,槽位還沒有分配好,則進行fail操作。

2.1、槽位的申請分配

槽位的申請分配邏輯如下:

public Collection<CompletableFuture<Execution>> allocateResourcesForAll(
      SlotProvider resourceProvider,
      boolean queued,
      LocationPreferenceConstraint locationPreferenceConstraint) {
   final ExecutionVertex[] vertices = this.taskVertices;
   final CompletableFuture<Execution>[] slots = new CompletableFuture[vertices.length];
   /** 為ExecutionJobVertex中的每個Execution嘗試申請一個slot,並返回future */
   for (int i = 0; i < vertices.length; i++) {
      final Execution exec = vertices[i].getCurrentExecutionAttempt();
      final CompletableFuture<Execution> allocationFuture = exec.allocateAndAssignSlotForExecution(
         resourceProvider,
         queued,
         locationPreferenceConstraint);
      slots[i] = allocationFuture;
   }
   /** 很好, 我們請求到了所有的slots */
   return Arrays.asList(slots);
}

上述邏輯就是為ExecutionJobVertex中的每個Execution申請一個slot,然後具體的申請邏輯,是放在Execution中的,繼續向下看。

public CompletableFuture<Execution> allocateAndAssignSlotForExecution(
      SlotProvider slotProvider,
      boolean queued,
      LocationPreferenceConstraint locationPreferenceConstraint) throws IllegalExecutionStateException {
   checkNotNull(slotProvider);
   /** 獲取在構建JobVertex時已經賦值好的SlotSharingGroup例項和CoLocationConstraint例項,如果有的話 */
   final SlotSharingGroup sharingGroup = vertex.getJobVertex().getSlotSharingGroup();
   final CoLocationConstraint locationConstraint = vertex.getLocationConstraint();

   /** 位置約束不為null, 而共享組為null, 這種情況是不可能出現的, 出現了肯定就是異常了 */
   if (locationConstraint != null && sharingGroup == null) {
      throw new IllegalStateException(
            "Trying to schedule with co-location constraint but without slot sharing allowed.");
   }

   /** 只有狀態是 'CREATED' 時, 這個方法才能正常工作 */
   if (transitionState(CREATED, SCHEDULED)) {
      /** ScheduleUnit 例項就是在這裡構造出來的 */
      ScheduledUnit toSchedule = locationConstraint == null ?
            new ScheduledUnit(this, sharingGroup) :
            new ScheduledUnit(this, sharingGroup, locationConstraint);

      /** 獲取當前任務分配槽位所在節點的"偏好位置集合",也就是分配時,優先考慮分配在這些節點上 */
      final CompletableFuture<Collection<TaskManagerLocation>> preferredLocationsFuture = calculatePreferredLocations(locationPreferenceConstraint);

      return preferredLocationsFuture
         .thenCompose(
            (Collection<TaskManagerLocation> preferredLocations) ->
               /** 在獲取輸入節點的位置之後,將其作為偏好位置集合,基於這些偏好位置,申請分配一個slot */
               slotProvider.allocateSlot(
                  toSchedule,
                  queued,
                  preferredLocations))
         .thenApply(
            (SimpleSlot slot) -> {
               if (tryAssignResource(slot)) {
                  /** 如果slot分配成功,則返回這個future */
                  return this;
               } else {
                  /** 釋放slot */
                  slot.releaseSlot();

                  throw new CompletionException(new FlinkException("Could not assign slot " + slot + " to execution " + this + " because it has already been assigned "));
               }
            });
   }
   else {
      throw new IllegalExecutionStateException(this, CREATED, state);
   }
}

上述的邏輯還是很清晰的,

a、將狀態從’CREATED’成功轉換成’SCHEDULED’; 
b、根據LocationPreferenceConstraint的設定,為這個Execution指定優先分配槽位所在的TaskManager; 
c、基於上述步驟獲取的偏好位置,進行slot分配; 
d、在slot分配成功後,將slot設定給當前Execution,如果設定成功,則返回相應的slot,否則是否slot,然後丟擲異常。
其中LocationPreferenceConstraint有兩種取值:

a、ALL —— 需要確認其所有的輸入都已經分配好slot,然後基於其輸入所在的TaskManager,作為其偏好位置集合; 
b、ANY —— 只考慮那些slot已經分配好的輸入所在的TaskManager,作為偏好位置集合;
某個Execution的偏好位置的計算邏輯,是先由其對應的ExecutionVertex基於所有輸入,獲取偏好位置集合,然後根據LocationPreferenceConstraint的策略不同,刪選出一個子集,作為這個Execution的偏好位置集合。 
這裡就只看下ExecutionVertex基於輸入獲取偏好集合的邏輯。

public Collection<CompletableFuture<TaskManagerLocation>> getPreferredLocationsBasedOnInputs() {
   // 如果沒有輸入,則返回空集合,否則,基於輸入連線確定偏好位置
   if (inputEdges == null) {
      return Collections.emptySet();
   }
   else {
      Set<CompletableFuture<TaskManagerLocation>> locations = new HashSet<>(getTotalNumberOfParallelSubtasks());
      Set<CompletableFuture<TaskManagerLocation>> inputLocations = new HashSet<>(getTotalNumberOfParallelSubtasks());

      // 遍歷所有inputs
      for (int i = 0; i < inputEdges.length; i++) {
         inputLocations.clear();
         ExecutionEdge[] sources = inputEdges[i];
         if (sources != null) {
            // 遍歷所有輸入源
            for (int k = 0; k < sources.length; k++) {
               // 查詢輸入源的分配slot
               CompletableFuture<TaskManagerLocation> locationFuture = sources[k].getSource().getProducer().getCurrentTaskManagerLocationFuture();
               inputLocations.add(locationFuture);
               // 如果某個輸入源有太多的節點分佈,則不考慮這個輸入源的節點位置了
               if (inputLocations.size() > MAX_DISTINCT_LOCATIONS_TO_CONSIDER) {
                  inputLocations.clear();
                  break;
               }
            }
         }
         // 保留具有最少分佈位置的輸入的位置
         if (locations.isEmpty() || // 當前還沒有分配的位置
               (!inputLocations.isEmpty() && inputLocations.size() < locations.size())) {
            // 當前的輸入具有更少的偏好位置
            locations.clear();
            locations.addAll(inputLocations);
         }
      }

      return locations.isEmpty() ? Collections.emptyList() : locations;
   }
}

邏輯拆分如下:

a、如果沒有輸入源,則返回空集合,對於資料來源節點來說,就是返回空集合; 
b、如果有輸入源,則對每個輸入源,都找出其所有分割槽所在的TaskManager的位置,如果某個輸入源的分割槽所在位置超過MAX_DISTINCT_LOCATIONS_TO_CONSIDER(預設值為8),則不考慮這個輸入源,直接跳過,然後將滿足條件的輸入源中,分割槽位置分佈做少的那個資料來源對應的TaskManager的位置集合,作為計算結果返回。
2.2、部署

在槽位分配成功後,就開始各個Execution的部署操作。

public void deploy() throws JobException {
   final SimpleSlot slot  = assignedResource;
   checkNotNull(slot, "In order to deploy the execution we first have to assign a resource via tryAssignResource.");
   /** 檢查slot是否alive */
   if (!slot.isAlive()) {
      throw new JobException("Target slot (TaskManager) for deployment is no longer alive.");
   }

   /**
    * 確保在正確的狀態的情況下進行部署呼叫
    * 注意:從 CREATED to DEPLOYING 只是用來測試的
    */
   ExecutionState previous = this.state;
   if (previous == SCHEDULED || previous == CREATED) {
      if (!transitionState(previous, DEPLOYING)) {
         /**
          * 競態條件,有人在部署呼叫上擊中我們了(其實就是衝突了)
          * 這個在真實情況下不該發生,如果發生,則說明有地方發生衝突了
          */
         throw new IllegalStateException("Cannot deploy task: Concurrent deployment call race.");
      }
   }
   else {
      // vertex 可能已經被取消了,或者已經被排程了
      throw new IllegalStateException("The vertex must be in CREATED or SCHEDULED state to be deployed. Found state " + previous);
   }

   try {
      // 很好,走到這裡,說明我們被允許部署了
      if (!slot.setExecutedVertex(this)) {
         throw new JobException("Could not assign the ExecutionVertex to the slot " + slot);
      }

      // 雙重校驗,是我們 失敗/取消 ? 我們需要釋放這個slot?
      if (this.state != DEPLOYING) {
         slot.releaseSlot();
         return;
      }

      if (LOG.isInfoEnabled()) {
         LOG.info(String.format("Deploying %s (attempt #%d) to %s", vertex.getTaskNameWithSubtaskIndex(),
               attemptNumber, getAssignedResourceLocation().getHostname()));
      }

      final TaskDeploymentDescriptor deployment = vertex.createDeploymentDescriptor(
         attemptId,
         slot,
         taskState,
         attemptNumber);

      final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();

      /** 這裡就是將task提交到{@code TaskManager}的地方 */
      final CompletableFuture<Acknowledge> submitResultFuture = taskManagerGateway.submitTask(deployment, timeout);

      /** 根據提交結果進行處理,如果提交失敗,則進行fail處理 */
      submitResultFuture.whenCompleteAsync(
         (ack, failure) -> {
            // 只處理失敗響應
            if (failure != null) {
               if (failure instanceof TimeoutException) {
                  String taskname = vertex.getTaskNameWithSubtaskIndex() + " (" + attemptId + ')';

                  markFailed(new Exception(
                     "Cannot deploy task " + taskname + " - TaskManager (" + getAssignedResourceLocation()
                        + ") not responding after a timeout of " + timeout, failure));
               } else {
                  markFailed(failure);
               }
            }
         },
         executor);
   }
   catch (Throwable t) {
      markFailed(t);
      ExceptionUtils.rethrow(t);
   }
}

上述程式碼雖然很長,但是邏輯很簡明,先是做一系列的校驗工作,然後將狀態轉換為’DEPLOYING’,然後就是TaskDeploymentDescriptor例項,然後提交給相應的TaskManager例項,這裡是非同步的,如果執行失敗,則進行fail處理。 
其中提交到TaskManager的訊息結構如下: 
JobManagerMessages.LeaderSessionMessage[TaskMessages.SubmitTask[TaskDeploymentDescriptor]]。


--------------------- 
作者:混混fly 
來源:CSDN 
原文:https://blog.csdn.net/qq_21653785/article/details/79582489 
版權宣告:本文為博主原創文章,轉載請附上博文連結!