1. 程式人生 > >手把手教你使用Kettle JAVA API進行資料抽取

手把手教你使用Kettle JAVA API進行資料抽取

Kettle作為一款優秀的資料抽取程式,因為高效穩定的效能,一直被廣大使用者所喜愛,並且還在國內廣受好評。因為其本身使用純JAVA編寫,所以其JAVA API使用起來自然也是非常簡便。雖然其本身自帶的元件已經非常好用,並且能夠滿足豐富的場景。但可能有些場景下,我們可能需要通過其他的方式來實現,本篇我們將介紹Kettle的JAVA API的使用。

一、環境搭建

核心jar包的pom.xml配置如下:

<dependency>
	<groupId>pentaho-kettle</groupId>
	<artifactId>kettle-engine</artifactId>
	<version>4.4.0-stable</version>
</dependency>
<dependency>
	<groupId>pentaho-kettle</groupId>
	<artifactId>kettle-core</artifactId>
	<version>4.4.0-stable</version>
</dependency>
<dependency>
	<groupId>pentaho-kettle</groupId>
	<artifactId>kettle-db</artifactId>
	<version>4.4.0-stable</version>
</dependency>

二、程式碼部分

1、初始化環境

public void initKettleEnvironment(HttpServletRequest request) throws KettleException {
	if (KettleEnvironment.isInitialized()) {
		return;
	}
	/**
	 * 為避免在部分網路環境中無法完成初始化,需要自行處理
	 */
	if (request == null) {
		// 執行環境初始化
		KettleEnvironment.init();
	} else {
		String userDir = System.getProperty("user.dir");
		String kettleHome = request.getSession().getServletContext().getRealPath(File.separator   "WEB-INF");
		// 設定使用者路徑和系統環境,包括使用者路徑和主目錄
		System.setProperty("user.dir", kettleHome);
		System.setProperty("KETTLE_HOME", kettleHome);
		// 執行環境初始化
		KettleEnvironment.init();
		// 避免造成影響其他程式的執行,還原使用者路徑
		System.setProperty("user.dir", userDir);
	}
}

2、建立轉化元

新增配置陣列,配置轉化元

public TransMeta buildTransMeta(String metaName, String[] transXML) throws KettleXMLException {
	TransMeta transMeta = new TransMeta();
	// 設定轉化元的名稱
	transMeta.setName(metaName);
	// 新增轉換的資料庫連線
	for (int i = 0; i < transXML.length; i    ) {
		DatabaseMeta databaseMeta = new DatabaseMeta(transXML[i]);
		transMeta.addDatabase(databaseMeta);
	}
	return transMeta;
}

3、新增日誌(可選操作)

public void setStepLogTable(TransMeta transMeta, String connDbName, String tableName) {
	VariableSpace space = new Variables();
	// 將step日誌資料庫配置名加入到變數集中
	space.setVariable(Const.KETTLE_TRANS_LOG_DB, connDbName);
	space.initializeVariablesFrom(null);
	StepLogTable stepLogTable = StepLogTable.getDefault(space, transMeta);
	// 配置StepLogTable使用的資料庫配置名稱
	stepLogTable.setConnectionName(connDbName);
	// 設定Step日誌的表名
	stepLogTable.setTableName(tableName);
	// 設定TransMeta的StepLogTable
	transMeta.setStepLogTable(stepLogTable);
}

4、建立外掛註冊器

public PluginRegistry getRegistry() {
    // 外掛註冊,用於註冊轉換中需要用到的外掛
    return PluginRegistry.getInstance();
}

5、設定表輸入步驟元

該步驟用於獲取源資料

/**
 * 設定表輸入步驟
 * @param transMeta
 * @param registry
 * @param sourceDbName
 * @param sql
 * @param stepName
 * @return
 */
public StepMeta setTableInputStep(TransMeta transMeta, PluginRegistry registry, String sourceDbName, String sql,
		String stepName) {
	// 建立表輸入
	TableInputMeta tableInputMeta = new TableInputMeta();
	String pluginId = registry.getPluginId(StepPluginType.class, tableInputMeta);
	// 指定資料來源資料庫配置名
	DatabaseMeta source = transMeta.findDatabase(sourceDbName);
	tableInputMeta.setDatabaseMeta(source);
	tableInputMeta.setSQL(sql);
	// 將表輸入新增到轉換中
	StepMeta stepMeta = new StepMeta(pluginId, stepName, tableInputMeta);
	// 給步驟新增在spoon工具中的顯示位置
	stepMeta.setDraw(true);
	stepMeta.setLocation(100, 100);
	// 將表輸入新增到步驟中
	transMeta.addStep(stepMeta);
	return stepMeta;
}

6、更新步驟元

該步驟用於將獲取到的資料更新到目標資料庫中

/**
 * 設定表輸出步驟,用於整表抽取
 * @param transMeta
 * @param registry
 * @param targetDbName
 * @param targetTableName
 * @param stepName
 * @return
 */
public StepMeta setTableOutput(TransMeta transMeta, PluginRegistry registry, String targetDbName,
		String targetTableName, String stepName) {
	// 建立表輸出
	TableOutputMeta tableOutputMeta = new TableOutputMeta();
	String pluginId = registry.getPluginId(StepPluginType.class, tableOutputMeta);
	// 配置表輸出的目標資料庫配置名
	DatabaseMeta targetDb = transMeta.findDatabase(targetDbName);
	tableOutputMeta.setDatabaseMeta(targetDb);
	tableOutputMeta.setTableName(targetTableName);
	// 將表輸出新增到轉換中
	StepMeta stepMeta = new StepMeta(pluginId, stepName, tableOutputMeta);
	transMeta.addStep(stepMeta);
	return stepMeta;
}

/**
 * 設定表插入與更新步驟,用於表中部分欄位更新
 * @param transMeta
 * @param registry
 * @param targetDbName
 * @param targetTableName
 * @param updatelookup lookup檢索欄位
 * @param updateStream lookup更新欄位
 * @param updateStream2 lookup更新欄位2
 * @param conditions lookup條件
 * @param updateOrNot lookup更新標記
 * @param stepName
 * @return
 */
public StepMeta setInsertUpdateMeta(TransMeta transMeta, PluginRegistry registry, String targetDbName,
		String targetTableName, String[] updatelookup, String[] updateStream, String[] updateStream2,
		String[] conditions, Boolean[] updateOrNot, String stepName) {
	// 建立插入與更新
	InsertUpdateMeta insertUpdateMeta = new InsertUpdateMeta();
	String pluginId = registry.getPluginId(StepPluginType.class, insertUpdateMeta);
	// 配置目標資料庫配置名
	DatabaseMeta database_target = transMeta.findDatabase(targetDbName);
	insertUpdateMeta.setDatabaseMeta(database_target);
	// 設定目標表名
	insertUpdateMeta.setTableName(targetTableName);
	// 設定用來查詢的關鍵字
	insertUpdateMeta.setKeyLookup(updatelookup);
	insertUpdateMeta.setKeyStream(updateStream);
	insertUpdateMeta.setKeyStream2(updateStream2);// 這一步不能省略
	insertUpdateMeta.setKeyCondition(conditions);
	// 設定要更新的欄位
	insertUpdateMeta.setUpdateLookup(updatelookup);
	insertUpdateMeta.setUpdateStream(updateStream);
	insertUpdateMeta.setUpdate(updateOrNot);
	// 新增步驟到轉換中
	StepMeta stepMeta = new StepMeta(pluginId, stepName, insertUpdateMeta);
	stepMeta.setDraw(true);
	stepMeta.setLocation(250, 100);
	transMeta.addStep(stepMeta);
	return stepMeta;
}

7、繫結關聯步驟

該步驟用於將資料獲取和匯入更新的步驟關聯繫結

/**
 * 用於將表輸入步驟與第二步驟繫結
 * @param transMeta
 * @param from
 * @param to
 */
public void addTransHop(TransMeta transMeta, StepMeta from, StepMeta to) {
	transMeta.addTransHop(new TransHopMeta(from, to));
}

8、執行抽取

執行資料抽取

/**
 * 執行抽取
 * @param transMeta
 * @param targetDbName
 */
public void executeTrans(TransMeta transMeta, String targetDbName) {
	try {
		Database database = new Database(null, transMeta.findDatabase(targetDbName));
		database.connect();
		Trans trans = new Trans(transMeta);
		trans.execute(new String[] { "start..." });
		trans.waitUntilFinished();
		// 關閉資料庫連線
		database.disconnect();
		if (trans.getErrors() > 0) {
			throw new RuntimeException("There were errors during transformation execution.");
		}
	} catch (KettleDatabaseException e) {
		e.printStackTrace();
	} catch (KettleException e) {
		e.printStackTrace();
	}
}