1. 程式人生 > >【Java】【Flume】Flume-NG啟動過程源代碼分析(一)

【Java】【Flume】Flume-NG啟動過程源代碼分析(一)

code extends fix tar top 依據 oid article gif

從bin/flume 這個shell腳本能夠看到Flume的起始於org.apache.flume.node.Application類,這是flume的main函數所在。

  main方法首先會先解析shell命令,假設指定的配置文件不存在就甩出異常。

  依據命令中含有"no-reload-conf"參數,決定採用那種載入配置文件方式:一、沒有此參數。會動態載入配置文件,默認每30秒載入一次配置文件,因此能夠動態改動配置文件。二、有此參數,則僅僅在啟動時載入一次配置文件。

實現動態載入功能採用了公布訂閱模式,使用guava中的EventBus實現。

技術分享
     EventBus eventBus = new
EventBus(agentName + "-event-bus"); PollingPropertiesFileConfigurationProvider configurationProvider = new PollingPropertiesFileConfigurationProvider(agentName, configurationFile, eventBus, 30); //這裏是公布事件的類,這裏的30則是動態載入配置文件時間間隔,單位是s components.add(configurationProvider); application
= new Application(components); eventBus.register(application); //將訂閱類註冊到Bus中
技術分享

  訂閱類是application = new Application(components);公布代碼在PollingPropertiesFileConfigurationProvider中的FileWatcherRunnable.run方法中。

在這僅僅是先構建一個PollingPropertiesFileConfigurationProvider對象,PollingPropertiesFileConfigurationProvider extends PropertiesFileConfigurationProvider implements LifecycleAware,繼續跟蹤PropertiesFileConfigurationProvider extends AbstractConfigurationProvider。再跟蹤AbstractConfigurationProvider implements ConfigurationProvider能夠看到這些類的構造方法都是初始化。AbstractConfigurationProvid的構造方法初始化了sink、channel、source的工廠類。

  Application.handleConfigurationEvent(MaterializedConfiguration conf)[email protected],是訂閱方法,當eventBus.post(MaterializedConfiguration conf)運行時,會觸發運行handleConfigurationEvent方法。

  new Application(components)時,會構建一個對象supervisor = new LifecycleSupervisor()會啟動10個線程用來執行配置文件裏的各個組件,並監控組件的整個執行過程。

  application.start()方法會啟動配置文件的載入過程supervisor.supervise(component, new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START); //LifecycleState.START開始執行。在這的component就是上面的PollingPropertiesFileConfigurationProvider對象。supervise方法會對component創建一個MonitorRunnable進程。並放入默認有10個線程的monitorService去執行

技術分享
    Supervisoree process = new Supervisoree();
    process.status = new Status();

    process.policy = policy;
    process.status.desiredState = desiredState;
    process.status.error = false;

    MonitorRunnable monitorRunnable = new MonitorRunnable();
    monitorRunnable.lifecycleAware = lifecycleAware;//組件
    monitorRunnable.supervisoree = process;
    monitorRunnable.monitorService = monitorService;

    supervisedProcesses.put(lifecycleAware, process);
    //創建並運行一個在給定初始延遲後首次啟用的定期操作。隨後。在每一次運行終止和下一次運行開始之間都存在給定的延遲。

假設任務的任一運行遇到異常,就會取消興許運行。 ScheduledFuture<?

> future = monitorService.scheduleWithFixedDelay( monitorRunnable, 0, 3, TimeUnit.SECONDS); //啟動MonitorRunnable,結束之後3秒再又一次啟動,能夠用於重試 monitorFutures.put(lifecycleAware, future);

技術分享

  看MonitorRunnable類。其run方法主要是依據supervisoree.status.desiredState的值運行相應的操作。

這裏的lifecycleAware就是上面supervise方法中的componentlifecycleAware在構造之初將lifecycleState=IDLE,application.start()方法通過supervisor.supervise方法將supervisoree.status.desiredState=START。所以在run方法中會運行lifecycleAware.start(),也就是PollingPropertiesFileConfigurationProvider.start()方法。

  PollingPropertiesFileConfigurationProvider.start()方法會啟動一個單線程FileWatcherRunnable每隔30s去載入一次配置文件(假設配置文件有改動):eventBus.post(getConfiguration())。getConfiguration()是AbstractConfigurationProvider.getConfiguration()這種方法解析了配置文件獲取了全部組件及其配置屬性。這種方法較為復雜。放在興許再解說。

  待eventBus.post(getConfiguration())之後會觸發Application.handleConfigurationEvent方法:

  @Subscribe
  public synchronized void handleConfigurationEvent(MaterializedConfiguration conf) {
    stopAllComponents();
    startAllComponents(conf);
  }

  stopAllComponents()方法會依次stop各個組件的執行。順序是:source、sink、channel。

之所以有順序是由於:一、source是不停的讀數據放入channel的;二、sink是不停的從channel拿數據的。channel兩頭都在使用應該最後停止,停止向channel發送數據後sink停止才不會丟數據。stop是通過supervisor.unsupervise方法來完畢的。

  startAllComponents(conf)是啟動各個組件的,順序正好和stopAllComponents()停止順序相反。相信大夥非常easy理解。是通過supervisor.supervise啟動組件的。另外須要註意的是啟動channel組件後須要等待一定時間,是為了讓所有channel所有啟動。

  另外為什麽要先stop再start呢?由於考慮到要動態載入配置文件啊。載入配置文件後就須要又一次啟動全部組件。所以先停止全部的,再又一次啟動全部的。

  main方法的最後另一個鉤子函數Runtime.getRuntime().addShutdownHook,主要是用來進行內存清理、對象銷毀等操作。

【Java】【Flume】Flume-NG啟動過程源代碼分析(一)