1. 程式人生 > >Datax 與 Azkaban 實現資料抽取與排程

Datax 與 Azkaban 實現資料抽取與排程

1.什麼是DataX

DataX 是阿里巴巴集團內被廣泛使用的離線資料同步工具/平臺,實現包括 MySQL、Oracle、HDFS、Hive、OceanBase、HBase、OTS、ODPS 等各種異構資料來源之間高效的資料同步功能。DataX採用了框架 + 外掛 的模式,目前已開源,程式碼託管在github

DataX的安裝省略

配置詳情可見 https://github.com/alibaba/DataX

 執行原理介紹:

  1. DataX完成單個數據同步的作業,我們稱之為Job,DataX接受到一個Job之後,將啟動一個程序來完成整個作業同步過程。DataX Job模組是單個作業的中樞管理節點,承擔了資料清理、子任務切分(將單一作業計算轉化為多個子Task)、TaskGroup管理等功能。
  2. DataXJob啟動後,會根據不同的源端切分策略,將Job切分成多個小的Task(子任務),以便於併發執行。Task便是DataX作業的最小單元,每一個Task都會負責一部分資料的同步工作。
  3. 切分多個Task之後,DataX Job會呼叫Scheduler模組,根據配置的併發資料量,將拆分成的Task重新組合,組裝成TaskGroup(任務組)。每一個TaskGroup負責以一定的併發(可在json配置檔案中配置)執行完畢分配好的所有Task,預設單個任務組的併發數量為5。
  4. 每一個Task都由TaskGroup負責啟動,Task啟動後,會固定啟動Reader—>Channel—>Writer的執行緒來完成任務同步工作。
  5. DataX作業執行起來之後, Job監控並等待多個TaskGroup模組任務完成,等待所有TaskGroup任務完成後Job成功退出。否則,異常退出。

工作流程介紹:

工作流程大概就是用Reader模組從源資料庫讀資料,在Storage模組裡將Reader模組讀到的資料交換給Write模組,

Write模組將資料寫進目的資料庫。

DoubleQueue:

設立兩塊空間,一個儲存源資料,一個儲存目標資料。在開始,空間A和空間B都是空的,loading 任務從源資料庫向A空間載入資料,A空間滿後再向B空間載入資料,同時dumping任務將A空間資料轉儲到目的資料庫。A空間清空後,交換AB兩者的任務,即A空間的任務換成loading,B空間的任務換成dumping。不斷重複上述操作。

RAMStorage:基於DoubleQueue,用記憶體作為資料交換的空間

基於RAMStorage的資料操縱介面:LineSender和LineReceiver

LineSender的作用:Reader用LineSender來放資料到Storage物件中。

在LineSender接口裡,主要有這幾個介面:

createLine():構造一個將要被用來交換資料的Line物件

sendToWriter(Line line): 用來將一個Line物件put到Storage抽象類裡。

flush()用來將buffer的資料flush到Storage物件中。

LineReceiver的作用:Writer用LineReceiver來從到Storage物件中獲取資料。在LineReceiver接口裡,主要有一個介面:getFromReader():獲取下一個Storage中的Line物件。

基於RAMStorage的批量資料交換:BufferedLineExchanger

內部初始化一個指定大小的陣列緩衝,預設大小64 ,在push資料時會先寫滿64個數組再單次寫入DoubleQueue佇列,Poll時返回的大小可能會小於64個單位,由當時陣列的實際大小決定。

2.什麼是Azkaban

Azkaban是由Linkedin公司推出的一個批量工作流任務排程器,主要用於在一個工作流內以一個特定的順序執行一組工作和流程,它的配置是通過簡單的key:value對的方式,通過配置中的dependencies 來設定依賴關係,這個依賴關係必須是無環的,否則會被視為無效的工作流。Azkaban使用job配置檔案建立任務之間的依賴關係,並提供一個易於使用的web使用者介面維護和跟蹤你的工作流。

需求:從Oracle抽取每天的使用者行為日表到HIVE分割槽表中,即資料的ODS層

技術選型:利用Datax與azkaban,其中Datax可方便配置能與關係型資料庫進行互動

其中Azkabn的排程job為:

把此job上傳到azkaban的介面如圖:

其中DataX的配置為:

問題解決:

azkaban-web-start.sh啟動時出現Table 'execution_flows' is marked as crashed and should be repaired Query

由於azkaban非正常關閉,導致資料表損壞現對資料進行修復

修復後對資料進行check如圖,說明表已經repair

對錶進行修復後,順利啟動了azkaban