1. 程式人生 > >Flume NG原始碼分析(二)支援執行時動態修改配置的配置模組

Flume NG原始碼分析(二)支援執行時動態修改配置的配置模組

在上一篇中講了Flume NG配置模組基本的介面的類,PropertiesConfigurationProvider提供了基於properties配置檔案的靜態配置的能力,這篇細說一下PollingPropertiesFileConfigurationProvider提供的執行時動態修改配置並生效的能力。


要實現動態修改配置檔案並生效,主要有兩個待實現的功能

1. 觀察配置檔案是否修改

2. 如果修改,將修改的內容通知給觀察者


對於第一點,監控配置檔案是否修改,Flume NG定義了一個FileWatcherRunnable物件來監控配置檔案,啟動了一個單獨的執行緒採用定時輪詢的方式來監控,輪詢頻率是30毫秒一次,比較file.lastModified屬性與lastChange時間戳,當file.lastModified > lastChange時表示檔案被修改

public class FileWatcherRunnable implements Runnable {

    private final File file;
    private final CounterGroup counterGroup;

    private long lastChange;

    public FileWatcherRunnable(File file, CounterGroup counterGroup) {
      super();
      this.file = file;
      this.counterGroup = counterGroup;
      this.lastChange = 0L;
    }

    @Override
    public void run() {
      LOGGER.debug("Checking file:{} for changes", file);

      counterGroup.incrementAndGet("file.checks");

      long lastModified = file.lastModified();

      if (lastModified > lastChange) {
        LOGGER.info("Reloading configuration file:{}", file);

        counterGroup.incrementAndGet("file.loads");

        lastChange = lastModified;

        try {
          eventBus.post(getConfiguration());
        } catch (Exception e) {
          LOGGER.error("Failed to load configuration data. Exception follows.",
              e);
        } catch (NoClassDefFoundError e) {
          LOGGER.error("Failed to start agent because dependencies were not " +
              "found in classpath. Error follows.", e);
        } catch (Throwable t) {
          // caught because the caller does not handle or log Throwables
          LOGGER.error("Unhandled error", t);
        }
      }
    }
  }

// PollingPropertiesFileConfigurationProvider.start()啟動一個單獨的執行緒來監控properties配置檔案
 public void start() {
    LOGGER.info("Configuration provider starting");

    Preconditions.checkState(file != null,
        "The parameter file must not be null");

    executorService = Executors.newSingleThreadScheduledExecutor(
            new ThreadFactoryBuilder().setNameFormat("conf-file-poller-%d")
                .build());

    FileWatcherRunnable fileWatcherRunnable =
        new FileWatcherRunnable(file, counterGroup);

    executorService.scheduleWithFixedDelay(fileWatcherRunnable, 0, interval,
        TimeUnit.SECONDS);

    lifecycleState = LifecycleState.START;

    LOGGER.debug("Configuration provider started");
  }

對於第二點,利用Guava EventBus提供的釋出訂閱模式機制,將配置修改封裝成事件傳遞給Application,來重新載入配置

// FileWatcherRunnable.run方法 釋出配置修改的事件
   eventBus.post(getConfiguration());

// Application.main方法來註冊事件訂閱
      Application application;
      if(reload) {
        EventBus eventBus = new EventBus(agentName + "-event-bus");
        PollingPropertiesFileConfigurationProvider configurationProvider =
            new PollingPropertiesFileConfigurationProvider(agentName,
                configurationFile, eventBus, 30);
        components.add(configurationProvider);
        application = new Application(components);
        eventBus.register(application);
      }


// Application類採用@Subscribe標註來定義訂閱方法,即配置修改後會執行handleConfigurationEvent方法,這個方法是執行緒安全的

@Subscribe
  public synchronized void handleConfigurationEvent(MaterializedConfiguration conf) {
    stopAllComponents();
    startAllComponents(conf);
  }