1. 程式人生 > >Flume NG原始碼分析(四)使用ExecSource從本地日誌檔案中收集日誌

Flume NG原始碼分析(四)使用ExecSource從本地日誌檔案中收集日誌

常見的日誌收集方式有兩種,一種是經由本地日誌檔案做媒介,非同步地傳送到遠端日誌倉庫,一種是基於RPC方式的同步日誌收集,直接傳送到遠端日誌倉庫。這篇講講Flume NG如何從本地日誌檔案中收集日誌。


ExecSource是用來執行本地shell命令,並把本地日誌檔案中的資料封裝成Event事件流在Flume NG中流動。它的典型配置如下,指定source型別是exec,指定Source下游的Channel是哪個,指定要執行的shell命令。最常用的命令就是tail -F命令,可以從本地日誌檔案中獲取新追加的日誌。

producer.sources.s1.type = exec
producer.sources.s1.channels = channel
producer.sources.s1.command = tail -F /data/logs/test.log

看一下ExecSource的實現流程

1. ExecSource維護了一個單執行緒的執行緒池executor,以及配置的shell命令,計數器等屬性

2. ExecRunnable物件實現了Runnable介面,被executor執行緒池執行。 ExecRunnable實現了獲取本地日誌的主要流程

3. ExecRunnable維護了一個定時執行的執行緒池timedFlushService,定時去檢查Event列表,如果符合批量輸出的要求,就批量flush event

4. ExecRunnable使用Runtime.getRuntime().exec以及java.lang.ProcessBuilder來使用Java平臺執行作業系統的Shell命令,並把這個Shell命令建立的程序的輸出流重定向到Java平臺的流,從而在Java平臺可以獲取到本地日誌檔案的資料。這裡的Shell命令是tail -F

這裡最主要的是步驟是在Java平臺中使用Shell命令來獲取本地日誌檔案的資料,主要的程式碼如下

// ExecRuannable.run()

try {
          if(shell != null) {
            String[] commandArgs = formulateShellCommand(shell, command);
            process = Runtime.getRuntime().exec(commandArgs);
          }  else {
            String[] commandArgs = command.split("\\s+");
            process = new ProcessBuilder(commandArgs).start();
          }
          reader = new BufferedReader(
              new InputStreamReader(process.getInputStream(), charset));
          // 當tail -F沒有資料時,reader.readLine會阻塞,直到有資料到達
          while ((line = reader.readLine()) != null) {
            synchronized (eventList) {
              sourceCounter.incrementEventReceivedCount();
              eventList.add(EventBuilder.withBody(line.getBytes(charset)));
              if(eventList.size() >= bufferCount || timeout()) {
                flushEventBatch(eventList);
              }
            }
          }

 

將java.lang.Process代表的本地程序的輸出流重定向到Java的輸入流中,當tail -F沒有資料時,Java輸入流的reader.readLine會阻塞,直到有新資料到達。獲取到新資料後,首先是將資料封裝成Event,如果超過了批量限制,就flushEventBatch

flushEventBatch會將Event列表交給ChannelProcessor批量處理。

// EventBuilder.withBdoy

 public static Event withBody(byte[] body, Map<String, String> headers) {
    Event event = new SimpleEvent();

    if(body == null) {
      body = new byte[0];
    }
    event.setBody(body);

    if (headers != null) {
      event.setHeaders(new HashMap<String, String>(headers));
    }

    return event;
  }


// ExecSource.flushEventBatch

private void flushEventBatch(List<Event> eventList){
      channelProcessor.processEventBatch(eventList);
      sourceCounter.addToEventAcceptedCount(eventList.size());
      eventList.clear();
      lastPushToChannel = systemClock.currentTimeMillis();
    }

ExecSource是非同步收集本地日誌的實現,它不保證可靠性,比如Java平臺建立的tail -F程序出問題了,那麼目標日誌檔案的收集會收到影響。ExecSource的好處是效能比RPC方式要好,減少了網路的流量,同時避免了對應用程式的傾入性,可以無縫地接入。