1. 程式人生 > >Flume NG原始碼分析(五)使用ThriftSource通過RPC方式收集日誌

Flume NG原始碼分析(五)使用ThriftSource通過RPC方式收集日誌

上一篇說了利用ExecSource從本地日誌檔案非同步的收集日誌,這篇說說採用RPC方式同步收集日誌的方式。筆者對Thrift比較熟悉,所以用ThriftSource來介紹RPC的日誌收集方式。

整體的結構圖如下:

1. ThriftSource包含了一個Thrift Server,以及一個Thrift Service服務的實現。這裡的Thrift Service是由ThriftSourceProtocol定義

2. 應用程式呼叫Thrift Service的客戶端,以RPC的方式將日誌傳送到ThriftSource的Thrfit Service的服務端,完成應用層序日誌的收集


先來看下ThriftSourceProtocol定義的Thrfit服務。Thrift服務定義在flume-ng-sdk工程的flume.thrift中

1. 定義了ThriftFlumeEvent資料結構,日誌封裝成Event來Flume NG中傳遞

2. 定義了ThriftSourceProtocol服務,有兩個介面,append和appendBatch

namespace java org.apache.flume.thrift

struct ThriftFlumeEvent {
  1: required map <string, string> headers,
  2: required binary body,
}

enum Status {
  OK,
  FAILED,
  ERROR,
  UNKNOWN
}

service ThriftSourceProtocol {
  Status append(1: ThriftFlumeEvent event),
  Status appendBatch(1: list<ThriftFlumeEvent> events),
}

Thrift生成的中間檔案是ThrfitSourceProtocol,由伺服器端和客戶端共享

public class ThriftSourceProtocol {

  public interface Iface {

    public Status append(ThriftFlumeEvent event) throws org.apache.thrift.TException;

    public Status appendBatch(List<ThriftFlumeEvent> events) throws org.apache.thrift.TException;

  }

。。。。。。
}

ThrfitSource中的ThriftSourceHandler私有類實現了ThrfitSourceProtocol這個服務,append介面的實現邏輯如下

1. 把ThriftFlumeEvent轉化成SimpleEvent

2. 修改計數器

3. 把SimpleEvent交給ChannelProcessor來處理,傳遞到下游的Channel中去


可以看到ThriftSouceHandler的實現邏輯和ExecRunnable的邏輯基本是一樣的

private class ThriftSourceHandler implements ThriftSourceProtocol.Iface {

    @Override
    public Status append(ThriftFlumeEvent event) throws TException {
      Event flumeEvent = EventBuilder.withBody(event.getBody(),
        event.getHeaders());

      sourceCounter.incrementAppendReceivedCount();
      sourceCounter.incrementEventReceivedCount();

      try {
        getChannelProcessor().processEvent(flumeEvent);
      } catch (ChannelException ex) {
        logger.warn("Thrift source " + getName() + " could not append events " +
          "to the channel.", ex);
        return Status.FAILED;
      }
      sourceCounter.incrementAppendAcceptedCount();
      sourceCounter.incrementEventAcceptedCount();
      return Status.OK;
    }

    @Override
    public Status appendBatch(List<ThriftFlumeEvent> events) throws TException {
      sourceCounter.incrementAppendBatchReceivedCount();
      sourceCounter.addToEventReceivedCount(events.size());

      List<Event> flumeEvents = Lists.newArrayList();
      for(ThriftFlumeEvent event : events) {
        flumeEvents.add(EventBuilder.withBody(event.getBody(),
          event.getHeaders()));
      }

      try {
        getChannelProcessor().processEventBatch(flumeEvents);
      } catch (ChannelException ex) {
        logger.warn("Thrift source %s could not append events to the " +
          "channel.", getName());
        return Status.FAILED;
      }

      sourceCounter.incrementAppendBatchAcceptedCount();
      sourceCounter.addToEventAcceptedCount(events.size());
      return Status.OK;
    }
  }

}


有了Thrfit服務實現後,ThrfitSource定義了Thrfit Server。預設是TThreadedSelectorServer,當TThreadedSelectorServer ClassNotFound後,建立TThreadPoolServer,還是沒找到的話,那麼ThriftSource啟動失敗。

關於Thrfit Server的更多介紹可以看這篇Thrift原始碼分析(七)-- TServer伺服器分析


Thrift Server的建立和啟動主要做了幾件事情

1. 建立ServerSocket,這裡是TNonblockingServerSocket,非阻塞的ServerSocket

2. 建立伺服器引數類TNonblockingServer.AbstractNonblockingServerArgs,所有的伺服器的屬性設定都是在Args類裡傳遞的

3. TThreadedSelectorServer是一個Reactor模式的伺服器實現,需要傳遞一個執行緒池。這裡是Executors.newFixedThreadPool(maxThreads, threadFactory);

4. 設定編解碼協議,這裡是TFastFramedTransport協議

5. 設定Thrift服務的實現類Processor,這裡是上面定義的ThrfitSourceHandler類

6. 啟動Thrift伺服器,這裡在單獨的執行緒中啟動了Thrift伺服器。servingExecutor.submit(new Runnable() {public void run() {server.serve();}})

在單獨的執行緒啟動Thrift伺服器主要的目的是在原來的執行緒中可以處理一下Thrfit伺服器停止後的清理工作。

Class<?> serverClass = null;
    Class<?> argsClass = null;
    TServer.AbstractServerArgs args = null;
    /*
     * Use reflection to determine if TThreadedSelectServer is available. If
     * it is not available, use TThreadPoolServer
     */
    try {
      serverClass = Class.forName("org.apache.thrift" +
        ".server.TThreadedSelectorServer");

      argsClass = Class.forName("org.apache.thrift" +
        ".server.TThreadedSelectorServer$Args");

      // Looks like TThreadedSelectorServer is available, so continue..
      ExecutorService sourceService;
      ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat(
        "Flume Thrift IPC Thread %d").build();
      if (maxThreads == 0) {
        sourceService = Executors.newCachedThreadPool(threadFactory);
      } else {
        sourceService = Executors.newFixedThreadPool(maxThreads, threadFactory);
      }
      serverTransport = new TNonblockingServerSocket(
        new InetSocketAddress(bindAddress, port));
      args = (TNonblockingServer.AbstractNonblockingServerArgs) argsClass
        .getConstructor(TNonblockingServerTransport.class)
        .newInstance(serverTransport);
      Method m = argsClass.getDeclaredMethod("executorService",
        ExecutorService.class);
      m.invoke(args, sourceService);
    }


 try {

      args.protocolFactory(new TCompactProtocol.Factory());
      args.inputTransportFactory(new TFastFramedTransport.Factory());
      args.outputTransportFactory(new TFastFramedTransport.Factory());
      args.processor(new ThriftSourceProtocol
        .Processor<ThriftSourceHandler>(new ThriftSourceHandler()));
     
      server = (TServer) serverClass.getConstructor(argsClass).newInstance
        (args);
    } catch (Throwable ex) {
      throw new FlumeException("Cannot start Thrift Source.", ex);
    }


    servingExecutor = Executors.newSingleThreadExecutor(new
      ThreadFactoryBuilder().setNameFormat("Flume Thrift Source I/O Boss")
      .build());

    /**
     * Start serving.
     */
    servingExecutor.submit(new Runnable() {
      @Override
      public void run() {
        server.serve();
      }
    });




總結一下,採用RPC的方式收集日誌有幾個步驟

1. 定義RPC服務來收集日誌

2. 實現RPC服務,並提供客戶端給應用程式。應用程式使用客戶端來將日誌封裝成Event,通過RPC呼叫傳遞給RPC型別的Source

3. RPC型別的Source啟動RPC Server,提供RPC服務,將接收到的Event傳遞給下游的Channel