Flume NG原始碼分析(二)支援執行時動態修改配置的配置模組
阿新 • • 發佈:2018-11-01
在上一篇中講了Flume NG配置模組基本的介面的類,PropertiesConfigurationProvider提供了基於properties配置檔案的靜態配置的能力,這篇細說一下PollingPropertiesFileConfigurationProvider提供的執行時動態修改配置並生效的能力。
要實現動態修改配置檔案並生效,主要有兩個待實現的功能
1. 觀察配置檔案是否修改
2. 如果修改,將修改的內容通知給觀察者
對於第一點,監控配置檔案是否修改,Flume NG定義了一個FileWatcherRunnable物件來監控配置檔案,啟動了一個單獨的執行緒採用定時輪詢的方式來監控,輪詢頻率是30毫秒一次,比較file.lastModified屬性與lastChange時間戳,當file.lastModified > lastChange時表示檔案被修改
public class FileWatcherRunnable implements Runnable { private final File file; private final CounterGroup counterGroup; private long lastChange; public FileWatcherRunnable(File file, CounterGroup counterGroup) { super(); this.file = file; this.counterGroup = counterGroup; this.lastChange = 0L; } @Override public void run() { LOGGER.debug("Checking file:{} for changes", file); counterGroup.incrementAndGet("file.checks"); long lastModified = file.lastModified(); if (lastModified > lastChange) { LOGGER.info("Reloading configuration file:{}", file); counterGroup.incrementAndGet("file.loads"); lastChange = lastModified; try { eventBus.post(getConfiguration()); } catch (Exception e) { LOGGER.error("Failed to load configuration data. Exception follows.", e); } catch (NoClassDefFoundError e) { LOGGER.error("Failed to start agent because dependencies were not " + "found in classpath. Error follows.", e); } catch (Throwable t) { // caught because the caller does not handle or log Throwables LOGGER.error("Unhandled error", t); } } } } // PollingPropertiesFileConfigurationProvider.start()啟動一個單獨的執行緒來監控properties配置檔案 public void start() { LOGGER.info("Configuration provider starting"); Preconditions.checkState(file != null, "The parameter file must not be null"); executorService = Executors.newSingleThreadScheduledExecutor( new ThreadFactoryBuilder().setNameFormat("conf-file-poller-%d") .build()); FileWatcherRunnable fileWatcherRunnable = new FileWatcherRunnable(file, counterGroup); executorService.scheduleWithFixedDelay(fileWatcherRunnable, 0, interval, TimeUnit.SECONDS); lifecycleState = LifecycleState.START; LOGGER.debug("Configuration provider started"); }
對於第二點,利用Guava EventBus提供的釋出訂閱模式機制,將配置修改封裝成事件傳遞給Application,來重新載入配置
// FileWatcherRunnable.run方法 釋出配置修改的事件 eventBus.post(getConfiguration()); // Application.main方法來註冊事件訂閱 Application application; if(reload) { EventBus eventBus = new EventBus(agentName + "-event-bus"); PollingPropertiesFileConfigurationProvider configurationProvider = new PollingPropertiesFileConfigurationProvider(agentName, configurationFile, eventBus, 30); components.add(configurationProvider); application = new Application(components); eventBus.register(application); } // Application類採用@Subscribe標註來定義訂閱方法,即配置修改後會執行handleConfigurationEvent方法,這個方法是執行緒安全的 @Subscribe public synchronized void handleConfigurationEvent(MaterializedConfiguration conf) { stopAllComponents(); startAllComponents(conf); }