1. 程式人生 > >Flume NG原始碼分析(一)基於靜態properties檔案的配置模組

Flume NG原始碼分析(一)基於靜態properties檔案的配置模組

日誌收集是網際網路公司的一個重要服務,Flume NG是Apache的頂級專案,是分散式日誌收集服務的一個開源實現,具有良好的擴充套件性,與其他很多開源元件可以無縫整合。搜了一圈發現介紹Flume NG的文章有不少,但是深入分析Flume NG原始碼的卻沒有。準備寫一個系列分析一下Flume NG的原始碼。先從基礎的配置模組說起。


Flume NG支援兩種配置模式,一種是基於properties檔案的靜態配置,並且只加載一次。另一種是基於Guava EventBus釋出訂閱模式的動態配置,可執行時載入修改的配置。這篇先說說基於properties檔案的靜態配置。


下面這個是flume-conf.properties的一個常見配置

1. producers是agent的名字,一個agent表示一個Flume-NG的程序

2. producer.sources指定了這個agent監控的幾個日誌源,可以配置多個source

3. producer.channels, sinks指定了channel和sink,這些概念後面會說

4. producer.sources.sX.XXX指定了日誌源獲取的方式,對於從本地日誌檔案收集的方式來說,實際使用的是tail -F的命令來監控日誌檔案的尾部

producer.sources = s1 s2 s3
producer.channels = c
producer.sinks = r


producer.sources.s1.type = exec
producer.sources.s1.channels = c
producer.sources.s1.command = tail -F /data/logs/s1.log

producer.sources.s2.type = exec
producer.sources.s2.channels = c
producer.sources.s2.command = tail -F /data/logs/s2.log

producer.sources.s3.type = exec
producer.sources.s3.channels = c
producer.sources.s3.command = tail -F /data/logs/s3.log

producer.sinks.r.type = org.apache.flume.plugins.KafkaSink
producer.sinks.r.metadata.broker.list=server1:9092,server2:9092,server3:9092
producer.sinks.r.zk.connect=server1:2181,server2:2181,server3:2181,server4:2181,server5:2181
producer.sinks.r.partition.key=0
producer.sinks.r.partitioner.class=org.apache.flume.plugins.SinglePartition
producer.sinks.r.serializer.class=kafka.serializer.StringEncoder
producer.sinks.r.request.required.acks=0
producer.sinks.r.max.message.size=1000000
producer.sinks.r.producer.type=sync
producer.sinks.r.custom.encoding=UTF-8
producer.sinks.r.custom.topic.name=topic.xxx

#Specify the channel the sink should use
producer.sinks.r.channel = c

# Each channel's type is defined.
producer.channels.c.type = memory
producer.channels.c.capacity = 1000

再看看如何指定的producer這個agent名字以及指定採用哪個配置檔案,下面是Flume NG的啟動命令,-f指定了配置檔案的路徑,-n指定了agent的名字,也就是flume-conf.properties裡面每項配置的字首名

/flume-ng  agent -c conf -f ../conf/flume-conf.properties -n producer -Dflume.root.logger=INFO,console > flume-ng.log 2>&1 &

來看看Flume-NG是如何來獲取命令列引數,以及如何把flume-conf.properties的配置轉化成它內部的資料結構的。

org.apache.flume.node.Application類是Flume NG的啟動類,看一下它的main方法

1. 使用了commons-cli.jar提供的解析命令列引數的能力來解析命令列引數,把-n, -f/--conf-file, --no-reload-conf這幾個配置資訊讀到變數

2. 開啟由-f引數指定的配置檔案,如果指定了no-reload-conf = false,也就是要執行時載入配置,就建立一個EventBus來發布和註冊配置檔案修改的事件,建立一個

PollingPropertiesFileConfigurationProvider 來輪詢properties配置檔案是否修改,如果修改就重新載入

3. no-reload-conf預設是true,也就是說預設是靜態配置,只在啟動時載入一次,只需要建立一個PropertiesFileConfigurationProvider來讀取properties配置檔案即可

 public static void main(String[] args) {

    try {

      Options options = new Options();

      Option option = new Option("n", "name", true, "the name of this agent");
      option.setRequired(true);
      options.addOption(option);

      option = new Option("f", "conf-file", true, "specify a conf file");
      option.setRequired(true);
      options.addOption(option);

      option = new Option(null, "no-reload-conf", false, "do not reload " +
        "conf file if changed");
      options.addOption(option);

      option = new Option("h", "help", false, "display help text");
      options.addOption(option);

      CommandLineParser parser = new GnuParser();
      CommandLine commandLine = parser.parse(options, args);

      File configurationFile = new File(commandLine.getOptionValue('f'));
      String agentName = commandLine.getOptionValue('n');
      boolean reload = !commandLine.hasOption("no-reload-conf");

      if (commandLine.hasOption('h')) {
        new HelpFormatter().printHelp("flume-ng agent", options, true);
        return;
      }
      /*
       * The following is to ensure that by default the agent
       * will fail on startup if the file does not exist.
       */
      if (!configurationFile.exists()) {
        // If command line invocation, then need to fail fast
        if (System.getProperty(Constants.SYSPROP_CALLED_FROM_SERVICE) == null) {
          String path = configurationFile.getPath();
          try {
            path = configurationFile.getCanonicalPath();
          } catch (IOException ex) {
            logger.error("Failed to read canonical path for file: " + path, ex);
          }
          throw new ParseException(
              "The specified configuration file does not exist: " + path);
        }
      }
      List<LifecycleAware> components = Lists.newArrayList();
      Application application;
      if(reload) {
        EventBus eventBus = new EventBus(agentName + "-event-bus");
        PollingPropertiesFileConfigurationProvider configurationProvider =
            new PollingPropertiesFileConfigurationProvider(agentName,
                configurationFile, eventBus, 30);
        components.add(configurationProvider);
        application = new Application(components);
        eventBus.register(application);
      } else {
        PropertiesFileConfigurationProvider configurationProvider =
            new PropertiesFileConfigurationProvider(agentName,
                configurationFile);
        application = new Application();
        application.handleConfigurationEvent(configurationProvider.getConfiguration());
      }
      application.start();

      final Application appReference = application;
      Runtime.getRuntime().addShutdownHook(new Thread("agent-shutdown-hook") {
        @Override
        public void run() {
          appReference.stop();
        }
      });

    } catch (Exception e) {
      logger.error("A fatal error occurred while running. Exception follows.",
          e);
    }
  }

Flume NG配置相關的介面和類的結構如下

1. ConfigurationProvider頂層介面定義了 MaterializedConfiguration getConfiguration() 方法

2. MaterializedConfiguration介面表示具體化的配置,也就是把flume-conf.properties配置檔案裡定義的配置例項化成具體的物件。SimpleMaterializedConfiguration提供了實現,維護了實際執行時的配置資料結構

3. AbstractConfigurationProvider實現了ConfigurationProvider介面,並定義了abstract FlumeConfiguration getFlumeConfiguration()抽象方法

4. FlumeConfiguration, AgentConfiguration, SourceConfiguration, ChannelConfiguration, SinkConfiguration這幾個類用來輔助解析flume-conf.properties配置檔案,儲存配置定義的欄位

5. PropertiesFileConfigurationProvider從-f/--conf指定的配置檔案中讀取配置資訊,只在讀取一次

6. PollingPropertiesFileConfigurationProvider 採用輪詢的方式從配置檔案中讀取配置資訊,並支援動態修改配置



PropertiesFileConfigurationProvider的實現很簡單

1. 首先是getFlumeConfiguration方法讀取properties檔案,然後轉化成FlumeConfiguration結構的物件

2. 在 父類AbstractConfigurationProvider的getConfiguration方法生成MaterializedConfiguration例項,也就是建立實際執行時的Channel, SourceRunner, SinkRunner物件,它會從FlumeConfiguration中去讀取各個物件的欄位

public FlumeConfiguration getFlumeConfiguration() {
    BufferedReader reader = null;
    try {
      reader = new BufferedReader(new FileReader(file));
      Properties properties = new Properties();
      properties.load(reader);
      return new FlumeConfiguration(toMap(properties));
    } catch (IOException ex) {
      LOGGER.error("Unable to load file:" + file
          + " (I/O failure) - Exception follows.", ex);
    } finally {
      if (reader != null) {
        try {
          reader.close();
        } catch (IOException ex) {
          LOGGER.warn(
              "Unable to close file reader for file: " + file, ex);
        }
      }
    }
    return new FlumeConfiguration(new HashMap<String, String>());
  }

public MaterializedConfiguration getConfiguration() {
    MaterializedConfiguration conf = new SimpleMaterializedConfiguration();
    FlumeConfiguration fconfig = getFlumeConfiguration();
    AgentConfiguration agentConf = fconfig.getConfigurationFor(getAgentName());
    if (agentConf != null) {
      Map<String, ChannelComponent> channelComponentMap = Maps.newHashMap();
      Map<String, SourceRunner> sourceRunnerMap = Maps.newHashMap();
      Map<String, SinkRunner> sinkRunnerMap = Maps.newHashMap();
      try {
        loadChannels(agentConf, channelComponentMap);
        loadSources(agentConf, channelComponentMap, sourceRunnerMap);
        loadSinks(agentConf, channelComponentMap, sinkRunnerMap);
        Set<String> channelNames =
            new HashSet<String>(channelComponentMap.keySet());
        for(String channelName : channelNames) {
          ChannelComponent channelComponent = channelComponentMap.
              get(channelName);
          if(channelComponent.components.isEmpty()) {
            LOGGER.warn(String.format("Channel %s has no components connected" +
                " and has been removed.", channelName));
            channelComponentMap.remove(channelName);
            Map<String, Channel> nameChannelMap = channelCache.
                get(channelComponent.channel.getClass());
            if(nameChannelMap != null) {
              nameChannelMap.remove(channelName);
            }
          } else {
            LOGGER.info(String.format("Channel %s connected to %s",
                channelName, channelComponent.components.toString()));
            conf.addChannel(channelName, channelComponent.channel);
          }
        }
        for(Map.Entry<String, SourceRunner> entry : sourceRunnerMap.entrySet()) {
          conf.addSourceRunner(entry.getKey(), entry.getValue());
        }
        for(Map.Entry<String, SinkRunner> entry : sinkRunnerMap.entrySet()) {
          conf.addSinkRunner(entry.getKey(), entry.getValue());
        }
      } catch (InstantiationException ex) {
        LOGGER.error("Failed to instantiate component", ex);
      } finally {
        channelComponentMap.clear();
        sourceRunnerMap.clear();
        sinkRunnerMap.clear();
      }
    } else {
      LOGGER.warn("No configuration found for this host:{}", getAgentName());
    }
    return conf;
  }
</pre><pre name="code" class="java">