1. 程式人生 > >[從原始碼學設計] Flume 之 memory channel

[從原始碼學設計] Flume 之 memory channel

# [從原始碼學設計] Flume 之 memory channel [toc] ## 0x00 摘要 在使用Flume時,有時遇到如下錯誤資訊:Space for commit to queue couldn't be acquired。 究其原因,是在memory channel的使用中出現了問題。 本文就以此為切入點,帶大家一起剖析下 Flume 中 MemoryChannel 的實現 ## 0x01 業務範疇 ### 1.1 用途和特點 Flume的用途:高可用的,高可靠的,分散式的海量日誌採集、聚合和傳輸的系統。 這裡我們介紹與本文相關的特點: - Flume的管道是基於事務,保證了資料在傳送和接收時的一致性. - Flume是可靠的,容錯性高的,可升級的,易管理的,並且可定製的。 - 當收集資料的速度超過將寫入資料的時候,也就是當收集資訊遇到峰值時,這時候收集的資訊非常大,甚至超過了系統的寫入資料能力,這時候,Flume會在資料生產者和資料收容器間做出調整
,保證其能夠在兩者之間提供平穩的資料. ### 1.2 Channel 這裡就要介紹channel的概念。channel是一種短暫的儲存容器,它將從source處接收到的event格式的資料快取起來,直到它們被sinks消費掉,它在source和sink間起著橋樑的作用,channel是一個完整的事務,這一點保證了資料在收發的時候的一致性。並且它可以和任意數量的source和sink連結。 支援的型別主要有: JDBC channel , File System channel , Memory channel等,大致區別如下: - Memory Channel:events儲存在Java Heap,即記憶體佇列中(記憶體的大小是可以指定的)。對於流量較高和由於agent故障而準備丟失資料的流程來說,這是一個理想的選擇; - File Channel:event儲存在本地檔案中,可靠性高,但吞吐量低於Memory Channel; - JDBC Channel :event儲存在持久化儲存庫中(其背後是一個數據庫),JDBC channel目前支援嵌入式Derby。這是一個持續的channel,對於可恢復性非常重要的流程來說是理想的選擇; - Kafka Channel:events儲存在Kafka叢集中。Kafka提供高可用性和高可靠性,所以當agent或者kafka broker 崩潰時,events能馬上被其他sinks可用。 本文主要涉及Memory Channel,所以看看其特性。 - **好處**:速度快,吞吐量大; - **壞處**:根據計算機工作的原理就可以得知,凡是在記憶體中計算的資料,只要電腦出現故障導致停機,那麼記憶體中資料是不會進行儲存的; - **所適用的場景**:高吞吐量,允許資料丟失的業務中; ### 1.3 研究重點 由此,我們可以總結出來 Flume 的一些重點功能: - 可靠的,容錯性高的; - 實現事務; - 速度快,吞吐量大; - 可以調節收集的速度以解決生產者消費者不一致; - 可升級的,易管理,可定製的; 因為MemoryChannel屬於Flume的重要模組,所以,我們本文就看看是MemoryChannel是如何確保Flume以上特點的
,這也是本文的學習思路。 ### 1.4 實際能夠學到什麼 如何回滾,使用鎖,訊號量 ,動態擴容,如何解決生產者消費者不一致問題。 ### 1.5 總述 MemoryChannel還是比較簡單的,主要是通過MemoryTransaction中的putList、takeList與MemoryChannel中的queue進行資料流轉和事務控制,這裡的queue相當於持久化層,只不過放到了記憶體中,如果是FileChannel的話,會把這個queue放到本地檔案中。 MemoryChannel受記憶體空間的影響,如果資料產生的過快,同時獲取訊號量超時容易造成資料的丟失。而且Flume程序掛掉,資料也會丟失。 具體是: - 維持一個佇列,佇列的兩端分別是source和sink。 - source使用doPut方法往putList插入Event - sink使用doTake方法從queue中獲取event放入takeList,並且提供rollback方法,用於回滾。 - commit方法作用是把putList中的event一次性寫到queue; 下面表示了Event在一個使用了MemoryChannel的agent中資料流向: ```java source ---> putList ---> queue ---> takeList ---> sink ``` 為了大家更好的理解,我們提前把最終圖例發到這裡。 具體如下圖: ```java +----------+ +-------+ | Source | +----------------------------------------------------------------+ | Sink | +-----+----+ | [MemoryChannel] | +---+---+ | | +--------------------------------------------------------+ | ^ | | | [MemoryTransaction] | | | | | | | | | | | | | | | | | | channelCounter | | | | | | | | | | | | putByteCounter takeByteCounter | | | | | | | | | | | | +-----------+ +------------+ | |doTake | +----------------> | putList | | takeList +----------------+ doPut | | +----+--+---+ +----+---+---+ | | | | | ^ | ^ | | | | | | | | | | | +--------------------------------------------------------+ | | | | | | poll | | | | | | | | | | rollback rollback | | | | | +--------------+ +-------------+ | | | | | | | | | | | v | | | | doCommit +--+--+---+ doCommit | | | +------------> | queue | +-----------+ | | +---------+ | +----------------------------------------------------------------+ ``` 手機上如圖: ![img](https://img2020.cnblogs.com/blog/1850883/202101/1850883-20210126200336587-1768894223.png) ## 0x02 定義 我們要看看MemoryChannel重要變數的定義,這裡我們沒有按照程式碼順序來,而是重新整理。 ### 2.1 介面 MemorChannel中最重要的部分主要是Channel、Transaction 和Configurable三個介面。 **Channel介面** 主要聲明瞭Channel中的三個方法,就是佇列基本功能
: ```java public void put(Event event) throws ChannelException; //從指定的Source中獲得Event放入指定的Channel中 public Event take() throws ChannelException; //從Channel中取出event放入Sink中 public Transaction getTransaction(); //獲得當前Channel的事務例項 ``` **Transaction介面** 主要聲明瞭flume中事務機制的四個方法,就是事務功能: ```java enum TransactionState { Started, Committed, RolledBack, Closed } //列舉型別,指定了事務的四種狀態,事務開始、提交、失敗回滾、關閉 void begin(); void commit(); void rollback(); void close(); ``` **Configurable介面** 主要是和flume配置元件相關的,需要從flume配置系統獲取配置資訊的任何元件,都必須實現該介面。該介面中只聲明瞭一個context方法,用於獲取配置資訊。 大體邏輯如下: ```java +-----------+ +--------------+ +---------------+ | | | | | | | Channel | | Transaction | | Configurable | | | | | | | +-----------+ +--------------+ +---------------+ ^ ^ ^ | | | | | | | | | | +-------------+--------------+ | | | | | | | MemorChannel +---------+ +-------+ | | | | | | | | | | | | | | +----------------------------+ ``` 下面我們具體講講成員變數。 ### 2.2 配置引數 首先是一系列業務配置引數。 ```java //定義佇列中一次允許的事件總數 private static final Integer defaultCapacity = 100; //定義一個事務中允許的事件總數 private static final Integer defaultTransCapacity = 100; //將實體記憶體轉換成槽(slot)數,預設是100 private static final double byteCapacitySlotSize = 100; //定義佇列中事件所使用空間的最大位元組數(預設是JVM最大可用記憶體的0.8) private static final Long defaultByteCapacity = (long)(Runtime.getRuntime().maxMemory() * .80); //定義byteCapacity和預估Event大小之間的緩衝區百分比: private static final Integer defaultByteCapacityBufferPercentage = 20; //新增或者刪除一個event的超時時間,單位秒: private static final Integer defaultKeepAlive = 3; // maximum items in a transaction queue private volatile Integer transCapacity; private volatile int keepAlive; private volatile int byteCapacity; private volatile int lastByteCapacity; private volatile int byteCapacityBufferPercentage; private ChannelCounter channelCounter; ``` 這些引數基本都在configure(Context context)中設定,基本邏輯如下: - 設定 capacity:MemroyChannel的容量,預設是100。 - 設定 transCapacity:每個事務最大的容量,也就是每個事務能夠獲取的最大Event數量。預設也是100。事務容量必須小於等於Channel Queue容量。 - 設定 byteCapacityBufferPercentage:用來確定byteCapacity的一個百分比引數,即我們定義的位元組容量和實際事件容量的百分比,因為我們定義的位元組容量主要考慮Event body,而忽略Event header,因此需要減去Event header部分的記憶體佔用,可以認為該引數定義了Event header佔了實際位元組容量的百分比,預設20%; - 設定 byteCapacity:byteCapacity等於設定的byteCapacity值或堆的80%乘以1減去byteCapacityBufferPercentage的百分比,然後除以100。具體是首先讀取配置檔案定義的byteCapacity,如果沒有定義,則使用預設defaultByteCapacity,而defaultByteCapacity預設是JVM實體記憶體的80%(Runtime.getRuntime().maxMemory() * .80);那麼實際byteCapacity=定義的byteCapacity * (1- Event header百分比)/ byteCapacitySlotSize;byteCapacitySlotSize預設100,即計算百分比的一個係數。 - 設定 keep-alive:增加和刪除一個Event的超時時間(單位:秒)。 - 設定初始化 LinkedBlockingDeque物件,大小為capacity。以及各種訊號量物件。 - 最後初始化計數器。 配置程式碼摘要如下: ```java public void configure(Context context) { capacity = context.getInteger("capacity", defaultCapacity); transCapacity = context.getInteger("transactionCapacity", defaultTransCapacity); byteCapacityBufferPercentage = context.getInteger("byteCapacityBufferPercentage", defaultByteCapacityBufferPercentage); byteCapacity = (int) ((context.getLong("byteCapacity", defaultByteCapacity).longValue() *(1 - byteCapacityBufferPercentage * .01)) / byteCapacitySlotSize); if (byteCapacity < 1) { byteCapacity = Integer.MAX_VALUE; } keepAlive = context.getInteger("keep-alive", defaultKeepAlive); resizeQueue(capacity); if (channelCounter == null) { channelCounter = new ChannelCounter(getName()); } } ``` #### 2.2.1 channel屬性 ChannelCounter 需要單獨說一下。其就是把channel的一些屬性封裝了一下,初始化了一個ChannelCounter,是一個計數器,記錄如當前佇列放入Event數、取出Event數、成功數等。 ```java private ChannelCounter channelCounter; ``` 定義如下: ```java public class ChannelCounter extends MonitoredCounterGroup implements ChannelCounterMBean { private static final String COUNTER_CHANNEL_SIZE = "channel.current.size"; private static final String COUNTER_EVENT_PUT_ATTEMPT ="channel.event.put.attempt"; private static final String COUNTER_EVENT_TAKE_ATTEMPT = "channel.event.take.attempt"; private static final String COUNTER_EVENT_PUT_SUCCESS = "channel.event.put.success"; private static final String COUNTER_EVENT_TAKE_SUCCESS = "channel.event.take.success"; private static final String COUNTER_CHANNEL_CAPACITY = "channel.capacity"; } ``` ### 2.4 Semaphore和Queue 其次是Semaphore和Queue。主要就是用來協助制事務。 MemoryChannel有三個訊號量用來控制事務,防止容量越界:queueStored,queueRemaining,bytesRemaining。 - queueLock:建立一個Object當做佇列鎖,操作佇列的時候保證資料的一致性; - queue:使用LinkedBlockingDeque queue維持一個佇列,佇列的兩端分別是source和sink; - queueStored:來儲存queue中當前的儲存的event的數目,即已經儲存的容量大小,後面tryAcquire方法可以判斷是否可以take到一個event; - queueRemaining:來儲存queue中當前可用的容量,即空閒的容量大小,可以用來判斷當前是否有可以提交一定數量的event到queue中; - bytesRemaining : 表示可以使用的記憶體大小。該大小就是計算後的byteCapacity值。 ```java private Object queueLock = new Object(); @GuardedBy(value = "queueLock") private LinkedBlock