1. 程式人生 > >5分鐘試作一個最簡單的airflow plugin « Terrence的宅宅幻想

5分鐘試作一個最簡單的airflow plugin « Terrence的宅宅幻想

因為想要讓我的使用者去可以更容易的跟既有的平臺做排程工作

最近想試著製作airflow的operator

airflow本身提供了operator, menu_link, hook等等的介面

照著官方網站,簡單試做了一個簡單版的myoperator plugin

要製作自己的plugin方法很單純

去plugin目錄下塞自己的python檔就好了

假設我做了一個plugin檔案$AIRFLOW_HOMR/plugins/my.py

內容如下

from airflow.plugins_manager import AirflowPlugin
from airflow.models
import BaseOperator from airflow.utils.decorators import apply_defaults class MyOperator(BaseOperator): @apply_defaults def __init__(self, message, *args, **kwargs): super(MyOperator, self).__init__(*args, **kwargs) self.message = message def
execute(self, context): print("Message:[%s]" % self.message) def on_kill(self): print("Killed MyOperator") # Defining the plugin class class AirflowTestPlugin(AirflowPlugin): name = "myplugin" operators = [MyOperator]

這邊重點是要製作一個class去繼承AirflowPlugin

他宣告了這個python檔要涵蓋哪些type的plugin跟這個plugin的名字

name這個屬性會影響之後要import時候的路徑,官方說明如下

from airflow.{type, like "operators", "sensors"}.{name specificed inside the plugin class} import *

因為我這邊只想做一個myoperator,所以只做了一個MyOperator去繼承BaseOperator

當實做一個Operator的時候有兩個要點

  1. 使用@apply_defaults去包住__init__然後用super去傳遞初始化引數
  2. 實作execute的內容

一個Operator可以覆寫的函式有

  • pre_execute
  • execute
  • post_execute
  • on_kill

可以照自己的需求去實作

之後只要重啟airflow的webserver/scheduler/worker等就行了

這邊再給一個範例dag $AIRFLOW_HOMR/dags/hello_world.py

from datetime import datetime
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from airflow.operators.bash_operator import BashOperator

from airflow.operators.myplugin import MyOperator

def print_hello():
    return 'Hello world!'

dag = DAG('hello_world', description='Simple tutorial DAG',
          schedule_interval='10 * * * *',
          start_date=datetime(2017, 3, 20), catchup=False)

dummy_operator = MyOperator(task_id='dummy_task', message="Hello World", dag=dag)

hello_operator = PythonOperator(task_id='hello_task', python_callable=print_hello, dag=dag)

t1 = BashOperator(
    task_id='print_date',
    bash_command='date',
    dag=dag)


dummy_operator >> hello_operator >> t1

這邊的重點就只有import的方式

我用from airflow.operators.myplugin import MyOperator去import自己做的MyOperator

當初一直不知道怎麼import才正確,因為一些古早範例都是寫from airflow.operators import xxxx

讓我卡了一小段時間

.....

題外話之前把airflow從1.8升級到1.10之後遇到timezone被鎖死在UTC的狀況

現階段只能用降版回1.8的方式解決

這幾天airflow試用下來他的好處我還沒體會到他的坑倒是踩了不少

現在想想自己還是習慣用Azkaban

但是客戶想用airflow,也只能硬著頭皮去踩坑