1. 程式人生 > >Flink原始碼系列——TaskManager處理SubmitTask的過程

Flink原始碼系列——TaskManager處理SubmitTask的過程

《Flink原始碼系列——JobManager處理SubmitJob的過程》,在從JobManager中,將SubmitTask提交到TaskManager後,繼續分析TaskManager的處理邏輯。
TaskManager是個Actor,混入了LeaderSessionMessageFilter這個trait,所以在從JobManager接收到JobManagerMessages.LeaderSessionMessage[TaskMessages.SubmitTask[TaskDeploymentDescriptor]]這樣的一個封裝訊息後,會先在LeaderSessionMessageFilter這個trait的receive方法中,進行訊息的過濾,過濾邏輯如下:

abstract override def receive: Receive = {
  case leaderMessage @ LeaderSessionMessage(msgID, msg) =>
    leaderSessionID match {
      case Some(leaderId) =>
        if (leaderId.equals(msgID)) {
          super.receive(msg)
        } else {
          handleDiscardedMessage(leaderId, leaderMessage)
        }
      case
None => handleNoLeaderId(leaderMessage) } case msg: RequiresLeaderSessionID => throw new Exception(s"Received a message $msg without a leader session ID, even though" + s" the message requires a leader session ID.") case msg => super.receive(msg) }

邏輯拆分如下:

a、接收到的是一個LeaderSessionMessage訊息

a.1、當前TaskManager中有leaderSessionID

a.1.1、TaskManager所屬的JobManager的sessionID和訊息中的sessionID相同,則呼叫父類的receive方法
a.1.2、兩個sessionID不同,則說明是一個過期訊息,忽視該訊息

a.2、當前TaskManager沒有leaderSessionID,則列印個日誌,不做任何處理

b、接收到的是一個RequiresLeaderSessionID訊息,說明訊息需要leaderSessionID,但其又沒有封裝在LeaderSessionMessage中,屬於異常情況,丟擲異常

c、其他訊息,呼叫父類的receive方法

對於從JobManager接收到的上述訊息,經過上述處理邏輯後,就變成TaskMessages.SubmitTask[TaskDeploymentDescriptor],並作為handleMessage方法的入參,SubmitTask是TaskMessage的子類,所以在handleMessage中的處理邏輯如下:

override def handleMessage: Receive = {
  ...

  case message: TaskMessage => handleTaskMessage(message)

  ...
}

然後會就進入handleTaskMessage方法,如下:

private def handleTaskMessage(message: TaskMessage): Unit = {
    ...

    case SubmitTask(tdd) => submitTask(tdd)

    ...
}

經過上述兩步轉化後,就會進入submitTask方法中,且入參就是TaskDeploymentDescriptor。

submitTask()方法的程式碼很長,但是邏輯不復雜,分塊說明如下:

/** 獲取當前JobManager的actor */
val jobManagerActor = currentJobManager match {
  case Some(jm) => jm
  case None =>
    throw new IllegalStateException("TaskManager is not associated with a JobManager.")
}

/** 獲取library快取管理器 */
val libCache = libraryCacheManager match {
  case Some(manager) => manager
  case None => throw new IllegalStateException("There is no valid library cache manager.")
}

/** 獲取blobCache */
val blobCache = this.blobCache match {
  case Some(manager) => manager
  case None => throw new IllegalStateException("There is no valid BLOB cache.")
}

/** 槽位編號校驗 */
val slot = tdd.getTargetSlotNumber
if (slot < 0 || slot >= numberOfSlots) {
  throw new IllegalArgumentException(s"Target slot $slot does not exist on TaskManager.")
}

/** 獲取一些連結相關 */
val (checkpointResponder,
  partitionStateChecker,
  resultPartitionConsumableNotifier,
  taskManagerConnection) = connectionUtils match {
  case Some(x) => x
  case None => throw new IllegalStateException("The connection utils have not been " +
                                                 "initialized.")
}

這部分邏輯就是獲取一些處理控制代碼,如果獲取不到,則丟擲異常,並校驗當前任務的槽位編號是否在有效範圍,以及一些連結資訊。

/** 構建JobManager的gateway */
val jobManagerGateway = new AkkaActorGateway(jobManagerActor, leaderSessionID.orNull)

/** 部分資料可能由於量較大,不方便通過rpc傳輸,會先持久化,然後在這裡再載入回來 */
try {
  tdd.loadBigData(blobCache.getPermanentBlobService);
} catch {
  case e @ (_: IOException | _: ClassNotFoundException) =>
    throw new IOException("Could not deserialize the job information.", e)
}

/** 獲取jobInformation */
val jobInformation = try {
  tdd.getSerializedJobInformation.deserializeValue(getClass.getClassLoader)
} catch {
  case e @ (_: IOException | _: ClassNotFoundException) =>
    throw new IOException("Could not deserialize the job information.", e)
}

/** 校驗jobID資訊 */
if (tdd.getJobId != jobInformation.getJobId) {
  throw new IOException(
    "Inconsistent job ID information inside TaskDeploymentDescriptor (" +
    tdd.getJobId + " vs. " + jobInformation.getJobId + ")")
}

/** 獲取taskInformation */
val taskInformation = try {
  tdd.getSerializedTaskInformation.deserializeValue(getClass.getClassLoader)
} catch {
  case [email protected](_: IOException | _: ClassNotFoundException) =>
    throw new IOException("Could not deserialize the job vertex information.", e)
}

/** 統計相關 */
val taskMetricGroup = taskManagerMetricGroup.addTaskForJob(
  jobInformation.getJobId,
  jobInformation.getJobName,
  taskInformation.getJobVertexId,
  tdd.getExecutionAttemptId,
  taskInformation.getTaskName,
  tdd.getSubtaskIndex,
  tdd.getAttemptNumber)

val inputSplitProvider = new TaskInputSplitProvider(
  jobManagerGateway,
  jobInformation.getJobId,
  taskInformation.getJobVertexId,
  tdd.getExecutionAttemptId,
  new FiniteDuration(
    config.getTimeout().getSize(),
    config.getTimeout().getUnit()))

/** 構建task */
val task = new Task(
  jobInformation,
  taskInformation,
  tdd.getExecutionAttemptId,
  tdd.getAllocationId,
  tdd.getSubtaskIndex,
  tdd.getAttemptNumber,
  tdd.getProducedPartitions,
  tdd.getInputGates,
  tdd.getTargetSlotNumber,
  tdd.getTaskStateHandles,
  memoryManager,
  ioManager,
  network,
  bcVarManager,
  taskManagerConnection,
  inputSplitProvider,
  checkpointResponder,
  blobCache,
  libCache,
  fileCache,
  config,
  taskMetricGroup,
  resultPartitionConsumableNotifier,
  partitionStateChecker,
  context.dispatcher)

log.info(s"Received task ${task.getTaskInfo.getTaskNameWithSubtasks()}")

上述邏輯還是在獲取各種資料,主要的目的根據以上獲取的變數,構建一個Task例項。

val execId = tdd.getExecutionAttemptId
// 將task新增到map
val prevTask = runningTasks.put(execId, task)
if (prevTask != null) {
  // 對於ID已經存在一個task,則恢復回來,並報告一個錯誤
  runningTasks.put(execId, prevTask)
  throw new IllegalStateException("TaskManager already contains a task for id " + execId)
}

// 一切都好,我們啟動task,讓它開始自己的初始化
task.startTaskThread()

sender ! decorateMessage(Acknowledge.get())

這裡的邏輯就是將新建的task加入到runningTasks這個map中,如果發現相同execID,已經存在執行的task,則先回滾,然後丟擲異常。
一切都執行順利的話,則啟動task,並給sender傳送一個ack訊息。

task的啟動,就是執行Task例項中的executingThread這個變量表示的執行緒。

public void startTaskThread() {
   executingThread.start();
}

而executingThread這個變數的初始化是在Task的建構函式的最後進行的。

executingThread = new Thread(TASK_THREADS_GROUP, this, taskNameWithSubtask);

並且將Task例項自身作為其執行物件,而Task實現了Runnable介面,所以最後就是執行Task中的run()方法。
run方法的邏輯,先是進行狀態的初始化,就是進入一個while迴圈,根據當前狀態,執行不同的操作,有可能正常退出迴圈,進行向下執行,有可能直接reture,有可能丟擲異常,邏輯如下:

while (true) {
   ExecutionState current = this.executionState;
   if (current == ExecutionState.CREATED) {
      /** 如果是CREATED狀態, 則先將狀態轉換為DEPLOYING, 然後退出迴圈 */
      if (transitionState(ExecutionState.CREATED, ExecutionState.DEPLOYING)) {
         /** 如果成功, 則說明我們可以開始啟動我們的work了 */
         break;
      }
   }
   else if (current == ExecutionState.FAILED) {
      /** 如果當前狀態是FAILED, 則立即執行失敗操作, 告訴TaskManager, 我們已經到達最終狀態了, 然後直接返回 */
      notifyFinalState();
      if (metrics != null) {
         metrics.close();
      }
      return;
   }
   else if (current == ExecutionState.CANCELING) {
      if (transitionState(ExecutionState.CANCELING, ExecutionState.CANCELED)) {
         /** 如果是CANCELING狀態, 則告訴TaskManager, 我們到達最終狀態了, 然後直接返回 */
         notifyFinalState();
         if (metrics != null) {
            metrics.close();
         }
         return;
      }
   }
   else {
      /** 如果是其他狀態, 則丟擲異常 */
      if (metrics != null) {
         metrics.close();
      }
      throw new IllegalStateException("Invalid state for beginning of operation of task " + this + '.');
   }
}

當從這個while迴圈正常退出後,繼續向下執行,就是一個try-catch-finally的結構。

這裡主要分析一下try塊中的邏輯。

1、任務引導

// activate safety net for task thread
LOG.info("Creating FileSystem stream leak safety net for task {}", this);
FileSystemSafetyNet.initializeSafetyNetForThread();

blobService.getPermanentBlobService().registerJob(jobId);

/**
 * 首先, 獲取一個 user-code 類載入器
 * 這可能涉及下載作業的JAR檔案和/或類。
 */
LOG.info("Loading JAR files for task {}.", this);

userCodeClassLoader = createUserCodeClassloader();
final ExecutionConfig executionConfig = serializedExecutionConfig.deserializeValue(userCodeClassLoader);

if (executionConfig.getTaskCancellationInterval() >= 0) {
   /** 嘗試取消task時, 兩次嘗試之間的時間間隔, 單位毫秒 */
   taskCancellationInterval = executionConfig.getTaskCancellationInterval();
}

if (executionConfig.getTaskCancellationTimeout() >= 0) {
   /** 取消任務的超時時間, 可以在flink的配置中覆蓋 */
   taskCancellationTimeout = executionConfig.getTaskCancellationTimeout();
}

/**
 * 例項化AbstractInvokable的具體子類
 * {@see StreamGraph#addOperator}
 * {@see StoppableSourceStreamTask}
 * {@see SourceStreamTask}
 * {@see OneInputStreamTask}
 */
invokable = loadAndInstantiateInvokable(userCodeClassLoader, nameOfInvokableClass);

/** 如果當前狀態'CANCELING'、'CANCELED'、'FAILED', 則丟擲異常 */
if (isCanceledOrFailed()) {
   throw new CancelTaskException();
}

這部分就是載入jar包,超時時間等獲取,然後例項化AbstractInvokable的一個具體子類,目前主要是StoppableSourceStreamTask、SourceStreamTask、OneInputStreamTask 這三個子類。
並且會對狀態進行檢查,如果處於’CANCELING’、’CANCELED’、’FAILED’其中的一個狀態,則丟擲CancelTaskException異常。

2、相關注冊

LOG.info("Registering task at network: {}.", this);

network.registerTask(this);

// add metrics for buffers
this.metrics.getIOMetricGroup().initializeBufferMetrics(this);

// register detailed network metrics, if configured
if (taskManagerConfig.getConfiguration().getBoolean(TaskManagerOptions.NETWORK_DETAILED_METRICS)) {
   // similar to MetricUtils.instantiateNetworkMetrics() but inside this IOMetricGroup
   MetricGroup networkGroup = this.metrics.getIOMetricGroup().addGroup("Network");
   MetricGroup outputGroup = networkGroup.addGroup("Output");
   MetricGroup inputGroup = networkGroup.addGroup("Input");

   // output metrics
   for (int i = 0; i < producedPartitions.length; i++) {
      ResultPartitionMetrics.registerQueueLengthMetrics(
         outputGroup.addGroup(i), producedPartitions[i]);
   }

   for (int i = 0; i < inputGates.length; i++) {
      InputGateMetrics.registerQueueLengthMetrics(
         inputGroup.addGroup(i), inputGates[i]);
   }
}

/** 接下來, 啟動為分散式快取進行檔案的後臺拷貝 */
try {
   for (Map.Entry<String, DistributedCache.DistributedCacheEntry> entry :
         DistributedCache.readFileInfoFromConfig(jobConfiguration))
   {
      LOG.info("Obtaining local cache file for '{}'.", entry.getKey());
      Future<Path> cp = fileCache.createTmpFile(entry.getKey(), entry.getValue(), jobId);
      distributedCacheEntries.put(entry.getKey(), cp);
   }
}
catch (Exception e) {
   throw new Exception(
      String.format("Exception while adding files to distributed cache of task %s (%s).", taskNameWithSubtask, executionId),
      e);
}

/** 再次校驗狀態 */
if (isCanceledOrFailed()) {
   throw new CancelTaskException();
}

這裡最後,也會進行狀態校驗,以便可以快速執行取消操作。

3、使用者程式碼初始化

TaskKvStateRegistry kvStateRegistry = network
      .createKvStateTaskRegistry(jobId, getJobVertexId());

Environment env = new RuntimeEnvironment(
   jobId, vertexId, executionId, executionConfig, taskInfo,
   jobConfiguration, taskConfiguration, userCodeClassLoader,
   memoryManager, ioManager, broadcastVariableManager,
   accumulatorRegistry, kvStateRegistry, inputSplitProvider,
   distributedCacheEntries, writers, inputGates,
   checkpointResponder, taskManagerConfig, metrics, this);

/** 讓task程式碼建立它的readers和writers */
invokable.setEnvironment(env);

// the very last thing before the actual execution starts running is to inject
// the state into the task. the state is non-empty if this is an execution
// of a task that failed but had backuped state from a checkpoint

if (null != taskStateHandles) {
   if (invokable instanceof StatefulTask) {
      StatefulTask op = (StatefulTask) invokable;
      op.setInitialState(taskStateHandles);
   } else {
      throw new IllegalStateException("Found operator state for a non-stateful task invokable");
   }
   // be memory and GC friendly - since the code stays in invoke() for a potentially long time,
   // we clear the reference to the state handle
   //noinspection UnusedAssignment
   taskStateHandles = null;
}

4、真正執行

/** 在我們將狀態切換到'RUNNING'狀態時, 我們可以方法cancel方法 */
this.invokable = invokable;

/** 將狀態從'DEPLOYING'切換到'RUNNING', 如果失敗, 已經是在同一時間, 發生了 canceled/failed 操作。 */
if (!transitionState(ExecutionState.DEPLOYING, ExecutionState.RUNNING)) {
   throw new CancelTaskException();
}

/** 告訴每個人, 我們切換到'RUNNING'狀態了 */
notifyObservers(ExecutionState.RUNNING, null);
taskManagerActions.updateTaskExecutionState(new TaskExecutionState(jobId, executionId, ExecutionState.RUNNING));

/** 設定執行緒上下文類載入器 */
executingThread.setContextClassLoader(userCodeClassLoader);

/** run,這裡就是真正開始執行處理邏輯的地方 */
invokable.invoke();

/** 確保, 如果task由於被取消而退出了invoke()方法, 我們可以進入catch邏輯塊 */
if (isCanceledOrFailed()) {
   throw new CancelTaskException();
}

其中的 invokable.invoke() 這句程式碼就是真正邏輯開始執行的地方,且一般會阻塞在這裡,直至任務執行完成,或者被取消,發生異常等。

5、結尾

/** 完成生產資料分割槽。如果這裡失敗, 我們也任務執行失敗 */
for (ResultPartition partition : producedPartitions) {
   if (partition != null) {
      partition.finish();
   }
}

/**
 * 嘗試將狀態從'RUNNING'修改為'FINISHED'
 * 如果失敗, 那麼task是同一時間被執行了 canceled/failed 操作
 */
if (transitionState(ExecutionState.RUNNING, ExecutionState.FINISHED)) {
   notifyObservers(ExecutionState.FINISHED, null);
}
else {
   throw new CancelTaskException();
}

這裡就是做收尾操作,以及把狀態從’RUNNING’轉換為’FINISHED’,並通知相關觀察者。

相關推薦

Flink原始碼系列——TaskManager處理SubmitTask過程

接《Flink原始碼系列——JobManager處理SubmitJob的過程》,在從JobManager中,將SubmitTask提交到TaskManager後,繼續分析TaskManager的處理邏輯。 TaskManager是個Actor,混入了Leade

Flink原始碼系列——JobManager處理SubmitJob的過程

接《Flink原始碼系列——獲取JobGraph的過程》,在獲取到JobGraph後,客戶端會封裝一個SubmitJob訊息,並將其提交給JobManager,本文就接著分析,JobManager在收到SubmitJob訊息後,對其處理邏輯。JobManager是一個Acto

Flink 原始碼解析 —— TaskManager 處理 SubmitJob 的過程

TaskManager 處理 SubmitJob 的過程 https://t.zsxq.com/eu7mQZj 部落格 1、Flink 從0到1學習 —— Apache Flink 介紹 2、Flink 從0到1學習 —— Mac 上搭建 Flink 1.6.0 環境並構建執行簡單程式入門 3、Flink

Flink原始碼系列——獲取StreamGraph的過程

接著《Flink原始碼系列——一個簡單的資料處理功能的實現過程》一文的結尾繼續分析,在完成對資料流的轉換操作之後,需要執行任務,這時會呼叫如下程式碼: env.execute("Socket Window WordCount"); 在StreamExecutionEnvir

Flink原始碼系列——獲取JobGraph的過程

接《Flink原始碼系列——獲取StreamGraph的過程》獲取到StreamGraph後,繼續分析,如果通過獲取到的StreamGraph來轉化為JobGraph。轉化邏輯在StreamingJobGraphGenerator這個類中,入口是createJobGraph(

Flink原始碼系列——Flink中一個簡單的資料處理功能的實現過程

在Flink中,實現從指定主機名和埠接收字串訊息,對接收到的字串中出現的各個單詞,每隔1秒鐘就輸出最近5秒內出現的各個單詞的統計次數。 程式碼實現如下: public class SocketWindowWordCount {     public static void

SpringMVC原始碼--控制器Handler處理請求過程

      DispatcherServlet類的doDispatch()方法中,真正去處理請求的關鍵步驟是:            HandlerAdapter ha = getHandlerAdapter(mappedHandler.getHandler());  

Flink原始碼系列——指標監測

1、Metric簡介 Flink對於指標監測有一套自己的實現,指標的統計方式有四種,這些指標都實現了Metric這個介面,而Metric這個介面只是一個標識,本身並沒有定義如何方法介面,部分子類的繼承關係如下所示。 從圖中可以看出,Metric這個介面有

Flink原始碼解析(standalone)之taskmanager啟動

1、簡單粗暴,flink-daemon.sh指令碼可知taskmanager執行類為:org.apache.flink.runtime.taskmanager.TaskManager 2、main方法裡面,最主要的就是啟動taskmanager try {

Spark原始碼系列(九)Spark SQL初體驗之解析過程詳解

首先宣告一下這個版本的程式碼是1.1的,之前講的都是1.0的。 Spark支援兩種模式,一種是在spark裡面直接寫sql,可以通過sql來查詢物件,類似.net的LINQ一樣,另外一種支援hive的HQL。不管是哪種方式,下面提到的步驟都會有,不同的是具體的執行過程。下面

雲星資料---Apache Flink實戰系列(精品版)】:Flink處理API詳解與程式設計實戰002-Flink基於流的wordcount示例002

三、基於socket的wordcount 1.傳送資料 1.傳送資料命令 nc -lk 9999 2.傳送資料內容 good good study day day

【雲星資料---Apache Flink實戰系列(精品版)】:Apache Flink實戰基礎002--flink特性:流處理特性介紹

第二部分:flink的特性 一、流處理特性 1.高吞吐,低延時 有圖有真相,有比較有差距。且看下圖: 1.flink的吞吐量大 2.flink的延時低 3.flink的配置少

zookeeper原始碼 — 五、處理寫請求過程

目錄 處理寫請求總體過程 客戶端發起寫請求 follower和leader互動過程 follower傳送請求給客戶端 處理寫請求總體過程 zk為了保證分散式資料一致性,使用ZAB協議,在客戶端發起一次寫請求的時候時候,假設該請求請求到的是follower,follower不會直接處理這個請求,而是轉發給l

Flink 原始碼解析 —— Flink TaskManager 有什麼作用?

TaskManager 有什麼作用 <!--more--> https://t.zsxq.com/RZbu7yN 部落

【Yii系列處理請求

入口 實現 官方 cookie this sender att 只需要 ota 緣起 這一章是Yii系列的第三章,前兩章給大夥講解了Yii2.0的安裝與Yii2.0的基本框架及基礎概念,傳送門: 【Yii2.0的安裝與調試】:http://www.cnblogs.com/r

SQL系列學習 存儲過程&事物語法

bsp ima unique reat 學習 tab soft 很多 存儲 /*學習事物基本語法*/ /*增加課室名的唯一索引*/ALTER table class add constraint uni_ClassName unique(name) /*創建存儲過程,其

Linux發行版CentOS系列系統的安裝過程

CentOS系列系統安裝步驟Linux系統CentOS發行版的安裝流程: 內核空間的引導啟動過程:POST(加電自檢) --> BootSequence(BIOS) 【MBR引導,順序啟動階段BootSequence】--> BootLoader(GRUB(stage1--stage1_5--st

Http請求處理整個過程

admin 轉發 速度 客戶端 OS 有效 施工 功能實現 。net 一,服務器接受http請求的實際處理過程 二,當客戶端將請求通過網絡傳送到服務器時,HTTP.SYS會在內核模式下實時監聽當前的http請求。Http.sys功能如下描述:

一次服務器被挖礦的處理解決過程

amp 命令 刪除 root密碼 pos 服務器 exc 感染 oot 內網一臺服務器cpu爆滿,第6感猜測中了挖礦病毒,以下為cpu爆滿監控圖表趕緊ssh進系統,top了下,一個./x3e536747 進程占用了大量的cpu,cpu load average超過了cpu內

Spark原始碼系列:RDD repartition、coalesce 對比

在上一篇文章中 Spark原始碼系列:DataFrame repartition、coalesce 對比 對DataFrame的repartition、coalesce進行了對比,在這篇文章中,將會對RDD的repartition、coalesce進行對比。 RDD重新分割槽的手段與Da