1. 程式人生 > >【任務排程系統第三篇】:Azkaban原理介紹

【任務排程系統第三篇】:Azkaban原理介紹

寫在前面

Azkaban官網:https://azkaban.github.io/

1. azkaban簡單介紹

Azkaban是由Linkedin公司推出的一個批量工作流任務排程器,主要用於在一個工作流內以一個特定的順序執行一組工作和流程。Azkaban使用job配置檔案建立任務之間的依賴關係,並提供一個易於使用的web使用者介面維護和跟蹤你的工作流。 其Web UI介面如下圖所示。
在這裡插入圖片描述
由於我們團隊內部使用Java作為主流開發語言,並且Spark運算元之間確實存在著依賴關係。我們選擇Azkaban的原因基於以下幾點:

  1. 提供功能清晰,簡單易用的Web UI介面
  2. 提供job配置檔案快速建立任務和任務之間的依賴關係
  3. 提供模組化和可插拔的外掛機制,原生支援command、Java、Hive、Pig、Hadoop
  4. 基於Java開發,程式碼結構清晰,易於二次開發
  5. 提供了Restful介面,方面我們平臺定製化。

2. Azkaban的適用場景

實際專案中經常有這些場景:每天有一個大任務,這個大任務可以分成A,B,C,D四個小任務,A,B任務之間沒有依賴關係,C任務依賴A,B任務的結果,D任務依賴C任務的結果。一般的做法是,開兩個終端同時執行A,B,兩個都執行完了再執行C,最後再執行D。這樣的話,整個的執行過程都需要人工參加,並且得盯著各任務的進度。但是我們的很多工都是在深更半夜執行的,通過寫指令碼設定crontab執行。這樣子很不好維維護。

其實,整個過程類似於一個有向無環圖(DAG)。每個子任務相當於大任務中的一個流,任務的起點可以從沒有度的節點開始執行,任何沒有通路的節點之間可以同時執行,比如上述的A,B。總結起來的話,我們需要的就是一個工作流的排程器,而Azkaban就是能解決上述問題的一個排程器。
在這裡插入圖片描述

3. Azkaban架構

Azkaban在LinkedIn上實施,以解決Hadoop作業依賴問題。我們有工作需要按順序執行,Spark各個運算元之間有執行依賴關係,比如下一個運算元執行的資料來源依賴於上一個運算元執行產生的結果資料。最初是單一伺服器解決方案,隨著多年來Hadoop使用者數量的增加,Azkaban 已經發展成為一個更強大的解決方案。
Azkaban由三個關鍵元件構成:

  1. 關係型資料庫(MySQL)
  2. AzkabanWebServer
  3. AzkabanExecutorServer

在這裡插入圖片描述

3.1 關係型資料庫(MySQL)

Azkaban使用資料庫儲存大部分狀態,AzkabanWebServer和AzkabanExecutorServer都需要訪問資料庫。

AzkabanWebServer使用資料庫的原因如下:

  1. 專案管理:專案、專案許可權以及上傳的檔案。
  2. 執行流狀態:跟蹤執行流程以及執行程式正在執行的流程。
  3. 以前的流程/作業:通過以前的作業和流程執行以及訪問其日誌檔案進行搜尋。
  4. 計劃程式:保留計劃作業的狀態。
  5. SLA:保持所有的SLA規則

AzkabanExecutorServer使用資料庫的原因如下:

  1. 訪問專案:從資料庫檢索專案檔案。
  2. 執行流程/作業:檢索和更新正在執行的作業流的資料
  3. 日誌:將作業和工作流的輸出日誌儲存到資料庫中。
  4. 互動依賴關係:如果一個工作流在不同的執行器上執行,它將從資料庫中獲取狀態。

3.2 AzkabanWebServer

AzkabanWebServer是整個Azkaban工作流系統的主要管理者,它負責project管理、使用者登入認證、定時執行工作流、跟蹤工作流執行進度等一系列任務。同時,它還提供Web服務操作的介面,利用該介面,使用者可以使用curl或其他ajax的方式,來執行azkaban的相關操作。操作包括:使用者登入、建立project、上傳workflow、執行workflow、查詢workflow的執行進度、殺掉workflow等一系列操作,且這些操作的返回結果均是json的格式。並且Azkaban使用方便,Azkaban使用以.job為字尾名的鍵值屬性檔案來定義工作流中的各個任務,以及使用dependencies屬性來定義作業間的依賴關係鏈。這些作業檔案和關聯的程式碼最終以*.zip的方式通過Azkaban UI上傳到Web伺服器上。

3.3 AzkabanExecutorServer

以前版本的Azkaban在單個服務中具有AzkabanWebServer和AzkabanExecutorServer功能,目前Azkaban已將AzkabanExecutorServer分離成獨立的伺服器,拆分AzkabanExecutorServer的原因有如下幾點:

  1. 某個任務流失敗後,可以更方便的將其重新執行
  2. 便於Azkaban升級

AzkabanExecutorServer主要負責具體的工作流的提交、執行,可以啟動多個執行伺服器,它們通過mysql資料庫來協調任務的執行以及實現高可用性。

4. Azkaban作業流執行過程

Webserver根據記憶體中快取的各Executor的資源狀態(Webserver有一個執行緒會遍歷各個active executor,去傳送http請求獲取其資源狀態資訊快取到記憶體中),按照選擇策略(包括executor資源狀態、最近執行流個數等)選擇一個executor下發作業流;executor判斷是否設定作業粒度分配,如果未設定作業粒度分配,則在當前executor執行所有作業;如果設定了作業粒度分配,則當前節點會成為作業分配的決策者,即分配節點;分配節點從zookeeper獲取各個executor的資源狀態資訊,然後根據策略選擇一個executor分配作業;被分配到作業的executor即成為執行節點,執行作業,然後更新資料庫。

5. Azkaban架構的三種執行模式

在版本3.0中,Azkaban提供了以下三種模式:

  1. solo server mode:最簡單的模式,資料庫內建的H2資料庫,AzkabanWebServer和AzkabanExecutorServer都在一個程序中執行,任務量不大專案可以採用此模式。
  2. two server mode:資料庫為MySQL,管理伺服器和執行伺服器在不同程序,這種模式下,AzkabanWebServer和AzkabanExecutorServer互不影響。
  3. multiple executor mode:該模式下,AzkabanWebServer和AzkabanExecutorServer執行在不同主機上,且AzkabanExecutorServer可以有多個。
    大資料平臺要求其具有高可用性,所以目前我們採用的是multiple executor mode方式,分別在不同的主機上部署多個AzkabanExecutorServer以應對高併發定時任務執行的情況,從而減輕單個伺服器的壓力。 下面是叢集架構圖:
    在這裡插入圖片描述

6.核心排程概述

Azkaban WebServer需要根據Executor Server的執行狀態資訊,選擇一個合適的Executor Server來執行WorkFlow,然後會將提交到佇列中的WorkFlow排程到選定的Executor Server上執行。我們整理了與核心排程相關的各個元件,主要包括Azkaban WebServer端和Azkaban ExecutorServer端,他們之間的關係如下圖所示:
在這裡插入圖片描述
其實,從排程層面來看,Azkaban WebServer與Executor Server之間的互動方式非常簡單,是通過REST API的方式來進行互動,基本的模式是,Azkaban WebServer根據排程的需要,主動呼叫Executor Server暴露的REST API來獲取相應的資源資訊,比如Executor Server的狀態資訊、分配WorkFlow到指定Executor Server上執行,等等。

我們可以在QueueProcessorThread.selectExecutorAndDispatchFlow()方法中看到,選擇Executor Server並進行排程的實現,程式碼片段如下所示:

final Executor selectedExecutor = selectExecutor(exflow, availableExecutors);
if (selectedExecutor != null) {
  try {
    dispatch(reference, exflow, selectedExecutor);
    ExecutorManager.this.commonMetrics.markDispatchSuccess();
  } catch (final ExecutorManagerException e) {
    ExecutorManager.this.commonMetrics.markDispatchFail();
    logger.warn(String.format(
        "Executor %s responded with exception for exec: %d",
        selectedExecutor, exflow.getExecutionId()), e);
    handleDispatchExceptionCase(reference, exflow, selectedExecutor,
        availableExecutors);
  }
}

QueueProcessorThread是執行在Azkaban WebServer端的一個執行緒,它在ExecutorManager中定義,是內部排程中最核心的執行緒。

selectExecutor()方法處理如何選擇一個合適的Executor Server,然後通過dispatch()方法將需要執行的WorkFlow排程到該Executor Server上執行。

選擇Executor Server

Azkaban WebServer選擇Executor,呼叫selectExecutor()方法,實現如下所示:

private Executor selectExecutor(final ExecutableFlow exflow,
    final Set<Executor> availableExecutors) {
  Executor choosenExecutor =
      getUserSpecifiedExecutor(exflow.getExecutionOptions(),
          exflow.getExecutionId());
 
  // If no executor was specified by admin
  if (choosenExecutor == null) {
    logger.info("Using dispatcher for execution id :"
        + exflow.getExecutionId());
    final ExecutorSelector selector = new ExecutorSelector(ExecutorManager.this.filterList,
        ExecutorManager.this.comparatorWeightsMap);
    choosenExecutor = selector.getBest(availableExecutors, exflow);
  }
  return choosenExecutor;
}

首先,檢視當前exflow的配置中,是否要求將該exflow排程到指定的Executor Server上執行,如果是的話,則會返回該指定的Executor Server的資訊,後續直接排程到該Executor Server上;否則會按照一定的計算規則去選出一個Executor Server。

在建立ExecutorSelector時,傳入引數值ExecutorManager.this.filterList,該filterList是從azkanban.properties檔案中讀取azkaban.executorselector.filters的配置值,並建立了一個ExecutorFilter物件,而該物件中包含了一組FactorFilter,後面我們會說明。

使用ExecutorSelector來選出一個Executor Server,具體選擇的邏輯,我們可以檢視ExecutorSelector.getBest()方法。

首先通過定義的CandidateFilter(它是一個抽象類,具體實現類為ExecutorFilter)進行預篩選:

for (final K candidateInfo : candidateList) {
  if (this.filter.filterTarget(candidateInfo, dispatchingObject)) {
    filteredList.add(candidateInfo);
  }
}

上面的filter就是FactorFilter類的例項,Azkaban內部定義瞭如下3種:

private static final String STATICREMAININGFLOWSIZE_FILTER_NAME = "StaticRemainingFlowSize";
private static final String MINIMUMFREEMEMORY_FILTER_NAME = "MinimumFreeMemory";
private static final String CPUSTATUS_FILTER_NAME = "CpuStatus";

目前3.40.0版本不支援自定義,只能使用內建實現的,如果需要增加新的FactorFilter,可以在此基礎上做一個簡單改造,配置使用自己定義的FactorFilter實現。FactorFilter是一個泛型類:FactorFilter<Executor, ExecutableFlow>,根據上面定義的3種指標對Executor Server進行一個預過濾,滿足要求的會進行後面的比較,加入到排程WorkFlow執行的Executor Server的候選集中。

然後,通過如下方式進行比較排序,選擇合適的Executor Server:

// final work - find the best candidate from the filtered list.
final K executor = Collections.max(filteredList, this.comparator);
logger.debug(String.format("candidate selected %s",
    null == executor ? "(null)" : executor.toString()));
return executor;

這裡關鍵的就是this.comparator,它有一個實現類ExecutorComparator,該類中給出了需要對兩個Executor Server的哪些指標進行綜合比較,亦即一組比較器的定義,可以看到目前考慮了4種比較器:

private static final String NUMOFASSIGNEDFLOW_COMPARATOR_NAME = "NumberOfAssignedFlowComparator";
private static final String MEMORY_COMPARATOR_NAME = "Memory";
private static final String LSTDISPATCHED_COMPARATOR_NAME = "LastDispatched";
private static final String CPUUSAGE_COMPARATOR_NAME = "CpuUsage";

通過上面程式碼可以看出,在選擇排程一個WorkFlow到Azkaban叢集中的某個Executor Server時,需要比較Executor Server的如下4個指標:

  1. 能夠執行WorkFlow的剩餘容量,數值越大越優先
  2. 剩餘記憶體用量,數值越大越優先
  3. 最近分配Flow的時間,數值越大越優先
  4. CPU使用用量,數值越小越優先

基於上面4個指標,建立了4個比較器,使用FactorComparator來表示,對需要比較的一組Executor Server,使用這4個比較器進行比較,通過加權後得到一個得分值,根據該得分值選定Executor Server,核心邏輯如下所示:

final Collection<FactorComparator<T>> comparatorList = this.factorComparatorList.values();
for (final FactorComparator<T> comparator : comparatorList) {
  final int result = comparator.compare(object1, object2);
  result1 = result1 + (result > 0 ? comparator.getWeight() : 0);
  result2 = result2 + (result < 0 ? comparator.getWeight() : 0);
  logger.debug(String.format("[Factor: %s] compare result : %s (current score %s vs %s)",
      comparator.getFactorName(), result, result1, result2));
}

上面選取了待比較的兩個Executor Server都不為空的情況,分別遍歷每個FactorComparator進行比較,在分別對每個Executor Server的比較結果值進行累加求和,加權得到一個分數值。從一組Executor Server中,根據最終比較的分數值,分數值最大的Executor Server為最終選定的Executor Server。

獲取Executor Server的執行統計資訊

在Azkaban WebServer內部,會維護叢集中每個Executor Server的執行狀態資訊,該資訊的獲取是在QueueProcessorThread執行緒中實現的,定期去更新所維護的Executor Server的執行狀態資訊,如下所示:

if (currentTime - lastExecutorRefreshTime > activeExecutorsRefreshWindow
    || currentContinuousFlowProcessed >= maxContinuousFlowProcessed) {
  // Refresh executorInfo for all activeExecutors
  refreshExecutors();
  lastExecutorRefreshTime = currentTime;
  currentContinuousFlowProcessed = 0;
}

上面refreshExecutors()方法遍歷記憶體中維護的所有的Executor Server,呼叫每個Executor Server的/serverStatistics介面,拉取Executor Server的執行狀態資訊。

另外,Azkaban WebServer還需要能夠獲取到各個Executor Server上執行的WorkFlow的狀態資訊,可以在ExecutorManager.ExecutingManagerUpdaterThread中看到實現,程式碼片段如下所示:

results =
    ExecutorManager.this.apiGateway.callWithExecutionId(executor.getHost(),
        executor.getPort(), ConnectorParams.UPDATE_ACTION,
        null, null, executionIds, updateTimes);

上面呼叫Executor Server的/executor?action=update介面來拉取WorkFlow的狀態資訊,然後更新記憶體中維護的狀態資訊資料結構。其中,有些WorkFlow可能已經執行完成,需要釋放資源;有些WorkFlow狀態發生變更,也需要更新Azkaban WebServer端記憶體中維護的資料結構。

排程WorkFlow到Executor Server上執行

上面已經選定Executor Server,結合前面程式碼,是通過呼叫ExecutorManager.dispatch()方法來實現,排程WorkFlow到該選定的Executor Server上執行,程式碼片段如下所示:

try {
  this.apiGateway.callWithExecutable(exflow, choosenExecutor,
      ConnectorParams.EXECUTE_ACTION);
} catch (final ExecutorManagerException ex) {
  logger.error("Rolling back executor assignment for execution id:"
      + exflow.getExecutionId(), ex);
  this.executorLoader.unassignExecutor(exflow.getExecutionId());
  throw new ExecutorManagerException(ex);
}

通過跟蹤檢視apiGateway.callWithExecutable()實現,可以看到,最終是呼叫了Executor Server端的一個REST API介面:/executor,然後帶上相關的請求引數,如action=execute、execId等。

Executor Server執行WorkFlow

很顯然,Azkaban WebServer排程WorkFlow後,Executor Server在ExecutorServlet中接收到對應的請求,核心方法如下所示:

private void handleAjaxExecute(final HttpServletRequest req,
    final Map<String, Object> respMap, final int execId) throws ServletException {
  try {
    this.flowRunnerManager.submitFlow(execId);
  } catch (final ExecutorManagerException e) {
    e.printStackTrace();
    logger.error(e.getMessage(), e);
    respMap.put(RESPONSE_ERROR, e.getMessage());
  }
}

在收到Azkaban WebServer的排程請求後,Executor Server使用內部的FlowRunnerManager來提交WorkFlow執行。在這個過程中,首先使用ExecutorLoader從資料庫中讀取WorkFlow對應的資訊;然後使用FlowPreparer進行初始化,建立對應的資料目錄等;最後建立FlowRunner來執行WorkFlow,並跟蹤其執行狀態。

參考

  1. https://azkaban.github.io/
  2. http://shiyanjun.cn/archives/1820.html