1. 程式人生 > >開源ETL工具kettle系列之常見問題

開源ETL工具kettle系列之常見問題

摘要:本文主要討論如何在你自己的Java應用程式中整合Kettle

如果你需要在自己的Java應用程式中整合Kettle , 一般來說有兩種應用需求,一種是通過純設計器來設計ETL轉換任務,然後儲存成某種格式,比如xml或者在資料庫中都可以,然後自己呼叫程式解析這個格式,執行這種轉換,是比較抽象的一種執行方式,ETL裡面轉換了什麼東西我們並不關心,只關心它有沒有正常執行。另一種是通過完全程式設計的方式來實現,詳細的控制每一個步驟,需要知道轉換執行的成功與否,這種方式可能需要更多的理解kettle的API 以便更好的跟你的應用程式緊密結合,不過難度也比較大,可以很好的定製你的應用程式,代價自然是入門門檻比較高。本文主要向你解釋第一種Kettle的整合方式,文中所列出的程式碼節選自pentaho ,不過應用程式本身跟pentaho 沒有什麼關係。
    Pentaho 整合kettle的程式碼主要是兩個類,KettleSystemListener和 KettleComponent,看名字就猜出KettleSystemListener 主要是起監聽器的作用,它主要負責初始化kettle的一些環境變數,這個類主要包含四個方法: startup() , readProperties(),environmentInit(),shutdown(),程式入口自然是startup()方法,然後它會呼叫 environmentInit() 方法,這個方法就呼叫readProperties()方法讀一個配置檔案kettle.properties,這個檔案主要記錄者kettle執行時可以呼叫的一些環境變數,關於kettle.properties檔案怎麼用,第二篇文章“使用Kettle設計動態轉換”有提到,readProperties()方法讀完這個檔案之後就把裡面的鍵值對轉換成變數傳給kettle執行環境.當kettle執行完了之後就呼叫 shutdown()方法結束轉換. KettleSystemListener相對邏輯比較簡單,就不多介紹,下面主要介紹重點類:
KettleComponent
KettleComponent的方法主要有三種類型,一類是用來初始化工作,做一些驗證工作,第二類是執行轉換的方法,也是主要需要討論的方法,第三類是取得資料結果的,有時候你需要得到轉換的結果交給下一個步驟處理.下面分別討論這三類方法。

初始化
   KettleComponent的初始化工作主要是驗證這個轉換,包括有 validateSystemSettings(),init(),validateAction(),全部都是public 方法,validateSystemSettings()會檢查kettle 使用何種方式來連線資源庫。
kettle有兩種方式連線資源庫,一種是純資料庫式,也就是你所有的轉換全部都儲存在一個數據庫中,一般你在開始使用kettle的時候,它都會要求你建立一個資源倉庫,這個資源倉庫的連線方式就是你的資料庫連線,你需要能夠有相應的資料庫驅動和對應的連線使用者名稱和密碼。另外一種連線方式是使用文字檔案,也就是xml檔案,在做完任何轉換之後,我們都可以把轉換或者Job變成xml檔案輸出,這個輸出檔案包含你所有轉換的全部資訊。
在示例應用中使用的是檔案的連線方式,下面看一下初始化的一段程式碼:
Boolean useRepository = PentahoSystem.getSystemSetting("kettle/settings.xml",
                             "repository.type","files").equals("rdbms");
PentahoSystem.getSystemSetting()方法只是返回一個字串,使用的xpath讀一個xml的對應欄位,下面列出settings.xml檔案:
<kettle-repository>
  <!-- The values within <properties> are passed directly to the Kettle Pentaho components. -->       
 <!-- This is the location of the Kettle repositories.xml file, leave empty if the default is used: $HOME/.kettle/repositories.xml -->
 <repositories.xml.file></repositories.xml.file>
 <repository.type>files</repository.type>
 <!--  The name of the repository to use -->
 <repository.name></repository.name> 
 <repository.userid>admin</repository.userid> 
 <repository.password>admin</repository.password>  
</kettle-repository>
可以看到其中的repositories.xml.file 上面的一段註釋,如果這個值為空會預設使用$HOME/.kettle/repository.xml檔案當作資源庫的連線檔案,由於示例中使用的是文字檔案所以沒有用資料庫連線,下面的repository.userid和repository.password是指的kettle的資源庫連線的使用者名稱和密碼,一般預設安裝就兩個,admin/admin  和guest/guest , 這裡的使用者名稱和密碼不是連線資料庫的使用者名稱和密碼,連線資料庫的使用者名稱和密碼是在另外一個檔案repositories.xml.file指定的值所定義的
一般預設的kettle安裝並且運行了一段時間之後,會在$HOME/.kettle 目錄下建立一些檔案,如果你要在自己的系統中整合kettle的話,也需要保留這些檔案,當然不一定位置是在原來的位置,關鍵是要讓kettle知道這些檔案放在哪。

執行轉換
當讀完了這些配置檔案並且驗證了之後,KettleComponent就開始把前面讀到的轉換檔案或者資源庫型別變成Kettle的API,這主要是在executeAction()方法裡面進行,它當然根據連線方式也分兩種執行型別:
1. 文字執行方式
2. 資源庫連線方式

文字執行方式需要接受一個你指定的執行轉換的檔案或者Job的檔案,然後把這個xml檔案解析成Kettle能夠執行的模式,
根據執行的型別又可以分成兩種:
1. Trans任務
2. Job任務
兩個執行的邏輯差不多,下面先介紹Trans的執行方式:

執行Trans任務
transMeta = new TransMeta(fileAddress, repository, true);
      transMeta.setFilename(fileAddress);
然後它會呼叫:
executeTransformation(TransMeta transMeta, LogWriter logWriter)
這個方法是真正的把前面的來的transMeta轉換成trans物件,等待下一步的執行:
Trans trans = new Trans(logWriter, transMeta);
List stepList = trans.getSteps();
for (int stepNo = 0; stepNo < stepList.size(); stepNo++) {
      StepMetaDataCombi step = (StepMetaDataCombi) stepList.get(stepNo);
      if (step.stepname.equals(stepName)) {                  
   ①          Row row = transMeta.getStepFields(stepName);
              // create the metadata that the Pentaho result set needs
              String fieldNames[] = row.getFieldNames();
              String columns[][] = new String[1][fieldNames.length];
              for (int column = 0; column < fieldNames.length; column++) {
                    columns[0][column] = fieldNames[column];
              }
    ②        MemoryMetaData metaData = new MemoryMetaData(columns, null);
              results = new MemoryResultSet(metaData);
              // add ourself as a row listener
    ③       step.step.addRowListener(this);
             foundStep = true;
             break;
      }
}
1. Row物件是kettle用來表示一行資料的標準物件,跟jdbc取出來的一條資料轉化後成為的一個POJO是一樣的。裡面可以包含多個欄位。
2 . MemoryMetaData物件是pentaho特有的,是專門用來返回ETL任務執行後的結果的,與標準的JDBC裡面的resultSet 對應的resultSetMetaData  是一樣的。
3. 對於如何處理資料的一個Listener,實現的是一個RowListener,資料是每一行每一行處理的,後面會介紹如果需要輸出資料怎麼取得這些輸出資料。如果不需要放回任何物件,則從1處開始都可以不要,只要初始化step物件即可。

所有的step物件都已經初始化之後就可以開始執行了,
trans.startThreads();
trans.waitUntilFinished();
結束之後還有一些清理工作就不列出了。

執行Job任務
執行Job任務之前還是會讀取Job任務的描述檔案,然後把這個描述檔案(kettle的 .ktr檔案)變成一個xml文件的dom :
org.w3c.dom.Document doc = XmlW3CHelper.getDomFromString(jobXmlStr);
之後也是初始化對應的元資料物件JobMeta
jobMeta = new JobMeta(logWriter, doc.getFirstChild(), repository);
得到了jobMeta 之後就可以執行這個Job了,這裡跟trans是一樣的。
job = new Job(logWriter, StepLoader.getInstance(), repository, jobMeta);
由於Job一般都沒有什麼返回值,所以Job不需要初始化它下面的物件,直接開始執行就可以了
job.start();
job.waitUntilFinished(5000000);

連線資源庫
連線資源庫使用的是connectToRepository()方法,先取得RepositoriesMeta物件,然後根據你在setting.xml檔案裡面定義的repository的名字來連線對應的repository.理論上來說我們一般都只使用一個 repository ,但如果在產品中需要使用多個repository的話,你需要自己配置多個repository的名字和對應的使用者名稱和密碼。只列出幾行關鍵程式碼,
repositoriesMeta = new RepositoriesMeta(logWriter);
repositoriesMeta.readData(); // 從$HOME/.kettle/repositories.xml 讀資料.
repositoryMeta = repositoriesMeta.findRepository(repositoryName);
repository = new Repository(logWriter, repositoryMeta, userInfo);
userInfo = new UserInfo(repository, username, password);

從資源庫讀取Trans
連線到資源庫之後自然是想辦法讀取資料庫的表,把裡面的記錄轉換成為Trans 物件,使用的是loadTransformFromRepository,這個方法的函式原型需要解釋一下:
TransMetaloadTransformFromRepository(String directoryName, String transformationName, Repository repository,LogWriter logWriter)
第一個引數String directoryName 代表是你儲存轉換的目錄,當你使用kettle 圖形介面的時候,點選repository選單的explorer repository , 你會發現你所有的東西都是儲存在一個虛擬的類似與目錄結構的地方,其中包括database connections , transformations , job , users 等,所以你需要的是指定你連線的目錄位置,你也可以在目錄裡面再建立目錄。 
String transformationName 自然指的就是你轉換的名字.
Repository repository 指的是你連線的資源庫。
LogWriter logWriter 指定你的日誌輸出,這個log 指的是你kettle 轉換的日誌輸出,不是應用程式本身的輸出。
讀取TransMeta的步驟也相對比較簡單
repositoryDirectory=repository.getDirectoryTree().findDirectory(directoryName);
transMeta = new TransMeta(repository, transformationName, repositoryDirectory);

從資源庫讀取Job
從資源庫讀取Job跟Trans 的步驟基本是一樣的,同樣需要指定你儲存Job的目錄位置.
JobMeta loadJobFromRepository(String directoryName, String jobName, 
Repository repository, LogWriter logWriter)

讀取結果集
一般Job都是不會返回任何結果集的,大部分Trans也不會返回結果集,應為結果集一般都會直接從一個數據庫到另一個數據庫了,但是如果你需要返回轉換的結果集,那麼這一小結將會向你解釋如何從一個Trans裡面讀取這些結果集
首先,你需要一個容納Result的容器,就是類似與JDBC裡面的resultSet, resultSet當然會有一個resultSetMetadata跟它相關聯,在本文所舉的例項中,使用的是pentaho私有的memoryResultSet,
你可以不用關心它的細節,並且它的型別正如它的名字一樣是存在與Memory的,所以它不能被持久化,這個裡面儲存的是一個二維的Object陣列,裡面的資料就是從kettle轉化之後來的。
要從kettle的轉換中讀取結果集,要實現RowListener 介面,Row 是kettle裡面表示一行資料的一個類,RowListener 自然是指在轉換資料轉換的時候發生的事件,它有三個方法需要實現,
void rowReadEvent(Row)
void rowWrittenEvent(Row)
void errorRowWrittenEvent(Row)
分別對應讀資料時的事件,寫資料事的時間,出錯時的時間,我們需要取得結果集,所以只需要實現rowWrittenEvent(Row)就可以了,Row物件是通過TransMeta取得的,
Row row = transMeta.getStepFields(stepName);
下面給出具體實現取Row轉換成resultSet的程式碼:
Object pentahoRow[] = new Object[results.getColumnCount()];
    for (int columnNo = 0; columnNo < results.getColumnCount(); columnNo++) {
      Value value = row.getValue(columnNo);
      switch (value.getType()) {
        case Value.VALUE_TYPE_BIGNUMBER:
          pentahoRow[columnNo] = value.getBigNumber();
          break;
    ........

results.addRow(pentahoRow);
預設的資料型別是String 型別(在省略部分).
整個程式碼最重要的一行是Value value = row.getValue(columnNo);
這是真正取得實際資料的一行。有時候你會覺得實現一個resultSet比較麻煩,尤其是你還要實現相關的resultSetMetaData,怎麼把資料轉換成你自己的型別,你大可以就用一個List of List 來實現,裡面的List 就代表Row 的對應資料,外面一層List 就是result , 整個程式碼會簡單一些,當然,你要自己知道最後這個List怎麼用.

本文有意隱藏了一些跟pentaho有關的細節,比如 validateSystemSettings(),init(),validateAction()方法,這些都是pentaho私有的,有些方法比如 rowWrittenEvent(Row) 是用來取結果集的,但是很多時候我們不需要取轉換的結果集,文中很多程式碼都只列出主要的部分,省略一些判斷,除錯,log部分的程式碼,大家可以自己下載這些程式碼來研究,
本文並沒有給出一個可以獨立執行的示例,因為這個示例一定會太過於簡單(不超過15行程式碼),但是卻並不能考慮到各種情況,連線資源庫還是檔案,執行轉換還是Job ,metadata怎麼得來的,需不需要轉換之後的結果。
關於在本文一開始提到的使用kettle的兩種方式,對於第二種使用方式:使用完全程式設計的方式來執行轉換,其實它的與第一種方式的區別就好像一個用設計器來寫xml檔案,一個用純手工方式寫xml檔案(用程式碼的xml),大家可以參考官方網站上的一段示例程式碼,地址如下:
http://kettle.pentaho.org/downloads/api.php

開源ETL工具kettle系列之增量更新設計

ETL中增量更新是一個比較依賴與工具和設計方法的過程,Kettle中主要提供Insert / Update 步驟,Delete 步驟和Database Lookup 步驟來支援增量更新,增量更新的設計方法也是根據應用場景來選取的,雖然本文討論的是Kettle的實現方式,但也許對其他工具也有一些幫助。本文不可能涵蓋所有的情況,歡迎大家討論。

應用場景
增量更新按照資料種類的不同大概可以分成:
1. 只增加,不更新,
2. 只更新,不增加
3. 即增加也更新
4. 有刪除,有增加,有更新
其中1 ,2, 3種大概都是相同的思路,使用的步驟可能略有不同,通用的方法是在原資料庫增加一個時間戳,然後在轉換之後的對應表保留這個時間戳,然後每次抽取資料的時候,先讀取這個目標資料庫表的時間戳的最大值,把這個值當作引數傳給原資料庫的相應表,根據這個時間戳來做限定條件來抽取資料,抽取之後同樣要保留這個時間戳,並且原資料庫的時間戳一定是指定預設值為sysdate當前時間(以原資料庫的時間為標準),抽取之後的目標資料庫的時間戳要保留原來的時間戳,而不是抽取時候的時間。
   對於第一種情況,可以使用Kettle的Insert / Update 步驟,只是可以勾選Don’t perform any update選項,這個選項可以告訴Kettle你只會執行Insert 步驟。
對於第二種情況可能比較用在資料出現錯誤然後原資料庫有一些更新,相應的目標資料庫也要更新,這時可能不是更新所有的資料,而是有一些限定條件的資料,你可以使用Kettle的Update 步驟來只執行更新。關於如何動態的執行限定條件,可以參考前一篇文章。
第三種情況是最為常見的一種情況,使用的同樣是 Kettle的Insert / Update 步驟,只是不要勾選Don’t perform any update 選項。
第四種情況有些複雜,後面專門討論。

對於第1,2,3種情況,可以參考下面的例子。
這個例子假設原資料庫表為customers , 含有一個id , firstname , lastname , age 欄位,主鍵為id , 然後還加上一個預設值為sysdate的時間戳欄位。轉換之後的結果類似:id , firstname , lastname , age , updatedate . 整個設計流程大概如下:

 
                                                                        圖1
其中第一個步驟的sql 大概如下模式:
Select max(updatedate) from target_customer ;
你會注意到第二個步驟和第一個步驟的連線是黃色的線,這是因為第二個table input 步驟把前面一個步驟的輸出當作一個引數來用,所有Kettle用黃色的線來表示,第二個table input 的sql 模式大概如下:
Select field1 , field2 , field3 from customers where updatedate > ? 
後面的一個問號就是表示它需要接受一個引數,你在這個table input 下面需要指定replace variable in script 選項和execute for each row 為選中狀態,這樣,Kettle就會迴圈執行這個sql , 執行的次數為前面引數步驟傳入的資料集的大小。
 
                                                                            圖2

關於第三個步驟執行insert / update 步驟需要特別解釋一下,
 
                                                                                    圖3

Kettle執行這個步驟是需要兩個資料流對比,其中一個是目標資料庫,你在Target table 裡面指定的,它放在The keys to look up the values(s) 左邊的Table field 裡面的,另外一個數據流就是你在前一個步驟傳進來的,它放在The keys to look up the value(s) 的右邊,Kettle首先用你傳進來的key 在資料庫中查詢這些記錄,如果沒有找到,它就插入一條記錄,所有的值都跟你原來的值相同,如果根據這個key找到了這條記錄,kettle會比較這兩條記錄,根據你指定update field 來比較,如果資料完全一樣,kettle就什麼都不做,如果記錄不完全一樣,kettle就執行一個update 步驟。所以首先你要確保你指定的key欄位能夠唯一確定一條記錄,這個時候會有兩種情況:
1.維表
2.事實表
維表大都是通過一個主鍵欄位來判斷兩條記錄是否匹配,可能我們的原資料庫的主鍵記錄不一定對應目標資料庫中相應的表的主鍵,這個時候原資料庫的主鍵就變成了業務主鍵,你需要根據某種條件判斷這個業務主鍵是否相等,想象一下如果是多個數據源的話,業務主鍵可能會有重複,這個時候你需要比較的是根據你自定義生成的新的實際的主鍵,這種主鍵可能是根據某種類似與sequence 的生成方式生成的,
事實表在經過轉換之後,進目標資料庫之前往往都是通過多個外來鍵約束來確定唯一一條記錄的,這個時候比較兩條記錄是否相等都是通過所有的維表的外來鍵決定的,你在比較了記錄相等或不等之後,還要自己判斷是否需要新增一個新的主鍵給這個新記錄。
上面兩種情況都是針對特定的應用的,如果你的轉換過程比較簡單,只是一個原資料庫對應一個目標資料庫,業務主鍵跟代理主鍵完全相同的時候完全可以不用考慮這麼多。

有刪除,有增加,有更新
首先你需要判斷你是否在處理一個維表,如果是一個維表的話,那麼這可能是一個SCD情況,可以使用Kettle的Dimension Lookup 步驟來解決這個問題,如果你要處理的是事實表,方法就可能有所不同,它們之間的主要區別是主鍵的判斷方式不一樣。
事實表一般都資料量很大,需要先確定是否有變動的資料處在某一個明確的限定條件之下,比如時間上處在某個特定區間,或者某些欄位有某種限定條件,儘量最大程度的先限定要處理的結果集,然後需要注意的是要先根據id 來判斷記錄的狀態,是不存在要插入新紀錄,還是已存在要更新,還是記錄不存在要刪除,分別對於id 的狀態來進行不同的操作。
處理刪除的情況使用 Delete步驟,它的原理跟Insert / Update 步驟一樣,只不過在找到了匹配的id之後執行的是刪除操作而不是更新操作,然後處理Insert / Update 操作,你可能需要重新建立一個轉換過程,然後在一個Job 裡面定義這兩個轉換之間的執行順序。
如果你的資料變動量比較大的話,比如超過了一定的百分比,如果執行效率比較低下,可以適當考慮重新建表。
另外需要考慮的是維表的資料刪除了,對應的事實表或其他依賴於此維表的表的資料如何處理,外來鍵約束可能不太容易去掉,或者說一旦去掉了就可能再加上去了,這可能需要先處理好事實表的依賴資料,主要是看你如何應用,如果只是簡單的刪除事實表資料的話還比較簡單,但是如果需要保留事實表相應記錄,可以在維表中增加一條記錄,這條記錄只有一個主鍵,其他欄位為空,當我們刪除了維表資料後,事實表的資料就更新指向這條空的維表記錄。

定時執行增量更新
可能有時候我們就是定時執行更新操作,比如每天或者一個星期一次,這個時候可以不需要在目標表中增加一個時間戳欄位來判斷ETL進行的最大時間,直接在取得原資料庫的時間加上限定條件比如:
Startdate > ? and enddate < ? 
或者只有一個startdate
Startdate > ?   (昨天的時間或者上個星期的時間)
這個時候需要傳一個引數,用get System Info 步驟來取得,而且你還可以控制時間的精度,比如到天而不是到秒的時間。
當然,你也需要考慮一下如果更新失敗了怎麼處理,比如某一天因為某種原因沒有更新,這樣可能這一天的記錄需要手工處理回來,如果失敗的情況經常可能發生,那還是使用在目標資料庫中增加一個時間欄位取最大時間戳的方式比較通用,雖然它多了一個很少用的欄位。

執行效率和複雜度
刪除和更新都是一項比較耗費時間的操作,它們都需要不斷的在資料庫中查詢記錄,執行刪除操作或更新操作,而且都是一條一條的執行,執行效率低下也是可以預見的,儘量可能的縮小原資料集大小。減少傳輸的資料集大小,降低ETL的複雜程度

時間戳方法的一些優點和缺點
優點:  實現方式簡單,很容易就跨資料庫實現了,執行起來也容易設計
缺點: 浪費大量的儲存空間,時間戳欄位除ETL過程之外都不被使用,如果是定時執行的,某一次執行失敗了,就有可能造成資料有部分丟失.

其他的增量更新辦法:
增量更新的核心問題在與如何找出自上次更新以後的資料,其實大多數資料庫都能夠有辦法捕捉這種資料的變化,比較常見的方式是資料庫的增量備份和資料複製,利用資料庫的管理方式來處理增量更新就是需要有比較好的資料庫管理能力,大多數成熟的資料庫都提供了增量備份和資料複製的方法,雖然實現上各不一樣,不過由於ETL的增量更新對資料庫的要求是隻要資料,其他的資料庫物件不關心,也不需要完全的備份和完全的stand by 資料庫,所以實現方式還是比較簡單的.,只要你建立一個與原表結構類似的表結構,然後建立一個三種類型的觸發器,分別對應insert , update , delete 操作,然後維護這個新表,在你進行ETL的過程的時候,將增量備份或者資料複製停止,然後開始讀這個新表,在讀完之後將這個表裡面的資料刪除掉就可以了,不過這種方式不太容易定時執行,需要一定的資料庫特定的知識。如果你對資料的實時性要求比較高可以實現一個數據庫的資料複製方案,如果對實時性的要求比較低,用增量備份會比較簡單一點。

幾點需要注意的地方:
1.觸發器
無論是增量備份還是資料複製,如果原表中有觸發器,在備份的資料庫上都不要保留觸發器,因為我們需要的不是一個備份庫,只是需要裡面的資料,最好所有不需要的資料庫物件和一些比較小的表都不用處理。
2.邏輯一致和物理一致
資料庫在資料庫備份和同步上有所謂邏輯一致和物理一致的區別,簡單來說就是同一個查詢在備份資料庫上和主資料庫上得到的總的資料是一樣的,但是裡面每一條的資料排列方式可能不一樣,只要沒有明顯的排序查詢都可能有這種情況(包括group by , distinct , union等),而這可能會影響到生成主鍵的方式,需要注意在設計主鍵生成方式的時候最好考慮這一點,比如顯式的增加order 排序. 避免在資料出錯的時候,如果需要重新讀一遍資料的時候主鍵有問題.

總結
    增量更新是ETL中一個常見任務,對於不同的應用環境可能採用不同的策略,本文不可能覆蓋所有的應用場景,像是多個數據源匯到一個目標資料庫,id生成策略,業務主鍵和代理主鍵不統一等等,只是希望能給出一些思路處理比較常見的情況,希望能對大家有所幫助。

 開源ETL工具kettle系列之動態轉換

摘要:本文主要討論使用Kettle來設計一些較為複雜和動態的轉換可能使用到的一些技巧,這些技巧可能會讓你在使用Kettle的時候更加容易的設計更強大的ETL任務。

動態引數的傳遞
Kettle 在處理執行時輸入引數可以使用JavaScript 來實現,大部分工作只是按照一個模板來處理的
動態引數傳遞主要使用在像資料清理,調式,測試,完成複雜的條件過濾等等,這種方式一般不會在產品已經執行穩定了一段時間之後使用,因為我們一般仍然是做定時任務來自動轉換資料,所以在開始介紹如何使用動態引數之前,希望大家能明白不要在產品資料庫上做實驗,即使你已經知道你的轉換有什麼影響並且做了備份,因為這種方法是不可能自動執行的。
Kettle有兩種動態引數傳遞的方法,一種是非常輕量級的傳argument , 另一種是對付較複雜一點情況使用JavaScript . 下面分別介紹這兩種方法。
1. argument
當你在執行一個轉換的時候,不管這個轉換是一個Job的一部分還是隻有這個轉換,你都可以傳遞引數給它,當你執行一個轉換的時候,會彈出一個 Execution a Transformation 的對話方塊,讓你選擇執行轉換的方式,本地執行,遠端執行,分散式執行,下面就是日誌記錄的級別和回放時間,然後是argument 和 variables 的設定。Argument 和 variables 的區別在官方FAQ裡面也有解釋。你也可以參考一下官方的解釋和下面解釋的異同。
Q : Argument 和 variables 的區別 /
A : variables 也可以認為叫做environment variables , 就像它的名字一樣,主要是用來設定環境變數的,比如最常見的:檔案的存放地址,smtp的配置等等,你也可以把它認為是程式語言裡面的全域性變數,即使是不同的轉換它們也擁有同樣的值,而argument 自然就類似與區域性變數,只針對一個特定的轉換,比如像是限定結果集的大小和過濾條件。

取得argument的值
我們在轉換之前設定了argument的值,需要用到的時候就使用get system info 步驟,這個步驟取得在執行時引數,需要注意的是我們是先設定get system info ,然後在裡面決定要使用多少個引數,最多10個,每個引數名叫什麼,然後我們才能在執行時看到你設定了的引數名後面跟一個要你輸入的值,並且引數型別是不能夠指定,全部都當作字串處理,如果你需要對引數型別有要求,你需要自己轉換,使用一個Mapping步驟或者Select values步驟。
取得variable的值
Variable 的值個數不受限制,你可以在kettle選單的set environment裡面設定,也可以使用檔案儲存這些值,在第一次執行kettle之後,kettle會在%HOME_USER_FOLDER%選單裡面建立一個 .kettle資料夾,如果是windows 使用者可能就是C:/Documents and Settings/${your user name}/.kettle這個資料夾,如果是linux使用者可能就是/home/${your user name }/.kettle資料夾,這個資料夾下面有kettle.properties檔案,如果你開啟這個檔案,你會發現裡面有一些以#開頭的註釋,其中設定了一些像是:PRODUCTION_SERVER = Hercules 這樣的鍵值對,你可以自己定義一些環境變數比如像是smtp的地址,ftp伺服器的地址,你放log檔案的目錄名等等,當然不能直接編輯這個檔案就設定環境變數,要先設定KETTLE_HOME環境變數,windows就是點我的電腦,然後在設定path的那個地方新增一個KETTLE_HOME變數,linux就是export KETTLE_HOME=’一個目錄’,這個目錄可以任意地方,不過一般還是指向kettle的安裝目錄或是你自己的文件目錄,然後啟動kettle它會建立一個新的.kettle目錄,編輯裡面的kettle.properties檔案就可以設定環境變量了.


2. 使用指令碼
Kettle使用的是JavaScript來作為它的指令碼實現,使用的是mozilla 的rhino 1.5r5版本實現,如果你打算實現一些複雜的計算過程,比如字串分割,資料型別轉換,條件計算等等,你都應該使用指令碼語言來搞定。
我們在某種應用環境下使用指令碼語言來實現一些動態的功能大部分原因都是為了避免程式設計,一個複雜一點的應用程式,比如像是Kettle這種工具,或是報表工具,它們不可能提供全部功能,把什麼都做成圖形化,應用條件永遠都是複雜的,如果你不想研究程式碼和程式的結構,甚至你都不知道怎樣程式設計,指令碼語言絕對是一種簡單的解決方案,而JavaScript語言又是其中入門門檻非常低的一種,你完全可以多看一些例子,嘗試模仿一些指令碼來解決問題,也許會有一點難以除錯和測試,但總比自己程式設計要好的多。
下面的這個例子將會使用JavaScript彈出一個對話方塊來接受兩個引數,都是時間型別,其中的UI元件是使用的swt 的一些類,Kettle使用的是swt 作為其UI元件,如果你對swt 有了解的話會更容易理解這些UI元件,當然這並不需要你有swt 程式設計的經驗或者其他GUI設計的經驗。
開啟Kettle 下載目錄下的samples / transformation / JavaScript dialog.ktr 檔案(使用Kettle File 選單裡面的import from an xml file 。你會看到一個包含3個步驟的轉換。
第一個步驟使用generate rows 產生一條測試資料,測試資料包含一個DateFromProposal 時間欄位和一個DateToProposal時間欄位。
第二個步驟使用JavaScript 來實現動態的引數轉變,它會連續彈出兩次對話方塊,要求輸入一個起始值和結束值,然後它會呼叫一些JavaScript 函式來對日期格式做一些處理,
第三個步驟使用Dummy 來接受輸入,你完全可以使用File output 步驟來檢視輸出。

我們先看一下第二部中的JavaScript程式碼:(刪掉了開頭的註釋)
var display;
var displayHasToBeDisposed=false;
var shell=null;

try {
    display=Packages.org.eclipse.swt.widgets.Display.getCurrent();
    shell=display.getActiveShell();
} catch(e) {
    // if it runs in batch mode (Pan or preview mode) no Display is available, so we have to create one
    display=new Packages.org.eclipse.swt.widgets.Display();
    displayHasToBeDisposed=true;
    shell=new Packages.org.eclipse.swt.widgets.Shell(display);
}

// if we run in Pan we need to load the properties:
if(!Packages.org.pentaho.di.ui.core.PropsUI.isInitialized()) {
    Packages.org.pentaho.di.ui.core.PropsUI.init(display,2); //2=TYPE_PROPERTIES_PAN
}

var dateDefaultFrom=DateFromProposal.getString().substr(0,10); //only the date and not the time
var dialogDateFrom=new Packages.org.pentaho.di.ui.core.dialog.EnterTextDialog(shell, "Date from", "Please enter the beginning date", dateDefaultFrom);
var dateFromAsString=dialogDateFrom.open();

if(dateFromAsString!=null && dateFromAsString.length()>0) {
    var dateDefaultTo=DateToProposal.getString().substr(0,10); //only the date and not the time;
    var dialogDateTo=new Packages.org.pentaho.di.ui.core.dialog.EnterTextDialog(shell, "Date to", "Please enter the ending date", dateDefaultTo);
    var dateToAsString=dialogDateTo.open();
    if(dateToAsString!=null && dateToAsString.length()>0) {
        // here you could check or change formats a.s.o
    } else {
        // stop transformation when user cancels
        throw new Packages.java.lang.RuntimeException("Input canceled by the user.");
    }
} else {
    // stop transformation when user cancels
    throw new Packages.java.lang.RuntimeException("Input canceled by the user.");
}

if(displayHasToBeDisposed) {
  display.dispose();
}
Display 和 shell 都是swt 裡面的物件,你只用知道他們是表示UI的就可以了.
DateFromProposal和DateToProposal都是前面傳過來的欄位,dateFromAsString和dateToAsString都是需要輸出的內容,整個指令碼只是簡單的取了兩個日期變數的時間部分,使用了字串操作的substr()函式。
其中有三點需要注意:
1. dialog物件的初始化方式:使用的建構函式型別為
EnterTextDialog(Shell parent, String title, String message, String text) , 另一種建構函式型別是加一個引數fixed : 
EnterTextDialog(Shell parent, String title, String message, String text, boolean fixed) 
fixed代表字型是否用固定寬度,text引數代表的是輸入在對話方塊裡面的值,一般可以預設為空或輸入一段使用者提示資訊,例子中是設定成原先轉換之前的值,相當於預設值。
2. 使用open()函式取得輸入值
我們呼叫dialog 的open()函式取得輸入的值。
3. 異常的處理方式
基本上是一個標準的java 語法的try catch throw .
最後執行一下並檢視輸出,執行的時候什麼都不輸入接受預設值就可以了,最後檢視輸出,以下是文字方式的輸出,以分號分割
DateFromProposal;DateToProposal;dateFromAsString;dateToAsString
2006/01/01 00:00:00.000;2006/12/31 00:00:00.000;2006/01/01;2006/12/31

最後需要注意的是這種方式的實現可能將來會直接用一個新的step來實現,不用這樣寫指令碼。

除錯
除錯可不是程式的專利,ETL過程同樣需要除錯過程,Kettle同樣支援比較簡單的除錯過程,你可能已經發現了在選單下面的工具欄下面有一個debug 和preview 按鈕來支援除錯過程,這種除錯的技巧同樣可以用來幫助你完成一些複雜的ETL工程,下面以一個例子來解釋除錯過程.
首先,開啟samples / add sequence specify a common counter.ktr 檔案,你會發現一個定義了兩個sequence 的轉換,點選debug按鈕,它會彈出一個Transformation debug dialog 視窗。
 
                                                                          圖1

這個視窗左邊列出了在這個轉換中所有的步驟,我們選取Generate ID 步驟,然後設定斷點的條件:
Kettle支援兩種斷點的方法,一種基於限定結果集的數量大小,另一種是基於條件的判斷過程。
我們選擇基於結果集大小的方式,只檢視前面5條資料。
你會看到Kettle列出了Generate ID 步驟產生的前面5條資料,
 
                                      圖2

從上圖中可以看到這個generate id 步驟產生了5個值並不是連續的,下面的按鈕Close ,Stop 可以控制當前執行緒是繼續還是停止.

利用除錯的方法可以幫住我們設計一些需要基於條件判斷的複雜ETL過程,我們使用除錯的方法來檢視資料中是否可能存在某些特定資料,以此來設計一些ETL過程針對這些資料進行處理。

開源ETL工具kettle系列之建立緩慢增長維

摘要:本文主要介紹使用kettle 來建立一個Type 2的Slowly Changing Dimension 以及其中一些細節問題 

1. Kettle 簡介 
Kettle 是一個強大的,元資料驅動的ETL工具被設計用來填補商業和IT之前的差距,將你公司的資料變成可增長的利潤. 

我們先來看看Kettle能做什麼: 
1. Data warehouse population with built-in support for slowly changing dimensions, junk dimensions and much, much more. 
2. Export of database(s) to text-file(s) or other databases 
3. Import of data into databases, ranging from text-files to excel sheets 
4. Data migration between database applications 
5. Exploration of data in existing databases. (tables, views, synonyms, ) 
6. Information enrichment by looking up data in various information stores (databases, text-files, excel sheets, ) 
7. Data cleaning by applying complex conditions in data transformations 
8. Application integration 

本系列文章主要介紹如下幾點: 
1. 資料倉庫內建支援緩慢增長維SCD , 
2. 在資料轉換中使用複雜條件判斷來清理資料 
3. 如何使用kettle 來處理增量更形 
4. 將Kettle 整合到你的應用程式裡 
5. 使用kettle中應該注意的一些地方 
2. Kettle 文件 
最好的kettle教程就在你身邊,我們下載的kettle-version. zip 檔案裡其實已經包括了非常多的示例和文件,在你的kettle資料夾下,docs 資料夾下包含了所有的文件,samples資料夾下包含了一些示例,後面的介紹中一部分示例都來自kettle自帶的這個示例資料夾下。docs裡面最主要的是Spoon-version-User-Guide. zip ,裡面記錄了kettle 的技術性文件,包括支援的作業系統,資料庫平臺,文字格式,圖形化的介面,其中最重要的是所有的轉換物件(Transformation Core Objects) 和Job物件(Job Core Objects) 的解釋,包括截圖和每一個引數的解釋。 

3. Kettle與Slowly Changing Dimension 
  我們使用kettle自帶的samples檔案下的示例,來看kettle如何支援SCD的。 
開啟samples / jobs / Slowly Changing Dimension 資料夾,發現裡面有三個檔案, 
create - populate - update slowly changing dimension.kjb 
DimensionLookup - update dimension table 2.ktr 
DimensionLookup - update dimension table.ktr 
其中字尾以 .kjb 結尾的是kettle 的job 檔案匯出的格式,而以ktr 結尾的是kettle 的transformation 匯出的格式,開啟其中的DimensionLookup - update dimension table.ktr , 出現如下所示 :
  
    圖1 
1. 最左邊的是產生測試資料,如果是實際環境的話應該是連線真實的資料庫,產生的真實資料格式開啟如下: 

    圖2 
2 第二個步驟Dummy 就是把前面的資料合併起來,Dummy 步驟本身不做任何事情,不過由於前面有四個輸入指向它,所以它在第二步的作用等同於資料合併。 
3 第三個步驟是取得系統引數(get system date) , 它取得當前系統時間的日期,並且格式是當天的 00:00:00 , 如圖所示 


    4. 最後一步是真正的重點,執行Dimension Lookup / Update 步驟來更新和插入資料,以此來實現Type 1 ,2 ,3 的不同Slowly Changing Dimension 
    
       圖4

                                                                  圖5
   在開始介紹Dimension Lookup / Update 之前,先看看在執行這個步驟之前的輸入和輸入: 
   輸入: 
欄位名 資料型別 說明
id int 前面步驟的輸入
name Varchar(50) 前面步驟的輸入
firstname Varchar(50) 前面步驟的輸入
updated time 從第三步來的時間引數
輸出:
欄位名 資料型別 說明
id INT 來自輸入
name      varchar(50) 來自輸入
firstname varchar(50) 來自輸入
customer_tk BIGINT 代理主鍵
version INT 版本變更號
Date_from datetime 有效期起始日期
Date_to Datetime 有效期失效日期
   注意: 上圖中所使用的是mysql 5 資料庫做測試,所以資料型別一欄都是mysql 的資料型別,如果你使用其他資料庫,可能資料型別會有所不同,其中的datetime 的格式 yyyy/mon/day hh:mm:ss:sss

  我們再來看看當我們第一次執行以後出現的資料輸出:
 
圖6 注意圖6中所有的 version 值都是 1  
Date_from 都是 1900/01/01 00:00:00.000
Date_to   都是 2199/12/31 23:59:59.000      這兩列都是根據圖4下面部分定義的
Id , name , firstname 都是測試資料,從前面步驟來的. 然後我們修改圖1中generate row 的部分資料(一共兩條),並且只有測試資料變了的情況下,我們再次執行轉換,檢視資料輸出:
 
                                                                          圖7
注意到其中customer_tk 並沒有什麼變化,仍然在產生類似序列的輸出
Version 的值中出現了 2 , 並且只有在我們改變的資料中
在出現了改變的行中的date_from 變成了2007/11/28/ 00:00:00.000
在出現了改變的行中原來資料的date_to 變成了 2007-11-28 00:00:00.000
Id 列沒有變化,(變化了也沒用,圖5中的中間部分 Field 選項卡沒有選id)
Name , firstname 有兩個值變了(我們手工改變的) Dimension Lookup / Update 引數解釋 
Step name 步驟的名稱,在一個轉換中必須是唯一的
Update the dimension? 當找到符合條件記錄的時候更新這條記錄,如果這個複選框沒有選擇,找到了符合條件記錄的時候就是插入新紀錄而不是更新
Connection 資料庫連線的名字
Target schema
Target table 要更新的維表的名稱
Commit size 批處理更新的記錄數
Cache size in rows 這是把維表的資料放在快取中用來提高資料查詢速度從而減少資料庫查詢的次數 注意只有最近一次的記錄會被放在快取中,如果記錄數超過快取大小,最有最有關的最近的最高版本號記錄會被放在快取中 如果把cache size 設定成0 ,kettle會一直把記錄放在快取中直到JVM沒有記憶體了,如果你這樣設定要確保維的記錄數不要太大 設定成 1 表示不使用快取
Keys tab 設定在流中的主鍵和目標維表的業務主鍵,當兩個鍵相等時認為這條記錄匹配
Fields tab 設定要更新的欄位,當主鍵記錄匹配的時候,只有設定更新的欄位不一樣才認為是這條記錄是不一樣的,需要更新或者插入(注意圖5的中間部分,Fields tab 右邊設定的是Insert ,所以實現的是Type2 的SCD)
Technical key field 維的主鍵,也可以叫做代理主鍵(Surrogate Key)
Creation of technical key 指定技術主鍵的生成方式,對於你資料庫連線不適合的方式會自動被去掉,一共有三種: 1 .Use table maximum + 1 : 使用當前表最大記錄數加一的方式產生新主鍵,注意新的最大值會被快取,所以不用每次需要產生新記錄的時候就計算 2 . Use sequence : 使用一個數據庫支援的序列來產生技術主鍵(比如Oracle ,你也可以看到圖4中這一條是灰色的因為使用的是mysql 資料庫) 3. Use auto increment field : 使用一個數據庫支援的自動增長來產生技術主鍵(比如DB2)
Version field 使用這個欄位來儲存版本號
Stream Datafield 你可以指定維記錄最後一次被更改的時間,它能指定你要更新的維的精度,如果不指定,就會預設是系統時間
Date range start field 維記錄其實有效時間
Table daterange end 維記錄失效時間
Get Fields button 指定所有你想要更新的欄位,除了你指定的主鍵
SQL button 產生sql 來建立維表

官方文件中提到的注意事項: 

1. Stream date field : 如果你不想每次都改變時間的範圍,你需要新增一個額外的這個欄位,比如你打算每天的午夜來進行ETL過程,可以考慮加一個Join 步驟”Yesterday 23:59:59” 作為輸入的時間欄位. 
2. 這必須是一個Date 欄位(不能是轉換後的字串,即使他們有相同的格式也不行),我們(Kettle 的開發小組)把功能實現隔離出來,如果你需要的話自己要先轉換. 
3. 對於Date range start and end fields : 你只能指定一個表示年的資料,而不是時間戳,如果你輸入YYYY(比如2100) ,這將會被當成一個時間戳來用: YYYY-01-01 00:00:00.000 ,(注意圖6中的格式) 

另外需要注意的地方: 
1. Technical key field : 其他一些ETL工具(比如OWB)也許叫做代理主鍵,只是名字上不同而已. 
2. SQL Button : 當你在目標資料庫中還沒有建立維表的時候,你點選SQL Button ,Kettle 會彈出如下對話方塊幫你建立維表,你會發現它預設幫你在代理主鍵和業務主鍵上建立索引。 

    圖8 

3 Creation of technical key : 在這個選項的第二種實現方式上,Use sequence ,這個要視你資料庫支援而定,mysql 就不支援,Oracle 支援sequence , 但是你要自己建立和管理這個sequence , 如果這個sequence 的值因某種外部因素改變了,你要自己確定sequence 產生的值處於何種狀態,如果可以的話儘量不要用,儘量用第一種:table maximum + 1 ,這種方式永遠不要擔心資料庫的不同和實現方式的不同,而且簡單易懂。 

4 Stream Datefield 
    4.1 這個選項是用來控制時間的精度的,有的時候我們可能只是一個月進行一次ETL,這個時候Datefield 顯然沒有必要到秒的精度,而且這個選項嚴重影響你後面如果使用緩慢增長維的sql 的複雜度,因為你需要先把時間的精度調到你需要的精度,比如你使用的資料是到秒的精度,但是你實際需要的只是天的精度,你在sql 裡面有大量的時間都浪費在toString( stream date field) ,然後把這個字串substring() ,執行效率會低一些. 

    4.2 不要輕易改這個精度,一旦你確定了精度問題,不要嘗試改變它,尤其是當精度變細的時候,你可能會損失掉已經存在與資料庫中的資料的精度,如果你只是從 “Today 00:00:00.000” 改成 “Today 23.59.59.000” 的情況,需要手動處理好已經存在的資料格式問題. 

    4.3 執行ETL的時間可能決定這個值,如果你一天可能存在5次執行ETL過程(包括自動執行或者手工執行)那麼你顯然不希望時間的精度是按天來計算的(比如Today 00.00.00這種格式) 

    4.4 精度的損失並不可怕:考慮一下你的應用場景,比如我們要做表,列出2006年11月份和2006年12月份的所有銷售總和,結合上圖中的customer 的例子,假設是按客戶聚合的, 我們對於customer 的精度要求只要求到月,沒有要求到天,如果我們執行ETL的過程是一個星期執行一次,可能一個客戶在一個星期內改變了三次他的名字(雖然不是個好例子,完全是為了配合上面的圖),而只有最後一次的改變被記錄了下來,這完全跟你執行ETL的頻度有關,但是考慮到使用者需求,只要精度到月就夠了,即使這種精度有資料損失也完全沒關係,所以你如何指定你的Stream date field 的精度主要是看使用者需求的精度。 

4.5 如果以上四點你覺得只是一堆讓你頭疼的字串,那你完全可以把stream date field 設定成空(預設的到時間戳的精度) 
執行Type 2 SCD 
1. “Update the dimension?” 選中 
2. 在Field tabs 裡面,對於每一個你想要保持全部記錄的欄位都要選擇Insert 方式. 

錯誤處理和依賴問題 
    如果你運行了這個轉換,你會發現你的輸出中有一條customer_tk為1,version為1的資料,你在圖6和圖7中沒有看到這條資料是因為我不想一開始把這條資料跟SCD的實現混在一起,SCD的實現本身並不會告訴你要新增這條資料,這完全是跟資料建模有關係,為了理解這個問題,我們看一下如下情況該如何處理: 

    一個產品銷售的記錄是作為一個立方體的主要事實表,它包括一個客戶維,現在因為某種原因客戶維需要刪除掉一部分資料,但是對映的產品銷售記錄卻要儲存起來,該如何處理外來鍵約束的問題? 

    SCD實現本身並不會考慮這個問題,因為它跟維表沒有什麼關係,你要處理的是事實表裡面那些引用了維表的記錄,如果你沒有這個空行(它唯一的一個值就是 id ,而且是為了滿足主鍵約束,version那個欄位有沒有值不重要),事實表中的記錄就不好處理這種情況,因為你把它賦予任何一個值都是不合適的。這種方法是為了處理像資料依賴(外來鍵的關係)和錯誤處理比較常見的方法。