1. 程式人生 > >Kettle外掛開發流程

Kettle外掛開發流程

最近正好做了有關Kettle中外掛開發的工作,對Kettle外掛的原始碼進行了一定的研究,並開發了自定義的外掛,在此有些感悟,記錄下來。

一 Kettle外掛概述
Kettle的開發體系是基於外掛的,平臺本身提供了介面,開發者按照相關規範就可以開發出相應的外掛新增到Kettle中使用,感覺這個體系設計思路很不錯,非常有利於Kettle後續的擴充套件。
初次接觸Kettle外掛開發可以參考GitHub上有關外掛模板DummyPlugin的原始碼,通過對原始碼的分析,發現Kettle外掛開發的流程還是比較簡單的,以DummyPlugin為例,主要包括以下幾個類:

    DummyPlugin.java
DummyPluginData.java DummyPluginDialog.java DummyPluginMeta.java

Kettle外掛的開發遵循了MVC設計模式,其中DummyPlugin.java類實現了Control功能,當轉換執行時,負責按照預設的邏輯處理輸入資料;DummyPluginDialog.java類實現了View功能,即對話方塊的實現;而DummyPluginData.java和DummyPluginMeta.java用來儲存使用者在對話方塊的配置引數,實現了Model功能。

二 Kettle中的Solr外掛開發
由於Kettle中沒有預先整合Solr外掛,因為專案開發的需要,對Solr外掛進行了編寫測試,主要功能是讀取輸入的資料流傳送到Solr叢集中,開發的外掛也比較簡單,分享一下。
其實主要就實現了3個類,SolrPluginMeta、SolrPluginDialog、SolrPlugin,分別實現Model、View、Control功能:

/**
 * SolrPluginMeta 類主要用來儲存使用者的配置資料,頁面上的配置包括zk地址、collection名以及分割槽策略等
 */
public class SolrPluginMeta extends BaseStepMeta implements     StepMetaInterface{

    private String zkHost;

    private String collectionName;

    /**
     * 分割槽策略 0:不分割槽 1:欄位分割槽 2:大小分割槽
     */
    private String mode;

    /**
     * 選擇的分割槽欄位
     */
private String filedselected; /** * 每個shard的最大容量 */ private long countsize; public SolrPluginMeta(){ super(); } public void setZkHost(String zkHost) { this.zkHost = zkHost; } public String getZkHost() { return zkHost; } public void setCollectionName(String collectionName) { this.collectionName = collectionName; } public String getCollectionName() { return collectionName; } public void setMode(String mode) { this.mode = mode; } public String getMode() { return mode; } public void setFiledselected(String filedselected) { this.filedselected = filedselected; } public String getFiledselected() { return filedselected; } public void setCountsize(long countsize) { this.countsize = countsize; } public long getCountsize() { return countsize; } /** * 這個函式的作用是在複製SolrPluginMeta 時獲取原物件引數的 */ public String getXML(){ StringBuilder retval = new StringBuilder(); retval.append("<values>").append(Const.CR); retval.append(" ").append(XMLHandler.addTagValue("zkHost", zkHost)); retval.append(" ").append(XMLHandler.addTagValue("collectionName", collectionName)); retval.append(" ").append(XMLHandler.addTagValue("filedselected", filedselected)); retval.append(" ").append(XMLHandler.addTagValue("countsize", countsize)); retval.append(" ").append(XMLHandler.addTagValue("mode", mode)); retval.append("</values>").append(Const.CR); return retval.toString(); } public Object clone(){ return super.clone(); } /** * 複製物件時傳遞引數 */ public void loadXML(Node stepnode, List<DatabaseMeta> databases, Map<String,Counter> counters){ Node valnode = XMLHandler.getSubNode(stepnode, "values", "zkHost"); if(null!=valnode){ zkHost = valnode.getTextContent(); } valnode = XMLHandler.getSubNode(stepnode, "values", "collectionName"); if(null!=valnode){ collectionName = valnode.getTextContent(); } valnode = XMLHandler.getSubNode(stepnode, "values", "filedselected"); if(null!=valnode){ filedselected = valnode.getTextContent(); } valnode = XMLHandler.getSubNode(stepnode, "values", "countsize"); if(null!=valnode){ countsize = Long.parseLong(valnode.getTextContent()); } valnode = XMLHandler.getSubNode(stepnode, "values", "mode"); if(null!=valnode){ mode = valnode.getTextContent(); } } @Override public void setDefault() { this.zkHost = "localhost:2181,localhost:2182,localhost:2183"; this.collectionName = "collection1234"; this.mode = "0"; } public StepDialogInterface getDialog(Shell shell, StepMetaInterface meta, TransMeta transMeta, String name){ return new SolrPluginDialog(shell, meta, transMeta, name); } @Override public StepInterface getStep(StepMeta stepMeta, StepDataInterface stepDataInterface, int cnr, TransMeta transMeta, Trans disp) { return new SolrPlugin(stepMeta, stepDataInterface, cnr, transMeta, disp); } @Override public StepDataInterface getStepData() { return new SolrPluginData(); } }

SolrPluginMeta 類儲存了使用者的配置資訊,在開發中Solr的分割槽分為三種模式:不分割槽(所有記錄傳送到一個shard中)、欄位分割槽(每個欄位單獨一個分割槽)、大小分割槽(指定數量的記錄劃分在一個分割槽中)。
當從Kettle中拖拽一個外掛到面板上時,其實就生成了一個SolrPluginMeta 物件,這個物件將儲存使用者在對話方塊中輸入的配置資訊,而當轉換執行時,Kettle會重新生成一個SolrPluginMeta 物件並獲取原物件的配置引數(這一點我還不太明白為啥Kettle採用這種方式),因此getXML()和loadXML函式就是在複製配置引數時使用的。
SolrPluginDialog就是編寫對話方塊供使用者輸入引數,並且將引數儲存到SolrPluginMeta中,具體程式碼就不帖出來了。
SolrPlugin類也比較簡單,是轉換操作的核心邏輯,其實主要的方法就是processRow,Kettle中資料按照流的形式傳遞,因此processRow方法會分批次對輸入流進行處理。

    public class SolrPlugin extends BaseStep implements StepInterface{

    private SolrPluginData data;
    private SolrPluginMeta meta;

    /**
     * Zk叢集地址
     */
    private String zkHost;

    /**
     * collection名
     */
    private String collectionName;

    /**
     * 輸入的資料總量
     */
    private long send_count = 0l;
    /**
     * 欄位名列表
     */
    private String[] fieldNames;

    /**
     * shard與傳送文件的對映
     */
    private Map<String, List<SolrInputDocument>> send_list;

    /**
     * 當前shard與hostIp的對映
     */
    private Map<String, String> shard_hostIp;

    /**
     * shard與對應的solr地址的對映
     */
    private Map<String, HttpSolrClient> solrserver_url;

    public SolrPlugin(StepMeta s, StepDataInterface stepDataInterface, 
                      int c, TransMeta t, Trans dis){
        super(s,stepDataInterface,c,t,dis);
    }

    public boolean init(StepMetaInterface smi, StepDataInterface sdi){
        meta = (SolrPluginMeta)smi;
        data = (SolrPluginData)sdi;
        return super.init(smi, sdi);
    }

    public void dispose(StepMetaInterface smi, StepDataInterface sdi){
        meta = (SolrPluginMeta)smi;
        data = (SolrPluginData)sdi;
        super.dispose(smi, sdi);
    }

    /**
     * 獲取route欄位
     * @param mode 分割槽策略
     * @param doc  輸入文件
     * @param site 文件位置
     * @return
     */
    public String getRoute(String mode, SolrInputDocument doc, long site){

        //不分割槽模式
        if(mode.equals("0")){
            return "shard1";
         //欄位分割槽
        }else if(mode.equals("1")){

            String filed = meta.getFiledselected();
            String shardname = doc.getFieldValue(filed)==null ? "shard1" : 
                                      doc.getFieldValue(filed).toString();
            return PinyinUtil.getInstance().getStringPinyin(shardname);
         //大小分割槽
        }else{
            long shard_num = meta.getCountsize();
            int index = (int)(site/shard_num)+1;
            return "shard"+index;
        }   
    }

    /**
     * 傳送本地快取的list至solr中
     * @param doclist
     */
    public void sendList(Map<String, List<SolrInputDocument>> doclist) throws KettleException{

        if(null==doclist){
            return;
        }

        for(String shard : doclist.keySet()){
            if(StringUtils.isEmpty(shard)){
                continue;
            }

            //獲取該shard對應的hostIp
            String hostIp = shard_hostIp.get(shard);
            if(StringUtils.isEmpty(hostIp)){
                logBasic("準備建立shard:"+shard);
                SolrService.createShard(zkHost, collectionName, shard, this);
                hostIp = SolrService.getCollectionShardInfo(zkHost, collectionName, shard, this);
                shard_hostIp.put(shard, hostIp);
            }

            //獲取shard對應的url
            HttpSolrClient client = solrserver_url.get(shard);
            if(client==null){
                String url = "http://"+hostIp+"/solr/"+collectionName;
                client = new HttpSolrClient(url);
                solrserver_url.put(shard, client);
            }

            long time = System.currentTimeMillis();
            //待發送的文件集合
            List<SolrInputDocument> list = doclist.get(shard);

            if(list.size()<=0){
                continue;
            }

            try {
                client.add(list);
                client.commit();
            } catch (Exception e) {
                logError(String.format("傳送到shard:%s出錯,地址:%s,原因:%s", 
                                        shard, client.getBaseURL(), e.getMessage()));
                throw new KettleException(e.getMessage());
            } 
            logBasic(String.format("成功傳送到%s %s條記錄, 耗時%s毫秒", shard, list.size(), 
                                    System.currentTimeMillis()-time));
            list.clear();
        }
    }

    public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws KettleException{
        meta = (SolrPluginMeta)smi;
        data = (SolrPluginData)sdi;

        //獲取上一個步驟的輸入流
        Object[] r=getRow();
        if(r==null){
            logBasic("無輸入資料");
            sendList(send_list);
            setOutputDone();
            return false;
        }

        //first為true則表明是第一行資料,可以在此完成相關的初始化工作
        if(first){
            first = false;

            zkHost = meta.getZkHost();
            collectionName = meta.getCollectionName();

            SolrService.createCollection(zkHost, collectionName, this);

            //獲取輸入欄位名集合
            RowMetaInterface fields = getInputRowMeta().clone();
            fieldNames = fields.getFieldNames();
            for(String o : fieldNames){
                logBasic(o);
            }

            send_list = new HashMap<String, List<SolrInputDocument>>();
            solrserver_url = new HashMap<String, HttpSolrClient>();
            shard_hostIp = new HashMap<String, String>();

            String hostIp = SolrService.getCollectionShardInfo(zkHost, collectionName, "shard1", this);
            shard_hostIp.put("shard1", hostIp);
        }

        if(r.length<fieldNames.length){
            logError("輸入資料有誤, 本次資料忽略");
            return true;
        }

        //儲存輸入資料至document
        SolrInputDocument input = new SolrInputDocument();
        for(int i=0; i<fieldNames.length; i++){
            input.addField(fieldNames[i], r[i]);
        }
        String shardname = getRoute(meta.getMode(), input, ++send_count);
        input.addField("_route_", shardname);

        List<SolrInputDocument> documentlist = send_list.get(shardname);
        if(documentlist==null){
            documentlist = new ArrayList<SolrInputDocument>();
            send_list.put(shardname, documentlist);
        }
        documentlist.add(input);

        if(send_count%20000==0){
            sendList(send_list);
        }
        return true;
    }
}

processRow返回true則表明資料處理沒有結束,則Kettle會繼續呼叫processRow處理輸入資料;返回false則表明處理完成,記住在返回false之前要呼叫基類的setOutputDone()方法。

三 外掛部署到Kettle中
原始碼寫好後,打成jar包,接下來還要編寫plugin.xml配置文件:

<?xml version="1.0" encoding="UTF-8"?>
<plugin id="SolrPlugin" iconfile="solr.png" description="SolrPlugin" 
tooltip="This is a solr plugin step" category="TestDemo"
classname="com.iflytek.kettle.solr.SolrPluginMeta" >

    <libraries>
      <library name="SolrPlugin.jar"/>
    </libraries>

    <localized_category>
      <category locale="en_US">TestDemo</category>
      <category locale="zh_CN">外掛測試</category>
    </localized_category>

    <localized_description>
      <description locale="en_US">SolrPlugin</description>
    </localized_description>

    <localized_tooltip>
      <tooltip locale="en_US">傳送記錄到Solr中</tooltip>
    </localized_tooltip>
</plugin>

其中的id是外掛註冊的標識,iconfile指定了外掛的圖示,classname指定了外掛的入口類,就是SolrPluginMeta類;指定了外掛在Kettle左側列表中的位置。將打好的jar包、plugin.xml配置檔案、圖示等放置在單獨的資料夾中,並將該資料夾Kettle目錄下的plugins\steps中(如果沒有steps目錄則新建),重啟Kettle就可看到自定義的外掛:
這裡寫圖片描述
四 總結
Kettle外掛的開發並不複雜,掌握了基本的開發流程就可以自己開發需要的外掛了,寫得比較亂,歡迎各位多多交流指正。