1. 程式人生 > >【Flume】【原始碼分析】flumeng的事務控制的原理詳解【記憶體通道memory channel】

【Flume】【原始碼分析】flumeng的事務控制的原理詳解【記憶體通道memory channel】

一開始我也是以為flume ng的事務控制是在sink端的,因為只看到那裡有事務的使用,但是今天看了一下fluem的整個事務控制,我才後知後覺,特此寫了這篇文章,望各位不吝指教。

先來一張圖吧!!!


從圖中可以看出,flume的事務控制在source端和sink端都有,具體的事務是依賴於通道的。這裡將的事務和檔案通道中的事務控制有個小區別【檔案通道中的事務是記錄在磁碟上

1、獲取事務

Transaction transaction = channel.getTransaction();
方法定義:
public Transaction getTransaction() {

    if (!initialized) {
      synchronized (this) {
        if (!initialized) {
          initialize();
          initialized = true;
        }
      }
    }

    BasicTransactionSemantics transaction = currentTransaction.get();
    if (transaction == null || transaction.getState().equals(
            BasicTransactionSemantics.State.CLOSED)) {
      transaction = createTransaction();
      currentTransaction.set(transaction);
    }
    return transaction;
  }
內部呼叫createTransaction();方法,具體定義如下【看記憶體通道】:
protected BasicTransactionSemantics createTransaction() {
    return new MemoryTransaction(transCapacity, channelCounter);
  }
public MemoryTransaction(int transCapacity, ChannelCounter counter) {
      putList = new LinkedBlockingDeque<Event>(transCapacity);
      takeList = new LinkedBlockingDeque<Event>(transCapacity);

      channelCounter = counter;
    }
事務初始化了三個變數,分別是:事件放入列表【一次事務中可以放入的event數量】,事件取出列表【一次事務中可以取走的event數量】,通道監控度量資料
transCapacity就是我們配置的事務容量,也就是一次事務中最多可以容下多少個event

事務,以及事務中的變數都定義好了,下面就是事務中具體的方法定義了:

以下只講述記憶體通道的相關方法定義:【關於檔案通道的講解

1、doPut

putList放入一個event,代表一個event已經納入到事務中了;這個put的操作肯定是由source端發起的,看個例子:【關於ExexSource的原始碼分析

 for (Channel reqChannel : reqChannelQueue.keySet()) {
      Transaction tx = reqChannel.getTransaction();
      Preconditions.checkNotNull(tx, "Transaction object must not be null");
      try {
        tx.begin();

        List<Event> batch = reqChannelQueue.get(reqChannel);

        for (Event event : batch) {
          reqChannel.put(event);
        }

        tx.commit();
      } catch (Throwable t) {
        tx.rollback();
        if (t instanceof Error) {
          LOG.error("Error while writing to required channel: " +
              reqChannel, t);
          throw (Error) t;
        } else {
          throw new ChannelException("Unable to put batch on required " +
              "channel: " + reqChannel, t);
        }
      } finally {
        if (tx != null) {
          tx.close();
        }
      }
這是channelprocessor的方法,迴圈遍歷reqChannelQueue這個map物件,對立面每個通道對應的批量event進行put操作,納入事務的過程中。

event在放入eventQueue是通過list的add方法,所以放在列表尾部

2、doTake

記憶體通道中有個變數

queue——LinkedBlockingDeque

在通道初始化的時候,初始化該變量了

synchronized(queueLock) {
        queue = new LinkedBlockingDeque<Event>(capacity);
        queueRemaining = new Semaphore(capacity);
        queueStored = new Semaphore(0);
      }
分別記錄的通道的總容量、剩餘空閒容量、佔用容量

take的時候首先從queue中取出隊頭event,前面第一步說了放入的時候是放在尾部,現在從頭部取,保證先入先出;這裡是從總體的通道容量中取出一個,還需要操作一下takeList,將其納入事務中,takeList中也put一次

3、doCommit

 synchronized(queueLock) {
        if(puts > 0 ) {
          while(!putList.isEmpty()) {
            if(!queue.offer(putList.removeFirst())) {
              throw new RuntimeException("Queue add failed, this shouldn't be able to happen");
            }
          }
        }
        putList.clear();
        takeList.clear();
      }

offer——放隊尾【從putList中取頭一個放】,一直迴圈到putList取完了

這裡commit都做完了,說明doTake肯定沒問題的,所以takeList要清空了

4、doRollback

 synchronized(queueLock) {
        Preconditions.checkState(queue.remainingCapacity() >= takeList.size(), "Not enough space in memory channel " +
            "queue to rollback takes. This should never happen, please report");
        while(!takeList.isEmpty()) {
          queue.addFirst(takeList.removeLast());
        }
        putList.clear();
      }
回滾,肯定回滾的是sink寫出失敗的event,在takeList中,走到這個方法的時候,doCommit肯定是沒做的,只有發生異常了,才會到回滾,所以takeList並未clear。

這裡將takeList中的最後一個元素,迴圈取出放回通道佇列的第一個【因為doTake會從頭取,當然要把剛剛失敗的event繼續放到頭部,下次繼續操作這些event

同理,這裡rollback了,說明putList肯定也沒用了,清空,重新再放


無論是往通道中放event還是從通道中取event,都有一個超時控制

// this does not need to be in the critical section as it does not
      // modify the structure of the log or queue.
      if(!queueRemaining.tryAcquire(keepAlive, TimeUnit.SECONDS)) {
        throw new ChannelFullException("The channel has reached it's capacity. "
            + "This might be the result of a sink on the channel having too "
            + "low of batch size, a downstream system running slower than "
            + "normal, or that the channel capacity is just too low. "
            + channelNameDescriptor);
      }
這個keepAlive就是我們配置的超時時間,超時則會丟擲異常!


以上所有操作方法中,大家看到有一些變數XXXCounter的操作,這些都是監控度量的資料,詳見【flume中度量監控的分析