Flume NG原始碼分析(五)使用ThriftSource通過RPC方式收集日誌
上一篇說了利用ExecSource從本地日誌檔案非同步的收集日誌,這篇說說採用RPC方式同步收集日誌的方式。筆者對Thrift比較熟悉,所以用ThriftSource來介紹RPC的日誌收集方式。
整體的結構圖如下:
1. ThriftSource包含了一個Thrift Server,以及一個Thrift Service服務的實現。這裡的Thrift Service是由ThriftSourceProtocol定義
2. 應用程式呼叫Thrift Service的客戶端,以RPC的方式將日誌傳送到ThriftSource的Thrfit Service的服務端,完成應用層序日誌的收集
先來看下ThriftSourceProtocol定義的Thrfit服務。Thrift服務定義在flume-ng-sdk工程的flume.thrift中
1. 定義了ThriftFlumeEvent資料結構,日誌封裝成Event來Flume NG中傳遞
2. 定義了ThriftSourceProtocol服務,有兩個介面,append和appendBatch
namespace java org.apache.flume.thrift struct ThriftFlumeEvent { 1: required map <string, string> headers, 2: required binary body, } enum Status { OK, FAILED, ERROR, UNKNOWN } service ThriftSourceProtocol { Status append(1: ThriftFlumeEvent event), Status appendBatch(1: list<ThriftFlumeEvent> events), }
Thrift生成的中間檔案是ThrfitSourceProtocol,由伺服器端和客戶端共享
public class ThriftSourceProtocol { public interface Iface { public Status append(ThriftFlumeEvent event) throws org.apache.thrift.TException; public Status appendBatch(List<ThriftFlumeEvent> events) throws org.apache.thrift.TException; } 。。。。。。 }
ThrfitSource中的ThriftSourceHandler私有類實現了ThrfitSourceProtocol這個服務,append介面的實現邏輯如下
1. 把ThriftFlumeEvent轉化成SimpleEvent
2. 修改計數器
3. 把SimpleEvent交給ChannelProcessor來處理,傳遞到下游的Channel中去
可以看到ThriftSouceHandler的實現邏輯和ExecRunnable的邏輯基本是一樣的
private class ThriftSourceHandler implements ThriftSourceProtocol.Iface {
@Override
public Status append(ThriftFlumeEvent event) throws TException {
Event flumeEvent = EventBuilder.withBody(event.getBody(),
event.getHeaders());
sourceCounter.incrementAppendReceivedCount();
sourceCounter.incrementEventReceivedCount();
try {
getChannelProcessor().processEvent(flumeEvent);
} catch (ChannelException ex) {
logger.warn("Thrift source " + getName() + " could not append events " +
"to the channel.", ex);
return Status.FAILED;
}
sourceCounter.incrementAppendAcceptedCount();
sourceCounter.incrementEventAcceptedCount();
return Status.OK;
}
@Override
public Status appendBatch(List<ThriftFlumeEvent> events) throws TException {
sourceCounter.incrementAppendBatchReceivedCount();
sourceCounter.addToEventReceivedCount(events.size());
List<Event> flumeEvents = Lists.newArrayList();
for(ThriftFlumeEvent event : events) {
flumeEvents.add(EventBuilder.withBody(event.getBody(),
event.getHeaders()));
}
try {
getChannelProcessor().processEventBatch(flumeEvents);
} catch (ChannelException ex) {
logger.warn("Thrift source %s could not append events to the " +
"channel.", getName());
return Status.FAILED;
}
sourceCounter.incrementAppendBatchAcceptedCount();
sourceCounter.addToEventAcceptedCount(events.size());
return Status.OK;
}
}
}
有了Thrfit服務實現後,ThrfitSource定義了Thrfit Server。預設是TThreadedSelectorServer,當TThreadedSelectorServer ClassNotFound後,建立TThreadPoolServer,還是沒找到的話,那麼ThriftSource啟動失敗。
關於Thrfit Server的更多介紹可以看這篇Thrift原始碼分析(七)-- TServer伺服器分析
Thrift Server的建立和啟動主要做了幾件事情
1. 建立ServerSocket,這裡是TNonblockingServerSocket,非阻塞的ServerSocket
2. 建立伺服器引數類TNonblockingServer.AbstractNonblockingServerArgs,所有的伺服器的屬性設定都是在Args類裡傳遞的
3. TThreadedSelectorServer是一個Reactor模式的伺服器實現,需要傳遞一個執行緒池。這裡是Executors.newFixedThreadPool(maxThreads, threadFactory);
4. 設定編解碼協議,這裡是TFastFramedTransport協議
5. 設定Thrift服務的實現類Processor,這裡是上面定義的ThrfitSourceHandler類
6. 啟動Thrift伺服器,這裡在單獨的執行緒中啟動了Thrift伺服器。servingExecutor.submit(new Runnable() {public void run() {server.serve();}})
在單獨的執行緒啟動Thrift伺服器主要的目的是在原來的執行緒中可以處理一下Thrfit伺服器停止後的清理工作。
Class<?> serverClass = null;
Class<?> argsClass = null;
TServer.AbstractServerArgs args = null;
/*
* Use reflection to determine if TThreadedSelectServer is available. If
* it is not available, use TThreadPoolServer
*/
try {
serverClass = Class.forName("org.apache.thrift" +
".server.TThreadedSelectorServer");
argsClass = Class.forName("org.apache.thrift" +
".server.TThreadedSelectorServer$Args");
// Looks like TThreadedSelectorServer is available, so continue..
ExecutorService sourceService;
ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat(
"Flume Thrift IPC Thread %d").build();
if (maxThreads == 0) {
sourceService = Executors.newCachedThreadPool(threadFactory);
} else {
sourceService = Executors.newFixedThreadPool(maxThreads, threadFactory);
}
serverTransport = new TNonblockingServerSocket(
new InetSocketAddress(bindAddress, port));
args = (TNonblockingServer.AbstractNonblockingServerArgs) argsClass
.getConstructor(TNonblockingServerTransport.class)
.newInstance(serverTransport);
Method m = argsClass.getDeclaredMethod("executorService",
ExecutorService.class);
m.invoke(args, sourceService);
}
try {
args.protocolFactory(new TCompactProtocol.Factory());
args.inputTransportFactory(new TFastFramedTransport.Factory());
args.outputTransportFactory(new TFastFramedTransport.Factory());
args.processor(new ThriftSourceProtocol
.Processor<ThriftSourceHandler>(new ThriftSourceHandler()));
server = (TServer) serverClass.getConstructor(argsClass).newInstance
(args);
} catch (Throwable ex) {
throw new FlumeException("Cannot start Thrift Source.", ex);
}
servingExecutor = Executors.newSingleThreadExecutor(new
ThreadFactoryBuilder().setNameFormat("Flume Thrift Source I/O Boss")
.build());
/**
* Start serving.
*/
servingExecutor.submit(new Runnable() {
@Override
public void run() {
server.serve();
}
});
總結一下,採用RPC的方式收集日誌有幾個步驟
1. 定義RPC服務來收集日誌
2. 實現RPC服務,並提供客戶端給應用程式。應用程式使用客戶端來將日誌封裝成Event,通過RPC呼叫傳遞給RPC型別的Source
3. RPC型別的Source啟動RPC Server,提供RPC服務,將接收到的Event傳遞給下游的Channel