1. 程式人生 > >使用者畫像—Airflow作業排程(ETL)

使用者畫像—Airflow作業排程(ETL)

最近在弄畫像標籤每天ETL的排程事情,這篇文章分享一下一個開源的ETL工具Airflow。

 

一、基礎概念

Airflow是Airbnb內部發起並開源的一個ETL管理平臺,使用Python編寫實現的任務管理、排程、監控工作流平臺。這是其官方文件地址:Apache Airflow (incubating) Documentation ,關於airflow產品的使用,裡面有詳細的介紹。

Airflow的排程依賴於crontab命令,與crontab相比airflow可以直觀的看到任務執行情況、任務之間的邏輯依賴關係、可以設定任務出錯時郵件提醒、可以檢視任務執行日誌。

而crontab命令管理的方式存在以下幾方面的弊端:

1、在多工排程執行的情況下,難以理清任務之間的依賴關係;

2、不便於檢視當前執行到哪一個任務;

3、任務執行失敗時不便於檢視執行日誌,也即不方便定位報錯的任務和錯誤原因;

4、不便於檢視排程流下每個任務執行的起止消耗時間,這對於優化task作業是非常重要的;

5、不便於記錄歷史排程任務的執行情況,而這對於優化作業和錯誤排查是很重要的;

Airflow中有兩個最基本的概念:DAG和task,下面主要介紹一下。

DAG是什麼:

DAG是Directed Acyclic Graph的縮寫,即有向無環圖。是所有要執行任務指令碼(即task)的集合,在這個DAG中定義了各個task的依賴關係、排程時間、失敗重啟機制等。通過DAGid來標識每個DAG任務

每個DAG是由1到多個task組成

task是什麼:

task是具體執行的任務指令碼,可以是一個命令列(BashOperator),也可以是python指令碼等。

 

二、主要功能鍵介紹

1、DAG管理

 

在airflow的主頁,可以看到當前所有的DAG列表(通俗點說就是所有的排程任務列表),中間“Task by State”那一列顯示任務的執行狀態。深綠色的表示已執行成功的task,淺綠色的表示當前正在執行的task。

右側“Links”那一列可以連結檢視當前DAG任務的依賴關係、執行時間、執行指令碼等情況。

當點選具體某一個DAG任務時,就可以進去檢視該DAG的排程依賴、執行時長、排程指令碼等具體執行情況

2、排程依賴檢視

通過“Graph View”選項可以檢視當前排程任務的依賴關係,當排程作業較為複雜時,這種圖形化方式展示的依賴關係可以幫助使用者迅速理清。

在使用者畫像的排程管理中,每天需要執行cookieid和userid兩個維度的畫像指令碼,因此可以設定並行執行任務,讓cookieid和userid的指令碼同時執行排程作業

 

3、執行狀態

 

通過“Tree View”選項可以檢視當前任務的執行狀態,包括當前執行到哪一個task,還有哪些task未執行。哪些task執行成功,哪些task執行失敗。

也可以檢視歷史上該DAG下面各task的執行情況。

4、各task執行時間

通過“Gantt”選項可以檢視各task任務的執行起止時間的甘特圖。

瞭解各task執行的時間可以有針對性地優化執行時間長的task對應指令碼。

5、DAG排程指令碼

 

通過“Code”選項,可以檢視當前DAG排程的指令碼。腳本里面定義了需要執行的task、執行順序及依賴、排程時間、失敗傳送郵件或重調機制等方法

三、指令碼例項

在開發過程中,task指令碼是需要被排程的指令碼,在Airflow中主要需要開發的是DAG指令碼,即管理task任務的指令碼。通過一個DAG指令碼,將各個排程作業指令碼串起來,按照業務邏輯去執行。

1、DAG指令碼

下面通過一個具體DAG指令碼例項來了解一下:

 

from airflow.operators.bash_operator import BashOperator import airflow from airflow.models import DAG from airflow import operators from airflow.contrib.hooks import SSHHook from airflow.models import BaseOperator from airflow.contrib.operators import SSHExecuteOperator from airflow.operators.latest_only_operator import LatestOnlyOperator import os import sys from datetime import timedelta,date,datetime import pendulum from airflow.utils.trigger_rule import TriggerRule default_args = {    'owner': 'superuserprofile',    'depends_on_past': False,    'start_date': datetime(2018, 06, 01),    'email': ['[email protected]'],    'email_on_failure': True ,    'email_on_retry': True,    'retries': 1,    'retry_delay': timedelta(minutes=1), } os.environ['SPARK_HOME'] = '/usr/local/spark-2.1.1-bin-hadoop2.6' sys.path.append(os.path.join(os.environ['SPARK_HOME'], 'bin'))

該段指令碼定義了需要引入的包,以及預設的DAG引數配置,包括task是否依賴上游任務,首次排程時間、任務失敗接收郵箱、任務失敗是否重新調起等

 

dag = DAG(    'superuserprofile',    default_args=default_args,    description='A userprofile test',    schedule_interval='00 08  * * *' )

該段指令碼例項化了DAG,設定了DAGid,排程執行時間

 

gender_task = BashOperator(    task_id='gender',    bash_command=' sudo -E -H -u userprofile spark-submit   --master yarn --deploy-mode client --driver-memory 1g  --executor-memory 8g --executor-cores 2 --num-executors 200  /airflow/userprofile_gender.py  {{ ds_nodash }} ',    dag=dag,    trigger_rule=TriggerRule.ALL_DONE ) country_task = BashOperator(    task_id='country',    bash_command=' sudo -E -H -u userprofile spark-submit   --master yarn --deploy-mode client  --driver-memory 1g  --executor-memory 4g --executor-cores 2 --num-executors 200  /airflow/userprofile_country.py  {{ ds_nodash }} ',    dag=dag,    trigger_rule=TriggerRule.ALL_DONE )

該段指令碼設定了兩個需要執行的task任務(userprofile_gender.py和userprofile_country.py)的例項化。

task直接的排程依賴關係可以通過set_upstream、set_downstream命令或符號>> 、<<來建立。

gender_task .set_upstream(country_task) 命令指gender_task 任務將依賴country_task任務;反之同理

gender_task >> country_task 命令指country_task 任務將依賴gender_task 任務先執行完,反之同理

2、命令列執行

Airflow通過視覺化介面的方式實現了排程管理的介面操作,但在測試指令碼或介面操作失敗的時候,可通過命令列的方式調起任務。下面介紹幾個常用命令

命令1: airflow list_tasks userprofile

該命令用於檢視當前DAG任務下的所有task的列表

其中userprofile是DAGid,加粗的airflow list_tasks是關鍵字命令

-----------------------------------------------------------------------

命令2: airflow test userprofile gender_task 20180601

該命令用於單獨執行DAG下面的某個task

其中userprofile是DAGid,gender_task是要具體某個taskid,20180601是執行日期。加粗部分是關鍵字命令

-----------------------------------------------------------------------

命令3:airflow backfill -s 2018-06-01 -e 2018-06-02 userprofile

該命令用於調起整個DAG指令碼執行

其中2018-06-01是執行指令碼的開始日期, 2018-06-02是結束日期,userprofile是DAGid,加粗部分是關鍵字命令。