[從原始碼學設計] Flume 之 memory channel
阿新 • • 發佈:2021-01-29
# [從原始碼學設計] Flume 之 memory channel
[toc]
## 0x00 摘要
在使用Flume時,有時遇到如下錯誤資訊:Space for commit to queue couldn't be acquired。
究其原因,是在memory channel的使用中出現了問題。
本文就以此為切入點,帶大家一起剖析下 Flume 中 MemoryChannel 的實現
## 0x01 業務範疇
### 1.1 用途和特點
Flume的用途:高可用的,高可靠的,分散式的海量日誌採集、聚合和傳輸的系統。
這裡我們介紹與本文相關的特點:
- Flume的管道是基於事務,保證了資料在傳送和接收時的一致性.
- Flume是可靠的,容錯性高的,可升級的,易管理的,並且可定製的。
- 當收集資料的速度超過將寫入資料的時候,也就是當收集資訊遇到峰值時,這時候收集的資訊非常大,甚至超過了系統的寫入資料能力,這時候,Flume會在資料生產者和資料收容器間做出調整 ,保證其能夠在兩者之間提供平穩的資料.
### 1.2 Channel
這裡就要介紹channel的概念。channel是一種短暫的儲存容器,它將從source處接收到的event格式的資料快取起來,直到它們被sinks消費掉,它在source和sink間起著橋樑的作用,channel是一個完整的事務,這一點保證了資料在收發的時候的一致性。並且它可以和任意數量的source和sink連結。
支援的型別主要有: JDBC channel , File System channel , Memory channel等,大致區別如下:
- Memory Channel:events儲存在Java Heap,即記憶體佇列中(記憶體的大小是可以指定的)。對於流量較高和由於agent故障而準備丟失資料的流程來說,這是一個理想的選擇;
- File Channel:event儲存在本地檔案中,可靠性高,但吞吐量低於Memory Channel;
- JDBC Channel :event儲存在持久化儲存庫中(其背後是一個數據庫),JDBC channel目前支援嵌入式Derby。這是一個持續的channel,對於可恢復性非常重要的流程來說是理想的選擇;
- Kafka Channel:events儲存在Kafka叢集中。Kafka提供高可用性和高可靠性,所以當agent或者kafka broker 崩潰時,events能馬上被其他sinks可用。
本文主要涉及Memory Channel,所以看看其特性。
- **好處**:速度快,吞吐量大;
- **壞處**:根據計算機工作的原理就可以得知,凡是在記憶體中計算的資料,只要電腦出現故障導致停機,那麼記憶體中資料是不會進行儲存的;
- **所適用的場景**:高吞吐量,允許資料丟失的業務中;
### 1.3 研究重點
由此,我們可以總結出來 Flume 的一些重點功能:
- 可靠的,容錯性高的;
- 實現事務;
- 速度快,吞吐量大;
- 可以調節收集的速度以解決生產者消費者不一致;
- 可升級的,易管理,可定製的;
因為MemoryChannel屬於Flume的重要模組,所以,我們本文就看看是MemoryChannel是如何確保Flume以上特點的 ,這也是本文的學習思路。
### 1.4 實際能夠學到什麼
如何回滾,使用鎖,訊號量 ,動態擴容,如何解決生產者消費者不一致問題。
### 1.5 總述
MemoryChannel還是比較簡單的,主要是通過MemoryTransaction中的putList、takeList與MemoryChannel中的queue進行資料流轉和事務控制,這裡的queue相當於持久化層,只不過放到了記憶體中,如果是FileChannel的話,會把這個queue放到本地檔案中。
MemoryChannel受記憶體空間的影響,如果資料產生的過快,同時獲取訊號量超時容易造成資料的丟失。而且Flume程序掛掉,資料也會丟失。
具體是:
- 維持一個佇列,佇列的兩端分別是source和sink。
- source使用doPut方法往putList插入Event
- sink使用doTake方法從queue中獲取event放入takeList,並且提供rollback方法,用於回滾。
- commit方法作用是把putList中的event一次性寫到queue;
下面表示了Event在一個使用了MemoryChannel的agent中資料流向:
```java
source ---> putList ---> queue ---> takeList ---> sink
```
為了大家更好的理解,我們提前把最終圖例發到這裡。
具體如下圖:
```java
+----------+ +-------+
| Source | +----------------------------------------------------------------+ | Sink |
+-----+----+ | [MemoryChannel] | +---+---+
| | +--------------------------------------------------------+ | ^
| | | [MemoryTransaction] | | |
| | | | | |
| | | | | |
| | | channelCounter | | |
| | | | | |
| | | putByteCounter takeByteCounter | | |
| | | | | |
| | | +-----------+ +------------+ | |doTake |
+----------------> | putList | | takeList +----------------+
doPut | | +----+--+---+ +----+---+---+ | |
| | | ^ | ^ | |
| | | | | | | |
| +--------------------------------------------------------+ |
| | | | | poll |
| | | | | |
| | | rollback rollback | | |
| | +--------------+ +-------------+ | |
| | | | | |
| | | v | |
| | doCommit +--+--+---+ doCommit | |
| +------------> | queue | +-----------+ |
| +---------+ |
+----------------------------------------------------------------+
```
手機上如圖:
![img](https://img2020.cnblogs.com/blog/1850883/202101/1850883-20210126200336587-1768894223.png)
## 0x02 定義
我們要看看MemoryChannel重要變數的定義,這裡我們沒有按照程式碼順序來,而是重新整理。
### 2.1 介面
MemorChannel中最重要的部分主要是Channel、Transaction 和Configurable三個介面。
**Channel介面** 主要聲明瞭Channel中的三個方法,就是佇列基本功能 :
```java
public void put(Event event) throws ChannelException; //從指定的Source中獲得Event放入指定的Channel中
public Event take() throws ChannelException; //從Channel中取出event放入Sink中
public Transaction getTransaction(); //獲得當前Channel的事務例項
```
**Transaction介面** 主要聲明瞭flume中事務機制的四個方法,就是事務功能:
```java
enum TransactionState { Started, Committed, RolledBack, Closed } //列舉型別,指定了事務的四種狀態,事務開始、提交、失敗回滾、關閉
void begin();
void commit();
void rollback();
void close();
```
**Configurable介面** 主要是和flume配置元件相關的,需要從flume配置系統獲取配置資訊的任何元件,都必須實現該介面。該介面中只聲明瞭一個context方法,用於獲取配置資訊。
大體邏輯如下:
```java
+-----------+ +--------------+ +---------------+
| | | | | |
| Channel | | Transaction | | Configurable |
| | | | | |
+-----------+ +--------------+ +---------------+
^ ^ ^
| | |
| | |
| | |
| +-------------+--------------+ |
| | | |
| | MemorChannel +---------+
+-------+ | |
| |
| |
| |
| |
| |
| |
+----------------------------+
```
下面我們具體講講成員變數。
### 2.2 配置引數
首先是一系列業務配置引數。
```java
//定義佇列中一次允許的事件總數
private static final Integer defaultCapacity = 100;
//定義一個事務中允許的事件總數
private static final Integer defaultTransCapacity = 100;
//將實體記憶體轉換成槽(slot)數,預設是100
private static final double byteCapacitySlotSize = 100;
//定義佇列中事件所使用空間的最大位元組數(預設是JVM最大可用記憶體的0.8)
private static final Long defaultByteCapacity = (long)(Runtime.getRuntime().maxMemory() * .80);
//定義byteCapacity和預估Event大小之間的緩衝區百分比:
private static final Integer defaultByteCapacityBufferPercentage = 20;
//新增或者刪除一個event的超時時間,單位秒:
private static final Integer defaultKeepAlive = 3;
// maximum items in a transaction queue
private volatile Integer transCapacity;
private volatile int keepAlive;
private volatile int byteCapacity;
private volatile int lastByteCapacity;
private volatile int byteCapacityBufferPercentage;
private ChannelCounter channelCounter;
```
這些引數基本都在configure(Context context)中設定,基本邏輯如下:
- 設定 capacity:MemroyChannel的容量,預設是100。
- 設定 transCapacity:每個事務最大的容量,也就是每個事務能夠獲取的最大Event數量。預設也是100。事務容量必須小於等於Channel Queue容量。
- 設定 byteCapacityBufferPercentage:用來確定byteCapacity的一個百分比引數,即我們定義的位元組容量和實際事件容量的百分比,因為我們定義的位元組容量主要考慮Event body,而忽略Event header,因此需要減去Event header部分的記憶體佔用,可以認為該引數定義了Event header佔了實際位元組容量的百分比,預設20%;
- 設定 byteCapacity:byteCapacity等於設定的byteCapacity值或堆的80%乘以1減去byteCapacityBufferPercentage的百分比,然後除以100。具體是首先讀取配置檔案定義的byteCapacity,如果沒有定義,則使用預設defaultByteCapacity,而defaultByteCapacity預設是JVM實體記憶體的80%(Runtime.getRuntime().maxMemory() * .80);那麼實際byteCapacity=定義的byteCapacity * (1- Event header百分比)/ byteCapacitySlotSize;byteCapacitySlotSize預設100,即計算百分比的一個係數。
- 設定 keep-alive:增加和刪除一個Event的超時時間(單位:秒)。
- 設定初始化 LinkedBlockingDeque物件,大小為capacity。以及各種訊號量物件。
- 最後初始化計數器。
配置程式碼摘要如下:
```java
public void configure(Context context) {
capacity = context.getInteger("capacity", defaultCapacity);
transCapacity = context.getInteger("transactionCapacity", defaultTransCapacity);
byteCapacityBufferPercentage = context.getInteger("byteCapacityBufferPercentage",
defaultByteCapacityBufferPercentage);
byteCapacity = (int) ((context.getLong("byteCapacity", defaultByteCapacity).longValue() *(1 - byteCapacityBufferPercentage * .01)) / byteCapacitySlotSize);
if (byteCapacity < 1) {
byteCapacity = Integer.MAX_VALUE;
}
keepAlive = context.getInteger("keep-alive", defaultKeepAlive);
resizeQueue(capacity);
if (channelCounter == null) {
channelCounter = new ChannelCounter(getName());
}
}
```
#### 2.2.1 channel屬性
ChannelCounter 需要單獨說一下。其就是把channel的一些屬性封裝了一下,初始化了一個ChannelCounter,是一個計數器,記錄如當前佇列放入Event數、取出Event數、成功數等。
```java
private ChannelCounter channelCounter;
```
定義如下:
```java
public class ChannelCounter extends MonitoredCounterGroup implements
ChannelCounterMBean {
private static final String COUNTER_CHANNEL_SIZE = "channel.current.size";
private static final String COUNTER_EVENT_PUT_ATTEMPT ="channel.event.put.attempt";
private static final String COUNTER_EVENT_TAKE_ATTEMPT = "channel.event.take.attempt";
private static final String COUNTER_EVENT_PUT_SUCCESS = "channel.event.put.success";
private static final String COUNTER_EVENT_TAKE_SUCCESS = "channel.event.take.success";
private static final String COUNTER_CHANNEL_CAPACITY = "channel.capacity";
}
```
### 2.4 Semaphore和Queue
其次是Semaphore和Queue。主要就是用來協助制事務。
MemoryChannel有三個訊號量用來控制事務,防止容量越界:queueStored,queueRemaining,bytesRemaining。
- queueLock:建立一個Object當做佇列鎖,操作佇列的時候保證資料的一致性;
- queue:使用LinkedBlockingDeque queue維持一個佇列,佇列的兩端分別是source和sink;
- queueStored:來儲存queue中當前的儲存的event的數目,即已經儲存的容量大小,後面tryAcquire方法可以判斷是否可以take到一個event;
- queueRemaining:來儲存queue中當前可用的容量,即空閒的容量大小,可以用來判斷當前是否有可以提交一定數量的event到queue中;
- bytesRemaining : 表示可以使用的記憶體大小。該大小就是計算後的byteCapacity值。
```java
private Object queueLock = new Object();
@GuardedBy(value = "queueLock")
private LinkedBlock