一、Flow 2.0 簡介

1.1 Flow 2.0 的產生

Azkaban 目前同時支援 Flow 1.0 和 Flow2.0 ,但是官方文件上更推薦使用 Flow 2.0,因為 Flow 1.0 會在將來的版本被移除。Flow 2.0 的主要設計思想是提供 1.0 所沒有的流級定義。使用者可以將屬於給定流的所有 job / properties 檔案合併到單個流定義檔案中,其內容採用 YAML 語法進行定義,同時還支援在流中再定義流,稱為為嵌入流或子流。

1.2 基本結構

專案 zip 將包含多個流 YAML 檔案,一個專案 YAML 檔案以及可選庫和原始碼。Flow YAML 檔案的基本結構如下:

  • 每個 Flow 都在單個 YAML 檔案中定義;
  • 流檔案以流名稱命名,如:my-flow-name.flow
  • 包含 DAG 中的所有節點;
  • 每個節點可以是作業或流程;
  • 每個節點 可以擁有 name, type, config, dependsOn 和 nodes sections 等屬性;
  • 通過列出 dependsOn 列表中的父節點來指定節點依賴性;
  • 包含與流相關的其他配置;
  • 當前 properties 檔案中流的所有常見屬性都將遷移到每個流 YAML 檔案中的 config 部分。

官方提供了一個比較完善的配置樣例,如下:

config:
  user.to.proxy: azktest
  param.hadoopOutData: /tmp/wordcounthadoopout
  param.inData: /tmp/wordcountpigin
  param.outData: /tmp/wordcountpigout

# This section defines the list of jobs
# A node can be a job or a flow
# In this example, all nodes are jobs
nodes:
 # Job definition
 # The job definition is like a YAMLified version of properties file
 # with one major difference. All custom properties are now clubbed together
 # in a config section in the definition.
 # The first line describes the name of the job
 - name: AZTest
   type: noop
   # The dependsOn section contains the list of parent nodes the current
   # node depends on
   dependsOn:
     - hadoopWC1
     - NoOpTest1
     - hive2
     - java1
     - jobCommand2

 - name: pigWordCount1
   type: pig
   # The config section contains custom arguments or parameters which are
   # required by the job
   config:
     pig.script: src/main/pig/wordCountText.pig

 - name: hadoopWC1
   type: hadoopJava
   dependsOn:
     - pigWordCount1
   config:
     classpath: ./*
     force.output.overwrite: true
     input.path: ${param.inData}
     job.class: com.linkedin.wordcount.WordCount
     main.args: ${param.inData} ${param.hadoopOutData}
     output.path: ${param.hadoopOutData}

 - name: hive1
   type: hive
   config:
     hive.script: src/main/hive/showdb.q

 - name: NoOpTest1
   type: noop

 - name: hive2
   type: hive
   dependsOn:
     - hive1
   config:
     hive.script: src/main/hive/showTables.sql

 - name: java1
   type: javaprocess
   config:
     Xms: 96M
     java.class: com.linkedin.foo.HelloJavaProcessJob

 - name: jobCommand1
   type: command
   config:
     command: echo "hello world from job_command_1"

 - name: jobCommand2
   type: command
   dependsOn:
     - jobCommand1
   config:
     command: echo "hello world from job_command_2"

二、YAML語法

想要使用 Flow 2.0 進行工作流的配置,首先需要了解 YAML 。YAML 是一種簡潔的非標記語言,有著嚴格的格式要求的,如果你的格式配置失敗,上傳到 Azkaban 的時候就會丟擲解析異常。

2.1 基本規則

  1. 大小寫敏感 ;
  2. 使用縮排表示層級關係 ;
  3. 縮排長度沒有限制,只要元素對齊就表示這些元素屬於一個層級;
  4. 使用#表示註釋 ;
  5. 字串預設不用加單雙引號,但單引號和雙引號都可以使用,雙引號表示不需要對特殊字元進行轉義;
  6. YAML 中提供了多種常量結構,包括:整數,浮點數,字串,NULL,日期,布林,時間。

2.2 物件的寫法

# value 與 : 符號之間必須要有一個空格
key: value

2.3 map的寫法

# 寫法一 同一縮排的所有鍵值對屬於一個map
key: 
    key1: value1
    key2: value2

# 寫法二
{key1: value1, key2: value2}

2.3 陣列的寫法

# 寫法一 使用一個短橫線加一個空格代表一個數組項
- a
- b
- c

# 寫法二
[a,b,c]

2.5 單雙引號

支援單引號和雙引號,但雙引號不會對特殊字元進行轉義:

s1: '內容\n 字串'
s2: "內容\n 字串"

轉換後:
{ s1: '內容\\n 字串', s2: '內容\n 字串' }

2.6 特殊符號

一個 YAML 檔案中可以包括多個文件,使用 --- 進行分割。

2.7 配置引用

Flow 2.0 建議將公共引數定義在 config 下,並通過 ${} 進行引用。

三、簡單任務排程

3.1 任務配置

新建 flow 配置檔案:

nodes:
  - name: jobA
    type: command
    config:
      command: echo "Hello Azkaban Flow 2.0."

在當前的版本中,Azkaban 同時支援 Flow 1.0 和 Flow 2.0,如果你希望以 2.0 的方式執行,則需要新建一個 project 檔案,指明是使用的是 Flow 2.0:

azkaban-flow-version: 2.0

3.2 打包上傳

3.3 執行結果

由於在 1.0 版本中已經介紹過 Web UI 的使用,這裡就不再贅述。對於 1.0 和 2.0 版本,只有配置方式有所不同,其他上傳執行的方式都是相同的。執行結果如下:

四、多工排程

和 1.0 給出的案例一樣,這裡假設我們有五個任務(jobA——jobE), D 任務需要在 A,B,C 任務執行完成後才能執行,而 E 任務則需要在 D 任務執行完成後才能執行,相關配置檔案應如下。可以看到在 1.0 中我們需要分別定義五個配置檔案,而在 2.0 中我們只需要一個配置檔案即可完成配置。

nodes:
  - name: jobE
    type: command
    config:
      command: echo "This is job E"
    # jobE depends on jobD
    dependsOn: 
      - jobD
    
  - name: jobD
    type: command
    config:
      command: echo "This is job D"
    # jobD depends on jobA、jobB、jobC
    dependsOn:
      - jobA
      - jobB
      - jobC

  - name: jobA
    type: command
    config:
      command: echo "This is job A"

  - name: jobB
    type: command
    config:
      command: echo "This is job B"

  - name: jobC
    type: command
    config:
      command: echo "This is job C"

五、內嵌流

Flow2.0 支援在一個 Flow 中定義另一個 Flow,稱為內嵌流或者子流。這裡給出一個內嵌流的示例,其 Flow 配置如下:

nodes:
  - name: jobC
    type: command
    config:
      command: echo "This is job C"
    dependsOn:
      - embedded_flow

  - name: embedded_flow
    type: flow
    config:
      prop: value
    nodes:
      - name: jobB
        type: command
        config:
          command: echo "This is job B"
        dependsOn:
          - jobA

      - name: jobA
        type: command
        config:
          command: echo "This is job A"

內嵌流的 DAG 圖如下:

執行情況如下:

參考資料

  1. Azkaban Flow 2.0 Design
  2. Getting started with Azkaban Flow 2.0

更多大資料系列文章可以參見 GitHub 開源專案: 大資料入門指南