1. 程式人生 > >Flink原始碼解析(standalone)之taskmanager啟動

Flink原始碼解析(standalone)之taskmanager啟動

1、簡單粗暴,flink-daemon.sh指令碼可知taskmanager執行類為:org.apache.flink.runtime.taskmanager.TaskManager
2、main方法裡面,最主要的就是啟動taskmanager

try {
      SecurityUtils.getInstalledContext.runSecured(new Callable[Unit] {
        override def call(): Unit = {
        //執行taskmanager,記住classOf[TaskManager],這是taksManagerActor的啟動類,生命週期方法在此類中
          selectNetworkInterfaceAndRunTaskManager(configuration, resourceId, classOf[TaskManager])
        }
      })
    }

3、selectNetworkInterfaceAndRunTaskManager裡面主要做了三件事:
a、建立高可用服務
b、給taskmanager分配主機、埠範圍
c、啟動taskmanager

  def selectNetworkInterfaceAndRunTaskManager(
      configuration: Configuration,
      resourceID: ResourceID,
      taskManagerClass: Class[_ <: TaskManager])
    : Unit = {

    val highAvailabilityServices = HighAvailabilityServicesUtils.createHighAvailabilityServices(
      configuration,
      Executors.directExecutor(),
      AddressResolution.TRY_ADDRESS_RESOLUTION)
	//選擇網路介面和埠範圍
    val (taskManagerHostname, actorSystemPortRange) = selectNetworkInterfaceAndPortRange(
      configuration,
      highAvailabilityServices)

    try {
    //啟動taksmanager
      runTaskManager(
        taskManagerHostname,
        resourceID,
        actorSystemPortRange,
        configuration,
        highAvailabilityServices,
        taskManagerClass)
    } finally {
      try {
        highAvailabilityServices.close()
      } catch {
        case t: Throwable => LOG.warn("Could not properly stop the high availability services.", t)
      }
    }
  }

4、進入runTaskManager方法,裡面主要是根據上面分配的埠範圍,找到可用的埠分配給taskmanager通訊使用,然後呼叫過載的runTaskManager方法啟動taskmanager

def runTaskManager(
    taskManagerHostname: String,
    resourceID: ResourceID,
    actorSystemPortRange: java.util.Iterator[Integer],
    configuration: Configuration,
    highAvailabilityServices: HighAvailabilityServices,
    taskManagerClass: Class[_ <: TaskManager])
    : Unit = {
	//通過建立socket,找到可用的埠
    val result = AkkaUtils.retryOnBindException({
      // Try all ports in the range until successful
      val socket = NetUtils.createSocketFromPorts(
        actorSystemPortRange,
        new NetUtils.SocketFactory {
          override def createSocket(port: Int): ServerSocket = new ServerSocket(
            // Use the correct listening address, bound ports will only be
            // detected later by Akka.
            port, 0, InetAddress.getByName(NetUtils.getWildcardIPAddress))
        })

      val port =
        if (socket == null) {
          throw new BindException(s"Unable to allocate port for TaskManager.")
        } else {
          try {
            socket.getLocalPort()
          } finally {
            socket.close()
          }
        }

      runTaskManager(
        taskManagerHostname,
        resourceID,
        port,
        configuration,
        highAvailabilityServices,
        taskManagerClass)
    }, { !actorSystemPortRange.hasNext }, 5000)

    result match {
      case scala.util.Failure(f) => throw f
      case _ =>
    }
  }

5、進入過載的runTaskManager
5.1、建立一個taskManagerActorSystem

    val taskManagerSystem = BootstrapTools.startActorSystem(
      configuration,
      taskManagerHostname,
      actorSystemPort,
      LOG.logger)

5.2、建立一個MetricRegistry,並啟動初始化服務

val metricRegistry = new MetricRegistryImpl(
      MetricRegistryConfiguration.fromConfiguration(configuration))

    metricRegistry.startQueryService(taskManagerSystem, resourceID)

5.3、啟動taskmanager元件和taskmanagerActor

val taskManager = startTaskManagerComponentsAndActor(
        configuration,
        resourceID,
        taskManagerSystem,
        highAvailabilityServices,
        metricRegistry,
        taskManagerHostname,
        Some(TaskExecutor.TASK_MANAGER_NAME),
        localTaskManagerCommunication = false,
        taskManagerClass)

5.3.1、啟動taskmanagerActor後,進入生命週期方法prestart,裡面主要就是啟動了一個檢索leader jobmanager的檢索器,因為是standalone模式,所以直接告知leader jobmanager地址

leaderRetrievalService.start(this)
//檢視StandaloneLeaderRetrievalService的start方法
public void start(LeaderRetrievalListener listener) {
		checkNotNull(listener, "Listener must not be null.");

		synchronized (startStopLock) {
			checkState(!started, "StandaloneLeaderRetrievalService can only be started once.");
			started = true;

			// 直接通知監聽器,告知leader jobmanager地址
			listener.notifyLeaderAddress(leaderAddress, leaderId);
		}
	}

5.3.2 進入taskmanager的notifyLeaderAddress方法,裡面給taskmanagerActor傳送了JobManagerLeaderAddress訊息

override def notifyLeaderAddress(leaderAddress: String, leaderSessionID: UUID): Unit = {
    self ! JobManagerLeaderAddress(leaderAddress, leaderSessionID)
  }

5.3.3 進入taskmanagerActor的handleMessage方法,找到JobManagerLeaderAddress,處理邏輯如下:
1、如果taskmanager中已儲存的有leader jobmanager地址(即已經與一個leader jobmanager保持著連線),則先與舊的leader jobmanager斷開連線
2、觸發taskmanager到jobmanager中註冊

case JobManagerLeaderAddress(address, newLeaderSessionID) =>
      handleJobManagerLeaderAddress(address, newLeaderSessionID)

private def handleJobManagerLeaderAddress(
      newJobManagerAkkaURL: String,
      leaderSessionID: UUID)
    : Unit = {

    currentJobManager match {
      case Some(jm) =>
        Option(newJobManagerAkkaURL) match {
          case Some(newJMAkkaURL) =>
          //與舊的leader jobmanager斷開連線
            handleJobManagerDisconnect(s"JobManager $newJMAkkaURL was elected as leader.")
          case None =>
            handleJobManagerDisconnect(s"Old JobManager lost its leadership.")
        }
      case None =>
    }

    this.jobManagerAkkaURL = Option(newJobManagerAkkaURL)
    this.leaderSessionID = Option(leaderSessionID)

    if (this.leaderSessionID.isDefined) {
      // 觸發taskmanager註冊
      triggerTaskManagerRegistration()
    }
  }

5.3.4 給taskmanagerActor傳送一個註冊訊息TriggerTaskManagerRegistration

      self ! decorateMessage(
        TriggerTaskManagerRegistration(
          jobManagerAkkaURL.get,
          new FiniteDuration(
            config.getInitialRegistrationPause().getSize(),
            config.getInitialRegistrationPause().getUnit()),
          deadline,
          1,
          currentRegistrationRun)
      )

5.3.5 註冊邏輯:

case message: RegistrationMessage => handleRegistrationMessage(message)


	5.3.5.1、如果已經註冊過,列印日誌
if (isConnected) {
            // this may be the case, if we queue another attempt and
            // in the meantime, the registration is acknowledged
            log.debug(
              "TaskManager was triggered to register at JobManager, but is already registered")
          } 

5.3.5.2、如果在指定直接內沒有註冊成功則放棄註冊

 else if (deadline.exists(_.isOverdue())) {
            // we failed to register in time. that means we should quit
            log.error("Failed to register at the JobManager within the defined maximum " +
                        "connect time. Shutting down ...")

            // terminate ourselves (hasta la vista)
            self ! decorateMessage(PoisonPill)
          }

5.3.5.3、向jobmanagerActor傳送註冊訊息

val jobManager = context.actorSelection(jobManagerURL)

            jobManager ! decorateMessage(
              RegisterTaskManager(
                resourceID,
                location,
                resources,
                numberOfSlots)
            )

5.3.5.3.1 jobmanagerActor收到taskmanager的註冊訊息(jobmanager.handleMessage方法中),如果resourcemanager已經在jobmanager中註冊,則通知resourcemanager在給定的資源容器中啟動taskmanager(同步通訊),如果resourcemanager啟動正常,則回一個確認該taskmanager已經資源註冊的訊息

currentResourceManager match {
        case Some(rm) =>
          val future = (rm ? decorateMessage(new NotifyResourceStarted(msg.resourceId)))(timeout)
          future.onFailure {
            case t: Throwable =>
              t match {
                case _: TimeoutException =>
                  log.info("Attempt to register resource at ResourceManager timed out. Retrying")
                case _ =>
                  log.warn("Failure while asking ResourceManager for RegisterResource. Retrying", t)
              }
              self ! decorateMessage(
                new ReconnectResourceManager(
                  rm,
                  currentResourceManagerConnectionId))
          }(context.dispatcher)

        case None =>
          log.info("Task Manager Registration but not connected to ResourceManager")
      }

5.3.5.3.2 如果已經註冊過了,則發訊息給taskmanagerActor,表示該taskmanager已經存在了

if (instanceManager.isRegistered(resourceId)) {
        val instanceID = instanceManager.getRegisteredInstance(resourceId).getId

        taskManager ! decorateMessage(
          AlreadyRegistered(
            instanceID,
            blobServer.getPort))
      }

5.3.5.3.3 如果沒有註冊過,則註冊,並返回確認註冊的訊息給taskmanagerActor

taskManager ! decorateMessage(
            AcknowledgeRegistration(instanceID, blobServer.getPort))

5.3.5.3.3.1 taskmanagerActor在接收到反饋的訊息後主要做了幾件事:
1、啟動了BLOB快取
2、監聽jobmanager,在jobmanager掛掉後能及時知道
3、啟動和jobmanager直接的心跳機制

5.3.5.3.4 監聽改註冊的taskmanagerActor,taskmanager掛掉後能及時知道

context.watch(taskManager)

5.3.5.4 定義一個指定時間後註冊的定時排程任務,防止因為網路等原因沒有註冊上,類似遞迴操作,一直到註冊成功或者超過指定的註冊截止日期放棄為止。

            val nextTimeout = (timeout * 2).min(new FiniteDuration(
              config.getMaxRegistrationPause().toMilliseconds,
              TimeUnit.MILLISECONDS))

            // schedule a check to trigger a new registration attempt if not registered
            // by the timeout
            scheduledTaskManagerRegistration = Option(context.system.scheduler.scheduleOnce(
              timeout,
              self,
              decorateMessage(TriggerTaskManagerRegistration(
                jobManagerURL,
                nextTimeout,
                deadline,
                attempt + 1,
                registrationRun)
              ))(context.dispatcher))

5.4、啟動一個taskmanagerActor監測,在taskmanagerActor掛掉後kill掉JVM程序

taskManagerSystem.actorOf(
        Props(classOf[ProcessReaper], taskManager, LOG.logger, RUNTIME_FAILURE_RETURN_CODE),
        "TaskManager_Process_Reaper")