1. 程式人生 > >Airflow 使用隨筆(內涵 TimeZone 和 Backfill 等的詳解)

Airflow 使用隨筆(內涵 TimeZone 和 Backfill 等的詳解)

其實怎麼部署  airflow 又哪些特性,然後功能又是如何全面都可以在 Reference 的文章裡面找到,都不是重點這裡就不贅述了。

這裡重點談一下我在部署完成仔細閱讀文件之後覺得可以總結的一些東西,或者踩到的一些坑。

首選明確 airflow 中最重要的幾個概念:

  • DAG

    DAG 意為有向無迴圈圖,在 Airflow 中則定義了整個完整的作業。同一個 DAG 中的所有 Task 擁有相同的排程時間。

  • Task

    Task 為 DAG 中具體的作業任務,它必須存在於某一個 DAG 之中。Task 在 DAG 中配置依賴關係,跨 DAG 的依賴是可行的,但是並不推薦。跨 DAG 依賴會導致 DAG 圖的直觀性降低,並給依賴管理帶來麻煩。

  • DAG Run

    當一個 DAG 滿足它的排程時間,或者被外部觸發時,就會產生一個 DAG Run。可以理解為由 DAG 例項化的例項。

  • Task Instance

    當一個 Task 被排程啟動時,就會產生一個 Task Instance。可以理解為由 Task 例項化的例項。

在使用過程中,需要謹記這些概念不然很容易就被繞暈。

 

TimeZone:

時區問題可以說是 airflow 中根本繞不開的一個問題,官方文件也拿整整一章來闡述時區給 airflow 帶來的影響。

官方預設使用 UTC+0 時間來作為 airflow 的時間,並且在 airflow 提供的 webserver 上,這個時區直到現在 1.10.1 版本上依舊是無法被配置的。所以我們在 server 上看到的時間預設都是 UTC+0 的時間。但是系統啟動的時間,以及寫入資料庫元資料的時間確是可以配置時區的。他的格式遵循 IANA 格式。可以通過配置 airflow 的配置檔案 airflow.cfg 改變這個時區。例如我在啟動的時候就配置了中國時間:

# Default timezone in case supplied date times are naive
# can be utc (default), system, or any IANA timezone string (e.g. Europe/Amsterdam)
#default_timezone = utc
default_timezone = Asia/Chongqing

當配置了這個生效之後,我們給任務配置的啟動時間,以及 runtime 之類的時間會遵循這個時間(雖然在 ui 介面上看還是 UTC+0 的非常分裂。。。)

例如我們來看一個執行任務:

 可以看到 Last Run 欄位指明瞭該欄位是 2018-12-09 16:00 被執行了。按照我們剛才的配置 UTC+8 應該是 10號的 0點被執行了。我們來看下資料庫裡是否真的是這樣呢?

 

可以通過下圖發現完全按照我們預期:

mysql> select * from task_instance where task_id = 'sensors'\G;
*************************** 1. row ***************************
        task_id: sensors
         dag_id: sensors_dag
 execution_date: 2018-12-10 00:00:00.000000
     start_date: 2018-12-11 00:00:04.547631
       end_date: 2018-12-11 00:04:50.686937
       duration: 286.139
          state: success
     try_number: 1
       hostname: zed-2
       unixname: apache-airflow
         job_id: 26975
           pool: NULL
          queue: default
priority_weight: 1
       operator: BashOperator
    queued_dttm: 2018-12-11 00:00:03.302928
            pid: 19962
      max_tries: 3
executor_config: }q .

 

所以說,證明了只要我們設定了剛才的引數之後,只有 ui 介面依然繼續展示 UTC+0 的時間,其實我們真正的執行操作的時區已經被修改過來了。

另外還需要注意的一個地方在於任務的申明那裡,在申明一個 dag 的時候同樣需要指明自己的時區,否則一臉懵逼為啥到了時間為什麼沒有正常排程起來。

import pendulum
import datetime

local_tz = pendulum.timezone("Asia/Chongqing")


# start 接收一個 %Y-%m-%d %H:%M:%S 的字串時間
def init_default_args(owner, start_time, retry=3, email=None):
    if not email:
        email = []
    if not isinstance(start_time, datetime.datetime):
        raise TypeError('start_date must be datetime.')

    d = start_time.replace(tzinfo=local_tz)

    return {
        'owner': owner,
        'depends_on_past': False,
        'start_date': d,
        'email': email,
        'email_on_failure': False,
        'email_on_retry': False,
        'retries': retry,
        'retry_delay': datetime.timedelta(minutes=5),
    }

這裡我在申明預設 default_args 引數的時候就顯示的指明瞭時區並且賦值給 start_date 讓他開始的時間複合我們的預期。

 

Backfill:

感覺這個也非常讓人懵逼值得拿出來多多少少談一下。

光從字面意思來理解作為一箇中國人我真的是一下沒有 get 到他的點,但是後來去查了一下文件結合他的行為看了一下可以把它簡單的理解成當你錯過了某一次執行時間之後,往回去補充執行的行為。我們可以使用手動方法來執行這個行為

airflow backfill sensors -s 2015-06-01 -e 2015-06-07

他會回補這個時間段開始的 和 -e 後面時間段結束期間所有的任務執行。回補的意思就是把沒有執行的操作都執行一遍。

這個特性想法很好,但是自自動觸發的時候不注意就會產生非常不可預期的問題。

比如剛才在上面我們談到的在給 dag 配置的時候指定的 default_args 上面有一個引數 start_date。如果我們不給 dag 指定不回補,那麼 airflow 會預設回補從系統當前時間到我們指定的 start_date 期間的任務。如果這個引數設定得不恰當會打來恐怖的回補,所以一般我都會禁用回補。

sensors_dag = DAG(
    'sensors_dag',
    default_args=default_args,
    schedule_interval=u'0 0 * * *',
    catchup=False)

指定 catchup=False 。讓他從最新的任務時間點開始執行。這個在官方文件 Scheduling & Triggers 一章有詳細提到。

 

Operator:

現在這一章我只有一個地方覺得有點坑。可能是我還沒有深度使用吧

由於 airflow 支援模版 jinja 的模版功能,所以說在使用 BashOperator 的時候要注意自己的寫法,官方文件對此有描述

Troubleshooting
Jinja template not found
Add a space after the script name when directly calling a Bash script with the bash_command argument. This is because Airflow tries to apply a Jinja template to it, which will fail.

t2 = BashOperator(
    task_id='bash_example',

    # This fails with `Jinja template not found` error
    # bash_command="/home/batcher/test.sh",

    # This works (has a space after)
    bash_command="/home/batcher/test.sh ",
    dag=dag)

我覺得這個。。。還蠻坑的- -

這一章其實應該有蠻多的內容可以延伸,包括社群也給 airflow 貢獻了非常多的 Operator。等我深度使用之後再來 backfill 吧哈哈哈哈。

 

 

Reference:

https://airflow.apache.org  airflow 官方文件

https://zhuanlan.zhihu.com/p/44768244  如何部署一個健壯的 apache-airflow 排程系統

http://www.shuhegroup.com/newsmedias/%E6%B5%85%E8%B0%88%E8%B0%83%E5%BA%A6%E5%B7%A5%E5%85%B7airflow/  淺談排程工具——Airflow