1. 程式人生 > >Flume快速入門(三):File Channel之寫Event

Flume快速入門(三):File Channel之寫Event

       有了前兩篇博文的基礎,相信大家對Flume Agent的內部結構已經有了個初步的瞭解,現在我們來詳細介紹最常用的檔案通道——File Channel,本篇部落格主要介紹Eevnt是如何完成寫到File Channel這一操作的。

       Channel是聯絡Source和Sink的橋樑,記憶體Memory Channel效能雖高,但對於日誌資料處理這塊,實時並不是第一重要的,幾乎所有以日誌作為資料來源的資料分析都只能說是近乎實時的。對於大多數資料分析來說,日誌丟失是不可忍受的,所以,現在線上使用最多的Channel,就是File Channel。在大多數系統的設計中,為了保證高吞吐量,都會允許一小部分資料損失(比如每隔幾秒再進行刷盤操作,將緩衝區的資料寫如磁碟檔案),但

File Channel沒有這樣設計,它通過在一次事務中提交多個Event來提高吞吐量,做到了只要事務被提交,那麼資料就不會有丟失。但需要注意,Flume Channel是沒有資料副本的,意味著如果磁碟損壞,那麼資料就無法恢復了。

我們先來看看File Channel有哪些屬性可以配置:


我們在來看看,在Flume Agent的配置檔案中,File Channel是怎麼配置的,見下圖:

 

上圖是官網中的截圖,可以看到,圖中設定了兩個目錄,第一個是檢查點的目錄(checkpointDir),第二個是資料的目錄(dataDirs)。。那麼,檢查點是做什麼用的?我們知道,File Channel

中,有一個記憶體佇列來儲存已被Source寫入但還未被Sink消費的Event資料的指標,Event指標指向的就是Event在資料目錄下資料檔案中存放位置,所以,你很自然的就能想到檢查點指的就是記憶體佇列在某一穩定時刻的“快照”,而且每隔一段時間(checkpointIntervalFile Channel會將記憶體佇列持久化到磁碟檔案,也就是我們配置的檢查點目錄下。為了保證記憶體佇列“快照”的完整性,再將記憶體佇列持久到磁碟檔案時需要鎖定記憶體佇列,就是說此過程不Source不能寫ChannelSink也不能讀Channel你沒猜錯,上面的backupCheckpointDir就是檢查點目錄的備份目錄,因為檢查點檔案是經常讀寫的,很容易在
Flume Crash時導致檔案損壞,所以如果要做到快速恢復,就可以給檢查點配置一個複本。

從GitHub上下載Flume的原始碼,直接定位到flume-ng-channels模組,在該模組中,你肯定直接找到的是FileChannel.java類,在FileChannel中,我們看到了如下常見屬性(省略了部分其他屬性):

private Integer capacity = 0;
private int keepAlive;
protected Integer transactionCapacity = 0;
private Long checkpointInterval = 0L;
private long maxFileSize;
private long minimumRequiredSpace;
private File checkpointDir;
private File backupCheckpointDir;
private File[] dataDirs;
private Log log;
private final ThreadLocal<FileBackedTransaction> transactions =
    new ThreadLocal<FileBackedTransaction>();
private boolean fsyncPerTransaction;
private int fsyncInterval;

我接著看了下FileChannel的方法,發現大部分操作的實現是在上面的log屬性中,於是,我點開了Log.java。用Eclipse的Ctrl+O的快捷鍵,我們直接預覽Log類中的內部方法和屬性,發現我們想要的都在裡面,將Source將Event放入Channel肯定呼叫的是Log#put(long transactionID, Event event)方法,Sink從Channel取Event肯定呼叫的是Log#get(FlumeEventPointer pointer)方法等。如果直接看Log的這些方法,是不容易看明白它是怎麼被呼叫,是被誰呼叫這些問題的,於是我們先直接從Source的角度來看Channel的Event寫入過程。

       每個Source都需要設定一個通道處理器(ChannelProcessor),寫入Channel不是由Source來完成的,通道處理器用於暴露服務來將Event寫入Channel,它是單執行緒的(Executors.newSingleThreadExecutor)。通道處理器寫入Event的put操作有兩種:processEvent和processEventBatch,分別用來寫入單個Event和一次性批量寫入多個Event,兩個方法的內部處理過程都差不多。通道處理器將一個Event寫入到Channel的整個過程包含5部分:1.將Event進行攔截器鏈進行過濾;2.通過通道選擇器來選擇該Event需要寫入的哪些Channel;3.從Channel中獲取一個事務;4.呼叫Channel的put方法將Event寫入Channel;5.提交事務或回滾。其中如果某(批次)Event需要寫入多個Channel,則步驟3-5是在for迴圈中執行的,可見,如果要將Event寫入n個通道,則整個過程將產生n次事務操作。所以,縱觀Source、Channel和Sink的實現,Channel的實現無疑是最重量級的。

        咱們先看第1步攔截器構造工廠(InterceptorBuilderFactory)將使用者配置的攔截器組裝成攔截器鏈,通道處理器將需要寫入Channel的Event先放入攔截器鏈進行過濾,如果最後返回的Event不為空,說明沒被過濾掉,攔截器不一定只是為了過濾Event,它還可以給Event的頭部新增一些必要資訊,比如資料的日誌檔案來源等。

       第2步是確定該(批次)Event需要寫入哪些Channel,這些Channel包括要求(required)的和可選(optional)的,要求的Channel是需要保證寫入成功的(如果失敗則會重試),可選的Channel只會嘗試寫入一次,不管失敗與否。每個通道處理器例項都配置有一個通道選擇器(ChannelSelector),通道處理器的構造器的入參就是通道選擇器,Channel的選擇就是由通道選擇器來做的。通道選擇器會對Event的頭部資訊來進行篩選,決定該Event需要寫入到哪些Channel,具體配置可以參考官方文件,該部分不看程式碼也能明白其實現,所以通道選擇器不做具體介紹了。

         知道要寫入哪些Channel後,通道抽利器將for迴圈遍歷Channel列表將該(批次)Event依次放入Channel中,將該(批次)Event寫入該Channel前,會先通過改Channel建立事務(步驟3),全部寫入成功(步驟4)後將提交事務(步驟5,commit),否則會回滾(步驟5,rollback)並丟擲異常。這將會有個潛在的問題,如果一個Event需要寫入多個Channel,當寫入其中某個Channel導致事務回滾時,由於在他之前的Channel已經寫入成功了,所以當該(批次)Event再次被交給通道處理器時,上次那些已經成功寫入Event的Channel將會被重複寫入,這似乎是Flume設計的一個缺陷,所以只能下游的系統自己來處理重複訊息了。寫完要求(required)的Channel,將會繼續將Event寫入可選(optional)的Channel(如果配置了的話),寫入可選的Event也是會開啟事務的,但如果出錯只會回滾但不會對外丟擲異常。

        咱們接著看第3步:從Channel中獲取一個事務。在Flume中,Transaction介面定義了事務的四種狀態(Started,Committed,RolledBack,Closed)和對應四種操作(begin,commit,rollback,close),將事務物件藉助ThreadLocal來進行管理,便於跟蹤和使用,但事務物件在Flume中不會重複使用,也就是說事務在提交或回滾後將會關閉,然後當重新從FileChannel中獲取一個新的事務時,發現原有的事務時關閉狀態時,將會新建一個。抽象事務原語類BasicTransactionSemantics實現了Transaction介面,增加了幾個成員屬性,最重要的就是初始化事務ID(initialThreadId)了,它在事務的構造方法中會記錄建立它的執行緒ID,並在事務提供的所有操作之前都會通過initialThreadId來檢查當前的執行緒是否是建立它的執行緒,如果不是則拋異常(

Preconditions.checkState(Thread.currentThread().getId() == initialThreadId, "XXX called from different thread than getTransaction()!");),。BasicTransactionSemantics在Transaction原有的基礎上增加了put和take方法,併為這6種方法都添加了對應的do操作方法,而原方法的操作只是在為了在呼叫其do方法前做狀態檢查(如put將呼叫doPut,begin將呼叫doBegin等),於是,你一定想到了,從Channel中獲取一個事務其實就是呼叫了它ThreadLocal屬性(currentTransaction)的get方法來獲取當前執行緒繫結的事務物件,沒錯,就是這樣。由於每種通道的實現都不同,所以都需要自己去實現事務的具體細節,而檔案通道File Channel的事務是由直接寫在FileChannel類的靜態內部類FileBackedTransaction來實現的。Channel可以設定最大的事務數量(transactionCapacity,預設10000),該數量不能超過通道的容量(當然也沒必要超過,capacity,預設1000000),比如我們設定檔案通道的最大事務數量是10000,表明Source往Channel寫時最多能開啟10000個事務,Sink從Channel中取Event時最多也能開啟10000個事務。。

         繼續看第4步,即Source將Event寫Channel。通道處理器將呼叫各自Channel中的put方法將一個Event寫入Channel例項,而不用關心其具體細節。抽象通道原語類BasicChannelSemantics對事務進行了ThreadLocal包裝(步驟3中已經說了,即 private ThreadLocal<BasicTransactionSemantics> currentTransaction = new ThreadLocal<BasicTransactionSemantics>();),保證了事務實現的單執行緒呼叫,BasicChannelSemantics中實現了Channel的put方法,它直接呼叫的就是當前執行緒中事務例項的put方法:

  @Override
  public void put(Event event) throws ChannelException {
    BasicTransactionSemantics transaction = currentTransaction.get();
    Preconditions.checkState(transaction != null,
        "No transaction exists for this thread");
    transaction.put(event);
  }

 這下又直接將實現細節交給了不同Channel事務類的實現。BasicTransactionSemantics是實現了Transaction介面的抽象類,我們來看下它的put實現:

  protected void put(Event event) {
    Preconditions.checkState(Thread.currentThread().getId() == initialThreadId,
        "put() called from different thread than getTransaction()!");
    Preconditions.checkState(state.equals(State.OPEN),
        "put() called when transaction is %s!", state);
    Preconditions.checkArgument(event != null,
        "put() called with null event!");

    try {
      doPut(event);
    } catch (InterruptedException e) {
      Thread.currentThread().interrupt();
      throw new ChannelException(e.toString(), e);
    }
  }

 可見,它直接呼叫的是doPut方法(步驟3提到過)。FileBackedTransaction為FileChannel的事務實現類,它實現了BasicTransactionSemantics,FileBackedTransaction中有兩個FlumeEventPointer的雙端佇列:takeList和putList,用於儲存一個事務中(Source)需要寫入Channel或(Sink)需要從Channel讀取的Event的指標,因為在Flume中,事務只包含Source寫Channel或者Sink讀Channel這兩個操作中的一個,所以在實際情況中,takeList和putList有一個會為空。FileBackedTransaction的doPut實現中,先會去檢查putList佇列是否已經滿了(putList.remainingCapacity() == 0),如果滿了則直接拋異常,FileBackedTransaction中還有個queueRemaining的訊號量型別(Semaphore),它由其建構函式傳入,queueRemaining的許可數正好是File Channel的容量,所以它還會通過它檢查File Channel是否還有剩餘空間(queueRemaining.tryAcquire(keepAlive,TimeUnit.SECONDS)),如果你看過Flume官方文件中File Channel的屬性配置,一定對keep-alive這個屬性有過疑問,文件中對它的描述是:Amount of time (in sec) to wait for a put operation,即等候put操作的時間數量,預設值是3,所以,這時你就明白了,它就是剛才queueRemaining獲取一個可用許可的等待的秒數(keepAlive),就是說,如果檔案通道滿了,它最多等它3秒,所以,當對於該File Channel來說,如果它的Sink消費訊息的能力低於Source往Channel寫入訊息的能力,這個值可以稍微設定大點。當成功拿到寫入許可後,它會呼叫Log物件來獲取共享鎖(log.lockShared()),它其實是檢查點的讀鎖(checkpointReadLock.lock(),那什麼是檢查點呢?本文稍後會說)。當讀鎖獲取到以後,它會呼叫Log的put方法(FlumeEventPointer ptr = log.put(transactionID, event);)來開始真正進入“寫磁碟檔案”的流程。我們現在來仔細瞅瞅log.put的過程,Log不是簡單的將Event寫入檔案,它先對它進行了Put物件的包裝,Put類除了包含一個Event的成員屬性,還包含了事務ID(transactionID)和寫順序ID(logWriteOrderID),事務ID即通道獲取的事務物件中的事務ID,而這個寫順序ID是由WriteOrderOracle.next()直接生成的,在單個Flume Agent例項中是唯一的。所以,Log是將Put物件寫入到檔案中。由於我們可以在File Channel中配置多個數據儲存目錄(dataDirs),之所以Flume讓你配置多個數據目錄,是為了提升併發寫的能力,這也從側面說明,“寫”是個多執行緒操作。為了達到負載均衡,Log使用事務ID對dataDirs數量取模來決定寫入哪個資料儲存目錄((int)Math.abs(transactionID % (long)logFiles.length());),每個資料目錄其實是一個LogFileV3類的例項,當知道寫入哪個資料目錄後,將先檢查該資料目錄下面的檔案是否還有足夠的可供空間(程式碼中寫死的15L*1000L,不可配,為什麼會有這個空間,還有待稍後分析),如果有足夠的可用空間,則直接呼叫LogFileV3的put方法,將Put物件寫入磁碟檔案中,寫入時,還會對Put物件的ByteBuffer進行進一步包裝,比如在前面加上ByteBuffer的長度(limit)等,寫之前會記錄當前資料檔案指標的位置,即offset(getFileChannel().position()),寫完(int wrote = getFileChannel().write(toWrite);)後,將該資料檔案ID和offset一併返回(return Pair.of(getLogFileID(), offset);),便於後面追查記錄位置。

這裡第一次提到了資料檔案ID一詞,為了保證每個資料目錄(dataDir)下資料檔案的合理大小,當資料超過一個數據檔案的最大容量時,Flume會自動在當前目錄新建一個數據檔案,為了區分同一個資料目錄下的資料檔案,Flume採用檔名加數字字尾的形式(比如log-1log-2,其中的數字字尾就是資料檔案ID,它由Log例項中的nextFileID屬性來維護,它是原子整形,由於在Flume Agent例項中,一個File Channel會對應一個Log例項,所以資料檔案ID是唯一的,即使你配置了多個數據目錄。每個資料檔案都有一個對應的元資料檔案(MetaDataFile它和資料檔案在同一目錄,命名則是在資料檔案後面加上.meta,比如log-1對應的元資料檔案是log-1.meta,其實就是將Checkpoint物件通過谷歌的Protos協議序列化到元資料檔案中,Checkpoint儲存了對應資料檔案的諸多重要資訊,比如版本、寫順序ID(logWriteOrderID)、佇列大小(queueSize)和佇列頭索引(queueHead)等。元資料檔案主要用於快速將資料檔案的資訊載入到記憶體中,即重播(replay)時使用,關於重播,請參考我的博文Flume快速入門(五)。

這種資料檔案ID+位置的組合就是Event指標物件FlumeEventPointer,它有兩個int屬性,fileId(檔案ID)和offset(位置偏移量)。當拿到FlumeEventPointer物件,說明Event已經寫入磁碟緩衝區成功(還未強制刷盤)。接下來FileBackedTransaction.doPut需要將得到的FlumeEventPointer和事務ID臨時存放到記憶體佇列queueinflightPuts屬性中(queue.addWithoutCommit(ptr,transactionID);),之所以說臨時存放,是因為事務還未提交,還不能直接將FlumeEventPointer放入queue的已提交資料容器backingStore中,不然就提前暴露給Sink了,這也方便區分哪些Event還未被真正提交,方便後面回滾。queue就是FlumeEventQueue的一個例項,對於File Channel來說,採用的是檔案作為儲存介質,突然的服務宕機重啟不會帶來資料丟失(沒有刷盤的資料或磁碟損壞除外),但為了高效索引檔案資料,我們得在記憶體儲存寫入到檔案的Event的位置資訊,前面說了,Event的位置資訊可由FlumeEventPointer物件來儲存,FlumeEventPointer是由FlumeEventQueue來管理的,FlumeEventQueue有兩個InflightEventWrapper型別的屬性:inflightPuts和inflightTakes,InflightEventWrapper是FlumeEventQueue的內部類,專門用來儲存未提交Event(位置指標),這種未提交的Event資料稱為飛行中的Event(in flight events)。因為這是往Channel的寫操作,所以呼叫InflightEventWrapper的addEvent(Long transactionID,Long pointer)方法將未提交的資料儲存在inflightPuts中(inflightEvents.put(transactionID, pointer);),注意,這裡有個小細節,InflightEventWrapper的addEvent的第二個入參是Long型別的,而我們要儲存的是FlumeEventPointer型別,所以,在FlumeEventPointer.toLong方法中,會將FlumeEventPointer的兩個int屬性合成一個long屬性,以方便儲存,方法如下:

  public long toLong() {
    long result = fileID;
    result = (long)fileID << 32;
    result += (long)offset;
    return result;
  }

這一步做完之後,寫入Channel的操作就完成了,立馬釋放共享鎖(log.unlockShared();)和訊號量(queueRemaining.release();)。

       我們看最後的5,事務提交或回滾,我們這裡重點關注提交過程。大家知道,當一個事務提交後,Channel需要告知Source Event已經成功儲存了,而Source需要通知寫入源已經寫入成功,即成功ACK.那麼,事務提交的過程是不是很簡單呢?其實不然,我們接著看。File Channel的事務提交過程由FileBackedTransaction.doCommit方法來完成,對於SourceChannel的寫操作(putListsize大於0)來說,還是會先通過Log物件拿到檢查點的共享鎖(log.lockShared();),拿到共享鎖後,將呼叫Log物件的commit方法,commit方法先將事務ID、新生成的寫順序ID和操作型別(這裡是put)來構建一個Commit物件,該物件和上面步驟4提到過的Put物件類似,然後再將該Commit寫入資料檔案,至於是寫入哪個資料目錄下的檔案,和當時寫Put物件一樣,有同樣的事務ID來決定,這意味著,在一次事務操作中,一個Commit物件和對應的Put(如果是批次提交,則有多少個Event就有多少個Put)會被寫入同一個資料目錄下。當然,寫入Commit物件到檔案後並不需要保留其位置指標。做完這一步,將putList中的Event檔案位置指標FlumeEventPointer物件依次移除並放入記憶體佇列queue(FlumeEventQueue)的隊尾部while(!putList.isEmpty()){if(!queue.addTail(putList.removeFirst())){,這樣Sink才有機會從記憶體佇列queue中索引要取的Event在資料目錄中的位置,同時也保證了這批次的Event誰第一個寫入的Event也將第一個被放入記憶體佇列也將會第一個被Sink消費。當FlumeEventPointer被全部寫入記憶體佇列queue後,則需要將儲存在queue.inflightPuts中的該事務ID移除(queue.completeTransaction(transactionID);,因為該事務不再處於“飛行中in flight events”了,於是,一個事務真正的被commit了。

有人會說,第5步也不復雜啊!如果我們回頭想想,Event雖被寫入了本地磁碟檔案,Event的檔案指標也被寫入了記憶體佇列,似乎是萬無一失了,但這是一旦Flume被重啟,會怎麼樣?對,記憶體佇列FlumeEventQueue直接消失了!記憶體佇列資料沒了,檔案指標也沒了,你再也無法定位那些已經準備好的可以被Sink讀取的Event在檔案中的位置了!所以你立馬想到了,我應該經常對記憶體佇列做“備份”,沒錯,Flume也是會對記憶體佇列做檔案備份的,於是,檢查點(checkpoint)的概念出現了!還記得Flume官方文件中File Channel中checkpointDir屬性的設定了嗎?它就是設定檢查點檔案儲存的目錄的。那什麼時候會將記憶體佇列備份到檢查點檔案呢?對於寫Channel來說,就是步驟5queue.addTail操作,這也是為什麼步驟5我們沒仔細分析該步驟的原因。其實,寫檢查點檔案也是比較複雜的一個過程,因為它要保證寫時效率,也要保證恢復時能將資料重新載入到記憶體佇列中。

 我們接下來繼續分析第5步queue(FlumeEventQueue)中的addTail方法,在介紹該方法之前,我們來看看記憶體佇列FlumeEventQueue中最重要的成員屬性:backingStore,它是EventQueueBackingStore介面的實現,有興趣的讀者可以看看EventQueueBackingStoreFile類的實現。那麼backingStore是用來做什麼的呢?backingStore主要有兩個作用,第一是儲存已經提交了的Event位置指標,放在一個Map結構的overwriteMap屬性中,另一個作用就是將已提交但還未來得及被消費的Event位置指標資料寫入檢查點檔案,這是通過將檢查點檔案對映到一塊虛擬記憶體中來完成的,這個虛擬記憶體結構就是backingStore的mappedBuffer屬性,它是MappedByteBuffer型別。overwriteMap和mappedBuffer中的資料是不相交的,因為有個後臺排程任務(預設30秒執行一次,最短能每1秒鐘執行一次,該排程任務稍後會介紹)會定時將overwriteMap的資料一個一個寫入mappedbuffer,每寫一個Event指標就把該Event指標從overwriteMap中移除,所以overwriteMap + mappedbuffer幾乎就可以說是記憶體佇列中未被消費的全量Event指標資料。這下你就明白了,如果Sink想從File Channel取出一個Event來消費,FlumeEventQueue會先嚐試從overwriteMap中取,如果找不到,就從mappedBuffer中取。當然,由於mappedBuffer是ByteBuffer,所以為了操作方便,在mappedBuffer之上建立了一個LongBuffer的檢視,即backingStore的elementsBuffer屬性(elementsBuffer = mappedBuffer.asLongBuffer();)。好了,回過頭來,我們繼續說addTail方法的實現,它主要分為如下幾個步驟:

1.先檢查backingStore的容量(backingStore的容量和File Channel的容量一致,這也保證了及時整個File Channel中的資料沒被消費,記憶體佇列也能完整備份到檢查點檔案中),如果滿了則告知失敗,返回false,這時候會列印錯誤日誌,但不會拋異常,也不會影響整個事務的提交(為什麼只是列印錯誤日誌呢?因為Flume的作者認為這中情況不太可能發生,因為事務佇列中保留了部分空餘空間)。

2.還是老規矩,將Event指標物件FlumeEventPointer整合成一個long型別。

3.backingStore中有個屬性logFileIDReferenceCounts,它是個Map結構,key為資料檔案logFile的檔案IDvalue為該檔案中還未被消費的Event指標計數器。於是需要將該Event指標所寫的資料檔案ID對應的計數器+1?大家有沒有想過這是為什麼?對,如果當某個logFile檔案的計數器為0,表明該檔案的Event全被Sink消費了,也就是說Flume可以在需要時刪除該logFile及其相關的檢查點或備份檔案了。

4.backingStore有個int型的queueSize屬性,用啦記錄放入overwriteMapEvent指標數,於是,這一步是將queueSizeEvent指標放入overwriteMap中。

5.queueSize+1.

6.backingStore還有個Set<Long>型別的queueSet屬性,用來存放所有的Event指標的資料,並對映到檔案,queueSet中包含了overwriteMapmappedBuffer中所有的Event指標。所以這一步就是將該Event指標放入queueSet中。

經過這6步,記憶體佇列FlumeEventQueueaddTail操作就完成了。

       為了便於理解,這裡用幾張圖來說明整個流程。

       先看整體流程:

 

 

        我們再來看看將Event寫入資料檔案的過程:

 

 

將Event寫入資料檔案後、Event指標放入FlumeEventQueue的已提交資料backingStore之前,會將事務ID和Event指標臨時放入inflightPuts中,這一步很簡單,感興趣的讀者可以看InflightEventWrapper#addEvent.