1. 程式人生 > >支付寶資深架構師的分散式追蹤 & APM 系統 SkyWalking 原始碼分析— DataCarrier 非同步處理庫

支付寶資深架構師的分散式追蹤 & APM 系統 SkyWalking 原始碼分析— DataCarrier 非同步處理庫

1. 概述

本文主要分享 SkyWalking DataCarrier 非同步處理庫

基於生產者消費者的模式,大體結構如下圖:

實際專案中,沒有 Producer 這個類。所以本文提到的 Producer ,更多的是一種角色

下面我們來看看整體的專案結構,如下圖所示 :

org.skywalking.apm.commons.datacarrier.buffer 包,主要包含 Channels 、Buffer 兩個類。Channels 是 Buffer 陣列的封裝。

具有1-5工作經驗的,面對目前流行的技術不知從何下手,需要突破技術瓶頸的可以加群。在公司待久了,過得很安逸,
但跳槽時面試碰壁。需要在短時間內進修、跳槽拿高薪的可以加群。如果沒有工作經驗,但基礎非常紮實,對java工作
機制,常用設計思想,常用java開發框架掌握熟練的可以加群。java架構群:582505643一起交流。

org.skywalking.apm.commons.datacarrier.buffer.Buffer ,快取區。

buffer 屬性,緩衝陣列。Producer 儲存的資料到 buffer 裡。

Buffer 在儲存資料時,把 buffer 作為一個 ““,使用 index 記錄最後儲存的位置,不斷向下,迴圈儲存到 buffer 中。通過這樣的方式,帶來良好的儲存效能,避免擴容問題。But ,儲存會存在衝突的問題:buffer 寫入位置,暫未被消費,已經存在值。此時,根據不同的 BufferStrategy 進行處理。整體流程見 #save(data) 方法。

當 Buffer 被 Consumer 消費時,被呼叫 

#obtain(start, end) 方法,獲得資料並清空。為什麼會帶 start 、end 方法引數呢?下文揭曉答案。

org.skywalking.apm.commons.datacarrier.buffer.Channels ,內嵌多個 Buffer 的通道。

Channels 在儲存資料時,相比 Buffer ,從 buffer 變成了多 buffer ,因此需要先選一個 buffer 。通過使用不同的 IDataPartitioner 實現類,進行 Buffer 的選擇。當緩衝策略為 BufferStrategy.IF_POSSIBLE 時,根據 IDataPartitioner 定義的重試次數,進行多次儲存資料直到成功。整體流程見 

#save(data) 方法。

IDataPartitioner 目前有兩個子類實現:

org.skywalking.apm.commons.datacarrier.consumer 包,主要包含 ConsumerPool 、ConsumerThread 、IConsumer 三個類。

ConsumerThread 使用 IConsumer ,消費資料

ConsumerPool 是 ConsumerThread 的執行緒池封裝

org.skywalking.apm.commons.datacarrier.consumer.IConsumer ,消費者介面。定義瞭如下方法:

#init() 介面方法,初始化消費者。

#onExit() 介面方法,處理當消費結束。此處的結束時,ConsumerThread 關閉。

我們在使用時,自定義 Consumer 類,實現 IConsumer 介面。例如:RemoteMessageConsumer 。

org.skywalking.apm.commons.datacarrier.consumer.ConsumerThread ,繼承 java.lang.Thread ,消費執行緒。

dataSources 屬性,消費訊息的資料來源( DataSource )陣列。一個 ConsumerThread ,可以消費多個 Buffer ,並且單個 Buffer 消費的分割槽範圍可配置,即一個 Buffer 可以被多個 ConsumerThread 同時無衝突的消費。在 「4.3 ConsumerPool」詳細解析 ConsumerThread 分配 Buffer 的方式。

#run() 實現方法,不斷批量的消費資料。程式碼如下:

第 78 至 88 行:不斷消費,直到執行緒關閉( #shutdown() )。

第 80 行:呼叫 #consume() 方法,批量消費資料。

第 82 至 87 行:當未消費到資料,說明 dataSources 為空,等待 20 ms ,避免 CPU 空跑。

第 93 行:當執行緒關閉,呼叫 #consume() 方法,消費完 dataSources 剩餘的資料。

第 95 行:呼叫 IConsumer#onExit() 方法,處理當消費結束。

#consume() 方法,批量消費資料。程式碼如下:

第 107 至 117 行:從 dataSources 中,獲取要消費的資料。

第 120 至 126 行:當有資料可消費時,呼叫 IConsumer#consume(List) 方法。當消費發生異常時,呼叫IConsumer#onError(List, Throwable) 方法。

第 127 行:返回是否有消費資料。

org.skywalking.apm.commons.datacarrier.consumer.ConsumerPool ,消費者池,提供了對 Channels 啟動指定數量的 ConsumerThread 進行消費。

consumerThreads 屬性,ConsumerThread 陣列,通過構造方法的 num 引數進行指定。

lock 屬性,鎖。保證 ConsumerPool 啟動或關閉時的執行緒安全。

#begin() 方法,啟動 ConsumerPool ,進行資料消費。程式碼如下:

第 97 至 99 行:正在執行中,直接返回。

第 101 行:獲得鎖。

第 104 行:呼叫 #allocateBuffer2Thread() 方法,將 channels 的多個 Buffer ,分配給 consumerThreads 的多個ConsumerThread。

第 107 至 109 行:啟動每個 ConsumerThread ,開始消費。

第 112 行:標記正在執行中。

第 114 行:釋放鎖。

close() 方法,關閉 ConsumerPool 。程式碼如下:

第 168 行:獲得鎖。

第 169 行:標記不在執行中。

第 170 至 172 行:關閉每個 ConsumerThread ,結束消費。

第 174 行:釋放鎖。

#allocateBuffer2Thread() 方法,將 channels 的多個 Buffer ,分配給 consumerThreads 的多個 ConsumerThread。一共會有三種情況:

Buffer 數量等於 ConsumerThread 數量,這個十分好分配,一比一。

Buffer 數量大於 ConsumerThread 數量,那麼按照 Buffer 數量 % ConsumerThread 數量進行分組,分配給 ConsumerThread ,如下圖所示:

Buffer 數量大於 ConsumerThread 數量,那麼按照 ConsumerThread 數量 % Buffer 數量進行分組,分配給 Buffer 。其中,一個 Buffer 會被均分給多個 ConsumerThread ,如下圖所示:

具有1-5工作經驗的,面對目前流行的技術不知從何下手,需要突破技術瓶頸的可以加群。在公司待久了,過得很安逸,
但跳槽時面試碰壁。需要在短時間內進修、跳槽拿高薪的可以加群。如果沒有工作經驗,但基礎非常紮實,對java工作
機制,常用設計思想,常用java開發框架掌握熟練的可以加群。java架構群:582505643一起交流。

org.skywalking.apm.commons.datacarrier.DataCarrier ,DataCarrier 非同步處理庫的入口程式。通過建立 DataCarrier 物件,使用生產者消費者的模式,執行非同步執行邏輯。

構造方法 ,程式碼如下:

channels 屬性,資料通道。在構造方法中,我們可以看到預設使用 SimpleRollingPartitioner 作為資料分割槽分配者,使用BufferStrategy.BLOCKING 作為緩衝策略。

設定消費者和消費執行緒數量

生產訊息

關閉消費