1. 程式人生 > >使用Kettle API實現資料的遷移

使用Kettle API實現資料的遷移

利用kettle的api,將一個數據源中的資訊匯入到另外一個數據源中:

package cn.com.saidi.job;
 
import org.apache.commons.io.FileUtils;
import org.pentaho.di.core.KettleEnvironment;
import org.pentaho.di.core.database.DatabaseMeta;
import org.pentaho.di.core.exception.KettleDatabaseException;
import org.pentaho.di.core.exception.KettleXMLException;
import org.pentaho.di.core.plugins.PluginRegistry;
import org.pentaho.di.core.plugins.StepPluginType;
import org.pentaho.di.trans.TransHopMeta;
import org.pentaho.di.trans.TransMeta;
import org.pentaho.di.trans.step.StepMeta;
import org.pentaho.di.trans.steps.insertupdate.InsertUpdateMeta;
import org.pentaho.di.trans.steps.tableinput.TableInputMeta;
 
import java.io.File;

public class TransDemo {
 
	public static TransDemo transDemo;
 
	/**
	 * 兩個庫中的表名
	 */
	public static String bjdt_tablename = "test1";
	public static String kettle_tablename = "test2";
 
	/**
	 * 資料庫連線資訊,適用於DatabaseMeta其中 一個構造器DatabaseMeta(String xml)
	 */
	public static final String[] databasesXML = {
 
			"<?xml version=\"1.0\" encoding=\"UTF-8\"?>" +
					"<connection>" +
					"<name>bjdt</name>" +
					"<server>192.168.1.122</server>" +
					"<type>Mysql</type>" +
					"<access>Native</access>" +
					"<database>daiqiaobing</database>" +
					"<port>3306</port>" +
					"<username>root</username>" +
					"<password>root</password>" +
  					"</connection>",
			"<?xml version=\"1.0\" encoding=\"UTF-8\"?>" +
					"<connection>" +
					"<name>kettle</name>" +
					"<server>192.168.1.122</server>" +
					"<type>Mysql</type>" +
					"<access>Native</access>" +
					"<database>daiqiaobing</database>" +
					"<port>3306</port>" +
					"<username>root</username>" +
					"<password>root</password>" +
 					"</connection>"
 
	};
 
	public static void main(String[] args) {
		try {
			KettleEnvironment.init();
			transDemo = new TransDemo();
			TransMeta transMeta = transDemo.generateMyOwnTrans();
			String transXml = transMeta.getXML();
			String transName = "etl/update_insert_Trans.ktr";
			File file = new File(transName);
			FileUtils.writeStringToFile(file, transXml, "UTF-8");
			System.out.println(databasesXML.length+"\n"+databasesXML[0]+"\n"+databasesXML[1]);
 		} catch (Exception e) {
			e.printStackTrace();
			return;
		}
	}
 
	/**
	 * 生成一個轉化,把一個數據庫中的資料轉移到另一個數據庫中,只有兩個步驟,第一個是表輸入,第二個是表插入與更新操作
	 * @return
	 * @throws KettleXMLException
	 */
	public TransMeta generateMyOwnTrans() throws KettleXMLException, KettleDatabaseException {
		System.out.println("************start to generate my own transformation***********");
		TransMeta transMeta = new TransMeta();
		//設定轉化的名稱
		transMeta.setName("insert_update");
		//新增轉換的資料庫連線
		for (int i=0;i<databasesXML.length;i++){
			DatabaseMeta databaseMeta = new DatabaseMeta(databasesXML[i]);
			transMeta.addDatabase(databaseMeta);
		}
		//registry是給每個步驟生成一個標識Id用
		PluginRegistry registry = PluginRegistry.getInstance();
		//第一個表輸入步驟(TableInputMeta)
		TableInputMeta tableInput = new TableInputMeta();
		String tableInputPluginId = registry.getPluginId(StepPluginType.class, tableInput);
		//給表輸入新增一個DatabaseMeta連線資料庫
		DatabaseMeta database_bjdt = transMeta.findDatabase("bjdt");
		tableInput.setDatabaseMeta(database_bjdt);
		String select_sql = "SELECT name  FROM "+bjdt_tablename;
		tableInput.setSQL(select_sql);
 
		//新增TableInputMeta到轉換中
		StepMeta tableInputMetaStep = new StepMeta(tableInputPluginId,"table input",tableInput);
		//給步驟新增在spoon工具中的顯示位置
		tableInputMetaStep.setDraw(true);
		tableInputMetaStep.setLocation(100, 100);
		transMeta.addStep(tableInputMetaStep);
 
		//第二個步驟插入與更新
		InsertUpdateMeta insertUpdateMeta = new InsertUpdateMeta();
		String insertUpdateMetaPluginId = registry.getPluginId(StepPluginType.class,insertUpdateMeta);
		//新增資料庫連線
		DatabaseMeta database_kettle = transMeta.findDatabase("kettle");
  		insertUpdateMeta.setDatabaseMeta(database_kettle);
		//設定操作的表
		insertUpdateMeta.setTableName(kettle_tablename);
		//設定用來查詢的關鍵字
		insertUpdateMeta.setKeyLookup(new String[]{"name"});
		insertUpdateMeta.setKeyStream(new String[]{"name"});
		insertUpdateMeta.setKeyStream2(new String[]{""});//一定要加上
		insertUpdateMeta.setKeyCondition(new String[]{"="});
 
		//設定要更新的欄位
 		String[] updatelookup = {"name"} ;
 
		String [] updateStream = {"name"};
 		Boolean[] updateOrNot = {true};
		insertUpdateMeta.setUpdateLookup(updatelookup);
		insertUpdateMeta.setUpdateStream(updateStream);
		insertUpdateMeta.setUpdate(updateOrNot);
		String[] lookup = insertUpdateMeta.getUpdateLookup();
		//新增步驟到轉換中
		StepMeta insertUpdateStep = new StepMeta(insertUpdateMetaPluginId,"insert_update",insertUpdateMeta);
		insertUpdateStep.setDraw(true);
		insertUpdateStep.setLocation(250,100);
		transMeta.addStep(insertUpdateStep);
		//******************************************************************
 
		//******************************************************************
 
		//新增hop把兩個步驟關聯起來
		transMeta.addTransHop(new TransHopMeta(tableInputMetaStep, insertUpdateStep));
		System.out.println("***********the end************");
		return transMeta;
	}
 
}

上述操作將會產生一個ktr檔案,接下來的操作是對ktr檔案進行轉換:

public static void main(String[] args) throws KettleException {
		//初始化ketlle
		KettleEnvironment.init();
		//建立轉換元資料物件
		TransMeta meta = new TransMeta("etl/update_insert_Trans.ktr");
		Trans trans = new Trans(meta);
		trans.prepareExecution(null);
		trans.startThreads();
		trans.waitUntilFinished();
		if(trans.getErrors()!=0){
			System.out.println("執行失敗!");
		}
	}