1. 程式人生 > >Kettle7 ( Pentaho Data Integration )原始碼分析 每個step都有一個執行緒負責執行

Kettle7 ( Pentaho Data Integration )原始碼分析 每個step都有一個執行緒負責執行

Kettle 7 要求Java環境是Java 8
Kettle中的Transformation中包含多個step元件, 當執行transformation時, 這些元件並不是序列初始化的(資料!是在這些step間流式傳遞的). Step的執行是為了處理資料. 所以資料的流式傳遞和step的非序列啟動執行是不衝突的.

類Trans.java中的方法prepareExecution程式碼可論證上面的觀點.

......
......
......
    StepInitThread[] initThreads = new StepInitThread[steps.size()];
    Thread[] threads = new
Thread[steps.size()]; // Initialize all the threads... // for ( int i = 0; i < steps.size(); i++ ) { final StepMetaDataCombi sid = steps.get( i ); // Do the init code in the background! // Init all steps at once, but ALL steps need to finish before we can // continue properly!
// initThreads[i] = new StepInitThread( sid, log ); // Put it in a separate thread! // threads[i] = new Thread( initThreads[i] ); threads[i].setName( "init of " + sid.stepname + "." + sid.copy + " (" + threads[i].getName() + ")" ); ExtensionPointHandler.callExtensionPoint( log
, KettleExtensionPoint.StepBeforeInitialize.id, initThreads[i] ); threads[i].start(); } ...... ...... ......

類StepInitThread中的run()內容如下: (run方法中呼叫了step.init, 用來完成step的初始化操作)

  public void run() {
    // Set the internal variables also on the initialization thread!
    // ((BaseStep)combi.step).setInternalVariables();

    if ( !doIt ) {
      // An extension point plugin decided we should not initialize the step.
      // Logging, error handling, finished flag... should all be handled in the extension point.
      //
      return;
    }

    try {
      combi.step.getLogChannel().snap( Metrics.METRIC_STEP_INIT_START );
// combi.step.init方法功能: "Initialize and do work where other steps need to wait for..."
      if ( combi.step.init( combi.meta, combi.data ) ) {
        combi.data.setStatus( StepExecutionStatus.STATUS_IDLE );
        ok = true;
      } else {
        combi.step.setErrors( 1 );
        log.logError( BaseMessages.getString( PKG, "Trans.Log.ErrorInitializingStep", combi.step.getStepname() ) );
      }
    } catch ( Throwable e ) {
      log.logError( BaseMessages.getString( PKG, "Trans.Log.ErrorInitializingStep", combi.step.getStepname() ) );
      log.logError( Const.getStackTracker( e ) );
    } finally {
      combi.step.getLogChannel().snap( Metrics.METRIC_STEP_INIT_STOP );
    }

    finished = true;
  }