1. 程式人生 > >【Flume】【原始碼分析】flume中http監控型別的原始碼分析,度量資訊分析,以及flume的事件匯流排

【Flume】【原始碼分析】flume中http監控型別的原始碼分析,度量資訊分析,以及flume的事件匯流排

在flume1.5.2中,如果想要通過http方式的監控來獲取flume相關度量值,通過在啟動指令碼後新增如下內容即可:

-Dflume.monitoring.type=http -Dflume.monitoring.port=34545

監控

-D的屬性可以通過System.getProerties()直接獲取得到,那麼以上兩個屬性是通過方法loadMonitoring()來讀取,該方法在flume入口中Application

 private void loadMonitoring() {
    Properties systemProps = System.getProperties();
    Set<String> keys = systemProps.stringPropertyNames();
    try {
      if (keys.contains(CONF_MONITOR_CLASS)) {
        String monitorType = systemProps.getProperty(CONF_MONITOR_CLASS);
        Class<? extends MonitorService> klass;
        try {
          //Is it a known type?
          klass = MonitoringType.valueOf(
                  monitorType.toUpperCase()).getMonitorClass();
        } catch (Exception e) {
          //Not a known type, use FQCN
          klass = (Class<? extends MonitorService>) Class.forName(monitorType);
        }
        this.monitorServer = klass.newInstance();
        Context context = new Context();
        for (String key : keys) {
          if (key.startsWith(CONF_MONITOR_PREFIX)) {
            context.put(key.substring(CONF_MONITOR_PREFIX.length()),
                    systemProps.getProperty(key));
          }
        }
        monitorServer.configure(context);
        monitorServer.start();
      }
    } catch (Exception e) {
      logger.warn("Error starting monitoring. "
              + "Monitoring might not be available.", e);
    }

  }
這裡的monitorServer是一個HTTPMetricsServer,該類中是啟動了一個jettyserver例項來監聽的,看它的start()方法
public void start() {
    jettyServer = new Server();
    //We can use Contexts etc if we have many urls to handle. For one url,
    //specifying a handler directly is the most efficient.
    SelectChannelConnector connector = new SelectChannelConnector();
    connector.setReuseAddress(true);
    connector.setPort(port);
    jettyServer.setConnectors(new Connector[] {connector});
    jettyServer.setHandler(new HTTPMetricsHandler());
    try {
      jettyServer.start();
      while (!jettyServer.isStarted()) {
        Thread.sleep(500);
      }
    } catch (Exception ex) {
      LOG.error("Error starting Jetty. JSON Metrics may not be available.", ex);
    }
其實這就跟我們java中的socket變成一樣,這裡就啟動了服務端,監聽了配置的埠號
注意看上面有一個處理器HttpMetricsHandler

看它的handle()方法

public void handle(String target,
            HttpServletRequest request,
            HttpServletResponse response,
            int dispatch) throws IOException, ServletException {
      // /metrics is the only place to pull metrics.
      //If we want to use any other url for something else, we should make sure
      //that for metrics only /metrics is used to prevent backward
      //compatibility issues.
      if(request.getMethod().equalsIgnoreCase("TRACE") || request.getMethod()
        .equalsIgnoreCase("OPTIONS")) {
        response.sendError(HttpServletResponse.SC_FORBIDDEN);
        response.flushBuffer();
        ((Request) request).setHandled(true);
        return;
      }
      if (target.equals("/")) {
        response.setContentType("text/html;charset=utf-8");
        response.setStatus(HttpServletResponse.SC_OK);
        response.getWriter().write("For Flume metrics please click"
                + " <a href = \"./metrics\"> here</a>.");
        response.flushBuffer();
        ((Request) request).setHandled(true);
        return;
      } else if (target.equalsIgnoreCase("/metrics")) {
        response.setContentType("application/json;charset=utf-8");
        response.setStatus(HttpServletResponse.SC_OK);
        Map<String, Map<String, String>> metricsMap = JMXPollUtil.getAllMBeans();
        String json = gson.toJson(metricsMap, mapType);
        response.getWriter().write(json);
        response.flushBuffer();
        ((Request) request).setHandled(true);
        return;
      }
      response.sendError(HttpServletResponse.SC_NOT_FOUND);
      response.flushBuffer();
      //Not handling the request returns a Not found error page.
    }
該方法就是獲取那些度量值的關鍵

1、首先是禁止了TRACE和OPTIONS兩種請求

TRACE回顯伺服器收到的請求,主要用於測試或診斷。
OPTIONS - 返回伺服器針對特定資源所支援的HTTP請求方法。也可以利用向Web伺服器傳送'*'的請求來測試伺服器的功能性。

2、如果你輸入的地址是ip:port,沒有輸入後面的metrics,效果如下:


點選here就可以看到度量資訊了

3、如果你輸入了metrics請求,後面就是具體的處理邏輯了

Map<String, Map<String, String>> metricsMap = JMXPollUtil.getAllMBeans();
這就是獲取json資訊的核心
這後面的內容就是JMX相關的了,與本文沒太大關係,具體請見:http://baike.baidu.com/link?url=k1fgRfj4e720XBCEKIZT1eV1Zfct5LCkufbwKYj1s5MFvaK7kS0Vlb-lfdmkXOWFjJB9vYA1PWgYPYupkIRzJ_

度量

現在來看看某一個度量資訊是如何一步步被獲取到的,以

OpenConnectionCount

為例

在類SourceCounter中

構造方法

public SourceCounter(String name) {
    super(MonitoredCounterGroup.Type.SOURCE, name, ATTRIBUTES);
  }
被類JMSSource的doConfigure(Context context)呼叫

而該方法又被BasicSourceSemantics類的configure呼叫
在會上就回到了入口Application的loadMonitoring方法了,這一條鏈就通了

上面構造方法呼叫了父類構造方法

protected MonitoredCounterGroup(Type type, String name, String... attrs) {
    this.type = type;
    this.name = name;

    Map<String, AtomicLong> counterInitMap = new HashMap<String, AtomicLong>();

    // Initialize the counters
    for (String attribute : attrs) {
      counterInitMap.put(attribute, new AtomicLong(0L));
    }

    counterMap = Collections.unmodifiableMap(counterInitMap);

    startTime = new AtomicLong(0L);
    stopTime = new AtomicLong(0L);

  }
這裡所有屬性的預設值都是0,用AtomicLong來宣告的

在整個SourceCounter類中,我們看到有很多increment,add,get的方法,那麼這些方法什麼時候被呼叫呢?

我們看一個具體的AvroSource類

public Status append(AvroFlumeEvent avroEvent) {
    logger.debug("Avro source {}: Received avro event: {}", getName(),
        avroEvent);
    sourceCounter.incrementAppendReceivedCount();
    sourceCounter.incrementEventReceivedCount();

    Event event = EventBuilder.withBody(avroEvent.getBody().array(),
        toStringMap(avroEvent.getHeaders()));

    try {
      getChannelProcessor().processEvent(event);
    } catch (ChannelException ex) {
      logger.warn("Avro source " + getName() + ": Unable to process event. " +
          "Exception follows.", ex);
      return Status.FAILED;
    }

    sourceCounter.incrementAppendAcceptedCount();
    sourceCounter.incrementEventAcceptedCount();

    return Status.OK;
  }
這裡可以看到呼叫了sourceCounter的increment方法,由原始碼很容易可以看出各個屬性的意思了

各個屬性都是在元件執行過程,實時的去賦值的。

{
    "SOURCE.r1": {
        "OpenConnectionCount": "0",                         當前有效的連線數
        "Type": "SOURCE",                                        元件型別
        "AppendBatchAcceptedCount": "0",                source端剛剛追加放入channel的批量數
        "AppendBatchReceivedCount": "0",                source端剛剛追加的批量的數量,比如一批100,該度量為2,就是source端收到了200個events
        "EventAcceptedCount": "10",                          source端目前成功放入channel的event數量
        "StopTime": "0",
        "AppendReceivedCount": "0",                         source端剛剛追加的目前收到的event數量
        "StartTime": "1422502242340",                      source元件啟動的時間
        "EventReceivedCount": "10",                          source端已經收到的event數量
        "AppendAcceptedCount": "0"                          source端剛剛追加放入channel的event數量
    },
    "SOURCE.r2": {
        "OpenConnectionCount": "0",
        "Type": "SOURCE",
        "AppendBatchAcceptedCount": "0",
        "AppendBatchReceivedCount": "0",
        "EventAcceptedCount": "10",
        "StopTime": "0",
        "AppendReceivedCount": "0",
        "StartTime": "1422502242341",
        "EventReceivedCount": "10",
        "AppendAcceptedCount": "0"
    },
    "CHANNEL.c1": {
        "EventPutSuccessCount": "20",                   成功放入通道的event數量
        "ChannelFillPercentage": "0.0",                    通道使用比例
        "Type": "CHANNEL",
        "StopTime": "0",
        "EventPutAttemptCount": "20",                     正在放進通道的event數量
        "ChannelSize": "0",
        "StartTime": "1422502242328",
        "EventTakeSuccessCount": "20",                從通道中成功取出event的數量
        "ChannelCapacity": "10000000",
        "EventTakeAttemptCount": "2105"              正在從通道中取event的數量
    },
    "SINK.k1": {
        "BatchCompleteCount": "0",                        成功完成輸出的批量事件個數
        "ConnectionFailedCount": "0",                     sink端連線失敗的次數
        "EventDrainAttemptCount": "20",                 試圖消耗的事件數量,從通道中拿來消耗
        "ConnectionCreatedCount": "1",                  sink端連線數
        "Type": "SINK",
        "BatchEmptyCount": "1042",                       批量取空的次數
        "ConnectionClosedCount": "0",                   連線關閉的次數
        "EventDrainSuccessCount": "20",               成功處理的event數量
        "StopTime": "0",
        "StartTime": "1422502242340",
        "BatchUnderflowCount": "1"                        沒有達到batchsize的批量event數目,也就是這一批沒有達到batchsize就處理了,根據這個值可調整batchsize
    },
    "SINK.k2": {
        "BatchCompleteCount": "0",
        "ConnectionFailedCount": "0",
        "EventDrainAttemptCount": "0",
        "ConnectionCreatedCount": "1",
        "Type": "SINK",
        "BatchEmptyCount": "1042",
        "ConnectionClosedCount": "0",
        "EventDrainSuccessCount": "0",
        "StopTime": "0",
        "StartTime": "1422502243048",
        "BatchUnderflowCount": "0"
    }
}

事件匯流排

以上是監控相關的度量資訊,那麼回到本文的一開始,loadMonitoring方法,我們來看看它的呼叫過程,分析下eventBus相關內容

if(reload) {
        EventBus eventBus = new EventBus(agentName + "-event-bus");
        PollingPropertiesFileConfigurationProvider configurationProvider =
            new PollingPropertiesFileConfigurationProvider(agentName,
                configurationFile, eventBus, 30);
        components.add(configurationProvider);
        application = new Application(components);
        eventBus.register(application);
      } else {
        PropertiesFileConfigurationProvider configurationProvider =
            new PropertiesFileConfigurationProvider(agentName,
                configurationFile);
        application = new Application();
        application.handleConfigurationEvent(configurationProvider.getConfiguration());
      }
還是上面的入口,else分支我們很容易往下找到loadMonitoring方法的整個呼叫樹形結構,但是if分支裡,我們很難捕捉到呼叫的樹形結構,我們來詳細看看:

eventBus是一個事件匯流排

EventBus的程式碼:

    public void register(Object object) {
        Multimap<Class<?>, EventHandler> methodsInListener =
                finder.findAllHandlers(object);
        handlersByType.putAll(methodsInListener);
    }

在EventBus進行register時,會通過一個finder找到register的object中被標註了@Subscribe的方法。並且按照EventType進行分類,放在handlersByType裡。這樣當EventBus的post新的Event時,就可以根據EventType呼叫相應的EventHandler。

該事件匯流排註冊了application,按照上面的解釋,會尋找Application類中註解了@Subscribe的方法
@Subscribe
  public synchronized void handleConfigurationEvent(MaterializedConfiguration conf) {
    stopAllComponents();
    startAllComponents(conf);
  }
該方法中有個startAllComponents方法,該方法最後一行呼叫了this.loadMonitoring();方法,所以這個樹形結構就出來了。