【Flume】【原始碼分析】flume中http監控型別的原始碼分析,度量資訊分析,以及flume的事件匯流排
在flume1.5.2中,如果想要通過http方式的監控來獲取flume相關度量值,通過在啟動指令碼後新增如下內容即可:
-Dflume.monitoring.type=http -Dflume.monitoring.port=34545
監控
-D的屬性可以通過System.getProerties()直接獲取得到,那麼以上兩個屬性是通過方法loadMonitoring()來讀取,該方法在flume入口中Application
這裡的monitorServer是一個HTTPMetricsServer,該類中是啟動了一個jettyserver例項來監聽的,看它的start()方法private void loadMonitoring() { Properties systemProps = System.getProperties(); Set<String> keys = systemProps.stringPropertyNames(); try { if (keys.contains(CONF_MONITOR_CLASS)) { String monitorType = systemProps.getProperty(CONF_MONITOR_CLASS); Class<? extends MonitorService> klass; try { //Is it a known type? klass = MonitoringType.valueOf( monitorType.toUpperCase()).getMonitorClass(); } catch (Exception e) { //Not a known type, use FQCN klass = (Class<? extends MonitorService>) Class.forName(monitorType); } this.monitorServer = klass.newInstance(); Context context = new Context(); for (String key : keys) { if (key.startsWith(CONF_MONITOR_PREFIX)) { context.put(key.substring(CONF_MONITOR_PREFIX.length()), systemProps.getProperty(key)); } } monitorServer.configure(context); monitorServer.start(); } } catch (Exception e) { logger.warn("Error starting monitoring. " + "Monitoring might not be available.", e); } }
其實這就跟我們java中的socket變成一樣,這裡就啟動了服務端,監聽了配置的埠號public void start() { jettyServer = new Server(); //We can use Contexts etc if we have many urls to handle. For one url, //specifying a handler directly is the most efficient. SelectChannelConnector connector = new SelectChannelConnector(); connector.setReuseAddress(true); connector.setPort(port); jettyServer.setConnectors(new Connector[] {connector}); jettyServer.setHandler(new HTTPMetricsHandler()); try { jettyServer.start(); while (!jettyServer.isStarted()) { Thread.sleep(500); } } catch (Exception ex) { LOG.error("Error starting Jetty. JSON Metrics may not be available.", ex); }
注意看上面有一個處理器HttpMetricsHandler
看它的handle()方法
該方法就是獲取那些度量值的關鍵public void handle(String target, HttpServletRequest request, HttpServletResponse response, int dispatch) throws IOException, ServletException { // /metrics is the only place to pull metrics. //If we want to use any other url for something else, we should make sure //that for metrics only /metrics is used to prevent backward //compatibility issues. if(request.getMethod().equalsIgnoreCase("TRACE") || request.getMethod() .equalsIgnoreCase("OPTIONS")) { response.sendError(HttpServletResponse.SC_FORBIDDEN); response.flushBuffer(); ((Request) request).setHandled(true); return; } if (target.equals("/")) { response.setContentType("text/html;charset=utf-8"); response.setStatus(HttpServletResponse.SC_OK); response.getWriter().write("For Flume metrics please click" + " <a href = \"./metrics\"> here</a>."); response.flushBuffer(); ((Request) request).setHandled(true); return; } else if (target.equalsIgnoreCase("/metrics")) { response.setContentType("application/json;charset=utf-8"); response.setStatus(HttpServletResponse.SC_OK); Map<String, Map<String, String>> metricsMap = JMXPollUtil.getAllMBeans(); String json = gson.toJson(metricsMap, mapType); response.getWriter().write(json); response.flushBuffer(); ((Request) request).setHandled(true); return; } response.sendError(HttpServletResponse.SC_NOT_FOUND); response.flushBuffer(); //Not handling the request returns a Not found error page. }
1、首先是禁止了TRACE和OPTIONS兩種請求
TRACE- 回顯伺服器收到的請求,主要用於測試或診斷。
OPTIONS - 返回伺服器針對特定資源所支援的HTTP請求方法。也可以利用向Web伺服器傳送'*'的請求來測試伺服器的功能性。
2、如果你輸入的地址是ip:port,沒有輸入後面的metrics,效果如下:
點選here就可以看到度量資訊了
3、如果你輸入了metrics請求,後面就是具體的處理邏輯了
Map<String, Map<String, String>> metricsMap = JMXPollUtil.getAllMBeans();
這就是獲取json資訊的核心這後面的內容就是JMX相關的了,與本文沒太大關係,具體請見:http://baike.baidu.com/link?url=k1fgRfj4e720XBCEKIZT1eV1Zfct5LCkufbwKYj1s5MFvaK7kS0Vlb-lfdmkXOWFjJB9vYA1PWgYPYupkIRzJ_
度量
現在來看看某一個度量資訊是如何一步步被獲取到的,以
OpenConnectionCount
為例
在類SourceCounter中
構造方法
public SourceCounter(String name) {
super(MonitoredCounterGroup.Type.SOURCE, name, ATTRIBUTES);
}
被類JMSSource的doConfigure(Context context)呼叫
而該方法又被BasicSourceSemantics類的configure呼叫
在會上就回到了入口Application的loadMonitoring方法了,這一條鏈就通了
上面構造方法呼叫了父類構造方法
protected MonitoredCounterGroup(Type type, String name, String... attrs) {
this.type = type;
this.name = name;
Map<String, AtomicLong> counterInitMap = new HashMap<String, AtomicLong>();
// Initialize the counters
for (String attribute : attrs) {
counterInitMap.put(attribute, new AtomicLong(0L));
}
counterMap = Collections.unmodifiableMap(counterInitMap);
startTime = new AtomicLong(0L);
stopTime = new AtomicLong(0L);
}
這裡所有屬性的預設值都是0,用AtomicLong來宣告的
在整個SourceCounter類中,我們看到有很多increment,add,get的方法,那麼這些方法什麼時候被呼叫呢?
我們看一個具體的AvroSource類
public Status append(AvroFlumeEvent avroEvent) {
logger.debug("Avro source {}: Received avro event: {}", getName(),
avroEvent);
sourceCounter.incrementAppendReceivedCount();
sourceCounter.incrementEventReceivedCount();
Event event = EventBuilder.withBody(avroEvent.getBody().array(),
toStringMap(avroEvent.getHeaders()));
try {
getChannelProcessor().processEvent(event);
} catch (ChannelException ex) {
logger.warn("Avro source " + getName() + ": Unable to process event. " +
"Exception follows.", ex);
return Status.FAILED;
}
sourceCounter.incrementAppendAcceptedCount();
sourceCounter.incrementEventAcceptedCount();
return Status.OK;
}
這裡可以看到呼叫了sourceCounter的increment方法,由原始碼很容易可以看出各個屬性的意思了
各個屬性都是在元件執行過程,實時的去賦值的。
{
"SOURCE.r1": {
"OpenConnectionCount": "0", 當前有效的連線數
"Type": "SOURCE", 元件型別
"AppendBatchAcceptedCount": "0", source端剛剛追加放入channel的批量數
"AppendBatchReceivedCount": "0", source端剛剛追加的批量的數量,比如一批100,該度量為2,就是source端收到了200個events
"EventAcceptedCount": "10", source端目前成功放入channel的event數量
"StopTime": "0",
"AppendReceivedCount": "0", source端剛剛追加的目前收到的event數量
"StartTime": "1422502242340", source元件啟動的時間
"EventReceivedCount": "10", source端已經收到的event數量
"AppendAcceptedCount": "0" source端剛剛追加放入channel的event數量
},
"SOURCE.r2": {
"OpenConnectionCount": "0",
"Type": "SOURCE",
"AppendBatchAcceptedCount": "0",
"AppendBatchReceivedCount": "0",
"EventAcceptedCount": "10",
"StopTime": "0",
"AppendReceivedCount": "0",
"StartTime": "1422502242341",
"EventReceivedCount": "10",
"AppendAcceptedCount": "0"
},
"CHANNEL.c1": {
"EventPutSuccessCount": "20", 成功放入通道的event數量
"ChannelFillPercentage": "0.0", 通道使用比例
"Type": "CHANNEL",
"StopTime": "0",
"EventPutAttemptCount": "20", 正在放進通道的event數量
"ChannelSize": "0",
"StartTime": "1422502242328",
"EventTakeSuccessCount": "20", 從通道中成功取出event的數量
"ChannelCapacity": "10000000",
"EventTakeAttemptCount": "2105" 正在從通道中取event的數量
},
"SINK.k1": {
"BatchCompleteCount": "0", 成功完成輸出的批量事件個數
"ConnectionFailedCount": "0", sink端連線失敗的次數
"EventDrainAttemptCount": "20", 試圖消耗的事件數量,從通道中拿來消耗
"ConnectionCreatedCount": "1", sink端連線數
"Type": "SINK",
"BatchEmptyCount": "1042", 批量取空的次數
"ConnectionClosedCount": "0", 連線關閉的次數
"EventDrainSuccessCount": "20", 成功處理的event數量
"StopTime": "0",
"StartTime": "1422502242340",
"BatchUnderflowCount": "1" 沒有達到batchsize的批量event數目,也就是這一批沒有達到batchsize就處理了,根據這個值可調整batchsize
},
"SINK.k2": {
"BatchCompleteCount": "0",
"ConnectionFailedCount": "0",
"EventDrainAttemptCount": "0",
"ConnectionCreatedCount": "1",
"Type": "SINK",
"BatchEmptyCount": "1042",
"ConnectionClosedCount": "0",
"EventDrainSuccessCount": "0",
"StopTime": "0",
"StartTime": "1422502243048",
"BatchUnderflowCount": "0"
}
}
事件匯流排
以上是監控相關的度量資訊,那麼回到本文的一開始,loadMonitoring方法,我們來看看它的呼叫過程,分析下eventBus相關內容
if(reload) {
EventBus eventBus = new EventBus(agentName + "-event-bus");
PollingPropertiesFileConfigurationProvider configurationProvider =
new PollingPropertiesFileConfigurationProvider(agentName,
configurationFile, eventBus, 30);
components.add(configurationProvider);
application = new Application(components);
eventBus.register(application);
} else {
PropertiesFileConfigurationProvider configurationProvider =
new PropertiesFileConfigurationProvider(agentName,
configurationFile);
application = new Application();
application.handleConfigurationEvent(configurationProvider.getConfiguration());
}
還是上面的入口,else分支我們很容易往下找到loadMonitoring方法的整個呼叫樹形結構,但是if分支裡,我們很難捕捉到呼叫的樹形結構,我們來詳細看看:
eventBus是一個事件匯流排
EventBus的程式碼:
public void register(Object object) {
Multimap<Class<?>, EventHandler> methodsInListener =
finder.findAllHandlers(object);
handlersByType.putAll(methodsInListener);
}
在EventBus進行register時,會通過一個finder找到register的object中被標註了@Subscribe的方法。並且按照EventType進行分類,放在handlersByType裡。這樣當EventBus的post新的Event時,就可以根據EventType呼叫相應的EventHandler。
該事件匯流排註冊了application,按照上面的解釋,會尋找Application類中註解了@Subscribe的方法@Subscribe
public synchronized void handleConfigurationEvent(MaterializedConfiguration conf) {
stopAllComponents();
startAllComponents(conf);
}
該方法中有個startAllComponents方法,該方法最後一行呼叫了this.loadMonitoring();方法,所以這個樹形結構就出來了。