1. 程式人生 > >Kettle變數和自定義java程式碼的例項應用

Kettle變數和自定義java程式碼的例項應用

1  kettle.properties引數配置資料來源連線和FTP連線

由於測試環境和生產環境中資料庫連線FTP等配置會在部署過程中變更,所以預先定義成配置項,在配置檔案中修改,這樣測試和釋出將會變得簡單,下面以資料庫為例說明這類配置的使用。

(1)      首先要找到配置檔案,不同的作業系統路徑也不一樣,本人用win7進行開發,配置檔案的路徑為“C:\Users\chenpeng\.kettle\kettle.properties”,如下:

技術分享

(2)      配置檔案中的具體配置如下:

技術分享

還可以視覺化設定:

技術分享

(3)      具體使用示例

l  下方是資料庫連線配置:

技術分享

技術分享

技術分享

l  下方是FTP連線配置:

 技術分享

1.1.2  kettle.properties引數設定和路徑及與正則配合使用

(1)在kettle.properties中設定變數值

 技術分享

(2)kettle.properties設定如下(win7下的路徑為:C:\Users\chenpeng\.kettle)

技術分享

(4)      在輸出檔案時使用引數中指定的路徑:

技術分享

技術分享

技術分享

注:如果路徑出現錯誤,如把某個資料夾刪除則會報錯,只要是報錯,即使把資料夾建好了也仍然會報目錄錯誤(好像有記憶功能),這時必須重新啟動kettle才能正常執行。

l  在輸出檔案時使用引數中和正則表示式混合使用場景:

技術分享

1.1.3  kettle.properties引數在java程式碼中的應用

技術分享

1.1.4  作業中變數使用並用javascript設定變數值

上面的例子都是kettle.properties中宣告的變數的應用,這些都是全域性範圍內通用的,但很多時間,子作業需要有內部專用的變數引數,這時就不能使用kettle.properties中宣告的變量了,需要在流程中宣告變數,並把作用域設定為當前作業有效。以下應用場景業務如下:檔名要命名成當前日期格式的,所以在作業級別定義了一個變數,但無法給它賦值,如是採用了javascript指令碼方式給該變數賦值,然後在輸出檔名的位置應用該變數即可,後面檔案的刪除上傳都是公用部分都需要用到這個變數做為介面引數來做處理。

(1)      主流程如下:

技術分享

(2)      定義變數:

 技術分享

(3)      子過程中呼叫javascript指令碼修改值:

技術分享

技術分享

Date.prototype.Format = function (fmt) { //author: meizz   
   var o = {  
       "M+": this.getMonth() + 1, //月份   
       "d+": this.getDate(), //日   
       "h+": this.getHours(), //小時   
        "m+": this.getMinutes(), //分   
      "s+": this.getSeconds(), //秒   
       "q+": Math.floor((this.getMonth() + 3) / 3), //季度   
      "S": this.getMilliseconds() //毫秒   
    };  
    if (/(y+)/.test(fmt)) fmt = fmt.replace(RegExp.$1, (this.getFullYear() + "").substr(4 - RegExp.$1.length));  
   for (var k in o)  
    if (new RegExp("(" + k + ")").test(fmt)) fmt = fmt.replace(RegExp.$1, (RegExp.$1.length == 1) ? (o[k]) : (("00" + o[k]).substr

(("" + o[k]).length)));  
    return fmt;  
}  
var dateTime = new Date().Format("yyyyMMdd"); // gives back today at yyyy/MM/dd HH:mm:ss.000
setVariable("curdate",dateTime,"s");


另外,如果是用上一節點的欄位值,修改變數值則更為簡單,如下:

技術分享

技術分享

(4)      使用變數(按常規使用):

技術分享

1.1.5  Java程式碼訪問變數呼叫jar包並生成驗證檔案

這是一個較為綜合性的示例,首先定義了一條記錄(可能理解為很多個變數),然後通過java程式碼來呼叫jar包,計算出記錄數、檔案大小、MD5值等,賦值給相應記錄欄位,並輸出到檔案,形成資料檔案的校驗檔案。

(1)      主流程如下:

技術分享

技術分享

(2)      生成記錄

 技術分享

(3)      Java處理

技術分享

技術分享

//匯入在eclipse中編輯好的包,主要用於計算檔案行數、MD5值
import cgb.tools.KettleHelper;
import java.io.File;
import java.io.IOException; 

//kettle中已定義好的行處理方法,每行記錄都會執行一次
public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws KettleException
{
	//(1)獲取到上一個步驟的輸入行
	Object[] r = getRow();
	if (r == null) {
	  setOutputDone();
	  return false;
	}
	r = createOutputRow(r, data.outputRowMeta.size());

	//(2)讀取出引數變數值
	String kettleoutputdir = getVariable("kettleoutputdir", "");
	String hbprovince_code = getVariable("hbprovince_code", "");
	String item_code = getVariable("item_code", "");
	String curdate = getVariable("curdate", "");

	String filename = "";
	String onlyfilename = "";
	String recordcount = "";
	String bytecount = "";
	String md5code = "";

	//(3)呼叫jar包計算出MD5值、行數、位元組數
	try {
		filename = kettleoutputdir + "\\" + hbprovince_code + "_" + item_code + "_day_" + curdate + ".csv";
		onlyfilename = hbprovince_code + "_" + item_code + "_day_" + curdate + ".csv";
		md5code = KettleHelper.getFileMD5String(filename);
		recordcount = String.valueOf(KettleHelper.getFileRecordCount(filename, true));
		bytecount = String.valueOf(KettleHelper.getFileByteCount(filename));
	} catch (IOException e) {
		e.printStackTrace();
	}

	//(4)把計算好的值放入到輸出記錄中
	get(Fields.Out, "filename").setValue(r, onlyfilename);
	get(Fields.Out, "recordcount").setValue(r, recordcount);
	get(Fields.Out, "byteCount").setValue(r, bytecount);
	get(Fields.Out, "md5code").setValue(r, md5code);

	//(5)輸出到下一個節點做處理
    putRow(data.outputRowMeta, r);
	return true;
}

(4)      Jar包的開發和匯出

其中呼叫的jar包,可以選擇用eclipse來生成,如下:

技術分享

package XXX.tools;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.io.InputStream;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
  
public class KettleHelper {  
    /** 
     * 預設的密碼字串組合,用來將位元組轉換成 16 進製表示的字元,apache校驗下載的檔案的正確性用的就是預設的這個組合 
     */  
    private static char hexDigits[] = { '0', '1', '2', '3', '4', '5', '6',  
            '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f' };  
  
    private static MessageDigest messagedigest = null;  
    static {  
        try {  
            messagedigest = MessageDigest.getInstance("MD5");  
        } catch (NoSuchAlgorithmException nsaex) {  
            System.err.println("MD5Util.class.getName()" + "初始化失敗,MessageDigest不支援MD5Util。");  
            nsaex.printStackTrace();  
        }  
    }  
      
    /** 
     * 生成字串的md5校驗值 
     *  
     * @param s 
     * @return 
     */  
    private static String getMD5String(String s) {  
        return getMD5String(s.getBytes());  
    }  
      
    /** 
     * 判斷字串的md5校驗碼是否與一個已知的md5碼相匹配 
     *  
     * @param password 要校驗的字串 
     * @param md5PwdStr 已知的md5校驗碼 
     * @return 
     */  
    public static boolean checkPassword(String password, String md5PwdStr) {  
        String s = getMD5String(password);  
        return s.equals(md5PwdStr);  
    } 
    
    /** 
     * 生成檔案的md5校驗值 
     *  
     * @param file 
     * @return 
     * @throws IOException 
     */  
    public static String getFileMD5String(String fileName) throws IOException { 
    	File file = new File(fileName);
        InputStream fis;  
        fis = new FileInputStream(file);  
        byte[] buffer = new byte[1024];  
        int numRead = 0;  
        while ((numRead = fis.read(buffer)) > 0) {  
            messagedigest.update(buffer, 0, numRead);  
        }  
        fis.close();  
        return bufferToHex(messagedigest.digest());  
    }
    
    /** 
     * 獲取檔案的記錄條數 
     *  
     * @param file 
     * @return 
     * @throws IOException 
     */  
    public static int getFileRecordCount(String fileName,boolean hasHeadRow) throws IOException { 
    	
    	File inFile = new File(fileName); // 讀取的CSV檔案
        @SuppressWarnings("unused")
		String inString = "";
        int count = 0;
        try {
            BufferedReader reader = new BufferedReader(new FileReader(inFile));
            while((inString = reader.readLine())!= null){
            	count++; 
            }
            reader.close();
        } catch (FileNotFoundException ex) {
            System.out.println("沒找到檔案!");
        } catch (IOException ex) {
            System.out.println("讀寫檔案出錯!");
        }
    	if(hasHeadRow)
    	{
    		count--;
    	}
    	return count; 
    }
    
    /** 
     * 生成檔案的md5校驗值 
     *  
     * @param file 
     * @return 
     * @throws IOException 
     */  
    public static String getFileMD5String(File file) throws IOException {         
        InputStream fis;  
        fis = new FileInputStream(file);  
        byte[] buffer = new byte[1024];  
        int numRead = 0;  
        while ((numRead = fis.read(buffer)) > 0) {  
            messagedigest.update(buffer, 0, numRead);  
        }  
        fis.close();  
        return bufferToHex(messagedigest.digest());  
    }
  
    private static String getMD5String(byte[] bytes) {  
        messagedigest.update(bytes);  
        return bufferToHex(messagedigest.digest());  
    }  
  
    private static String bufferToHex(byte bytes[]) {  
        return bufferToHex(bytes, 0, bytes.length);  
    }  
  
    private static String bufferToHex(byte bytes[], int m, int n) {  
        StringBuffer stringbuffer = new StringBuffer(2 * n);  
        int k = m + n;  
        for (int l = m; l < k; l++) {  
            appendHexPair(bytes[l], stringbuffer);  
        }  
        return stringbuffer.toString();  
    }  
  
    private static void appendHexPair(byte bt, StringBuffer stringbuffer) {  
        char c0 = hexDigits[(bt & 0xf0) >> 4];// 取位元組中高 4 位的數字轉換, >>> 為邏輯右移,將符號位一起右移,此處未發現兩種符號有何不同   
        char c1 = hexDigits[bt & 0xf];// 取位元組中低 4 位的數字轉換   
        stringbuffer.append(c0);  
        stringbuffer.append(c1);  
    } 
    
    
    /** 
     * 獲取文個把的位元組數
     * @param fileName
     * @return
     * @throws IOException
     */
    public static long getFileByteCount(String fileName) throws IOException
    {
    	File file = new File(fileName);
    	return file.length();
    }

    
}

匯出jar包

技術分享

放入到kettle的jar包目錄,它會自載入,作業系統不同,目錄也會不同,本人使用的是win7,目錄如下:

 技術分享

(5)      檔案輸出

技術分享

(6)   最終生成檔案的效果如下:

技術分享

1.1.6     SQL中使用變數

下面的查詢語句用問號佔位符,當開始日期(第一個?號)和結束日期(第二個?號)繫結到SQL的問號佔位符,在查詢入職日期在一定期間的總統資訊:

SELECTname,took_office FROMpresidents WHEREtook_officeBETWEEN? AND?

技術分享

示例中,首先使用生成行步驟(“Generdate Rows”)生成一行帶有兩個欄位的記錄,分別按順序代替表輸入SQL語句中的佔位符。實際場景中,通常使用動態處理結果產生期望值代替生成行步驟。

技術分享

接下來是表輸入步驟,其中配置SQL查詢語句,包含問號佔位符,通過在“Insert Data Step”的下拉框中選擇前一步驟,來替換問號的值。

通過傳輸不同的值多次執行查詢

如果你想迴圈執行查詢,使用不同值替換佔位符;就需要佔位符生產步驟生成多行資料,並把表輸入的選項“Execute for each row”選中。本示例檔名稱為placeholders_in_loop.ktr

技術分享

佔位符的侷限性

雖然通過給佔位符繫結值查詢非常有效,但也有一些場景不能使用,下面一些SQL不能使用佔位符。這些示例都非常通用,但是不能使用佔位符。

不能用佔位符代替表名詞,否則查詢將不執行。

SELECT some_fieldFROM ?

不能使用佔位符代替查詢的欄位名稱,下面的查詢可以成功繫結引數,但只是作為一個常量,而不是欄位的名稱。

SELECT ? asmy_field FROM table

不能使用佔位符繫結逗號分隔的多個列表項值;如果你繫結1,2,3給下面的查詢語句,將得到意外的結果。

SELECT * FROM testWHERE id IN(?)

你期望得到的結果是:

SELECT * FROM testWHERE id IN("1,2,3")

但是執行的結果卻是這樣,傳輸一個字串,卻得到三個值,而實際情況完全不確定有幾個值傳輸進來。

SELECT * FROM testWHERE id IN(1,2,3)

為了解決這些場景的問題,需要使用kettle的變數動態構造查詢文字,下面詳細說明。

SQL查詢中使用kettle變數

表輸入步驟支援替換查詢中的變數或引數,假設有一系列結構完全相關的表,分別是: mammals, birds, insects(動物、鳥、昆蟲),可以使用kettle變數作為表的名稱。假設我們有一個變數,名稱為:ANIMALS_TABLE,賦值為birds,我們設定“Replace Vaiables”選項選中。如果我們寫下面的查詢:

SELECT name,population FROM${ANIMALS_TABLE}

在執行一定被成功的替換成:

SELECT name,population FROM birds

如果設定變數的值為“mammals”或“insects”,則將動態查詢不同的表。當佔位符不能勝任是,使用變數技術可以幫助我們解決。示例的名稱為variables.ktr,執行時不要忘了給parameter(命名引數)賦值進行測試。

技術分享

變數和佔位符一起使用

如果有必要,我們可以混合這兩種技術;本示例中使用變數作為表名詞,同時使用佔位符作為前面步驟的輸入值。示例檔案variables_and_placeholders.ktr

SELECT name, population FROM${ANIMALS_TABLE}WHERE population > ?

技術分享