基於Hadoop生態圈的資料倉庫實踐 —— ETL(三)
阿新 • • 發佈:2019-01-29
三、使用Oozie定期自動執行ETL
1. Oozie簡介
(1)Oozie是什麼
Oozie是一個管理Hadoop作業、可伸縮、可擴充套件、可靠的工作流排程系統,其工作流作業是由一系列動作構成的有向無環圖(DAGs),協調器作業是按時間頻率週期性觸發的Oozie工作流作業。Oozie支援的作業型別有Java map-reduce、Streaming map-reduce、Pig、 Hive、Sqoop和Distcp,及其Java程式和shell指令碼等特定的系統作業。
第一版Oozie是一個基於工作流引擎的伺服器,通過執行Hadoop Map/Reduce和Pig作業的動作執行工作流作業。第二版Oozie是一個基於協調器引擎的伺服器,按時間和資料觸發工作流執行。它可以基於時間(如每小時執行一次)或資料可用性(如等待輸入資料完成後再執行)連續執行工作流。第三版Oozie是一個基於Bundle引擎的伺服器。它提供更高級別的抽象,批量處理一系列協調器應用。使用者可以在bundle級別啟動、停止、掛起、繼續、重做協調器作業,這樣可以更好地簡化操作控制。
(2)為什麼需要Oozie
Oozie的體系結構如下圖所示。
hPDL是一種很簡潔的語言,只會使用少數流程控制和動作節點。控制節點會定義執行的流程,幷包含工作流的起點和終點(start、end和fail節點)以及控制工作流執行路徑的機制(decision、fork和join節點)。動作節點是一些機制,通過它們工作流會觸發執行計算或者處理任務。Oozie為以下型別的動作提供支援: Hadoop map-reduce、Hadoop檔案系統、Pig、Java和Oozie的子工作流(SSH動作已經從Oozie schema 0.2之後的版本中移除了)。
所有由動作節點觸發的計算和處理任務都不在Oozie之中——它們是由Hadoop的Map/Reduce框架執行的。這種方法讓Oozie可以支援現存的Hadoop用於負載平衡、災難恢復的機制。這些任務主要是非同步執行的(只有檔案系統動作例外,它是同步處理的)。這意味著對於大多數工作流動作觸發的計算或處理任務的型別來說,在工作流操作轉換到工作流的下一個節點之前都需要等待,直到計算或處理任務結束了之後才能夠繼續。Oozie可以通過兩種不同的方式來檢測計算或處理任務是否完成,也就是回撥和輪詢。當Oozie啟動了計算或處理任務的時候,它會為任務提供唯一的回撥URL,然後任務會在完成的時候傳送通知給特定的URL。在任務無法觸發回撥URL的情況下(可能是因為任何原因,比方說網路閃斷),或者當任務的型別無法在完成時觸發回撥URL的時候,Oozie有一種機制,可以對計算或處理任務進行輪詢,從而保證能夠完成任務。
Oozie工作流可以引數化(在工作流定義中使用像${inputDir}之類的變數)。在提交工作流操作的時候,我們必須提供引數值。如果經過合適地引數化(比方說,使用不同的輸出目錄),那麼多個同樣的工作流操作可以併發。
一些工作流是根據需要觸發的,但是大多數情況下,我們有必要基於一定的時間段和(或)資料可用性和(或)外部事件來執行它們。Oozie協調系統(Coordinator system)讓使用者可以基於這些引數來定義工作流執行計劃。Oozie協調程式讓我們可以以謂詞的方式對工作流執行觸發器進行建模,那可以指向資料、事件和(或)外部事件。工作流作業會在謂詞得到滿足的時候啟動。
經常我們還需要連線定時執行、但時間間隔不同的工作流操作。多個隨後執行的工作流的輸出會成為下一個工作流的輸入。把這些工作流連線在一起,會讓系統把它作為資料應用的管道來引用。Oozie協調程式支援建立這樣的資料應用管道。
CDH 5.7.0中,Oozie的版本是4.1.0,元資料儲存使用MySQL。關於CDH 5.7.0中Oozie的屬性,參考以下連結:
https://www.cloudera.com/documentation/enterprise/latest/topics/cm_props_cdh570_oozie.html
2. 建立定期裝載工作流
(1)修改資源配置
需要將以下兩個引數的值調高:
org.apache.oozie.action.ActionExecutorException: JA009: org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException: Invalid resource request, requested memory < 0, or requested memory > max configured, requestedMemory=1536, maxMemory=1500
具體做法是,從CDH Web控制檯修改相關引數,儲存更改並重啟叢集。
yarn.nodemanager.resource.memory-mb引數在YARN服務的NodeManager範圍裡,如下圖所示。
yarn.scheduler.maximum-allocation-mb引數在YARN服務的ResourceManager範圍裡,如下圖所示。
從Web控制檯重啟叢集的介面如下圖所示。
(2)啟用Oozie Web Console
預設配置時,Oozie Web Console是禁用的,為了後面方便監控Oozie作業的執行,需要將其改為啟用。“啟用 Oozie 伺服器 Web 控制檯”引數在Oozie服務的主要範圍裡,如下圖所示。
(3)啟動sqoop的share metastore service
定期裝載工作流需要用Oozie呼叫Sqoop執行,這需要開啟Sqoop元資料共享儲存,命令如下:
(4)連線metastore重建sqoop job
前面建立的sqoop job,其元資料並沒有儲存在share metastore裡,所以需要使用以下的命令重建。
(5)定義工作流
建立內容如下的workflow.xml檔案:
該工作流包括9個節點,其中有5個控制節點,4個動作節點:工作流的起點(start)、終點(end)、失敗處理節點(fail,DAG圖中未顯示),兩個執行路徑控制節點(fork-node和joining,fork與join節點必須成對出現),三個並行處理的Sqoop行動節點(sqoop-customer、sqoop-product、sqoop-sales_order)用作資料抽取,一個Hive行動節點(hive-node)用作資料轉換與裝載。
(6)部署工作流
建立內容如下的job.properties檔案:
點選作業所在行,可以開啟作業的詳細資訊視窗,如下圖所示。
點選動作所在行,可以開啟動作的詳細資訊視窗,如下圖所示。
可以點選Console URL右側的圖示,可以開啟Map/Reduce作業的跟蹤視窗,如下圖所示。
當Oozie作業執行完,可以在“All Jobs”標籤頁看到,Status列已經從RUNNING變為SUCCEEDED,如下圖所示。
此時檢視cdc_time表的資料,可以看到日期已經改為當前日期,如下圖所示。
3. 建立協調作業定期自動執行工作流
(1)建立協調作業屬性檔案
建立內容如下的job-coord.properties檔案:
建立內容如下的coordinator.xml檔案:
此協調作業自2016年7月11日開始,每天14點執行一次。結束日期非常晚,這裡設定的是2020年12月31日。需要注意一下時區的設定。Oozie預設的時區是UTC,而且即便在屬性檔案中設定了timezone=GMT+0800也不起作用,所以start屬性設定的是06:00,實際就是北京時間14:00。
當時間到達14:00時,協調作業開始執行,狀態由PREP變為RUNNING,如下圖所示。
點選作業所在行,可以開啟協調作業的詳細資訊視窗,如下圖所示。
點選協調作業所在行,可以開啟工作流作業的詳細資訊視窗,如下圖所示。
點選動作所在行,可以開啟動作的詳細資訊視窗,如下圖所示。
可以點選Console URL右側的圖示,可以開啟Map/Reduce作業的跟蹤視窗,如下圖所示。
至此介紹了使用Oozie定期自動執行ETL的一般方法。Oozie 4.1.0的官方文件連結地址如下:http://oozie.apache.org/docs/4.1.0/index.html
1. Oozie簡介
(1)Oozie是什麼
Oozie是一個管理Hadoop作業、可伸縮、可擴充套件、可靠的工作流排程系統,其工作流作業是由一系列動作構成的有向無環圖(DAGs),協調器作業是按時間頻率週期性觸發的Oozie工作流作業。Oozie支援的作業型別有Java map-reduce、Streaming map-reduce、Pig、 Hive、Sqoop和Distcp,及其Java程式和shell指令碼等特定的系統作業。
第一版Oozie是一個基於工作流引擎的伺服器,通過執行Hadoop Map/Reduce和Pig作業的動作執行工作流作業。第二版Oozie是一個基於協調器引擎的伺服器,按時間和資料觸發工作流執行。它可以基於時間(如每小時執行一次)或資料可用性(如等待輸入資料完成後再執行)連續執行工作流。第三版Oozie是一個基於Bundle引擎的伺服器。它提供更高級別的抽象,批量處理一系列協調器應用。使用者可以在bundle級別啟動、停止、掛起、繼續、重做協調器作業,這樣可以更好地簡化操作控制。
(2)為什麼需要Oozie
- 在Hadoop中執行的任務有時候需要把多個Map/Reduce作業連線到一起執行,或者需要多個作業並行處理。Oozie可以把多個Map/Reduce作業組合到一個邏輯工作單元中,從而完成更大型的任務。
- 從排程的角度看,如果使用crontab的方式呼叫多個工作流作業,可能需要編寫大量的指令碼,還要通過指令碼來控制好各個工作流作業的執行時序問題,不但指令碼不好維護,而且監控也不方便。基於這樣的背景,Oozie提出了Coordinator的概念,它能夠將每個工作流作業作為一個動作來執行,相當於工作流定義中的一個執行節點,這樣就能夠將多個工作流作業組成一個稱為Coordinator Job的作業,並指定觸發時間和頻率,還可以配置資料集、併發數等。
Oozie的體系結構如下圖所示。
Oozie是一種Java Web應用程式,它執行在Java servlet容器——即Tomcat——中,並使用資料庫來儲存以下內容:
- 工作流定義
- 當前執行的工作流例項,包括例項的狀態和變數
hPDL是一種很簡潔的語言,只會使用少數流程控制和動作節點。控制節點會定義執行的流程,幷包含工作流的起點和終點(start、end和fail節點)以及控制工作流執行路徑的機制(decision、fork和join節點)。動作節點是一些機制,通過它們工作流會觸發執行計算或者處理任務。Oozie為以下型別的動作提供支援: Hadoop map-reduce、Hadoop檔案系統、Pig、Java和Oozie的子工作流(SSH動作已經從Oozie schema 0.2之後的版本中移除了)。
所有由動作節點觸發的計算和處理任務都不在Oozie之中——它們是由Hadoop的Map/Reduce框架執行的。這種方法讓Oozie可以支援現存的Hadoop用於負載平衡、災難恢復的機制。這些任務主要是非同步執行的(只有檔案系統動作例外,它是同步處理的)。這意味著對於大多數工作流動作觸發的計算或處理任務的型別來說,在工作流操作轉換到工作流的下一個節點之前都需要等待,直到計算或處理任務結束了之後才能夠繼續。Oozie可以通過兩種不同的方式來檢測計算或處理任務是否完成,也就是回撥和輪詢。當Oozie啟動了計算或處理任務的時候,它會為任務提供唯一的回撥URL,然後任務會在完成的時候傳送通知給特定的URL。在任務無法觸發回撥URL的情況下(可能是因為任何原因,比方說網路閃斷),或者當任務的型別無法在完成時觸發回撥URL的時候,Oozie有一種機制,可以對計算或處理任務進行輪詢,從而保證能夠完成任務。
Oozie工作流可以引數化(在工作流定義中使用像${inputDir}之類的變數)。在提交工作流操作的時候,我們必須提供引數值。如果經過合適地引數化(比方說,使用不同的輸出目錄),那麼多個同樣的工作流操作可以併發。
一些工作流是根據需要觸發的,但是大多數情況下,我們有必要基於一定的時間段和(或)資料可用性和(或)外部事件來執行它們。Oozie協調系統(Coordinator system)讓使用者可以基於這些引數來定義工作流執行計劃。Oozie協調程式讓我們可以以謂詞的方式對工作流執行觸發器進行建模,那可以指向資料、事件和(或)外部事件。工作流作業會在謂詞得到滿足的時候啟動。
經常我們還需要連線定時執行、但時間間隔不同的工作流操作。多個隨後執行的工作流的輸出會成為下一個工作流的輸入。把這些工作流連線在一起,會讓系統把它作為資料應用的管道來引用。Oozie協調程式支援建立這樣的資料應用管道。
(4)CDH 5.7.0中的Oozie
https://www.cloudera.com/documentation/enterprise/latest/topics/cm_props_cdh570_oozie.html
2. 建立定期裝載工作流
(1)修改資源配置
需要將以下兩個引數的值調高:
yarn.nodemanager.resource.memory-mb = 2000
yarn.scheduler.maximum-allocation-mb = 2000
否則會在執行工作流作業時報類似下面的錯誤:org.apache.oozie.action.ActionExecutorException: JA009: org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException: Invalid resource request, requested memory < 0, or requested memory > max configured, requestedMemory=1536, maxMemory=1500
具體做法是,從CDH Web控制檯修改相關引數,儲存更改並重啟叢集。
yarn.nodemanager.resource.memory-mb引數在YARN服務的NodeManager範圍裡,如下圖所示。
yarn.scheduler.maximum-allocation-mb引數在YARN服務的ResourceManager範圍裡,如下圖所示。
從Web控制檯重啟叢集的介面如下圖所示。
(2)啟用Oozie Web Console
預設配置時,Oozie Web Console是禁用的,為了後面方便監控Oozie作業的執行,需要將其改為啟用。“啟用 Oozie 伺服器 Web 控制檯”引數在Oozie服務的主要範圍裡,如下圖所示。
具體的做法是:
- 下載安裝ext-2.2。
- 從CDH Web控制檯修改相關引數,儲存更改並重啟Oozie服務。
(3)啟動sqoop的share metastore service
定期裝載工作流需要用Oozie呼叫Sqoop執行,這需要開啟Sqoop元資料共享儲存,命令如下:
sqoop metastore > /tmp/sqoop_metastore.log 2>&1 &
關於Oozie無法執行Sqoop Job的問題,參考以下連結:http://www.lamborryan.com/oozie-sqoop-fail/(4)連線metastore重建sqoop job
前面建立的sqoop job,其元資料並沒有儲存在share metastore裡,所以需要使用以下的命令重建。
last_value=`sqoop job --show myjob_incremental_import | grep incremental.last.value | awk '{print $3}'`
sqoop job --delete myjob_incremental_import
sqoop job \
--meta-connect jdbc:hsqldb:hsql://cdh2:16000/sqoop \
--create myjob_incremental_import \
-- \
import \
--connect "jdbc:mysql://cdh1:3306/source?useSSL=false&user=root&password=mypassword" \
--table sales_order \
--columns "order_number, customer_number, product_code, order_date, entry_date, order_amount" \
--hive-import \
--hive-table rds.sales_order \
--incremental append \
--check-column order_number \
--last-value $last_value
其中$last-value是上次ETL執行後的值。(5)定義工作流
建立內容如下的workflow.xml檔案:
<?xml version="1.0" encoding="UTF-8"?>
<workflow-app xmlns="uri:oozie:workflow:0.1" name="regular_etl">
<start to="fork-node"/>
<fork name="fork-node">
<path start="sqoop-customer" />
<path start="sqoop-product" />
<path start="sqoop-sales_order" />
</fork>
<action name="sqoop-customer">
<sqoop xmlns="uri:oozie:sqoop-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<arg>import</arg>
<arg>--connect</arg>
<arg>jdbc:mysql://cdh1:3306/source?useSSL=false</arg>
<arg>--username</arg>
<arg>root</arg>
<arg>--password</arg>
<arg>mypassword</arg>
<arg>--table</arg>
<arg>customer</arg>
<arg>--hive-import</arg>
<arg>--hive-table</arg>
<arg>rds.customer</arg>
<arg>--hive-overwrite</arg>
<file>/tmp/hive-site.xml#hive-site.xml</file>
<archive>/tmp/mysql-connector-java-5.1.38-bin.jar#mysql-connector-java-5.1.38-bin.jar</archive>
</sqoop>
<ok to="joining"/>
<error to="fail"/>
</action>
<action name="sqoop-product">
<sqoop xmlns="uri:oozie:sqoop-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<arg>import</arg>
<arg>--connect</arg>
<arg>jdbc:mysql://cdh1:3306/source?useSSL=false</arg>
<arg>--username</arg>
<arg>root</arg>
<arg>--password</arg>
<arg>mypassword</arg>
<arg>--table</arg>
<arg>product</arg>
<arg>--hive-import</arg>
<arg>--hive-table</arg>
<arg>rds.product</arg>
<arg>--hive-overwrite</arg>
<file>/tmp/hive-site.xml#hive-site.xml</file>
<archive>/tmp/mysql-connector-java-5.1.38-bin.jar#mysql-connector-java-5.1.38-bin.jar</archive>
</sqoop>
<ok to="joining"/>
<error to="fail"/>
</action>
<action name="sqoop-sales_order">
<sqoop xmlns="uri:oozie:sqoop-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<command>job --exec myjob_incremental_import --meta-connect jdbc:hsqldb:hsql://cdh2:16000/sqoop</command>
<file>/tmp/hive-site.xml#hive-site.xml</file>
<archive>/tmp/mysql-connector-java-5.1.38-bin.jar#mysql-connector-java-5.1.38-bin.jar</archive>
</sqoop>
<ok to="joining"/>
<error to="fail"/>
</action>
<join name="joining" to="hive-node"/>
<action name="hive-node">
<hive xmlns="uri:oozie:hive-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<job-xml>/tmp/hive-site.xml</job-xml>
<script>/tmp/regular_etl.sql</script>
</hive>
<ok to="end"/>
<error to="fail"/>
</action>
<kill name="fail">
<message>Sqoop failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<end name="end"/>
</workflow-app>
其DAG如下圖所示。該工作流包括9個節點,其中有5個控制節點,4個動作節點:工作流的起點(start)、終點(end)、失敗處理節點(fail,DAG圖中未顯示),兩個執行路徑控制節點(fork-node和joining,fork與join節點必須成對出現),三個並行處理的Sqoop行動節點(sqoop-customer、sqoop-product、sqoop-sales_order)用作資料抽取,一個Hive行動節點(hive-node)用作資料轉換與裝載。
(6)部署工作流
hdfs dfs -put -f workflow.xml /user/root/
hdfs dfs -put /etc/hive/conf.cloudera.hive/hive-site.xml /tmp/
hdfs dfs -put /root/mysql-connector-java-5.1.38/mysql-connector-java-5.1.38-bin.jar /tmp/
hdfs dfs -put /root/regular_etl.sql /tmp/
(7)建立作業屬性檔案建立內容如下的job.properties檔案:
nameNode=hdfs://cdh2:8020
jobTracker=cdh2:8032
queueName=default
oozie.use.system.libpath=true
oozie.wf.application.path=${nameNode}/user/${user.name}
(8)執行工作流oozie job -oozie http://cdh2:11000/oozie -config /root/job.properties -run
此時從Oozie Web Console可以看到正在執行的作業,如下圖所示。點選作業所在行,可以開啟作業的詳細資訊視窗,如下圖所示。
點選動作所在行,可以開啟動作的詳細資訊視窗,如下圖所示。
可以點選Console URL右側的圖示,可以開啟Map/Reduce作業的跟蹤視窗,如下圖所示。
當Oozie作業執行完,可以在“All Jobs”標籤頁看到,Status列已經從RUNNING變為SUCCEEDED,如下圖所示。
此時檢視cdc_time表的資料,可以看到日期已經改為當前日期,如下圖所示。
3. 建立協調作業定期自動執行工作流
(1)建立協調作業屬性檔案
建立內容如下的job-coord.properties檔案:
nameNode=hdfs://cdh2:8020
jobTracker=cdh2:8032
queueName=default
oozie.use.system.libpath=true
oozie.coord.application.path=${nameNode}/user/${user.name}
timezone=UTC
start=2016-07-11T06:00Z
end=2020-12-31T07:15Z
workflowAppUri=${nameNode}/user/${user.name}
(2)建立協調作業配置檔案建立內容如下的coordinator.xml檔案:
<coordinator-app name="regular_etl-coord" frequency="${coord:days(1)}" start="${start}" end="${end}" timezone="${timezone}" xmlns="uri:oozie:coordinator:0.1">
<action>
<workflow>
<app-path>${workflowAppUri}</app-path>
<configuration>
<property>
<name>jobTracker</name>
<value>${jobTracker}</value>
</property>
<property>
<name>nameNode</name>
<value>${nameNode}</value>
</property>
<property>
<name>queueName</name>
<value>${queueName}</value>
</property>
</configuration>
</workflow>
</action>
</coordinator-app>
(3)部署協調作業hdfs dfs -put -f coordinator.xml /user/root/
(4)執行協調作業oozie job -oozie http://cdh2:11000/oozie -config /root/job-coord.properties -run
此時從Oozie Web Console可以看到準備執行的協調作業,作業的狀態為PREP,如下圖所示。此協調作業自2016年7月11日開始,每天14點執行一次。結束日期非常晚,這裡設定的是2020年12月31日。需要注意一下時區的設定。Oozie預設的時區是UTC,而且即便在屬性檔案中設定了timezone=GMT+0800也不起作用,所以start屬性設定的是06:00,實際就是北京時間14:00。
當時間到達14:00時,協調作業開始執行,狀態由PREP變為RUNNING,如下圖所示。
點選作業所在行,可以開啟協調作業的詳細資訊視窗,如下圖所示。
點選協調作業所在行,可以開啟工作流作業的詳細資訊視窗,如下圖所示。
點選動作所在行,可以開啟動作的詳細資訊視窗,如下圖所示。
可以點選Console URL右側的圖示,可以開啟Map/Reduce作業的跟蹤視窗,如下圖所示。
至此介紹了使用Oozie定期自動執行ETL的一般方法。Oozie 4.1.0的官方文件連結地址如下:http://oozie.apache.org/docs/4.1.0/index.html