1. 程式人生 > >基於Hadoop生態圈的資料倉庫實踐 —— ETL(三)

基於Hadoop生態圈的資料倉庫實踐 —— ETL(三)

三、使用Oozie定期自動執行ETL
1. Oozie簡介

(1)Oozie是什麼
        Oozie是一個管理Hadoop作業、可伸縮、可擴充套件、可靠的工作流排程系統,其工作流作業是由一系列動作構成的有向無環圖(DAGs),協調器作業是按時間頻率週期性觸發的Oozie工作流作業。Oozie支援的作業型別有Java map-reduce、Streaming map-reduce、Pig、 Hive、Sqoop和Distcp,及其Java程式和shell指令碼等特定的系統作業。
        第一版Oozie是一個基於工作流引擎的伺服器,通過執行Hadoop Map/Reduce和Pig作業的動作執行工作流作業。第二版Oozie是一個基於協調器引擎的伺服器,按時間和資料觸發工作流執行。它可以基於時間(如每小時執行一次)或資料可用性(如等待輸入資料完成後再執行)連續執行工作流。第三版Oozie是一個基於Bundle引擎的伺服器。它提供更高級別的抽象,批量處理一系列協調器應用。使用者可以在bundle級別啟動、停止、掛起、繼續、重做協調器作業,這樣可以更好地簡化操作控制。
(2)為什麼需要Oozie
  • 在Hadoop中執行的任務有時候需要把多個Map/Reduce作業連線到一起執行,或者需要多個作業並行處理。Oozie可以把多個Map/Reduce作業組合到一個邏輯工作單元中,從而完成更大型的任務。
  • 從排程的角度看,如果使用crontab的方式呼叫多個工作流作業,可能需要編寫大量的指令碼,還要通過指令碼來控制好各個工作流作業的執行時序問題,不但指令碼不好維護,而且監控也不方便。基於這樣的背景,Oozie提出了Coordinator的概念,它能夠將每個工作流作業作為一個動作來執行,相當於工作流定義中的一個執行節點,這樣就能夠將多個工作流作業組成一個稱為Coordinator Job的作業,並指定觸發時間和頻率,還可以配置資料集、併發數等。
(3)Oozie的體系結構(摘自http://www.infoq.com/cn/articles/introductionOozie/
        Oozie的體系結構如下圖所示。


        Oozie是一種Java Web應用程式,它執行在Java servlet容器——即Tomcat——中,並使用資料庫來儲存以下內容:

  • 工作流定義
  • 當前執行的工作流例項,包括例項的狀態和變數
        Oozie工作流是放置在控制依賴DAG(有向無環圖 Direct Acyclic Graph)中的一組動作(例如,Hadoop的Map/Reduce作業、Pig作業等),其中指定了動作執行的順序。我們會使用hPDL(一種XML流程定義語言)來描述這個圖。
        hPDL是一種很簡潔的語言,只會使用少數流程控制和動作節點。控制節點會定義執行的流程,幷包含工作流的起點和終點(start、end和fail節點)以及控制工作流執行路徑的機制(decision、fork和join節點)。動作節點是一些機制,通過它們工作流會觸發執行計算或者處理任務。Oozie為以下型別的動作提供支援: Hadoop map-reduce、Hadoop檔案系統、Pig、Java和Oozie的子工作流(SSH動作已經從Oozie schema 0.2之後的版本中移除了)。
        所有由動作節點觸發的計算和處理任務都不在Oozie之中——它們是由Hadoop的Map/Reduce框架執行的。這種方法讓Oozie可以支援現存的Hadoop用於負載平衡、災難恢復的機制。這些任務主要是非同步執行的(只有檔案系統動作例外,它是同步處理的)。這意味著對於大多數工作流動作觸發的計算或處理任務的型別來說,在工作流操作轉換到工作流的下一個節點之前都需要等待,直到計算或處理任務結束了之後才能夠繼續。Oozie可以通過兩種不同的方式來檢測計算或處理任務是否完成,也就是回撥和輪詢。當Oozie啟動了計算或處理任務的時候,它會為任務提供唯一的回撥URL,然後任務會在完成的時候傳送通知給特定的URL。在任務無法觸發回撥URL的情況下(可能是因為任何原因,比方說網路閃斷),或者當任務的型別無法在完成時觸發回撥URL的時候,Oozie有一種機制,可以對計算或處理任務進行輪詢,從而保證能夠完成任務。
        Oozie工作流可以引數化(在工作流定義中使用像${inputDir}之類的變數)。在提交工作流操作的時候,我們必須提供引數值。如果經過合適地引數化(比方說,使用不同的輸出目錄),那麼多個同樣的工作流操作可以併發。
        一些工作流是根據需要觸發的,但是大多數情況下,我們有必要基於一定的時間段和(或)資料可用性和(或)外部事件來執行它們。Oozie協調系統(Coordinator system)讓使用者可以基於這些引數來定義工作流執行計劃。Oozie協調程式讓我們可以以謂詞的方式對工作流執行觸發器進行建模,那可以指向資料、事件和(或)外部事件。工作流作業會在謂詞得到滿足的時候啟動。
        經常我們還需要連線定時執行、但時間間隔不同的工作流操作。多個隨後執行的工作流的輸出會成為下一個工作流的輸入。把這些工作流連線在一起,會讓系統把它作為資料應用的管道來引用。Oozie協調程式支援建立這樣的資料應用管道。

(4)CDH 5.7.0中的Oozie

        CDH 5.7.0中,Oozie的版本是4.1.0,元資料儲存使用MySQL。關於CDH 5.7.0中Oozie的屬性,參考以下連結:
https://www.cloudera.com/documentation/enterprise/latest/topics/cm_props_cdh570_oozie.html

2. 建立定期裝載工作流
(1)修改資源配置
        需要將以下兩個引數的值調高:
yarn.nodemanager.resource.memory-mb = 2000
yarn.scheduler.maximum-allocation-mb = 2000
        否則會在執行工作流作業時報類似下面的錯誤:
org.apache.oozie.action.ActionExecutorException: JA009: org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException: Invalid resource request, requested memory < 0, or requested memory > max configured, requestedMemory=1536, maxMemory=1500
        具體做法是,從CDH Web控制檯修改相關引數,儲存更改並重啟叢集。
        yarn.nodemanager.resource.memory-mb引數在YARN服務的NodeManager範圍裡,如下圖所示。
        yarn.scheduler.maximum-allocation-mb引數在YARN服務的ResourceManager範圍裡,如下圖所示。
        從Web控制檯重啟叢集的介面如下圖所示。
(2)啟用Oozie Web Console
        預設配置時,Oozie Web Console是禁用的,為了後面方便監控Oozie作業的執行,需要將其改為啟用。“啟用 Oozie 伺服器 Web 控制檯”引數在Oozie服務的主要範圍裡,如下圖所示。


        具體的做法是:

  • 下載安裝ext-2.2。
  • 從CDH Web控制檯修改相關引數,儲存更改並重啟Oozie服務。
        詳細步驟參考以下連結:http://www.cloudera.com/documentation/enterprise/5-5-x/topics/admin_oozie_console.html

(3)啟動sqoop的share metastore service
        定期裝載工作流需要用Oozie呼叫Sqoop執行,這需要開啟Sqoop元資料共享儲存,命令如下:
sqoop metastore > /tmp/sqoop_metastore.log 2>&1 &
        關於Oozie無法執行Sqoop Job的問題,參考以下連結:http://www.lamborryan.com/oozie-sqoop-fail/

(4)連線metastore重建sqoop job
        前面建立的sqoop job,其元資料並沒有儲存在share metastore裡,所以需要使用以下的命令重建。
last_value=`sqoop job --show myjob_incremental_import | grep incremental.last.value | awk '{print $3}'`
sqoop job --delete myjob_incremental_import
sqoop job \
--meta-connect jdbc:hsqldb:hsql://cdh2:16000/sqoop \
--create myjob_incremental_import \
-- \
import \
--connect "jdbc:mysql://cdh1:3306/source?useSSL=false&user=root&password=mypassword" \
--table sales_order \
--columns "order_number, customer_number, product_code, order_date, entry_date, order_amount" \
--hive-import \
--hive-table rds.sales_order \
--incremental append \
--check-column order_number \
--last-value $last_value
        其中$last-value是上次ETL執行後的值。

(5)定義工作流
        建立內容如下的workflow.xml檔案:
<?xml version="1.0" encoding="UTF-8"?>
<workflow-app xmlns="uri:oozie:workflow:0.1" name="regular_etl">
    <start to="fork-node"/>
    <fork name="fork-node">
        <path start="sqoop-customer" />
        <path start="sqoop-product" />
        <path start="sqoop-sales_order" />
    </fork>
    <action name="sqoop-customer">
        <sqoop xmlns="uri:oozie:sqoop-action:0.2">
            <job-tracker>${jobTracker}</job-tracker>
            <name-node>${nameNode}</name-node>
            <arg>import</arg>
            <arg>--connect</arg>
            <arg>jdbc:mysql://cdh1:3306/source?useSSL=false</arg>
            <arg>--username</arg>
            <arg>root</arg>
            <arg>--password</arg>
            <arg>mypassword</arg>
            <arg>--table</arg>
            <arg>customer</arg>
            <arg>--hive-import</arg>
            <arg>--hive-table</arg>
            <arg>rds.customer</arg>
            <arg>--hive-overwrite</arg>            
            <file>/tmp/hive-site.xml#hive-site.xml</file>
            <archive>/tmp/mysql-connector-java-5.1.38-bin.jar#mysql-connector-java-5.1.38-bin.jar</archive>
        </sqoop>
        <ok to="joining"/>
        <error to="fail"/>
    </action>
	<action name="sqoop-product">
        <sqoop xmlns="uri:oozie:sqoop-action:0.2">
            <job-tracker>${jobTracker}</job-tracker>
            <name-node>${nameNode}</name-node>
            <arg>import</arg>
            <arg>--connect</arg>
            <arg>jdbc:mysql://cdh1:3306/source?useSSL=false</arg>
            <arg>--username</arg>
            <arg>root</arg>
            <arg>--password</arg>
            <arg>mypassword</arg>
            <arg>--table</arg>
            <arg>product</arg>
            <arg>--hive-import</arg>
            <arg>--hive-table</arg>
            <arg>rds.product</arg>
            <arg>--hive-overwrite</arg>            
            <file>/tmp/hive-site.xml#hive-site.xml</file>
            <archive>/tmp/mysql-connector-java-5.1.38-bin.jar#mysql-connector-java-5.1.38-bin.jar</archive>
        </sqoop>
        <ok to="joining"/>
        <error to="fail"/>
    </action>
    <action name="sqoop-sales_order">
        <sqoop xmlns="uri:oozie:sqoop-action:0.2">
            <job-tracker>${jobTracker}</job-tracker>
            <name-node>${nameNode}</name-node>
            <command>job --exec myjob_incremental_import --meta-connect jdbc:hsqldb:hsql://cdh2:16000/sqoop</command>
            <file>/tmp/hive-site.xml#hive-site.xml</file>
            <archive>/tmp/mysql-connector-java-5.1.38-bin.jar#mysql-connector-java-5.1.38-bin.jar</archive>
        </sqoop>
        <ok to="joining"/>
        <error to="fail"/>
    </action>
    <join name="joining" to="hive-node"/>
    <action name="hive-node">
        <hive xmlns="uri:oozie:hive-action:0.2">
            <job-tracker>${jobTracker}</job-tracker>
            <name-node>${nameNode}</name-node> 
            <job-xml>/tmp/hive-site.xml</job-xml>
            <script>/tmp/regular_etl.sql</script>
        </hive>
        <ok to="end"/>
        <error to="fail"/>
    </action>
    <kill name="fail">
        <message>Sqoop failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
    </kill>
    <end name="end"/>
</workflow-app>
        其DAG如下圖所示。

        該工作流包括9個節點,其中有5個控制節點,4個動作節點:工作流的起點(start)、終點(end)、失敗處理節點(fail,DAG圖中未顯示),兩個執行路徑控制節點(fork-node和joining,fork與join節點必須成對出現),三個並行處理的Sqoop行動節點(sqoop-customer、sqoop-product、sqoop-sales_order)用作資料抽取,一個Hive行動節點(hive-node)用作資料轉換與裝載。
(6)部署工作流
hdfs dfs -put -f workflow.xml /user/root/
hdfs dfs -put /etc/hive/conf.cloudera.hive/hive-site.xml /tmp/
hdfs dfs -put /root/mysql-connector-java-5.1.38/mysql-connector-java-5.1.38-bin.jar /tmp/
hdfs dfs -put /root/regular_etl.sql /tmp/
(7)建立作業屬性檔案
        建立內容如下的job.properties檔案:
nameNode=hdfs://cdh2:8020
jobTracker=cdh2:8032
queueName=default
oozie.use.system.libpath=true
oozie.wf.application.path=${nameNode}/user/${user.name}
(8)執行工作流
oozie job -oozie http://cdh2:11000/oozie -config /root/job.properties -run
        此時從Oozie Web Console可以看到正在執行的作業,如下圖所示。
        點選作業所在行,可以開啟作業的詳細資訊視窗,如下圖所示。
        點選動作所在行,可以開啟動作的詳細資訊視窗,如下圖所示。
        可以點選Console URL右側的圖示,可以開啟Map/Reduce作業的跟蹤視窗,如下圖所示。
        當Oozie作業執行完,可以在“All Jobs”標籤頁看到,Status列已經從RUNNING變為SUCCEEDED,如下圖所示。
        此時檢視cdc_time表的資料,可以看到日期已經改為當前日期,如下圖所示。

3. 建立協調作業定期自動執行工作流
(1)建立協調作業屬性檔案
        建立內容如下的job-coord.properties檔案:
nameNode=hdfs://cdh2:8020
jobTracker=cdh2:8032
queueName=default
oozie.use.system.libpath=true
oozie.coord.application.path=${nameNode}/user/${user.name}
timezone=UTC
start=2016-07-11T06:00Z
end=2020-12-31T07:15Z
workflowAppUri=${nameNode}/user/${user.name}
(2)建立協調作業配置檔案
        建立內容如下的coordinator.xml檔案:
<coordinator-app name="regular_etl-coord" frequency="${coord:days(1)}" start="${start}" end="${end}" timezone="${timezone}" xmlns="uri:oozie:coordinator:0.1">
    <action>
        <workflow>
            <app-path>${workflowAppUri}</app-path>
            <configuration>
                <property>
                    <name>jobTracker</name>
                    <value>${jobTracker}</value>
                </property>
                <property>
                    <name>nameNode</name>
                    <value>${nameNode}</value>
                </property>
                <property>
                    <name>queueName</name>
                    <value>${queueName}</value>
                </property>
            </configuration>
        </workflow>
    </action>
</coordinator-app>
(3)部署協調作業
hdfs dfs -put -f coordinator.xml /user/root/
(4)執行協調作業
oozie job -oozie http://cdh2:11000/oozie -config /root/job-coord.properties -run
        此時從Oozie Web Console可以看到準備執行的協調作業,作業的狀態為PREP,如下圖所示。
        此協調作業自2016年7月11日開始,每天14點執行一次。結束日期非常晚,這裡設定的是2020年12月31日。需要注意一下時區的設定。Oozie預設的時區是UTC,而且即便在屬性檔案中設定了timezone=GMT+0800也不起作用,所以start屬性設定的是06:00,實際就是北京時間14:00。
        當時間到達14:00時,協調作業開始執行,狀態由PREP變為RUNNING,如下圖所示。
        點選作業所在行,可以開啟協調作業的詳細資訊視窗,如下圖所示。
        點選協調作業所在行,可以開啟工作流作業的詳細資訊視窗,如下圖所示。
        點選動作所在行,可以開啟動作的詳細資訊視窗,如下圖所示。
        可以點選Console URL右側的圖示,可以開啟Map/Reduce作業的跟蹤視窗,如下圖所示。
        至此介紹了使用Oozie定期自動執行ETL的一般方法。Oozie 4.1.0的官方文件連結地址如下:http://oozie.apache.org/docs/4.1.0/index.html